Wiki-Game / wikigame_app2.py
KAI MAURIN-JONES
beta version 2 - full updates
9511a2d
#### For scraping/webpage processing
import requests
import json # specifically for wikipedia api
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from bs4 import BeautifulSoup
#### For timing
import time
#### For app
import streamlit as st
from collections import deque # for printouts
#### For semantic similarity model
# !pip install tensorflow tensorflow-hub
import tensorflow as tf
import tensorflow_hub as hub
import numpy as np
embed = hub.load("https://tfhub.dev/google/universal-sentence-encoder/4") # Load the pre-trained Universal Sentence Encoder -- accessible at same link
# # @st.experimental_singleton
# @st.cache_resource
# def get_driver():
# return webdriver.Chrome(service = Service(ChromeDriverManager().install()), options = options)
# import os, sys
# @st.cache_resource
# def installff():
# os.system('sbase install geckodriver')
# os.system('ln -s /home/appuser/venv/lib/python3.7/site-packages/seleniumbase/drivers/geckodriver /home/appuser/venv/bin/geckodriver')
# _ = installff()
# from selenium import webdriver
# from selenium.webdriver import FirefoxOptions
# opts = FirefoxOptions()
# opts.add_argument("--headless")
# driver = webdriver.Firefox(options=opts)
# driver_target = webdriver.Firefox(options=opts)
# browser.get('http://example.com')
# driver.get("http://example.com")
# from selenium import webdriver
# from selenium.common.exceptions import TimeoutException
# from selenium.webdriver.common.by import By
# from selenium.webdriver.firefox.options import Options
# from selenium.webdriver.firefox.service import Service
# from selenium.webdriver.support import expected_conditions as EC
# from selenium.webdriver.support.ui import WebDriverWait
# from webdriver_manager.firefox import GeckoDriverManager
# # URL = ""
# TIMEOUT = 20
# # st.title("Test Selenium")
# firefoxOptions = Options()
# firefoxOptions.add_argument("--headless")
# service = Service(GeckoDriverManager().install())
# driver = webdriver.Firefox(
# options=firefoxOptions,
# service=service,
# )
# driver_target = webdriver.Firefox(
# options=firefoxOptions,
# service=service,
# )
import streamlit as st
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
@st.cache_resource
def get_driver():
return webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
options = Options()
options.add_argument('--disable-gpu')
options.add_argument('--headless')
driver = get_driver()
driver_target = get_driver()
# driver.get('http://example.com')
# st.code(driver.page_source)
# Initialize an empty deque
messages = deque(maxlen = 1000) # after 1000 links, it'll start popping things. The model should always timeout before this, since most people won't have the patience to make it last this long
def update_messages(message):
# Add the new message to the start of deque
messages.appendleft(message)
# Use a placeholder
placeholder = st.empty()
# Clear the placeholder and add all the messages from the deque
placeholder.text('') # clears the placeholder
for msg in messages:
placeholder.text(msg)
def most_similar_sentence(target_topic, labels_list):
# Encode the context sentence and all sentences in the list
context_embedding = embed([target_topic])[0]
sentence_embeddings = embed(labels_list)
# Calculate cosine similarities between the context sentence and each sentence in the list
similarities = np.inner(context_embedding, sentence_embeddings)
# Find the index of the most similar sentence
most_similar_index = np.argmax(similarities)
return labels_list[most_similar_index], similarities[most_similar_index], most_similar_index
def search_wikipedia(search_term):
# Define the endpoint
endpoint = "https://en.wikipedia.org/w/api.php"
# Define the search parameters
params = {
"action": "query",
"format": "json",
"list": "search",
"srsearch": search_term
}
# Send a GET request to the endpoint with your parameters
response = requests.get(url = endpoint, params = params)
# Parse the results as JSON
data = json.loads(response.text)
# Get the title of the first result (this will be used as the page title in the next step)
page_title = data["query"]["search"][0]["title"]
if "may refer to" in data["query"]["search"][0]["snippet"].lower():
page_title = data["query"]["search"][1]["title"]
# Construct the URL of the Wikipedia page
page_url = "https://en.wikipedia.org/wiki/{}".format(page_title.replace(" ", "_"))
return page_url, page_title
def get_topic_context(driver, more = False):
# Find the first paragraph of the main article
first_paragraph = driver.find_element(By.CSS_SELECTOR, "div.mw-parser-output > p:not(.mw-empty-elt)").text
if more:
context_sentence = ". ".join(first_paragraph.split(". ")[:5])
else:
context_sentence = first_paragraph.split(". ")[0]
return context_sentence
# bad_words = [word for word in open("censored.txt", "r").readlines()]
bad_words = [word.strip() for word in open("censored.txt", "r").readlines()]
def refine_links(topic, links, current_url_suffix, used_links, used_topics, censor = False):
links_texts = []
# Iterate through the links and extract their URLs
for link in links:
link_url = link.get('href')
if link_url and link_url.startswith("/wiki/"):
link_url = "https://en.wikipedia.org" + link_url
link_text = link.text.strip() # Get the text and remove leading/trailing spaces
# make sure they are both not None
if link_text and current_url_suffix not in link_url:
if link_url not in used_links and link_text.lower() not in [topic.lower() for topic in used_topics]:
# eliminates topic duplicates, non-wiki links, and wiki-help pages (non-content pages)
if topic.lower() not in link_url.lower() and "en.wikipedia.org/wiki/" in link_url and ":" not in "".join(link_url.split("/")[1:]) and "Main_Page" != str(link_url.split("/")[-1]):
# censoring if needed
if censor:
if not any(word1.lower() in bad_words for word1 in [word.lower() for word in link_text.split()]):
links_texts.append((link_url, link_text))
else:
links_texts.append((link_url, link_text))
return links_texts
def play_wiki_game_2(starting_topic: str, target_topic: str, limit: int = 100, delay: int = 0):
##### Setup Chrome options
# chrome_options = webdriver.ChromeOptions()
# chrome_options.add_argument("--headless") # Ensure GUI is off
# chrome_options.add_argument("--no-sandbox")
# chrome_options.add_argument("--disable-dev-shm-usage")
# driver = webdriver.Chrome(options = chrome_options)
# options = Options()
# options.add_argument('--disable-gpu')
# options.add_argument('--headless')
# driver = get_driver()
# driver = webdriver.Firefox(options=opts)
# driver_target = webdriver.Firefox(options=opts)
#### Getting target url, topic, and context
# driver_target = webdriver.Chrome(options = chrome_options)
# driver_target = get_driver()
target_url, target_topic = search_wikipedia(search_term = target_topic)
driver_target.get(target_url)
target_context = get_topic_context(driver_target, more = True)
# update_messages(target_context)
driver_target.quit()
topic = starting_topic
num_pages = 0
used_topics = []
used_links = []
start_time = time.time()
### BEGIN ###
update_messages("-" * 150)
update_messages(f"\nStarting!\n")
update_messages("-" * 150)
url, topic = search_wikipedia(search_term = starting_topic)
driver.get(url)
used_topics.append(topic)
used_links.append(driver.current_url)
while True:
# increment the page tracking by 1 for each new page
num_pages += 1
# if not the first page, navigate to the new page
if num_pages > 1:
driver.get(next_link)
try:
context_sentence = get_topic_context(driver)
except Exception as e:
context_sentence = "Context could not be found from webpage"
current_url = driver.current_url
current_url_suffix = str(current_url).split("/")[-1]
### Use BeautifulSoup and Requests instead of Selenium for link extraction
current_page = driver.page_source # html from Selenium instead of BeautifulSoup
soup = BeautifulSoup(current_page, 'html.parser')
links = soup.find_all('a')
# get rid of any bloat in the links from the page
links_texts = refine_links(topic, links, current_url_suffix, used_links, used_topics)
# best_label, best_score, loc_idx = most_similar_sentence(target_topic = target_topic, labels_list = [text for link, text in links_texts])
best_label, best_score, loc_idx = most_similar_sentence(target_topic = target_context.lower(), labels_list = [text.lower() for link, text in links_texts])
update_messages(f"\nPage: {num_pages}")
update_messages(f"Current topic: '{topic.title()}'")
update_messages(f"Current URL: '{current_url}'")
update_messages(f"Current Topic Context: '{context_sentence}'")
if current_url != target_url:
update_messages(f"Next topic: '{best_label.title()}'. Semantic similarity to '{target_topic.title()}': {round((best_score * 100), 2)}%")
next_link, topic = links_texts[loc_idx]
used_links.append(next_link)
used_topics.append(topic)
if current_url == target_url: # because the target_url is now found through the API
update_messages("\n" + "-" * 150)
update_messages(f"\nFrom '{starting_topic.title()}', to '{target_topic.title()}' in {num_pages} pages, {round(time.time() - start_time, 2)} seconds!")
update_messages(f"Starting topic: '{starting_topic.title()}': '{used_links[0]}'")
update_messages(f"Target topic: '{target_topic.title()}': '{target_url}'\n")
update_messages("-" * 150)
driver.quit()
break
if num_pages == limit:
update_messages("\n" + "-" * 150)
update_messages(f"\nUnfortunately, the model couldn't get from '{starting_topic.title()}', to '{target_topic.title()}' in {num_pages} pages or less.")
update_messages(f"In {round(time.time() - start_time, 2)} seconds, it got from '{starting_topic.title()}': '{used_links[0]}', to '{used_topics[-1].title()}': '{used_links[-1]}'")
update_messages(f"\nTry a different combination to see if it can do it!\n")
update_messages("-" * 150)
driver.quit()
break
# delay things, if applicable
###### Example
time.sleep(delay)
# starting_topic = 'soulja boy'
# target_topic = 'urine'
# play_wiki_game(starting_topic = starting_topic, target_topic = target_topic, limit = 50)