import gradio as gr import asyncio import aiohttp import logging import math import io import numpy as np from newspaper import Article import PyPDF2 from collections import Counter import json from datetime import datetime from sentence_transformers import SentenceTransformer from rank_bm25 import BM25Okapi from sentence_transformers.util import pytorch_cos_sim from enum import Enum from groq import Groq import os from typing import List, Dict, Any, Set from dotenv import load_dotenv # Load environment variables from .env file load_dotenv() # Initialize Groq client groq_client = Groq(api_key=os.getenv("GROQ_API_KEY")) class ScoringMethod(Enum): BM25 = "bm25" TFIDF = "tfidf" COMBINED = "combined" async def get_available_engines(session, base_url, headers): """Fetch available search engines from SearxNG instance.""" try: # First try the search endpoint to get engines params = { "q": "test", "format": "json", "engines": "all" } async with session.get(f"{base_url}/search", headers=headers, params=params) as response: data = await response.json() available_engines = set() # Extract unique engine names from the response if "search" in data: for engine_data in data["search"]: if isinstance(engine_data, dict) and "engine" in engine_data: available_engines.add(engine_data["engine"]) # If no engines found, try alternate endpoint if not available_engines: async with session.get(f"{base_url}/engines", headers=headers) as response: engines_data = await response.json() available_engines = set(engine["name"] for engine in engines_data if engine.get("enabled", True)) return list(available_engines) except Exception as e: logging.error(f'Error fetching search engines: {e}') # Return default engines if unable to fetch return ["google", "bing", "duckduckgo", "brave", "wikipedia"] def select_search_engines(available_engines: List[str]) -> Set[str]: """Let user select search engines from available options.""" print("\nAvailable search engines:") engines_list = sorted(available_engines) for i, engine in enumerate(engines_list, 1): print(f"{i}. {engine}") print("\nEnter the numbers of engines you want to use (comma-separated), or 'all' for all engines:") selection = input("Your selection: ").strip().lower() if selection == 'all': return set(engines_list) try: selected_indices = [int(idx.strip()) - 1 for idx in selection.split(',')] return {engines_list[idx] for idx in selected_indices if 0 <= idx < len(engines_list)} except (ValueError, IndexError): logging.error("Invalid selection, using all engines as fallback") return set(engines_list) logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') async def scrape_url(url, max_chars): logging.info(f'Scraping URL: {url}') if url.endswith(".pdf"): return await scrape_pdf(url, max_chars) else: return await scrape_html(url, max_chars) async def scrape_html(url, max_chars): try: article = Article(url) article.download() article.parse() text = article.text[:max_chars] publish_date = article.publish_date logging.info(f'Scraped HTML content from {url}') return {"content": text, "publish_date": publish_date.isoformat() if publish_date else None} except Exception as e: logging.error(f'Error scraping HTML content from {url}: {e}') return None async def scrape_pdf(url, max_chars): try: async with aiohttp.ClientSession() as session: async with session.get(url) as response: pdf_bytes = await response.read() pdf_reader = PyPDF2.PdfReader(io.BytesIO(pdf_bytes)) text = "" for page_num in range(len(pdf_reader.pages)): page = pdf_reader.pages[page_num] text += page.extract_text() text = text[:max_chars] logging.info(f'Scraped PDF content from {url}') return {"content": text, "publish_date": None} except Exception as e: logging.error(f'Error scraping PDF content from {url}: {e}') return None def normalize_scores(scores): """Normalize scores to [0, 1] range using min-max normalization""" if not isinstance(scores, np.ndarray): scores = np.array(scores) if len(scores) == 0: return [] min_score = np.min(scores) max_score = np.max(scores) if max_score - min_score > 0: normalized = (scores - min_score) / (max_score - min_score) else: normalized = np.ones_like(scores) return normalized.tolist() async def calculate_bm25(query, documents): """Calculate BM25 scores for documents.""" try: if not documents: return [] bm25 = BM25Okapi([doc.split() for doc in documents]) scores = bm25.get_scores(query.split()) return normalize_scores(scores) except Exception as e: logging.error(f'Error calculating BM25 scores: {e}') return [0] * len(documents) async def calculate_tfidf(query, documents, measure="cosine"): """Calculate TF-IDF based similarity scores.""" try: if not documents: return [] model = SentenceTransformer('all-MiniLM-L6-v2') query_embedding = model.encode(query) document_embeddings = model.encode(documents) # Normalize embeddings query_embedding = query_embedding / np.linalg.norm(query_embedding) document_embeddings = document_embeddings / np.linalg.norm(document_embeddings, axis=1)[:, np.newaxis] if measure == "cosine": # Calculate cosine similarity scores = np.dot(document_embeddings, query_embedding) return normalize_scores(scores) else: raise ValueError("Unsupported similarity measure.") except Exception as e: logging.error(f'Error calculating TF-IDF scores: {e}') return [0] * len(documents) def combine_scores(bm25_score, tfidf_score, weights=(0.5, 0.5)): """Combine scores using weighted average.""" return weights[0] * bm25_score + weights[1] * tfidf_score async def get_document_scores(query, documents, scoring_method: ScoringMethod): """Calculate document scores based on the chosen scoring method.""" if not documents: return [] if scoring_method == ScoringMethod.BM25: scores = await calculate_bm25(query, documents) return [(score, 0) for score in scores] elif scoring_method == ScoringMethod.TFIDF: scores = await calculate_tfidf(query, documents) return [(0, score) for score in scores] else: # COMBINED bm25_scores = await calculate_bm25(query, documents) tfidf_scores = await calculate_tfidf(query, documents) return list(zip(bm25_scores, tfidf_scores)) def get_total_score(scores, scoring_method: ScoringMethod): """Calculate total score based on the scoring method.""" bm25_score, tfidf_score = scores if scoring_method == ScoringMethod.BM25: return bm25_score elif scoring_method == ScoringMethod.TFIDF: return tfidf_score else: # COMBINED return combine_scores(bm25_score, tfidf_score) async def generate_summary(query: str, articles: List[Dict[str, Any]], temperature: float = 0.7) -> str: """ Generate a summary of the articles using Groq's LLama 3.1 8b model. """ try: # Format the articles into a structured JSON string json_input = json.dumps(articles, indent=2) system_prompt = """You are Sentinel, a world-class AI model who is expert at searching the web and answering user's queries. You are also an expert at summarizing web pages or documents and searching for content in them.""" user_prompt = f""" Please provide a comprehensive summary based on the following JSON input: {json_input} Original Query: {query} Instructions: 1. Analyze the query and the provided documents. 2. Write a detailed, long, and complete research document that is informative and relevant to the user's query based on provided context. 3. Use this context to answer the user's query in the best way possible. Use an unbiased and journalistic tone. 4. Use an unbiased and professional tone in your response. 5. Do not repeat text verbatim from the input. 6. Provide the answer in the response itself. 7. Use markdown to format your response. 8. Use bullet points to list information where appropriate. 9. Cite the answer using [number] notation along with the appropriate source URL embedded in the notation. 10. Place these citations at the end of the relevant sentences. 11. You can cite the same sentence multiple times if it's relevant. 12. Make sure the answer is not short and is informative. 13. Your response should be detailed, informative, accurate, and directly relevant to the user's query.""" messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ] response = groq_client.chat.completions.create( messages=messages, model="llama-3.1-70b-versatile", # Using LLama 3.1 8b model max_tokens=5000, temperature=temperature, top_p=0.9, presence_penalty=1.2, stream=False ) return response.choices[0].message.content.strip() except Exception as e: logging.error(f'Error generating summary: {e}') return f"Error generating summary: {str(e)}" class ChatBot: def __init__(self): self.scoring_method = ScoringMethod.COMBINED self.num_results = 10 self.max_chars = 10000 self.score_threshold = 0.8 self.temperature = 0.1 self.history = [] self.base_url = "https://shreyas094-searxng-local.hf.space/search" self.headers = { "X-Searx-API-Key": "f9f07f93b37b8483aadb5ba717f556f3a4ac507b281b4ca01e6c6288aa3e3ae5" } # Default search engines in case we can't fetch from SearxNG self.default_engines = ["google", "bing", "duckduckgo", "brave"] async def get_search_results(self, query: str, num_results: int, max_chars: int, score_threshold: float, temperature: float, scoring_method_str: str, selected_engines: List[str]) -> str: try: # Convert scoring method string to enum scoring_method_map = { "BM25": ScoringMethod.BM25, "TF-IDF": ScoringMethod.TFIDF, "Combined": ScoringMethod.COMBINED } self.scoring_method = scoring_method_map[scoring_method_str] async with aiohttp.ClientSession() as session: # Use the selected engines from the interface logging.info(f'Using engines: {", ".join(selected_engines)}') logging.info(f'Parameters: Results={num_results}, Chars={max_chars}, Threshold={score_threshold}, Temp={temperature}, Method={scoring_method_str}') # Perform search params = { "q": query, "format": "json", "engines": ",".join(selected_engines), "limit": num_results } try: async with session.get(f"{self.base_url}/search", headers=self.headers, params=params) as response: data = await response.json() except Exception as e: return f"Error: Could not connect to search service. Please check if SearxNG is running at {self.base_url}. Error: {str(e)}" if "results" not in data or not data["results"]: return "No results found." results = data["results"][:num_results] tasks = [scrape_url(result["url"], max_chars) for result in results] scraped_data = await asyncio.gather(*tasks) valid_results = [(result, article) for result, article in zip(results, scraped_data) if article is not None] if not valid_results: return "No valid articles found after scraping." results, scraped_data = zip(*valid_results) contents = [article["content"] for article in scraped_data] scores = await get_document_scores(query, contents, self.scoring_method) scored_articles = [] for i, (score_tuple, article) in enumerate(zip(scores, scraped_data)): total_score = get_total_score(score_tuple, self.scoring_method) if total_score >= self.score_threshold: scored_articles.append({ "url": results[i]["url"], "title": results[i]["title"], "content": article["content"], "publish_date": article["publish_date"], "score": round(total_score, 4), "bm25_score": round(score_tuple[0], 4), "tfidf_score": round(score_tuple[1], 4), "engine": results[i].get("engine", "unknown") }) scored_articles.sort(key=lambda x: x["score"], reverse=True) unique_articles = [] seen_content = set() for article in scored_articles: if article["content"] not in seen_content: seen_content.add(article["content"]) unique_articles.append(article) # Generate summary using Groq API summary = await generate_summary(query, unique_articles, self.temperature) # Format the response for chat response = f"**Search Parameters:**\n" response += f"- Results: {num_results}\n" response += f"- Max Characters: {max_chars}\n" response += f"- Score Threshold: {score_threshold}\n" response += f"- Temperature: {temperature}\n" response += f"- Scoring Method: {scoring_method_str}\n" response += f"- Search Engines: {', '.join(selected_engines)}\n\n" response += f"**Summary of Search Results:**\n\n{summary}\n\n" response += "\n**Sources:**\n" for i, article in enumerate(unique_articles, 1): response += f"{i}. [{article['title']}]({article['url']}) (Score: {article['score']})\n" return response except Exception as e: logging.error(f'Error in search_and_summarize: {e}') return f"Error occurred: {str(e)}" def chat(self, message: str, history: List[List[str]], num_results: int, max_chars: int, score_threshold: float, temperature: float, scoring_method: str, engines: List[str]) -> str: """ Process chat messages and return responses with custom parameters. """ # Run the async search function in the sync context response = asyncio.run(self.get_search_results( message, num_results, max_chars, score_threshold, temperature, scoring_method, engines )) return response def create_gradio_interface() -> gr.Interface: chatbot = ChatBot() with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# Research Assistant") gr.Markdown("Enter your search query, and I'll search, analyze, and summarize relevant articles for you.") with gr.Row(): with gr.Column(scale=3): chatbot_interface = gr.ChatInterface( fn=chatbot.chat, additional_inputs=[ gr.Slider(minimum=5, maximum=30, value=10, step=1, label="Number of Results"), gr.Slider(minimum=1000, maximum=50000, value=10000, step=1000, label="Max Characters per Article"), gr.Slider(minimum=0.0, maximum=1.0, value=0.8, step=0.05, label="Score Threshold"), gr.Slider(minimum=0.0, maximum=1.0, value=0.1, step=0.05, label="Temperature"), gr.Radio(["BM25", "TF-IDF", "Combined"], value="Combined", label="Scoring Method"), gr.CheckboxGroup( choices=["google", "bing", "duckduckgo", "brave", "wikipedia"], value=["google", "bing", "duckduckgo"], label="Search Engines" ) ], examples=[ ["What are the latest developments in quantum computing?"], ["Explain the impact of artificial intelligence on healthcare"], ["What are the current trends in renewable energy?"] ] ) with gr.Column(scale=1): gr.Markdown("### Parameter Descriptions") gr.Markdown(""" - **Number of Results**: Number of search results to fetch - **Max Characters**: Maximum characters to analyze per article - **Score Threshold**: Minimum relevance score (0-1) for including articles - **Temperature**: Controls creativity in summary generation (0=focused, 1=creative) - **Scoring Method**: Algorithm for ranking article relevance - BM25: Traditional keyword-based ranking - TF-IDF: Semantic similarity-based ranking - Combined: Balanced approach using both methods - **Search Engines**: Select which search engines to use """) return demo if __name__ == "__main__": # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') # Create and launch the interface demo = create_gradio_interface() demo.launch(share=True)