Spaces:
Runtime error
Runtime error
import re | |
import io | |
import os | |
from typing import Optional, Tuple | |
import datetime | |
import sys | |
import gradio as gr | |
import requests | |
import json | |
from threading import Lock | |
from langchain import ConversationChain, LLMChain | |
from langchain.agents import load_tools, initialize_agent, Tool | |
from langchain.tools.bing_search.tool import BingSearchRun, BingSearchAPIWrapper | |
from langchain.chains.conversation.memory import ConversationBufferMemory | |
from langchain.llms import OpenAI | |
from langchain.chains import PALChain | |
from langchain.llms import AzureOpenAI | |
from langchain.utilities import ImunAPIWrapper, ImunMultiAPIWrapper | |
from langchain.utils import get_url_path | |
from openai.error import AuthenticationError, InvalidRequestError, RateLimitError | |
import argparse | |
import logging | |
from opencensus.ext.azure.log_exporter import AzureLogHandler | |
import uuid | |
logger = None | |
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") | |
BUG_FOUND_MSG = "Some Functionalities not supported yet. Please refresh and hit 'Click to wake up MM-REACT'" | |
AUTH_ERR_MSG = "OpenAI key needed" | |
REFRESH_MSG = "Please refresh and hit 'Click to wake up MM-REACT'" | |
MAX_TOKENS = 512 | |
def get_logger(): | |
global logger | |
if logger is None: | |
logger = logging.getLogger(__name__) | |
logger.addHandler(AzureLogHandler()) | |
return logger | |
# load chain | |
def load_chain(history, log_state): | |
global ARGS | |
if ARGS.openAIModel == 'openAIGPT35': | |
# openAI GPT 3.5 | |
llm = OpenAI(temperature=0, max_tokens=MAX_TOKENS) | |
elif ARGS.openAIModel == 'azureChatGPT': | |
# for Azure OpenAI ChatGPT | |
llm = AzureOpenAI(deployment_name="text-chat-davinci-002", model_name="text-chat-davinci-002", temperature=0, max_tokens=MAX_TOKENS) | |
elif ARGS.openAIModel == 'azureGPT35turbo': | |
# for Azure OpenAI gpt3.5 turbo | |
llm = AzureOpenAI(deployment_name="gpt-35-turbo-version-0301", model_name="gpt-35-turbo (version 0301)", temperature=0, max_tokens=MAX_TOKENS) | |
elif ARGS.openAIModel == 'azureTextDavinci003': | |
# for Azure OpenAI text davinci | |
llm = AzureOpenAI(deployment_name="text-davinci-003", model_name="text-davinci-003", temperature=0, max_tokens=MAX_TOKENS) | |
elif ARGS.openAIModel == 'azureGPT4': | |
# for Azure GPT4 private preview | |
llm = AzureOpenAI(deployment_name="gpt-4-32k-0314", temperature=0, chat_completion=True, max_tokens=MAX_TOKENS, openai_api_version="2023-03-15-preview") | |
memory = ConversationBufferMemory(memory_key="chat_history") | |
############################# | |
# loading all tools | |
imun_dense = ImunAPIWrapper( | |
imun_url=os.environ.get("IMUN_URL2"), | |
params=os.environ.get("IMUN_PARAMS2"), | |
imun_subscription_key=os.environ.get("IMUN_SUBSCRIPTION_KEY2")) | |
imun = ImunAPIWrapper() | |
imun = ImunMultiAPIWrapper(imuns=[imun, imun_dense]) | |
imun_celeb = ImunAPIWrapper( | |
imun_url=os.environ.get("IMUN_CELEB_URL"), | |
params="") | |
imun_read = ImunAPIWrapper( | |
imun_url=os.environ.get("IMUN_OCR_READ_URL"), | |
params=os.environ.get("IMUN_OCR_PARAMS"), | |
imun_subscription_key=os.environ.get("IMUN_OCR_SUBSCRIPTION_KEY")) | |
imun_receipt = ImunAPIWrapper( | |
imun_url=os.environ.get("IMUN_OCR_RECEIPT_URL"), | |
params=os.environ.get("IMUN_OCR_PARAMS"), | |
imun_subscription_key=os.environ.get("IMUN_OCR_SUBSCRIPTION_KEY")) | |
imun_businesscard = ImunAPIWrapper( | |
imun_url=os.environ.get("IMUN_OCR_BC_URL"), | |
params=os.environ.get("IMUN_OCR_PARAMS"), | |
imun_subscription_key=os.environ.get("IMUN_OCR_SUBSCRIPTION_KEY")) | |
imun_layout = ImunAPIWrapper( | |
imun_url=os.environ.get("IMUN_OCR_LAYOUT_URL"), | |
params=os.environ.get("IMUN_OCR_PARAMS"), | |
imun_subscription_key=os.environ.get("IMUN_OCR_SUBSCRIPTION_KEY")) | |
imun_invoice = ImunAPIWrapper( | |
imun_url=os.environ.get("IMUN_OCR_INVOICE_URL"), | |
params=os.environ.get("IMUN_OCR_PARAMS"), | |
imun_subscription_key=os.environ.get("IMUN_OCR_SUBSCRIPTION_KEY")) | |
bing = BingSearchAPIWrapper(k=2) | |
def edit_photo(query: str) -> str: | |
endpoint = os.environ.get("PHOTO_EDIT_ENDPOINT_URL") | |
query = query.strip() | |
url_idx, img_url = get_url_path(query) | |
if not img_url.startswith(("http://", "https://")): | |
return "Invalid image URL" | |
img_url = img_url.replace("0.0.0.0", os.environ.get("PHOTO_EDIT_ENDPOINT_URL_SHORT")) | |
instruction = query[:url_idx] | |
# This should be some internal IP to wherever the server runs | |
job = {"image_path": img_url, "instruction": instruction} | |
response = requests.post(endpoint, json=job) | |
if response.status_code != 200: | |
return "Could not finish the task try again later!" | |
return "Here is the edited image " + endpoint + response.json()["edited_image"] | |
# these tools should not step on each other's toes | |
tools = [ | |
Tool( | |
name="PAL-MATH", | |
func=PALChain.from_math_prompt(llm).run, | |
description=( | |
"A wrapper around calculator. " | |
"A language model that is really good at solving complex word math problems." | |
"Input should be a fully worded hard word math problem." | |
) | |
), | |
Tool( | |
name = "Image Understanding", | |
func=imun.run, | |
description=( | |
"A wrapper around Image Understanding. " | |
"Useful for when you need to understand what is inside an image (objects, texts, people)." | |
"Input should be an image url, or path to an image file (e.g. .jpg, .png)." | |
) | |
), | |
Tool( | |
name = "OCR Understanding", | |
func=imun_read.run, | |
description=( | |
"A wrapper around OCR Understanding (Optical Character Recognition). " | |
"Useful after Image Understanding tool has found text or handwriting is present in the image tags." | |
"This tool can find the actual text, written name, or product name in the image." | |
"Input should be an image url, or path to an image file (e.g. .jpg, .png)." | |
) | |
), | |
Tool( | |
name = "Receipt Understanding", | |
func=imun_receipt.run, | |
description=( | |
"A wrapper receipt understanding. " | |
"Useful after Image Understanding tool has recognized a receipt in the image tags." | |
"This tool can find the actual receipt text, prices and detailed items." | |
"Input should be an image url, or path to an image file (e.g. .jpg, .png)." | |
) | |
), | |
Tool( | |
name = "Business Card Understanding", | |
func=imun_businesscard.run, | |
description=( | |
"A wrapper around business card understanding. " | |
"Useful after Image Understanding tool has recognized businesscard in the image tags." | |
"This tool can find the actual business card text, name, address, email, website on the card." | |
"Input should be an image url, or path to an image file (e.g. .jpg, .png)." | |
) | |
), | |
Tool( | |
name = "Layout Understanding", | |
func=imun_layout.run, | |
description=( | |
"A wrapper around layout and table understanding. " | |
"Useful after Image Understanding tool has recognized businesscard in the image tags." | |
"This tool can find the actual business card text, name, address, email, website on the card." | |
"Input should be an image url, or path to an image file (e.g. .jpg, .png)." | |
) | |
), | |
Tool( | |
name = "Invoice Understanding", | |
func=imun_invoice.run, | |
description=( | |
"A wrapper around invoice understanding. " | |
"Useful after Image Understanding tool has recognized businesscard in the image tags." | |
"This tool can find the actual business card text, name, address, email, website on the card." | |
"Input should be an image url, or path to an image file (e.g. .jpg, .png)." | |
) | |
), | |
Tool( | |
name = "Celebrity Understanding", | |
func=imun_celeb.run, | |
description=( | |
"A wrapper around celebrity understanding. " | |
"Useful after Image Understanding tool has recognized people in the image tags that could be celebrities." | |
"This tool can find the name of celebrities in the image." | |
"Input should be an image url, or path to an image file (e.g. .jpg, .png)." | |
) | |
), | |
BingSearchRun(api_wrapper=bing), | |
Tool( | |
name = "Photo Editing", | |
func=edit_photo, | |
description=( | |
"A wrapper around photo editing. " | |
"Useful to edit an image with a given instruction." | |
"Input should be an image url, or path to an image file (e.g. .jpg, .png)." | |
) | |
), | |
] | |
chain = initialize_agent(tools, llm, agent="conversational-assistant", verbose=True, memory=memory, return_intermediate_steps=True, max_iterations=4) | |
log_state = log_state or "" | |
print ("log_state {}".format(log_state)) | |
log_state = str(uuid.uuid1()) | |
print("langchain reloaded") | |
# eproperties = {'custom_dimensions': {'key_1': 'value_1', 'key_2': 'value_2'}} | |
properties = {'custom_dimensions': {'session': log_state}} | |
get_logger().warning("langchain reloaded", extra=properties) | |
history = [] | |
history.append(("Show me what you got!", "Hi Human, Please upload an image to get started!")) | |
return history, history, chain, log_state, \ | |
gr.Textbox.update(visible=True), \ | |
gr.Button.update(visible=True), \ | |
gr.UploadButton.update(visible=True), \ | |
gr.Row.update(visible=True), \ | |
gr.HTML.update(visible=True), \ | |
gr.Button.update(variant="secondary") | |
# executes input typed by human | |
def run_chain(chain, inp): | |
# global chain | |
output = "" | |
try: | |
output = chain.conversation(input=inp, keep_short=ARGS.noIntermediateConv) | |
# output = chain.run(input=inp) | |
except AuthenticationError as ae: | |
output = AUTH_ERR_MSG + str(datetime.datetime.now()) + ". " + str(ae) | |
print("output", output) | |
except RateLimitError as rle: | |
output = "\n\nRateLimitError: " + str(rle) | |
except ValueError as ve: | |
output = "\n\nValueError: " + str(ve) | |
except InvalidRequestError as ire: | |
output = "\n\nInvalidRequestError: " + str(ire) | |
except Exception as e: | |
output = "\n\n" + BUG_FOUND_MSG + ":\n\n" + str(e) | |
return output | |
# simple chat function wrapper | |
class ChatWrapper: | |
def __init__(self): | |
self.lock = Lock() | |
def __call__( | |
self, inp: str, history: Optional[Tuple[str, str]], chain: Optional[ConversationChain], log_state | |
): | |
"""Execute the chat functionality.""" | |
self.lock.acquire() | |
try: | |
print("\n==== date/time: " + str(datetime.datetime.now()) + " ====") | |
print("inp: " + inp) | |
properties = {'custom_dimensions': {'session': log_state}} | |
get_logger().warning("inp: " + inp, extra=properties) | |
history = history or [] | |
# If chain is None, that is because no API key was provided. | |
output = "Please paste your OpenAI key from openai.com to use this app. " + str(datetime.datetime.now()) | |
######################## | |
# multi line | |
outputs = run_chain(chain, inp) | |
outputs = process_chain_output(outputs) | |
print (" len(outputs) {}".format(len(outputs))) | |
for i, output in enumerate(outputs): | |
if i==0: | |
history.append((inp, output)) | |
else: | |
history.append((None, output)) | |
except Exception as e: | |
raise e | |
finally: | |
self.lock.release() | |
print (history) | |
properties = {'custom_dimensions': {'session': log_state}} | |
if outputs is None: | |
outputs = "" | |
get_logger().warning(str(json.dumps(outputs)), extra=properties) | |
return history, history, "" | |
def add_image_with_path(state, chain, imagepath, log_state): | |
global ARGS | |
state = state or [] | |
url_input_for_chain = "http://0.0.0.0:{}/file={}".format(ARGS.port, imagepath) | |
outputs = run_chain(chain, url_input_for_chain) | |
######################## | |
# multi line response handling | |
outputs = process_chain_output(outputs) | |
for i, output in enumerate(outputs): | |
if i==0: | |
# state.append((f"![](/file={imagepath})", output)) | |
state.append(((imagepath,), output)) | |
else: | |
state.append((None, output)) | |
print (state) | |
properties = {'custom_dimensions': {'session': log_state}} | |
get_logger().warning("url_input_for_chain: " + url_input_for_chain, extra=properties) | |
if outputs is None: | |
outputs = "" | |
get_logger().warning(str(json.dumps(outputs)), extra=properties) | |
return state, state | |
# upload image | |
def add_image(state, chain, image, log_state): | |
global ARGS | |
state = state or [] | |
# handling spaces in image path | |
imagepath = image.name.replace(" ", "%20") | |
url_input_for_chain = "http://0.0.0.0:{}/file={}".format(ARGS.port, imagepath) | |
outputs = run_chain(chain, url_input_for_chain) | |
######################## | |
# multi line response handling | |
outputs = process_chain_output(outputs) | |
for i, output in enumerate(outputs): | |
if i==0: | |
state.append(((imagepath,), output)) | |
else: | |
state.append((None, output)) | |
print (state) | |
properties = {'custom_dimensions': {'session': log_state}} | |
get_logger().warning("url_input_for_chain: " + url_input_for_chain, extra=properties) | |
if outputs is None: | |
outputs = "" | |
get_logger().warning(str(json.dumps(outputs)), extra=properties) | |
return state, state | |
# extract image url from response and process differently | |
def replace_with_image_markup(text): | |
img_url = None | |
text= text.strip() | |
url_idx = text.rfind(" ") | |
img_url = text[url_idx + 1:].strip() | |
if img_url.endswith((".", "?")): | |
img_url = img_url[:-1] | |
# if img_url is not None: | |
# img_url = f"![](/file={img_url})" | |
return img_url | |
# multi line response handling | |
def process_chain_output(outputs): | |
global ARGS | |
EMPTY_AI_REPLY = "AI:" | |
# print("outputs {}".format(outputs)) | |
if isinstance(outputs, str): # single line output | |
if outputs.strip() == EMPTY_AI_REPLY: | |
outputs = REFRESH_MSG | |
outputs = [outputs] | |
elif isinstance(outputs, list): # multi line output | |
if ARGS.noIntermediateConv: # remove the items with assistant in it. | |
cleanOutputs = [] | |
for output in outputs: | |
if output.strip() == EMPTY_AI_REPLY: | |
output = REFRESH_MSG | |
# found an edited image url to embed | |
img_url = None | |
# print ("type list: {}".format(output)) | |
if "assistant: here is the edited image " in output.lower(): | |
img_url = replace_with_image_markup(output) | |
cleanOutputs.append("Assistant: Here is the edited image") | |
if img_url is not None: | |
cleanOutputs.append((img_url,)) | |
else: | |
cleanOutputs.append(output) | |
# cleanOutputs = cleanOutputs + output+ "." | |
outputs = cleanOutputs | |
return outputs | |
def init_and_kick_off(): | |
global ARGS | |
# initalize chatWrapper | |
chat = ChatWrapper() | |
exampleTitle = """<h3>Examples to start conversation..</h3>""" | |
comingSoon = """<center><b><p style="color:Red;">MM-REACT: April 20th version with GPT4 and image understanding capabilities</p></b></center>""" | |
detailLinks = """ | |
<center> | |
<a href="https://multimodal-react.github.io/"> MM-ReAct Website</a> | |
· | |
<a href="https://arxiv.org/abs/2303.11381">MM-ReAct Paper</a> | |
· | |
<a href="https://github.com/microsoft/MM-REACT">MM-ReAct Code</a> | |
</center> | |
""" | |
with gr.Blocks(css="#tryButton {width: 120px;}") as block: | |
llm_state = gr.State() | |
history_state = gr.State() | |
chain_state = gr.State() | |
log_state = gr.State() | |
reset_btn = gr.Button(value="!!!CLICK to wake up MM-REACT!!!", variant="primary", elem_id="resetbtn").style(full_width=True) | |
gr.HTML(detailLinks) | |
gr.HTML(comingSoon) | |
example_image_size = 90 | |
col_min_width = 80 | |
button_variant = "primary" | |
with gr.Row(): | |
with gr.Column(scale=1.0, min_width=100): | |
chatbot = gr.Chatbot(elem_id="chatbot", label="MM-REACT Bot").style(height=620) | |
with gr.Column(scale=0.20, min_width=200, visible=False) as exampleCol: | |
with gr.Row(): | |
grExampleTitle = gr.HTML(exampleTitle, visible=False) | |
with gr.Row(): | |
with gr.Column(scale=0.50, min_width=col_min_width): | |
example3Image = gr.Image("images/receipt.png", interactive=False).style(height=example_image_size, width=example_image_size) | |
with gr.Column(scale=0.50, min_width=col_min_width): | |
example3ImageButton = gr.Button(elem_id="tryButton", value="Try it!", variant=button_variant).style(full_width=True) | |
# dummy text field to hold the path | |
example3ImagePath = gr.Text("images/receipt.png", interactive=False, visible=False) | |
with gr.Row(): | |
with gr.Column(scale=0.50, min_width=col_min_width): | |
example1Image = gr.Image("images/money.png", interactive=False).style(height=example_image_size, width=example_image_size) | |
with gr.Column(scale=0.50, min_width=col_min_width): | |
example1ImageButton = gr.Button(elem_id="tryButton", value="Try it!", variant=button_variant).style(full_width=True) | |
# dummy text field to hold the path | |
example1ImagePath = gr.Text("images/money.png", interactive=False, visible=False) | |
with gr.Row(): | |
with gr.Column(scale=0.50, min_width=col_min_width): | |
example2Image = gr.Image("images/cartoon.png", interactive=False).style(height=example_image_size, width=example_image_size) | |
with gr.Column(scale=0.50, min_width=col_min_width): | |
example2ImageButton = gr.Button(elem_id="tryButton", value="Try it!", variant=button_variant).style(full_width=True) | |
# dummy text field to hold the path | |
example2ImagePath = gr.Text("images/cartoon.png", interactive=False, visible=False) | |
with gr.Row(): | |
with gr.Column(scale=0.50, min_width=col_min_width): | |
example4Image = gr.Image("images/product.png", interactive=False).style(height=example_image_size, width=example_image_size) | |
with gr.Column(scale=0.50, min_width=col_min_width): | |
example4ImageButton = gr.Button(elem_id="tryButton", value="Try it!", variant=button_variant).style(full_width=True) | |
# dummy text field to hold the path | |
example4ImagePath = gr.Text("images/product.png", interactive=False, visible=False) | |
with gr.Row(): | |
with gr.Column(scale=0.50, min_width=col_min_width): | |
example5Image = gr.Image("images/celebrity.png", interactive=False).style(height=example_image_size, width=example_image_size) | |
with gr.Column(scale=0.50, min_width=col_min_width): | |
example5ImageButton = gr.Button(elem_id="tryButton", value="Try it!", variant=button_variant).style(full_width=True) | |
# dummy text field to hold the path | |
example5ImagePath = gr.Text("images/celebrity.png", interactive=False, visible=False) | |
with gr.Row(): | |
with gr.Column(scale=0.75): | |
message = gr.Textbox(label="Upload a pic and ask!", | |
placeholder="Type your question about the uploaded image", | |
lines=1, visible=False) | |
with gr.Column(scale=0.15): | |
submit = gr.Button(value="Send", variant="secondary", visible=False).style(full_width=True) | |
with gr.Column(scale=0.10, min_width=0): | |
btn = gr.UploadButton("🖼️", file_types=["image"], visible=False).style(full_width=True) | |
message.submit(chat, inputs=[message, history_state, chain_state, log_state], outputs=[chatbot, history_state, message]) | |
submit.click(chat, inputs=[message, history_state, chain_state, log_state], outputs=[chatbot, history_state, message]) | |
btn.upload(add_image, inputs=[history_state, chain_state, btn, log_state], outputs=[history_state, chatbot]) | |
# load the chain | |
reset_btn.click(load_chain, inputs=[history_state, log_state], outputs=[chatbot, history_state, chain_state, log_state, message, submit, btn, exampleCol, grExampleTitle, reset_btn]) | |
# setup listener click for the examples | |
example1ImageButton.click(add_image_with_path, inputs=[history_state, chain_state, example1ImagePath, log_state], outputs=[history_state, chatbot]) | |
example2ImageButton.click(add_image_with_path, inputs=[history_state, chain_state, example2ImagePath, log_state], outputs=[history_state, chatbot]) | |
example3ImageButton.click(add_image_with_path, inputs=[history_state, chain_state, example3ImagePath, log_state], outputs=[history_state, chatbot]) | |
example4ImageButton.click(add_image_with_path, inputs=[history_state, chain_state, example4ImagePath, log_state], outputs=[history_state, chatbot]) | |
example5ImageButton.click(add_image_with_path, inputs=[history_state, chain_state, example5ImagePath, log_state], outputs=[history_state, chatbot]) | |
# launch the app | |
block.launch(server_name="0.0.0.0", server_port = ARGS.port) | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--port', type=int, required=False, default=7860) | |
parser.add_argument('--openAIModel', type=str, required=False, default='azureGPT4') | |
parser.add_argument('--noIntermediateConv', default=True, action='store_true', help='if this flag is turned on no intermediate conversation should be shown') | |
global ARGS | |
ARGS = parser.parse_args() | |
init_and_kick_off() |