import os import time import requests import random from threading import Thread from typing import List, Dict, Union import torch import gradio as gr from bs4 import BeautifulSoup from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer from functools import lru_cache import re import io import json # Model Loading (Done once at startup) MODEL_ID = "meta-llama/Llama-3.2-3B-Instruct" tokenizer = AutoTokenizer.from_pretrained(MODEL_ID) model = AutoModelForCausalLM.from_pretrained( MODEL_ID, torch_dtype=torch.float16, device_map="auto", low_cpu_mem_usage=True ).eval() # Path to example texts (Updated to remove image and video examples) examples_path = os.path.dirname(__file__) EXAMPLES = [ [ { "text": "What is Friction? Explain in Detail.", } ], [ { "text": "Write me a Python function to generate unique passwords.", } ], [ { "text": "What's the latest price of Bitcoin?", } ], [ { "text": "Search and give me list of spaces trending on HuggingFace.", } ], [ { "text": "Create a Beautiful Picture of Eiffel at Night.", } ], [ { "text": "What unusual happens in this video.", "files": [f"{examples_path}/example_video/accident.gif"], } ], # Removed other image and video related examples ] # Set bot avatar image BOT_AVATAR = "OpenAI_logo.png" # Perform a Google search and return the results @lru_cache(maxsize=128) def extract_text_from_webpage(html_content): """Extracts visible text from HTML content using BeautifulSoup.""" soup = BeautifulSoup(html_content, "html.parser") for tag in soup(["script", "style", "header", "footer", "nav", "form", "svg"]): tag.extract() visible_text = soup.get_text(separator=' ', strip=True) return visible_text def search(query): term = query all_results = [] max_chars_per_page = 8000 with requests.Session() as session: resp = session.get( url="https://www.google.com/search", headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/111.0"}, params={"q": term, "num": 4}, timeout=5, verify=False, ) resp.raise_for_status() soup = BeautifulSoup(resp.text, "html.parser") result_block = soup.find_all("div", attrs={"class": "g"}) for result in result_block: link = result.find("a", href=True) if link and 'href' in link.attrs: link = link["href"] try: webpage = session.get(link, headers={"User-Agent": "Mozilla/5.0"}, timeout=5, verify=False) webpage.raise_for_status() visible_text = extract_text_from_webpage(webpage.text) if len(visible_text) > max_chars_per_page: visible_text = visible_text[:max_chars_per_page] all_results.append({"link": link, "text": visible_text}) except requests.exceptions.RequestException: all_results.append({"link": link, "text": None}) return all_results def generate_response(prompt, chat_history): # Construct the conversation history conversation = "" for user, assistant in chat_history: conversation += f"User: {user}\nAssistant: {assistant}\n" conversation += f"User: {prompt}\nAssistant:" inputs = tokenizer(conversation, return_tensors="pt").to("cuda") streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True) thread = Thread(target=model.generate, args=(inputs.input_ids,), kwargs={ "max_new_tokens": 512, "do_sample": True, "top_p": 0.95, "temperature": 0.8, "streamer": streamer }) thread.start() response = "" for new_text in streamer: response += new_text yield response @lru_cache(maxsize=128) def process_query(query, chat_history): # Here you can implement logic to decide between web_search, general_query, or hard_query # For simplicity, let's assume all queries go through general_query # You can expand this with your own logic functions_metadata = [ { "type": "function", "function": { "name": "web_search", "description": "Search query on google and find latest information.", "parameters": { "type": "object", "properties": { "query": {"type": "string", "description": "Web search query"} }, "required": ["query"] } } }, { "type": "function", "function": { "name": "general_query", "description": "Reply general query with LLM.", "parameters": { "type": "object", "properties": { "prompt": {"type": "string", "description": "A detailed prompt"} }, "required": ["prompt"] } } }, { "type": "function", "function": { "name": "hard_query", "description": "Reply tough query using powerful LLM.", "parameters": { "type": "object", "properties": { "prompt": {"type": "string", "description": "A detailed prompt"} }, "required": ["prompt"] } } }, ] # Example logic to choose function (you can customize this) if "search" in query.lower(): function_name = "web_search" elif "explain" in query.lower() or "detail" in query.lower(): function_name = "general_query" else: function_name = "hard_query" return { "name": function_name, "arguments": { "query" if function_name == "web_search" else "prompt": query } } def handle_functions(function_call, chat_history): function_name = function_call["name"] arguments = function_call["arguments"] if function_name == "web_search": query = arguments["query"] web_results = search(query) web_summary = ' '.join([f"Link: {res['link']}\nText: {res['text']}\n\n" for res in web_results if res["text"]]) # Append web results to chat history or pass to the model as context # Here we directly return the summarized web results return f"Here are the search results:\n{web_summary}" elif function_name in ["general_query", "hard_query"]: prompt = arguments["prompt"] # Generate response using the local model response_generator = generate_response(prompt, chat_history) return response_generator else: return "Function not recognized." def model_inference(user_prompt, chat_history): prompt = user_prompt["text"] # Determine which function to call function_call = process_query(prompt, chat_history) if function_call["name"] == "web_search": yield "Performing web search..." result = handle_functions(function_call, chat_history) yield result elif function_call["name"] in ["general_query", "hard_query"]: yield "Generating response..." response_generator = handle_functions(function_call, chat_history) for response in response_generator: yield response else: yield "Invalid function call." # Create a chatbot interface chatbot = gr.Chatbot( label="OpenGPT-4o", avatar_images=[None, BOT_AVATAR], show_copy_button=True, layout="panel", height=400, ) input_box = gr.Textbox(label="Prompt") iface = gr.Interface( fn=model_inference, inputs=[input_box, chatbot], outputs=chatbot, live=True, examples=EXAMPLES, title="OpenGPT-4o Chatbot", description="A powerful AI assistant using local Llama-3.2 model.", ) if __name__ == "__main__": iface.launch()