import os import time from googleapiclient.discovery import build import asyncio import httpx from dotenv import load_dotenv import requests import fitz from trafilatura import extract from bs4 import BeautifulSoup load_dotenv() API_KEY = os.environ.get("GOOGLE_SEARCH_API_KEY") CSE_KEY = os.environ.get("GOOGLE_SEARCH_CSE_ID") # Number of pages to scrape NUM_PAGES = 20 def build_results_beautifulsoup(url_list, scholar_abstracts: list[str] = None): print("Starting to scrape URLs...") start_time = time.perf_counter() # scrape URLs in list soups = asyncio.run(parallel_scrap(url_list)) scraping_time = time.perf_counter() - start_time print(f"Scraping processing time: {scraping_time:.2f} seconds") result_content = {} count = 0 print("Starting to process each URL...") for url, soup in zip(url_list, soups): if count >= NUM_PAGES: print(f"Reached the limit of {NUM_PAGES} pages. Stopping processing.") break if soup: print(f"Processing URL: {url}") text = extract( soup, include_tables=False, include_comments=False, output_format="txt", ) # If text is None or empty, log a warning and skip if text is None: print(f"Warning: Extraction returned None for URL: {url}") elif len(text) > 500: print(f"Adding content from URL: {url}, content length: {len(text)}") result_content[url] = text count += 1 else: print(f"Skipped URL: {url}, content too short (length: {len(text)})") elif scholar_abstracts and scholar_abstracts.get(url): print(f"Skipped URL: {url}, no soup content available. Returning scholar abstract instead.") result_content[url] = scholar_abstracts.get(url) else: print(f"Skipped URL: {url}, no soup content available.") print("Finished processing URLs.") return result_content def build_results_extractor(url_list): try: endpoint = "https://extractorapi.com/api/v1/extractor" result_content = {} count = 0 for url in url_list: if count >= NUM_PAGES: break params = {"apikey": os.environ.get("EXTRACTOR_API_KEY"), "url": url} r = requests.get(endpoint, params=params) if r.status_code == 200: text = r.json()["text"] if len(text) > 500: result_content[url] = text count += 1 if r.status_code == 403: raise Exception(f"Error with API; using default implementaion instead") return result_content except Exception as e: print(e) return build_results_beautifulsoup(url_list) months = { "January": "01", "February": "02", "March": "03", "April": "04", "May": "05", "June": "06", "July": "07", "August": "08", "September": "09", "October": "10", "November": "11", "December": "12", } domain_list = ["com", "org", "net", "int", "edu", "gov", "mil"] skip_urls = ["youtube.com", "twitter.com", "facebook.com", "instagram.com", "x.com"] def build_date(year=2024, month="March", day=1): return f"{year}{months[month]}{day}" async def get_url_data(url, client): try: r = await client.get(url, follow_redirects=True) print(f"URL: {url}, Response Code: {r.status_code}") if r.status_code == 200: content_type = r.headers.get("Content-Type", "").lower() # Improved PDF detection using Content-Type and file extension if "application/pdf" in content_type or url.lower().endswith(".pdf"): print(f"Detected PDF content via Content-Type or file extension at URL: {url}") pdf_content = await extract_pdf_text(r.content) return pdf_content else: return r.content else: print(f"Non-200 response for URL: {url}, status code: {r.status_code}") return None except Exception as e: print(f"Error fetching URL: {url}, Error: {str(e)}") return None async def extract_pdf_text(content): try: with fitz.open(stream=content, filetype="pdf") as doc: text = "" for page in doc: text += page.get_text() html_content = f"""
{text}
""" html_bytes = html_content.encode("utf-8") return html_bytes # Return in such a format that is parsable by trafilatura except Exception as e: print(f"Error extracting PDF text: {str(e)}") return None async def parallel_scrap(urls): headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3" } async with httpx.AsyncClient(timeout=30, headers=headers) as client: tasks = [] for url in urls: tasks.append(get_url_data(url=url, client=client)) results = await asyncio.gather(*tasks, return_exceptions=True) return results def scrap(urls): client = httpx.Client() soups = [] for url in urls: soups.append(get_url_data(url=url, client=client)) return soups def google_search_urls( text, sorted_date, domains_to_include, api_key, cse_id, num_results=10, # Number of results to fetch per page total_results=30, # Total number of results to fetch skip_urls=None, # List of URLs to skip **kwargs, ): if skip_urls is None: skip_urls = [] # Initialize as empty list if not provided service = build("customsearch", "v1", developerKey=api_key) url_list = [] start_index = 1 # Initial index for the search results while len(url_list) < total_results: # Fetch a page of results results = ( service.cse() .list( q=text, cx=cse_id, sort=sorted_date, start=start_index, num=min(num_results, total_results - len(url_list)), **kwargs, ) .execute() ) if "items" in results and len(results["items"]) > 0: for count, link in enumerate(results["items"]): url = link["link"] # Skip if the URL is in the skip_urls list or doesn't match the domain filter if url in skip_urls: continue if (domains_to_include is None) or any(("." + domain) in url for domain in domains_to_include): if url not in url_list: url_list.append(url) else: # No more results break # Move to the next page of results start_index += num_results return url_list[:total_results] def scrape_abstract(url, title): response = requests.get(url) soup = BeautifulSoup(response.content, "html.parser") abstract_section = soup.find("div", class_="tldr-abstract-replacement paper-detail-page__tldr-abstract") abstract = abstract_section.get_text().strip() if abstract_section else "" return title + "\n" + abstract if abstract != "" else None def semantic_scholar_urls( text, sorted_date, total_results=30, # Total number of results to fetch skip_urls=None, # List of URLs to skip **kwargs, ): ss_api_key = os.environ.get("SEMANTIC_SCHOLAR_API_KEY") semantic_scholar_endpoint = "http://api.semanticscholar.org/graph/v1/paper/search/" date_from, date_to = sorted_date.split(":r:")[1].split(":") year_from = date_from[:4] year_to = date_to[:4] success_count = 0 print(f"Dates: {year_from}-{year_to}") query_params = { "query": text, "fields": "title,abstract,url,publicationTypes,publicationDate,openAccessPdf,fieldsOfStudy", "year": f"{year_from}-{year_to}", "limit": 3 * total_results, } headers = {"x-api-key": ss_api_key} response = requests.get(semantic_scholar_endpoint, params=query_params, headers=headers).json() url_list = [] scholar_abstracts = {} for row in response.get("data", []): if success_count >= total_results: break url = row.get("url") if isinstance(url, dict) and url.get("url"): url = url.get("url") url_list.append(url) abstract = row.get("abstract") if abstract: scholar_abstracts[url] = abstract success_count += 1 if row.get("openAccessPdf") and row.get("url"): url_list.append(row.get("openAccessPdf").get("url")) success_count += 1 return url_list, scholar_abstracts def google_search(topic, sorted_date, domains_to_include, scholar_mode_check): api_key = os.environ.get("GOOGLE_SEARCH_API_KEY") cse_id = os.environ.get("GOOGLE_SEARCH_CSE_ID") start_time = time.perf_counter() # if scholar_mode_check: # topic += " -filetype:pdf" scholar_abstracts = None if not scholar_mode_check: url_list = google_search_urls( topic, sorted_date, domains_to_include, api_key, cse_id, ) else: url_list, scholar_abstracts = semantic_scholar_urls(topic, sorted_date) print("---") print(len(url_list)) print(url_list) print("---") if scholar_mode_check: print("Semantic Scholar processing time: ", time.perf_counter() - start_time) else: print("Google Search processing time: ", time.perf_counter() - start_time) result_content = build_results_beautifulsoup(url_list, scholar_abstracts) return result_content if __name__ == "__main__": res = google_search("Low Resource ", "date:r:20240101:20241231", domain_list, True) print(res.keys()) print(len(res)) print(res)