Spaces:
Sleeping
Sleeping
import os | |
import time | |
import requests | |
import random | |
from threading import Thread | |
from typing import List, Dict, Union | |
import torch | |
import gradio as gr | |
from bs4 import BeautifulSoup | |
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer | |
from functools import lru_cache | |
import re | |
import io | |
import json | |
# Model Loading (Done once at startup) | |
MODEL_ID = "meta-llama/Llama-3.2-3B-Instruct" | |
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID) | |
model = AutoModelForCausalLM.from_pretrained( | |
MODEL_ID, | |
torch_dtype=torch.float16, | |
device_map="auto", | |
low_cpu_mem_usage=True | |
).eval() | |
# Path to example texts (Updated to remove image and video examples) | |
examples_path = os.path.dirname(__file__) | |
EXAMPLES = [ | |
[ | |
{ | |
"text": "What is Friction? Explain in Detail.", | |
} | |
], | |
[ | |
{ | |
"text": "Write me a Python function to generate unique passwords.", | |
} | |
], | |
[ | |
{ | |
"text": "What's the latest price of Bitcoin?", | |
} | |
], | |
[ | |
{ | |
"text": "Search and give me list of spaces trending on HuggingFace.", | |
} | |
], | |
[ | |
{ | |
"text": "Create a Beautiful Picture of Eiffel at Night.", | |
} | |
], | |
[ | |
{ | |
"text": "What unusual happens in this video.", | |
"files": [f"{examples_path}/example_video/accident.gif"], | |
} | |
], | |
# Removed other image and video related examples | |
] | |
# Set bot avatar image | |
BOT_AVATAR = "OpenAI_logo.png" | |
# Perform a Google search and return the results | |
def extract_text_from_webpage(html_content): | |
"""Extracts visible text from HTML content using BeautifulSoup.""" | |
soup = BeautifulSoup(html_content, "html.parser") | |
for tag in soup(["script", "style", "header", "footer", "nav", "form", "svg"]): | |
tag.extract() | |
visible_text = soup.get_text(separator=' ', strip=True) | |
return visible_text | |
def search(query): | |
term = query | |
all_results = [] | |
max_chars_per_page = 8000 | |
with requests.Session() as session: | |
resp = session.get( | |
url="https://www.google.com/search", | |
headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/111.0"}, | |
params={"q": term, "num": 4}, | |
timeout=5, | |
verify=False, | |
) | |
resp.raise_for_status() | |
soup = BeautifulSoup(resp.text, "html.parser") | |
result_block = soup.find_all("div", attrs={"class": "g"}) | |
for result in result_block: | |
link = result.find("a", href=True) | |
if link and 'href' in link.attrs: | |
link = link["href"] | |
try: | |
webpage = session.get(link, headers={"User-Agent": "Mozilla/5.0"}, timeout=5, verify=False) | |
webpage.raise_for_status() | |
visible_text = extract_text_from_webpage(webpage.text) | |
if len(visible_text) > max_chars_per_page: | |
visible_text = visible_text[:max_chars_per_page] | |
all_results.append({"link": link, "text": visible_text}) | |
except requests.exceptions.RequestException: | |
all_results.append({"link": link, "text": None}) | |
return all_results | |
def generate_response(prompt, chat_history): | |
# Construct the conversation history | |
conversation = "" | |
for user, assistant in chat_history: | |
conversation += f"User: {user}\nAssistant: {assistant}\n" | |
conversation += f"User: {prompt}\nAssistant:" | |
inputs = tokenizer(conversation, return_tensors="pt").to("cuda") | |
streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True) | |
thread = Thread(target=model.generate, args=(inputs.input_ids,), kwargs={ | |
"max_new_tokens": 512, | |
"do_sample": True, | |
"top_p": 0.95, | |
"temperature": 0.8, | |
"streamer": streamer | |
}) | |
thread.start() | |
response = "" | |
for new_text in streamer: | |
response += new_text | |
yield response | |
def process_query(query, chat_history): | |
# Here you can implement logic to decide between web_search, general_query, or hard_query | |
# For simplicity, let's assume all queries go through general_query | |
# You can expand this with your own logic | |
functions_metadata = [ | |
{ | |
"type": "function", | |
"function": { | |
"name": "web_search", | |
"description": "Search query on google and find latest information.", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"query": {"type": "string", "description": "Web search query"} | |
}, | |
"required": ["query"] | |
} | |
} | |
}, | |
{ | |
"type": "function", | |
"function": { | |
"name": "general_query", | |
"description": "Reply general query with LLM.", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"prompt": {"type": "string", "description": "A detailed prompt"} | |
}, | |
"required": ["prompt"] | |
} | |
} | |
}, | |
{ | |
"type": "function", | |
"function": { | |
"name": "hard_query", | |
"description": "Reply tough query using powerful LLM.", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"prompt": {"type": "string", "description": "A detailed prompt"} | |
}, | |
"required": ["prompt"] | |
} | |
} | |
}, | |
] | |
# Example logic to choose function (you can customize this) | |
if "search" in query.lower(): | |
function_name = "web_search" | |
elif "explain" in query.lower() or "detail" in query.lower(): | |
function_name = "general_query" | |
else: | |
function_name = "hard_query" | |
return { | |
"name": function_name, | |
"arguments": { | |
"query" if function_name == "web_search" else "prompt": query | |
} | |
} | |
def handle_functions(function_call, chat_history): | |
function_name = function_call["name"] | |
arguments = function_call["arguments"] | |
if function_name == "web_search": | |
query = arguments["query"] | |
web_results = search(query) | |
web_summary = ' '.join([f"Link: {res['link']}\nText: {res['text']}\n\n" for res in web_results if res["text"]]) | |
# Append web results to chat history or pass to the model as context | |
# Here we directly return the summarized web results | |
return f"Here are the search results:\n{web_summary}" | |
elif function_name in ["general_query", "hard_query"]: | |
prompt = arguments["prompt"] | |
# Generate response using the local model | |
response_generator = generate_response(prompt, chat_history) | |
return response_generator | |
else: | |
return "Function not recognized." | |
def model_inference(user_prompt, chat_history): | |
prompt = user_prompt["text"] | |
# Determine which function to call | |
function_call = process_query(prompt, chat_history) | |
if function_call["name"] == "web_search": | |
yield "Performing web search..." | |
result = handle_functions(function_call, chat_history) | |
yield result | |
elif function_call["name"] in ["general_query", "hard_query"]: | |
yield "Generating response..." | |
response_generator = handle_functions(function_call, chat_history) | |
for response in response_generator: | |
yield response | |
else: | |
yield "Invalid function call." | |
# Create a chatbot interface | |
chatbot = gr.Chatbot( | |
label="OpenGPT-4o", | |
avatar_images=[None, BOT_AVATAR], | |
show_copy_button=True, | |
layout="panel", | |
height=400, | |
) | |
input_box = gr.Textbox(label="Prompt") | |
iface = gr.Interface( | |
fn=model_inference, | |
inputs=[input_box, chatbot], | |
outputs=chatbot, | |
live=True, | |
examples=EXAMPLES, | |
title="OpenGPT-4o Chatbot", | |
description="A powerful AI assistant using local Llama-3.2 model.", | |
) | |
if __name__ == "__main__": | |
iface.launch() | |