Spaces:
Sleeping
Sleeping
import json | |
import gradio as gr | |
import requests | |
from lagent.schema import AgentStatusCode | |
PLANNER_HISTORY = [] | |
SEARCHER_HISTORY = [] | |
def rst_mem(history_planner: list, history_searcher: list): | |
''' | |
Reset the chatbot memory. | |
''' | |
history_planner = [] | |
history_searcher = [] | |
if PLANNER_HISTORY: | |
PLANNER_HISTORY.clear() | |
return history_planner, history_searcher | |
def format_response(gr_history, agent_return): | |
if agent_return['state'] in [ | |
AgentStatusCode.STREAM_ING, AgentStatusCode.ANSWER_ING | |
]: | |
gr_history[-1][1] = agent_return['response'] | |
elif agent_return['state'] == AgentStatusCode.PLUGIN_START: | |
thought = gr_history[-1][1].split('```')[0] | |
if agent_return['response'].startswith('```'): | |
gr_history[-1][1] = thought + '\n' + agent_return['response'] | |
elif agent_return['state'] == AgentStatusCode.PLUGIN_END: | |
thought = gr_history[-1][1].split('```')[0] | |
if isinstance(agent_return['response'], dict): | |
gr_history[-1][ | |
1] = thought + '\n' + f'```json\n{json.dumps(agent_return["response"], ensure_ascii=False, indent=4)}\n```' # noqa: E501 | |
elif agent_return['state'] == AgentStatusCode.PLUGIN_RETURN: | |
assert agent_return['inner_steps'][-1]['role'] == 'environment' | |
item = agent_return['inner_steps'][-1] | |
gr_history.append([ | |
None, | |
f"```json\n{json.dumps(item['content'], ensure_ascii=False, indent=4)}\n```" | |
]) | |
gr_history.append([None, '']) | |
return | |
def predict(history_planner, history_searcher): | |
def streaming(raw_response): | |
for chunk in raw_response.iter_lines(chunk_size=8192, | |
decode_unicode=False, | |
delimiter=b'\n'): | |
if chunk: | |
decoded = chunk.decode('utf-8') | |
if decoded == '\r': | |
continue | |
if decoded[:6] == 'data: ': | |
decoded = decoded[6:] | |
elif decoded.startswith(': ping - '): | |
continue | |
response = json.loads(decoded) | |
yield (response['response'], response['current_node']) | |
global PLANNER_HISTORY | |
PLANNER_HISTORY.append(dict(role='user', content=history_planner[-1][0])) | |
new_search_turn = True | |
url = 'http://localhost:8002/solve' | |
headers = {'Content-Type': 'application/json'} | |
data = {'inputs': PLANNER_HISTORY} | |
raw_response = requests.post(url, | |
headers=headers, | |
data=json.dumps(data), | |
timeout=20, | |
stream=True) | |
for resp in streaming(raw_response): | |
agent_return, node_name = resp | |
if node_name: | |
if node_name in ['root', 'response']: | |
continue | |
agent_return = agent_return['nodes'][node_name]['detail'] | |
if new_search_turn: | |
history_searcher.append([agent_return['content'], '']) | |
new_search_turn = False | |
format_response(history_searcher, agent_return) | |
if agent_return['state'] == AgentStatusCode.END: | |
new_search_turn = True | |
yield history_planner, history_searcher | |
else: | |
new_search_turn = True | |
format_response(history_planner, agent_return) | |
if agent_return['state'] == AgentStatusCode.END: | |
PLANNER_HISTORY = agent_return['inner_steps'] | |
yield history_planner, history_searcher | |
return history_planner, history_searcher | |
with gr.Blocks() as demo: | |
gr.HTML("""<h1 align="center">WebAgent Gradio Simple Demo</h1>""") | |
with gr.Row(): | |
with gr.Column(scale=10): | |
with gr.Row(): | |
with gr.Column(): | |
planner = gr.Chatbot(label='planner', | |
height=700, | |
show_label=True, | |
show_copy_button=True, | |
bubble_full_width=False, | |
render_markdown=True) | |
with gr.Column(): | |
searcher = gr.Chatbot(label='searcher', | |
height=700, | |
show_label=True, | |
show_copy_button=True, | |
bubble_full_width=False, | |
render_markdown=True) | |
with gr.Row(): | |
user_input = gr.Textbox(show_label=False, | |
placeholder='inputs...', | |
lines=5, | |
container=False) | |
with gr.Row(): | |
with gr.Column(scale=2): | |
submitBtn = gr.Button('Submit') | |
with gr.Column(scale=1, min_width=20): | |
emptyBtn = gr.Button('Clear History') | |
def user(query, history): | |
return '', history + [[query, '']] | |
submitBtn.click(user, [user_input, planner], [user_input, planner], | |
queue=False).then(predict, [planner, searcher], | |
[planner, searcher]) | |
emptyBtn.click(rst_mem, [planner, searcher], [planner, searcher], | |
queue=False) | |
demo.queue() | |
demo.launch(server_name='127.0.0.1', | |
server_port=7882, | |
inbrowser=True, | |
share=True) | |