Spaces:
Runtime error
Runtime error
import gradio as gr | |
from huggingface_hub import InferenceClient | |
""" | |
For more information on `huggingface_hub` Inference API support, please check the docs: https://huggingface.co/docs/huggingface_hub/v0.22.2/en/guides/inference | |
""" | |
# client = InferenceClient("HuggingFaceH4/zephyr-7b-beta") | |
from google.cloud import storage | |
from google.oauth2 import service_account | |
import json | |
import os | |
import requests | |
# upload image to google cloud storage | |
def upload_file_to_gcs_blob(file): | |
google_creds = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS_JSON") | |
creds_json = json.loads(google_creds) | |
credentials = service_account.Credentials.from_service_account_info(creds_json) | |
# Google Cloud credentials | |
storage_client = storage.Client(credentials=credentials, project=creds_json['project_id']) | |
bucket_name=os.environ.get('bucket_name') | |
bucket = storage_client.bucket(bucket_name) | |
destination_blob_name = os.path.basename(file) | |
blob = bucket.blob(destination_blob_name) | |
blob.upload_from_filename(file) | |
public_url = blob.public_url | |
return public_url | |
from PIL import Image | |
def is_image(file_path): | |
try: | |
Image.open(file_path) | |
return True | |
except IOError: | |
return False | |
from supabase import create_client, Client | |
def get_supabase_client(): | |
url = os.environ.get('supabase_url') | |
key = os.environ.get('supbase_key') | |
supabase = create_client(url, key) | |
return supabase | |
def supabase_insert_message(user_message,response_content,messages,response_data,user_name,user_oauth_token,ip,sign,cookie_value,content_type): | |
supabase = get_supabase_client() | |
data, count = supabase.table('messages').insert({"user_message": user_message, "response_content": response_content,"messages":messages,"response":response_data,"user_name":user_name,"user_oauth_token":user_oauth_token,"ip":ip,"sign":sign,"cookie":cookie_value,"content_type":content_type}).execute() | |
def supabase_insert_user(name,user_name,profile,picture,oauth_token): | |
supabase = get_supabase_client() | |
data, count = supabase.table('users').insert({"name":name,"user_name":user_name,"profile":profile,"picture":picture,"oauth_token":oauth_token}).execute() | |
def supabase_fetch_user(user_name): | |
supabase = get_supabase_client() | |
data,count = supabase.table('users').select("*").eq('user_name',user_name).execute() | |
return data | |
# def respond( | |
# message, | |
# history: list[tuple[str, str]], | |
# system_message, | |
# max_tokens, | |
# temperature, | |
# top_p, | |
# ): | |
# messages = [{"role": "system", "content": system_message}] | |
# for val in history: | |
# if val[0]: | |
# messages.append({"role": "user", "content": val[0]}) | |
# if val[1]: | |
# messages.append({"role": "assistant", "content": val[1]}) | |
# messages.append({"role": "user", "content": message}) | |
# response = "" | |
# for message in client.chat_completion( | |
# messages, | |
# max_tokens=max_tokens, | |
# stream=True, | |
# temperature=temperature, | |
# top_p=top_p, | |
# ): | |
# token = message.choices[0].delta.content | |
# response += token | |
# yield response | |
# def get_completion(message,history,profile: gr.OAuthProfile | None,oauth_token: gr.OAuthToken | None,request: gr.Request): | |
# if request: | |
# ip = request.client.host | |
# print("Query parameters:", dict(request.query_params)) | |
# sign = dict(request.query_params).get('__sign') | |
# # get cookie | |
# headers = request.headers.raw | |
# # find 'cookie' | |
# cookie_header = next((header for header in headers if header[0] == b'cookie'), None) | |
# if cookie_header: | |
# # extract cookie | |
# cookie_value = cookie_header[1].decode() | |
# print(f"Cookie: {cookie_value}") | |
# else: | |
# cookie_value = '' | |
# print("No cookie found in request headers") | |
# # check login | |
# if profile is None: | |
# # raise gr.Error('Click "Sign in with Hugging Face" to continue') | |
# user_name = 'unknown' | |
# user_oauth_token = '' | |
# name = 'unknown' | |
# pf = '' | |
# pic = '' | |
# else: | |
# user_name = profile.username | |
# user_oauth_token = oauth_token.token | |
# name = profile.name | |
# pf = profile.profile | |
# pic = profile.picture | |
# # check if user exists | |
# user_data = supabase_fetch_user(user_name) | |
# if not user_data[1]: | |
# supabase_insert_user(name,user_name,pf,pic,user_oauth_token) | |
# # check if messages are empty | |
# if message["text"].strip() == "" and not message["files"]: | |
# raise gr.Error("Please input a query and optionally image(s).") | |
# if message["text"].strip() == "" and message["files"]: | |
# raise gr.Error("Please input a text query along the image(s).") | |
# text = message['text'] | |
# user_message = [ | |
# {"type": "text", "text": text}, | |
# ] | |
# content_type = 'text' | |
# if message['files']: | |
# file = message['files'][0] | |
# public_url = upload_file_to_gcs_blob(file) | |
# if is_image(file): # only support image file now | |
# content_image = { | |
# "type": "image_url", | |
# "image_url": { | |
# "url": public_url, | |
# },} | |
# user_message.append(content_image) | |
# content_type = 'image' | |
# else: | |
# raise gr.Error("Only support image files now.") | |
# history_openai_format = [] | |
# for human, assistant in history: | |
# # check if there is image info in the history message or empty history messages | |
# if isinstance(human, tuple) or human == "" or assistant is None: | |
# continue | |
# history_openai_format.append({"role": "user", "content": human }) | |
# history_openai_format.append({"role": "assistant", "content":assistant}) | |
# history_openai_format.append({"role": "user", "content": user_message}) | |
# # print(history_openai_format) | |
# system_message = '''You are GPT-4o("o" for omni), OpenAI's new flagship model that can reason across audio, vision, and text in real time. | |
# GPT-4o matches GPT-4 Turbo performance on text in English and code, with significant improvement on text in non-English languages, while also being much faster. | |
# GPT-4o is especially better at vision and audio understanding compared to existing models. | |
# GPT-4o's text and image capabilities are avaliable for users now. More capabilities like audio and video will be rolled out iteratively in the future. | |
# ''' | |
# # headers | |
# openai_api_key = os.environ.get('openai_api_key') | |
# base_url = os.environ.get('base_url') | |
# headers = { | |
# 'Authorization': f'Bearer {openai_api_key}' | |
# } | |
# temperature = 0.7 | |
# max_tokens = 2048 | |
# init_message = [{"role": "system", "content": system_message}] | |
# messages = init_message + history_openai_format[-5:] #system message + latest 2 round dialogues + user input | |
# print(messages) | |
# # request body | |
# data = { | |
# 'model': 'gpt-4o', # we use gpt-4o here | |
# 'messages': messages, | |
# 'temperature':temperature, | |
# 'max_tokens':max_tokens, | |
# 'stream':True, | |
# # 'stream_options':{"include_usage": True}, # retrieving token usage for stream response | |
# } | |
# # get response | |
# # response = requests.post(base_url, headers=headers, json=data) | |
# # response_data = response.json() | |
# # print(response_data) | |
# # print('-----------------------------------\n') | |
# # if 'error' in response_data: | |
# # response_content = response_data['error']['message'] | |
# # else: | |
# # response_content = response_data['choices'][0]['message']['content'] | |
# # usage = response_data['usage'] | |
# # return response_content | |
# # get response with stream | |
# response = requests.post(base_url, headers=headers, json=data,stream=True) | |
# response_content = "" | |
# for line in response.iter_lines(): | |
# line = line.decode().strip() | |
# if line == "data: [DONE]": | |
# continue | |
# elif line.startswith("data: "): | |
# line = line[6:] # remove prefix "data: " | |
# try: | |
# data = json.loads(line) | |
# if "delta" in data["choices"][0]: | |
# content = data["choices"][0]["delta"].get("content", "") | |
# response_content += content | |
# yield response_content | |
# except json.JSONDecodeError: | |
# print(f"Error decoding line: {line}") | |
# print(response_content) | |
# print('-----------------------------------\n') | |
# response_data = {} | |
# supabase_insert_message(user_message,response_content,messages,response_data,user_name,user_oauth_token,ip,sign,cookie_value,content_type) | |
def get_completion(message,history): | |
print(message) | |
res = "**Important Announcement:** \n\nThis space is shutting down now. \n\nVisit [chatgpt-4o](https://chatgpt-4o.streamlit.app/) for an improved UI experience and future enhancements." | |
return res | |
""" | |
For information on how to customize the ChatInterface, peruse the gradio docs: https://www.gradio.app/docs/chatinterface | |
""" | |
title = "ChatGPT-4o" | |
description = "This is GPT-4o, you can use the text and image capabilities now. More capabilities like audio and video will be rolled out iteratively in the future. Stay tuned." | |
with gr.Blocks(fill_height=True) as demo: | |
gr.Markdown( | |
"# ChatGPT-4o" | |
"\n\nThis is GPT-4o, you can use the text and image capabilities now. More capabilities like audio and video will be rolled out iteratively in the future. Stay tuned." | |
) | |
gr.LoginButton() | |
# gr.Markdown(""" | |
# ## This space will be shutting down soon. \n\n | |
# ## Visit [chatgpt-4o](https://chatgpt-4o.streamlit.app/) for an improved UI experience and future enhancements. | |
# """ | |
# ) | |
gr.ChatInterface( | |
get_completion, | |
multimodal=True, | |
# title = title, | |
# description = description | |
# additional_inputs=[ | |
# gr.Textbox(value="You are a friendly Chatbot.", label="System message"), | |
# gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"), | |
# gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"), | |
# ], | |
) | |
demo.queue(default_concurrency_limit=5) | |
if __name__ == "__main__": | |
demo.launch() |