import io
import os
import ssl
from contextlib import closing
from typing import Optional, Tuple
import datetime
import boto3
import gradio as gr
import requests
# UNCOMMENT TO USE WHISPER
import warnings
import whisper
from langchain import ConversationChain, LLMChain
from langchain.agents import load_tools, initialize_agent
from langchain.chains.conversation.memory import ConversationBufferMemory
from langchain.llms import OpenAI
from threading import Lock
# Console to variable
from io import StringIO
import sys
import re
from openai.error import AuthenticationError, InvalidRequestError, RateLimitError
# Pertains to Express-inator functionality
from langchain.prompts import PromptTemplate
from polly_utils import PollyVoiceData, NEURAL_ENGINE
from azure_utils import AzureVoiceData
# Pertains to question answering functionality
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.text_splitter import CharacterTextSplitter
from langchain.vectorstores.faiss import FAISS
from langchain.docstore.document import Document
from langchain.chains.question_answering import load_qa_chain
news_api_key = os.environ["NEWS_API_KEY"]
tmdb_bearer_token = os.environ["TMDB_BEARER_TOKEN"]
TOOLS_LIST = ['serpapi', 'wolfram-alpha', 'pal-math', 'pal-colored-objects'] #'google-search','news-api','tmdb-api','open-meteo-api'
TOOLS_DEFAULT_LIST = ['serpapi', 'pal-math']
BUG_FOUND_MSG = "Congratulations, you've found a bug in this application!"
# AUTH_ERR_MSG = "Please paste your OpenAI key from openai.com to use this application. It is not necessary to hit a button or key after pasting it."
AUTH_ERR_MSG = "Please paste your OpenAI key from openai.com to use this application. "
MAX_TOKENS = 512
LOOPING_TALKING_HEAD = "videos/Masahiro.mp4"
TALKING_HEAD_WIDTH = "192"
MAX_TALKING_HEAD_TEXT_LENGTH = 155
# Pertains to Express-inator functionality
NUM_WORDS_DEFAULT = 0
MAX_WORDS = 400
FORMALITY_DEFAULT = "N/A"
TEMPERATURE_DEFAULT = 0.5
EMOTION_DEFAULT = "N/A"
LANG_LEVEL_DEFAULT = "N/A"
TRANSLATE_TO_DEFAULT = "N/A"
LITERARY_STYLE_DEFAULT = "N/A"
PROMPT_TEMPLATE = PromptTemplate(
input_variables=["original_words", "num_words", "formality", "emotions", "lang_level", "translate_to",
"literary_style"],
template="Restate {num_words}{formality}{emotions}{lang_level}{translate_to}{literary_style}the following: \n{original_words}\n",
)
POLLY_VOICE_DATA = PollyVoiceData()
AZURE_VOICE_DATA = AzureVoiceData()
# Pertains to WHISPER functionality
WHISPER_DETECT_LANG = "Detect language"
# UNCOMMENT TO USE WHISPER
warnings.filterwarnings("ignore")
WHISPER_MODEL = whisper.load_model("tiny")
print("WHISPER_MODEL", WHISPER_MODEL)
# UNCOMMENT TO USE WHISPER
def transcribe(aud_inp, whisper_lang):
if aud_inp is None:
return ""
aud = whisper.load_audio(aud_inp)
aud = whisper.pad_or_trim(aud)
mel = whisper.log_mel_spectrogram(aud).to(WHISPER_MODEL.device)
_, probs = WHISPER_MODEL.detect_language(mel)
options = whisper.DecodingOptions()
if whisper_lang != WHISPER_DETECT_LANG:
whisper_lang_code = POLLY_VOICE_DATA.get_whisper_lang_code(whisper_lang)
options = whisper.DecodingOptions(language=whisper_lang_code)
result = whisper.decode(WHISPER_MODEL, mel, options)
print("result.text", result.text)
result_text = ""
if result and result.text:
result_text = result.text
return result_text
# Temporarily address Wolfram Alpha SSL certificate issue
ssl._create_default_https_context = ssl._create_unverified_context
# TEMPORARY FOR TESTING
def transcribe_dummy(aud_inp_tb, whisper_lang):
if aud_inp_tb is None:
return ""
# aud = whisper.load_audio(aud_inp)
# aud = whisper.pad_or_trim(aud)
# mel = whisper.log_mel_spectrogram(aud).to(WHISPER_MODEL.device)
# _, probs = WHISPER_MODEL.detect_language(mel)
# options = whisper.DecodingOptions()
# options = whisper.DecodingOptions(language="ja")
# result = whisper.decode(WHISPER_MODEL, mel, options)
result_text = "Whisper will detect language"
if whisper_lang != WHISPER_DETECT_LANG:
whisper_lang_code = POLLY_VOICE_DATA.get_whisper_lang_code(whisper_lang)
result_text = f"Whisper will use lang code: {whisper_lang_code}"
print("result_text", result_text)
return aud_inp_tb
# Pertains to Express-inator functionality
def transform_text(desc, express_chain, num_words, formality,
anticipation_level, joy_level, trust_level,
fear_level, surprise_level, sadness_level, disgust_level, anger_level,
lang_level, translate_to, literary_style):
num_words_prompt = ""
if num_words and int(num_words) != 0:
num_words_prompt = "using up to " + str(num_words) + " words, "
# Change some arguments to lower case
formality = formality.lower()
anticipation_level = anticipation_level.lower()
joy_level = joy_level.lower()
trust_level = trust_level.lower()
fear_level = fear_level.lower()
surprise_level = surprise_level.lower()
sadness_level = sadness_level.lower()
disgust_level = disgust_level.lower()
anger_level = anger_level.lower()
formality_str = ""
if formality != "n/a":
formality_str = "in a " + formality + " manner, "
# put all emotions into a list
emotions = []
if anticipation_level != "n/a":
emotions.append(anticipation_level)
if joy_level != "n/a":
emotions.append(joy_level)
if trust_level != "n/a":
emotions.append(trust_level)
if fear_level != "n/a":
emotions.append(fear_level)
if surprise_level != "n/a":
emotions.append(surprise_level)
if sadness_level != "n/a":
emotions.append(sadness_level)
if disgust_level != "n/a":
emotions.append(disgust_level)
if anger_level != "n/a":
emotions.append(anger_level)
emotions_str = ""
if len(emotions) > 0:
if len(emotions) == 1:
emotions_str = "with emotion of " + emotions[0] + ", "
else:
emotions_str = "with emotions of " + ", ".join(emotions[:-1]) + " and " + emotions[-1] + ", "
lang_level_str = ""
if lang_level != LANG_LEVEL_DEFAULT:
lang_level_str = "at a " + lang_level + " level, " if translate_to == TRANSLATE_TO_DEFAULT else ""
translate_to_str = ""
if translate_to != TRANSLATE_TO_DEFAULT:
translate_to_str = "translated to " + (
"" if lang_level == TRANSLATE_TO_DEFAULT else lang_level + " level ") + translate_to + ", "
literary_style_str = ""
if literary_style != LITERARY_STYLE_DEFAULT:
if literary_style == "Prose":
literary_style_str = "as prose, "
if literary_style == "Story":
literary_style_str = "as a story, "
elif literary_style == "Summary":
literary_style_str = "as a summary, "
elif literary_style == "Outline":
literary_style_str = "as an outline numbers and lower case letters, "
elif literary_style == "Bullets":
literary_style_str = "as bullet points using bullets, "
elif literary_style == "Poetry":
literary_style_str = "as a poem, "
elif literary_style == "Haiku":
literary_style_str = "as a haiku, "
elif literary_style == "Limerick":
literary_style_str = "as a limerick, "
elif literary_style == "Rap":
literary_style_str = "as a rap, "
elif literary_style == "Joke":
literary_style_str = "as a very funny joke with a setup and punchline, "
elif literary_style == "Knock-knock":
literary_style_str = "as a very funny knock-knock joke, "
elif literary_style == "FAQ":
literary_style_str = "as a FAQ with several questions and answers, "
formatted_prompt = PROMPT_TEMPLATE.format(
original_words=desc,
num_words=num_words_prompt,
formality=formality_str,
emotions=emotions_str,
lang_level=lang_level_str,
translate_to=translate_to_str,
literary_style=literary_style_str
)
trans_instr = num_words_prompt + formality_str + emotions_str + lang_level_str + translate_to_str + literary_style_str
if express_chain and len(trans_instr.strip()) > 0:
generated_text = express_chain.run(
{'original_words': desc, 'num_words': num_words_prompt, 'formality': formality_str,
'emotions': emotions_str, 'lang_level': lang_level_str, 'translate_to': translate_to_str,
'literary_style': literary_style_str}).strip()
else:
print("Not transforming text")
generated_text = desc
# replace all newlines with
in generated_text
generated_text = generated_text.replace("\n", "\n\n")
prompt_plus_generated = "GPT prompt: " + formatted_prompt + "\n\n" + generated_text
print("\n==== date/time: " + str(datetime.datetime.now() - datetime.timedelta(hours=5)) + " ====")
print("prompt_plus_generated: " + prompt_plus_generated)
return generated_text
def load_chain(tools_list, llm):
chain = None
express_chain = None
memory = None
if llm:
print("\ntools_list", tools_list)
tool_names = tools_list
tools = load_tools(tool_names, llm=llm, news_api_key=news_api_key, tmdb_bearer_token=tmdb_bearer_token)
memory = ConversationBufferMemory(memory_key="chat_history")
chain = initialize_agent(tools, llm, agent="conversational-react-description", verbose=True, memory=memory)
express_chain = LLMChain(llm=llm, prompt=PROMPT_TEMPLATE, verbose=True)
return chain, express_chain, memory
def set_openai_api_key(api_key):
"""Set the api key and return chain.
If no api_key, then None is returned.
"""
if api_key and api_key.startswith("sk-") and len(api_key) > 50:
os.environ["OPENAI_API_KEY"] = api_key
print("\n\n ++++++++++++++ Setting OpenAI API key ++++++++++++++ \n\n")
print(str(datetime.datetime.now()) + ": Before OpenAI, OPENAI_API_KEY length: " + str(
len(os.environ["OPENAI_API_KEY"])))
llm = OpenAI(temperature=0, max_tokens=MAX_TOKENS)
print(str(datetime.datetime.now()) + ": After OpenAI, OPENAI_API_KEY length: " + str(
len(os.environ["OPENAI_API_KEY"])))
chain, express_chain, memory = load_chain(TOOLS_DEFAULT_LIST, llm)
# Pertains to question answering functionality
embeddings = OpenAIEmbeddings()
qa_chain = load_qa_chain(OpenAI(temperature=0), chain_type="stuff")
print(str(datetime.datetime.now()) + ": After load_chain, OPENAI_API_KEY length: " + str(
len(os.environ["OPENAI_API_KEY"])))
os.environ["OPENAI_API_KEY"] = ""
return chain, express_chain, llm, embeddings, qa_chain, memory
return None, None, None, None, None, None
def run_chain(chain, inp, capture_hidden_text):
output = ""
hidden_text = None
if capture_hidden_text:
error_msg = None
tmp = sys.stdout
hidden_text_io = StringIO()
sys.stdout = hidden_text_io
try:
output = chain.run(input=inp)
except AuthenticationError as ae:
error_msg = AUTH_ERR_MSG + str(datetime.datetime.now()) + ". " + str(ae)
print("error_msg", error_msg)
except RateLimitError as rle:
error_msg = "\n\nRateLimitError: " + str(rle)
except ValueError as ve:
error_msg = "\n\nValueError: " + str(ve)
except InvalidRequestError as ire:
error_msg = "\n\nInvalidRequestError: " + str(ire)
except Exception as e:
error_msg = "\n\n" + BUG_FOUND_MSG + ":\n\n" + str(e)
sys.stdout = tmp
hidden_text = hidden_text_io.getvalue()
# remove escape characters from hidden_text
hidden_text = re.sub(r'\x1b[^m]*m', '', hidden_text)
# remove "Entering new AgentExecutor chain..." from hidden_text
hidden_text = re.sub(r"Entering new AgentExecutor chain...\n", "", hidden_text)
# remove "Finished chain." from hidden_text
hidden_text = re.sub(r"Finished chain.", "", hidden_text)
# Add newline after "Thought:" "Action:" "Observation:" "Input:" and "AI:"
hidden_text = re.sub(r"Thought:", "\n\nThought:", hidden_text)
hidden_text = re.sub(r"Action:", "\n\nAction:", hidden_text)
hidden_text = re.sub(r"Observation:", "\n\nObservation:", hidden_text)
hidden_text = re.sub(r"Input:", "\n\nInput:", hidden_text)
hidden_text = re.sub(r"AI:", "\n\nAI:", hidden_text)
if error_msg:
hidden_text += error_msg
print("hidden_text: ", hidden_text)
else:
try:
output = chain.run(input=inp)
except AuthenticationError as ae:
output = AUTH_ERR_MSG + str(datetime.datetime.now()) + ". " + str(ae)
print("output", output)
except RateLimitError as rle:
output = "\n\nRateLimitError: " + str(rle)
except ValueError as ve:
output = "\n\nValueError: " + str(ve)
except InvalidRequestError as ire:
output = "\n\nInvalidRequestError: " + str(ire)
except Exception as e:
output = "\n\n" + BUG_FOUND_MSG + ":\n\n" + str(e)
return output, hidden_text
def reset_memory(history, memory):
memory.clear()
history = []
return history, history, memory
class ChatWrapper:
def __init__(self):
self.lock = Lock()
def __call__(
self, api_key: str, inp: str, history: Optional[Tuple[str, str]], chain: Optional[ConversationChain],
trace_chain: bool, speak_text: bool, talking_head: bool, monologue: bool, express_chain: Optional[LLMChain],
num_words, formality, anticipation_level, joy_level, trust_level,
fear_level, surprise_level, sadness_level, disgust_level, anger_level,
lang_level, translate_to, literary_style, qa_chain, docsearch, use_embeddings
):
"""Execute the chat functionality."""
self.lock.acquire()
try:
print("\n==== date/time: " + str(datetime.datetime.now()) + " ====")
print("inp: " + inp)
print("trace_chain: ", trace_chain)
print("speak_text: ", speak_text)
print("talking_head: ", talking_head)
print("monologue: ", monologue)
history = history or []
# If chain is None, that is because no API key was provided.
output = "Please paste your OpenAI key from openai.com to use this app. " + str(datetime.datetime.now())
hidden_text = output
if chain:
# Set OpenAI key
import openai
openai.api_key = api_key
if not monologue:
if use_embeddings:
if inp and inp.strip() != "":
if docsearch:
docs = docsearch.similarity_search(inp)
output = str(qa_chain.run(input_documents=docs, question=inp))
else:
output, hidden_text = "Please supply some text in the the Embeddings tab.", None
else:
output, hidden_text = "What's on your mind?", None
else:
output, hidden_text = run_chain(chain, inp, capture_hidden_text=trace_chain)
else:
output, hidden_text = inp, None
output = transform_text(output, express_chain, num_words, formality, anticipation_level, joy_level,
trust_level,
fear_level, surprise_level, sadness_level, disgust_level, anger_level,
lang_level, translate_to, literary_style)
text_to_display = output
if trace_chain:
text_to_display = hidden_text + "\n\n" + output
history.append((inp, text_to_display))
html_video, temp_file, html_audio, temp_aud_file = None, None, None, None
if speak_text:
if talking_head:
if len(output) <= MAX_TALKING_HEAD_TEXT_LENGTH:
html_video, temp_file = do_html_video_speak(output, translate_to)
else:
temp_file = LOOPING_TALKING_HEAD
html_video = create_html_video(temp_file, TALKING_HEAD_WIDTH)
html_audio, temp_aud_file = do_html_audio_speak(output, translate_to)
else:
html_audio, temp_aud_file = do_html_audio_speak(output, translate_to)
else:
if talking_head:
temp_file = LOOPING_TALKING_HEAD
html_video = create_html_video(temp_file, TALKING_HEAD_WIDTH)
else:
# html_audio, temp_aud_file = do_html_audio_speak(output, translate_to)
# html_video = create_html_video(temp_file, "128")
pass
except Exception as e:
raise e
finally:
self.lock.release()
return history, history, html_video, temp_file, html_audio, temp_aud_file, ""
# return history, history, html_audio, temp_aud_file, ""
chat = ChatWrapper()
def do_html_audio_speak(words_to_speak, polly_language):
polly_client = boto3.Session(
aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
region_name=os.environ["AWS_DEFAULT_REGION"]
).client('polly')
# voice_id, language_code, engine = POLLY_VOICE_DATA.get_voice(polly_language, "Female")
voice_id, language_code, engine = POLLY_VOICE_DATA.get_voice(polly_language, "Male")
if not voice_id:
# voice_id = "Joanna"
voice_id = "Matthew"
language_code = "en-US"
engine = NEURAL_ENGINE
response = polly_client.synthesize_speech(
Text=words_to_speak,
OutputFormat='mp3',
VoiceId=voice_id,
LanguageCode=language_code,
Engine=engine
)
html_audio = '
no audio' # Save the audio stream returned by Amazon Polly on Lambda's temp directory if "AudioStream" in response: with closing(response["AudioStream"]) as stream: # output = os.path.join("/tmp/", "speech.mp3") try: with open('audios/tempfile.mp3', 'wb') as f: f.write(stream.read()) temp_aud_file = gr.File("audios/tempfile.mp3") temp_aud_file_url = "/file=" + temp_aud_file.value['name'] html_audio = f'' except IOError as error: # Could not write to file, exit gracefully print(error) return None, None else: # The response didn't contain audio data, exit gracefully print("Could not stream audio") return None, None return html_audio, "audios/tempfile.mp3" def create_html_video(file_name, width): temp_file_url = "/file=" + tmp_file.value['name'] html_video = f'' return html_video def do_html_video_speak(words_to_speak, azure_language): azure_voice = AZURE_VOICE_DATA.get_voice(azure_language, "Male") if not azure_voice: azure_voice = "en-US-ChristopherNeural" headers = {"Authorization": f"Bearer {os.environ['EXHUMAN_API_KEY']}"} body = { 'bot_name': 'Masahiro', 'bot_response': words_to_speak, 'azure_voice': azure_voice, 'azure_style': 'friendly', 'animation_pipeline': 'high_speed', } api_endpoint = "https://api.exh.ai/animations/v1/generate_lipsync" res = requests.post(api_endpoint, json=body, headers=headers) print("res.status_code: ", res.status_code) html_video = '
no video' if isinstance(res.content, bytes): response_stream = io.BytesIO(res.content) print("len(res.content)): ", len(res.content)) with open('videos/tempfile.mp4', 'wb') as f: f.write(response_stream.read()) temp_file = gr.File("videos/tempfile.mp4") temp_file_url = "/file=" + temp_file.value['name'] html_video = f'' else: print('video url unknown') return html_video, "videos/tempfile.mp4" def update_selected_tools(widget, state, llm): if widget: state = widget chain, express_chain, memory = load_chain(state, llm) return state, llm, chain, express_chain def update_talking_head(widget, state): if widget: state = widget video_html_talking_head = create_html_video(LOOPING_TALKING_HEAD, TALKING_HEAD_WIDTH) return state, video_html_talking_head else: # return state, create_html_video(LOOPING_TALKING_HEAD, "32") return None, "" def update_foo(widget, state): if widget: state = widget return state # Pertains to question answering functionality def update_embeddings(embeddings_text, embeddings, qa_chain): if embeddings_text: text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0) texts = text_splitter.split_text(embeddings_text) docsearch = FAISS.from_texts(texts, embeddings) print("Embeddings updated") return docsearch # Pertains to question answering functionality def update_use_embeddings(widget, state): if widget: state = widget return state with gr.Blocks(css=".gradio-container {background-color: lightgray}") as block: llm_state = gr.State() history_state = gr.State() chain_state = gr.State() express_chain_state = gr.State() tools_list_state = gr.State(TOOLS_DEFAULT_LIST) trace_chain_state = gr.State(False) speak_text_state = gr.State(False) talking_head_state = gr.State(True) monologue_state = gr.State(False) # Takes the input and repeats it back to the user, optionally transforming it. memory_state = gr.State() # Pertains to Express-inator functionality num_words_state = gr.State(NUM_WORDS_DEFAULT) formality_state = gr.State(FORMALITY_DEFAULT) anticipation_level_state = gr.State(EMOTION_DEFAULT) joy_level_state = gr.State(EMOTION_DEFAULT) trust_level_state = gr.State(EMOTION_DEFAULT) fear_level_state = gr.State(EMOTION_DEFAULT) surprise_level_state = gr.State(EMOTION_DEFAULT) sadness_level_state = gr.State(EMOTION_DEFAULT) disgust_level_state = gr.State(EMOTION_DEFAULT) anger_level_state = gr.State(EMOTION_DEFAULT) lang_level_state = gr.State(LANG_LEVEL_DEFAULT) translate_to_state = gr.State(TRANSLATE_TO_DEFAULT) literary_style_state = gr.State(LITERARY_STYLE_DEFAULT) # Pertains to WHISPER functionality whisper_lang_state = gr.State(WHISPER_DETECT_LANG) # Pertains to question answering functionality embeddings_state = gr.State() qa_chain_state = gr.State() docsearch_state = gr.State() use_embeddings_state = gr.State(False) with gr.Tab("Chat"): with gr.Row(): with gr.Column(): gr.HTML( """
This application, developed by James L. Weaver, demonstrates a conversational agent implemented with OpenAI GPT-3.5 and LangChain. When necessary, it leverages tools for complex math, searching the internet, and accessing news and weather. Uses talking heads from Ex-Human. For faster inference without waiting in queue, you may duplicate the space.
""") gr.HTML(""" """) gr.HTML("""