|
import random |
|
import requests |
|
from bs4 import BeautifulSoup |
|
from transformers import AutoTokenizer, AutoModelForCausalLM |
|
|
|
|
|
_useragent_list = [ |
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", |
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", |
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Edge/91.0.864.59 Safari/537.36", |
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Edge/91.0.864.59 Safari/537.36", |
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Safari/537.36", |
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Safari/537.36", |
|
] |
|
|
|
|
|
def extract_text_from_webpage(html): |
|
print("Extracting text from webpage...") |
|
soup = BeautifulSoup(html, 'html.parser') |
|
for script in soup(["script", "style"]): |
|
script.extract() |
|
text = soup.get_text() |
|
lines = (line.strip() for line in text.splitlines()) |
|
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) |
|
text = '\n'.join(chunk for chunk in chunks if chunk) |
|
print(f"Extracted text length: {len(text)}") |
|
return text |
|
|
|
|
|
def google_search(term, num_results=5, lang="en", timeout=5, safe="active", ssl_verify=None): |
|
"""Performs a Google search and returns the results.""" |
|
print(f"Searching for term: {term}") |
|
escaped_term = requests.utils.quote(term) |
|
start = 0 |
|
all_results = [] |
|
max_chars_per_page = 8000 |
|
|
|
with requests.Session() as session: |
|
while start < num_results: |
|
print(f"Fetching search results starting from: {start}") |
|
try: |
|
|
|
user_agent = random.choice(_useragent_list) |
|
headers = { |
|
'User-Agent': user_agent |
|
} |
|
print(f"Using User-Agent: {headers['User-Agent']}") |
|
|
|
resp = session.get( |
|
url="https://www.google.com/search", |
|
headers=headers, |
|
params={ |
|
"q": term, |
|
"num": num_results - start, |
|
"hl": lang, |
|
"start": start, |
|
"safe": safe, |
|
}, |
|
timeout=timeout, |
|
verify=ssl_verify, |
|
) |
|
resp.raise_for_status() |
|
except requests.exceptions.RequestException as e: |
|
print(f"Error fetching search results: {e}") |
|
break |
|
|
|
soup = BeautifulSoup(resp.text, "html.parser") |
|
result_block = soup.find_all("div", attrs={"class": "g"}) |
|
if not result_block: |
|
print("No more results found.") |
|
break |
|
for result in result_block: |
|
link = result.find("a", href=True) |
|
if link: |
|
link = link["href"] |
|
print(f"Found link: {link}") |
|
try: |
|
webpage = session.get(link, headers=headers, timeout=timeout) |
|
webpage.raise_for_status() |
|
visible_text = extract_text_from_webpage(webpage.text) |
|
if len(visible_text) > max_chars_per_page: |
|
visible_text = visible_text[:max_chars_per_page] + "..." |
|
all_results.append({"link": link, "text": visible_text}) |
|
except requests.exceptions.RequestException as e: |
|
print(f"Error fetching or processing {link}: {e}") |
|
all_results.append({"link": link, "text": None}) |
|
else: |
|
print("No link found in result.") |
|
all_results.append({"link": None, "text": None}) |
|
start += len(result_block) |
|
print(f"Total results fetched: {len(all_results)}") |
|
return all_results |
|
|
|
|
|
model_name = 'mistralai/Mistral-7B-Instruct-v0.3' |
|
tokenizer = AutoTokenizer.from_pretrained(model_name) |
|
model = AutoModelForCausalLM.from_pretrained(model_name) |
|
|
|
|
|
search_term = "How did Tesla perform in Q1 2024" |
|
search_results = google_search(search_term, num_results=3) |
|
|
|
|
|
combined_text = "\n\n".join(result['text'] for result in search_results if result['text']) |
|
|
|
|
|
inputs = tokenizer(combined_text, return_tensors="pt") |
|
|
|
|
|
outputs = model.generate(**inputs, max_length=150, temperature=0.7, top_p=0.9, top_k=50) |
|
|
|
|
|
response = tokenizer.decode(outputs[0], skip_special_tokens=True) |
|
|
|
|
|
print(response) |