Spaces:
Sleeping
Sleeping
#### For scraping/webpage processing | |
import requests | |
import json # specifically for wikipedia api | |
from selenium import webdriver | |
from selenium.webdriver.chrome.options import Options | |
from selenium.webdriver.chrome.service import Service | |
from webdriver_manager.chrome import ChromeDriverManager | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.common.keys import Keys | |
from bs4 import BeautifulSoup | |
#### For timing | |
import time | |
#### For app | |
import streamlit as st | |
from collections import deque # for printouts | |
#### For semantic similarity model | |
# !pip install tensorflow tensorflow-hub | |
import tensorflow as tf | |
import tensorflow_hub as hub | |
import numpy as np | |
embed = hub.load("https://tfhub.dev/google/universal-sentence-encoder/4") # Load the pre-trained Universal Sentence Encoder -- accessible at same link | |
# # @st.experimental_singleton | |
# @st.cache_resource | |
# def get_driver(): | |
# return webdriver.Chrome(service = Service(ChromeDriverManager().install()), options = options) | |
# import os, sys | |
# @st.cache_resource | |
# def installff(): | |
# os.system('sbase install geckodriver') | |
# os.system('ln -s /home/appuser/venv/lib/python3.7/site-packages/seleniumbase/drivers/geckodriver /home/appuser/venv/bin/geckodriver') | |
# _ = installff() | |
# from selenium import webdriver | |
# from selenium.webdriver import FirefoxOptions | |
# opts = FirefoxOptions() | |
# opts.add_argument("--headless") | |
# driver = webdriver.Firefox(options=opts) | |
# driver_target = webdriver.Firefox(options=opts) | |
# browser.get('http://example.com') | |
# driver.get("http://example.com") | |
# from selenium import webdriver | |
# from selenium.common.exceptions import TimeoutException | |
# from selenium.webdriver.common.by import By | |
# from selenium.webdriver.firefox.options import Options | |
# from selenium.webdriver.firefox.service import Service | |
# from selenium.webdriver.support import expected_conditions as EC | |
# from selenium.webdriver.support.ui import WebDriverWait | |
# from webdriver_manager.firefox import GeckoDriverManager | |
# # URL = "" | |
# TIMEOUT = 20 | |
# # st.title("Test Selenium") | |
# firefoxOptions = Options() | |
# firefoxOptions.add_argument("--headless") | |
# service = Service(GeckoDriverManager().install()) | |
# driver = webdriver.Firefox( | |
# options=firefoxOptions, | |
# service=service, | |
# ) | |
# driver_target = webdriver.Firefox( | |
# options=firefoxOptions, | |
# service=service, | |
# ) | |
import streamlit as st | |
from selenium import webdriver | |
from selenium.webdriver.chrome.options import Options | |
from selenium.webdriver.chrome.service import Service | |
from webdriver_manager.chrome import ChromeDriverManager | |
def get_driver(): | |
return webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options) | |
options = Options() | |
options.add_argument('--disable-gpu') | |
options.add_argument('--headless') | |
driver = get_driver() | |
driver_target = get_driver() | |
# driver.get('http://example.com') | |
# st.code(driver.page_source) | |
# Initialize an empty deque | |
messages = deque(maxlen = 1000) # after 1000 links, it'll start popping things. The model should always timeout before this, since most people won't have the patience to make it last this long | |
def update_messages(message): | |
# Add the new message to the start of deque | |
messages.appendleft(message) | |
# Use a placeholder | |
placeholder = st.empty() | |
# Clear the placeholder and add all the messages from the deque | |
placeholder.text('') # clears the placeholder | |
for msg in messages: | |
placeholder.text(msg) | |
def most_similar_sentence(target_topic, labels_list): | |
# Encode the context sentence and all sentences in the list | |
context_embedding = embed([target_topic])[0] | |
sentence_embeddings = embed(labels_list) | |
# Calculate cosine similarities between the context sentence and each sentence in the list | |
similarities = np.inner(context_embedding, sentence_embeddings) | |
# Find the index of the most similar sentence | |
most_similar_index = np.argmax(similarities) | |
return labels_list[most_similar_index], similarities[most_similar_index], most_similar_index | |
def search_wikipedia(search_term): | |
# Define the endpoint | |
endpoint = "https://en.wikipedia.org/w/api.php" | |
# Define the search parameters | |
params = { | |
"action": "query", | |
"format": "json", | |
"list": "search", | |
"srsearch": search_term | |
} | |
# Send a GET request to the endpoint with your parameters | |
response = requests.get(url = endpoint, params = params) | |
# Parse the results as JSON | |
data = json.loads(response.text) | |
# Get the title of the first result (this will be used as the page title in the next step) | |
page_title = data["query"]["search"][0]["title"] | |
if "may refer to" in data["query"]["search"][0]["snippet"].lower(): | |
page_title = data["query"]["search"][1]["title"] | |
# Construct the URL of the Wikipedia page | |
page_url = "https://en.wikipedia.org/wiki/{}".format(page_title.replace(" ", "_")) | |
return page_url, page_title | |
def get_topic_context(driver, more = False): | |
# Find the first paragraph of the main article | |
first_paragraph = driver.find_element(By.CSS_SELECTOR, "div.mw-parser-output > p:not(.mw-empty-elt)").text | |
if more: | |
context_sentence = ". ".join(first_paragraph.split(". ")[:5]) | |
else: | |
context_sentence = first_paragraph.split(". ")[0] | |
return context_sentence | |
# bad_words = [word for word in open("censored.txt", "r").readlines()] | |
bad_words = [word.strip() for word in open("censored.txt", "r").readlines()] | |
def refine_links(topic, links, current_url_suffix, used_links, used_topics, censor = False): | |
links_texts = [] | |
# Iterate through the links and extract their URLs | |
for link in links: | |
link_url = link.get('href') | |
if link_url and link_url.startswith("/wiki/"): | |
link_url = "https://en.wikipedia.org" + link_url | |
link_text = link.text.strip() # Get the text and remove leading/trailing spaces | |
# make sure they are both not None | |
if link_text and current_url_suffix not in link_url: | |
if link_url not in used_links and link_text.lower() not in [topic.lower() for topic in used_topics]: | |
# eliminates topic duplicates, non-wiki links, and wiki-help pages (non-content pages) | |
if topic.lower() not in link_url.lower() and "en.wikipedia.org/wiki/" in link_url and ":" not in "".join(link_url.split("/")[1:]) and "Main_Page" != str(link_url.split("/")[-1]): | |
# censoring if needed | |
if censor: | |
if not any(word1.lower() in bad_words for word1 in [word.lower() for word in link_text.split()]): | |
links_texts.append((link_url, link_text)) | |
else: | |
links_texts.append((link_url, link_text)) | |
return links_texts | |
def play_wiki_game_2(starting_topic: str, target_topic: str, limit: int = 100, delay: int = 0): | |
##### Setup Chrome options | |
# chrome_options = webdriver.ChromeOptions() | |
# chrome_options.add_argument("--headless") # Ensure GUI is off | |
# chrome_options.add_argument("--no-sandbox") | |
# chrome_options.add_argument("--disable-dev-shm-usage") | |
# driver = webdriver.Chrome(options = chrome_options) | |
# options = Options() | |
# options.add_argument('--disable-gpu') | |
# options.add_argument('--headless') | |
# driver = get_driver() | |
# driver = webdriver.Firefox(options=opts) | |
# driver_target = webdriver.Firefox(options=opts) | |
#### Getting target url, topic, and context | |
# driver_target = webdriver.Chrome(options = chrome_options) | |
# driver_target = get_driver() | |
target_url, target_topic = search_wikipedia(search_term = target_topic) | |
driver_target.get(target_url) | |
target_context = get_topic_context(driver_target, more = True) | |
# update_messages(target_context) | |
driver_target.quit() | |
topic = starting_topic | |
num_pages = 0 | |
used_topics = [] | |
used_links = [] | |
start_time = time.time() | |
### BEGIN ### | |
update_messages("-" * 150) | |
update_messages(f"\nStarting!\n") | |
update_messages("-" * 150) | |
url, topic = search_wikipedia(search_term = starting_topic) | |
driver.get(url) | |
used_topics.append(topic) | |
used_links.append(driver.current_url) | |
while True: | |
# increment the page tracking by 1 for each new page | |
num_pages += 1 | |
# if not the first page, navigate to the new page | |
if num_pages > 1: | |
driver.get(next_link) | |
try: | |
context_sentence = get_topic_context(driver) | |
except Exception as e: | |
context_sentence = "Context could not be found from webpage" | |
current_url = driver.current_url | |
current_url_suffix = str(current_url).split("/")[-1] | |
### Use BeautifulSoup and Requests instead of Selenium for link extraction | |
current_page = driver.page_source # html from Selenium instead of BeautifulSoup | |
soup = BeautifulSoup(current_page, 'html.parser') | |
links = soup.find_all('a') | |
# get rid of any bloat in the links from the page | |
links_texts = refine_links(topic, links, current_url_suffix, used_links, used_topics) | |
# best_label, best_score, loc_idx = most_similar_sentence(target_topic = target_topic, labels_list = [text for link, text in links_texts]) | |
best_label, best_score, loc_idx = most_similar_sentence(target_topic = target_context.lower(), labels_list = [text.lower() for link, text in links_texts]) | |
update_messages(f"\nPage: {num_pages}") | |
update_messages(f"Current topic: '{topic.title()}'") | |
update_messages(f"Current URL: '{current_url}'") | |
update_messages(f"Current Topic Context: '{context_sentence}'") | |
if current_url != target_url: | |
update_messages(f"Next topic: '{best_label.title()}'. Semantic similarity to '{target_topic.title()}': {round((best_score * 100), 2)}%") | |
next_link, topic = links_texts[loc_idx] | |
used_links.append(next_link) | |
used_topics.append(topic) | |
if current_url == target_url: # because the target_url is now found through the API | |
update_messages("\n" + "-" * 150) | |
update_messages(f"\nFrom '{starting_topic.title()}', to '{target_topic.title()}' in {num_pages} pages, {round(time.time() - start_time, 2)} seconds!") | |
update_messages(f"Starting topic: '{starting_topic.title()}': '{used_links[0]}'") | |
update_messages(f"Target topic: '{target_topic.title()}': '{target_url}'\n") | |
update_messages("-" * 150) | |
driver.quit() | |
break | |
if num_pages == limit: | |
update_messages("\n" + "-" * 150) | |
update_messages(f"\nUnfortunately, the model couldn't get from '{starting_topic.title()}', to '{target_topic.title()}' in {num_pages} pages or less.") | |
update_messages(f"In {round(time.time() - start_time, 2)} seconds, it got from '{starting_topic.title()}': '{used_links[0]}', to '{used_topics[-1].title()}': '{used_links[-1]}'") | |
update_messages(f"\nTry a different combination to see if it can do it!\n") | |
update_messages("-" * 150) | |
driver.quit() | |
break | |
# delay things, if applicable | |
###### Example | |
time.sleep(delay) | |
# starting_topic = 'soulja boy' | |
# target_topic = 'urine' | |
# play_wiki_game(starting_topic = starting_topic, target_topic = target_topic, limit = 50) |