AGYIntelligence / autogen_utils.py
thinkall's picture
Fix init_sender not defined
b2cf656
raw
history blame
16.6 kB
import asyncio
import os
import random
import sys
import textwrap
import threading
import time
from ast import literal_eval
import autogen
import chromadb
import isort
import panel as pn
from autogen import Agent, AssistantAgent, UserProxyAgent
from autogen.agentchat.contrib.compressible_agent import CompressibleAgent
from autogen.agentchat.contrib.gpt_assistant_agent import GPTAssistantAgent
from autogen.agentchat.contrib.llava_agent import LLaVAAgent
from autogen.agentchat.contrib.math_user_proxy_agent import MathUserProxyAgent
from autogen.agentchat.contrib.retrieve_assistant_agent import RetrieveAssistantAgent
from autogen.agentchat.contrib.retrieve_user_proxy_agent import RetrieveUserProxyAgent
from autogen.agentchat.contrib.teachable_agent import TeachableAgent
from autogen.code_utils import extract_code
from configs import (
DEFAULT_AUTO_REPLY,
DEFAULT_SYSTEM_MESSAGE,
Q1,
Q2,
Q3,
TIMEOUT,
TITLE,
)
try:
from termcolor import colored
except ImportError:
def colored(x, *args, **kwargs):
return x
def get_retrieve_config(docs_path, model_name, collection_name):
return {
"docs_path": literal_eval(docs_path),
"chunk_token_size": 1000,
"model": model_name,
"embedding_model": "all-mpnet-base-v2",
"get_or_create": True,
"client": chromadb.PersistentClient(path=".chromadb"),
"collection_name": collection_name,
}
# autogen.ChatCompletion.start_logging()
def termination_msg(x):
"""Check if a message is a termination message."""
_msg = str(x.get("content", "")).upper().strip().strip("\n").strip(".")
return isinstance(x, dict) and (
_msg.endswith("TERMINATE") or _msg.startswith("TERMINATE")
)
def _is_termination_msg(message):
"""Check if a message is a termination message.
Terminate when no code block is detected. Currently only detect python code blocks.
"""
if isinstance(message, dict):
message = message.get("content")
if message is None:
return False
cb = extract_code(message)
contain_code = False
for c in cb:
# todo: support more languages
if c[0] == "python":
contain_code = True
break
return not contain_code
def new_generate_oai_reply(
self,
messages=None,
sender=None,
config=None,
):
"""Generate a reply using autogen.oai."""
client = self.client if config is None else config
if client is None:
return False, None
if messages is None:
messages = self._oai_messages[sender]
# handle 336006โ€ƒhttps://cloud.baidu.com/doc/WENXINWORKSHOP/s/tlmyncueh
_context = messages[-1].pop("context", None)
_messages = self._oai_system_message + messages
for idx, msg in enumerate(_messages):
if idx == 0:
continue
if idx % 2 == 1:
msg["role"] = "user" if msg.get("role") != "function" else "function"
else:
msg["role"] = "assistant"
if len(_messages) % 2 == 1:
_messages.append({"content": DEFAULT_AUTO_REPLY, "role": "user"})
# print(f"messages: {_messages}")
response = client.create(context=_context, messages=_messages)
# print(f"{response=}")
return True, client.extract_text_or_function_call(response)[0]
def initialize_agents(
llm_config,
agent_name,
system_msg,
agent_type,
retrieve_config=None,
code_execution_config=False,
):
agent_name = agent_name.strip()
system_msg = system_msg.strip()
if "RetrieveUserProxyAgent" == agent_type:
agent = RetrieveUserProxyAgent(
name=agent_name,
system_message=system_msg,
is_termination_msg=_is_termination_msg,
human_input_mode="TERMINATE",
max_consecutive_auto_reply=5,
retrieve_config=retrieve_config,
code_execution_config=code_execution_config, # set to False if you don't want to execute the code
default_auto_reply=DEFAULT_AUTO_REPLY,
)
elif "GPTAssistantAgent" == agent_type:
agent = GPTAssistantAgent(
name=agent_name,
instructions=system_msg if system_msg else DEFAULT_SYSTEM_MESSAGE,
llm_config=llm_config,
is_termination_msg=termination_msg,
)
elif "CompressibleAgent" == agent_type:
compress_config = {
"mode": "COMPRESS",
"trigger_count": 600, # set this to a large number for less frequent compression
"verbose": True, # to allow printing of compression information: contex before and after compression
"leave_last_n": 2,
}
agent = CompressibleAgent(
name=agent_name,
system_message=system_msg if system_msg else DEFAULT_SYSTEM_MESSAGE,
llm_config=llm_config,
compress_config=compress_config,
is_termination_msg=termination_msg,
)
elif "UserProxy" in agent_type:
agent = globals()[agent_type](
name=agent_name,
is_termination_msg=termination_msg,
human_input_mode="TERMINATE",
system_message=system_msg,
default_auto_reply=DEFAULT_AUTO_REPLY,
max_consecutive_auto_reply=5,
code_execution_config=code_execution_config,
)
else:
agent = globals()[agent_type](
name=agent_name,
is_termination_msg=termination_msg,
human_input_mode="NEVER",
system_message=system_msg if system_msg else DEFAULT_SYSTEM_MESSAGE,
llm_config=llm_config,
)
# if any(["ernie" in cfg["model"].lower() for cfg in llm_config["config_list"]]):
if "ernie" in llm_config["config_list"][0]["model"].lower():
# Hack for ERNIE Bot models
# print("Hack for ERNIE Bot models.")
agent._reply_func_list.pop(-1)
agent.register_reply([Agent, None], new_generate_oai_reply, -1)
return agent
async def get_human_input(name, prompt: str, instance=None) -> str:
"""Get human input."""
if instance is None:
return input(prompt)
get_input_widget = pn.widgets.TextAreaInput(
placeholder=prompt, name="", sizing_mode="stretch_width"
)
get_input_checkbox = pn.widgets.Checkbox(name="Check to Submit Feedback")
instance.send(
pn.Row(get_input_widget, get_input_checkbox), user=name, respond=False
)
ts = time.time()
while True:
if time.time() - ts > TIMEOUT:
instance.send(
f"You didn't provide your feedback in {TIMEOUT} seconds, exit.",
user=name,
respond=False,
)
reply = "exit"
break
if get_input_widget.value != "" and get_input_checkbox.value is True:
get_input_widget.disabled = True
reply = get_input_widget.value
break
await asyncio.sleep(0.1)
return reply
async def check_termination_and_human_reply(
self,
messages=None,
sender=None,
config=None,
instance=None,
):
"""Check if the conversation should be terminated, and if human reply is provided."""
if config is None:
config = self
if messages is None:
messages = self._oai_messages[sender]
message = messages[-1]
reply = ""
no_human_input_msg = ""
if self.human_input_mode == "ALWAYS":
reply = await get_human_input(
self.name,
f"Provide feedback to {sender.name}. Press enter to skip and use auto-reply, or type 'exit' to end the conversation: ",
instance,
)
no_human_input_msg = "NO HUMAN INPUT RECEIVED." if not reply else ""
# if the human input is empty, and the message is a termination message, then we will terminate the conversation
reply = reply if reply or not self._is_termination_msg(message) else "exit"
else:
if (
self._consecutive_auto_reply_counter[sender]
>= self._max_consecutive_auto_reply_dict[sender]
):
if self.human_input_mode == "NEVER":
reply = "exit"
else:
# self.human_input_mode == "TERMINATE":
terminate = self._is_termination_msg(message)
reply = await get_human_input(
self.name,
f"Please give feedback to {sender.name}. Press enter or type 'exit' to stop the conversation: "
if terminate
else f"Please give feedback to {sender.name}. Press enter to skip and use auto-reply, or type 'exit' to stop the conversation: ",
instance,
)
no_human_input_msg = "NO HUMAN INPUT RECEIVED." if not reply else ""
# if the human input is empty, and the message is a termination message, then we will terminate the conversation
reply = reply if reply or not terminate else "exit"
elif self._is_termination_msg(message):
if self.human_input_mode == "NEVER":
reply = "exit"
else:
# self.human_input_mode == "TERMINATE":
reply = await get_human_input(
self.name,
f"Please give feedback to {sender.name}. Press enter or type 'exit' to stop the conversation: ",
instance,
)
no_human_input_msg = "NO HUMAN INPUT RECEIVED." if not reply else ""
# if the human input is empty, and the message is a termination message, then we will terminate the conversation
reply = reply or "exit"
# print the no_human_input_msg
if no_human_input_msg:
print(colored(f"\n>>>>>>>> {no_human_input_msg}", "red"), flush=True)
# stop the conversation
if reply == "exit":
# reset the consecutive_auto_reply_counter
self._consecutive_auto_reply_counter[sender] = 0
return True, None
# send the human reply
if reply or self._max_consecutive_auto_reply_dict[sender] == 0:
# reset the consecutive_auto_reply_counter
self._consecutive_auto_reply_counter[sender] = 0
return True, reply
# increment the consecutive_auto_reply_counter
self._consecutive_auto_reply_counter[sender] += 1
if self.human_input_mode != "NEVER":
print(colored("\n>>>>>>>> USING AUTO REPLY...", "red"), flush=True)
return False, None
async def format_code(code_to_format: str) -> str:
"""Format the code using isort and black."""
filename = f"temp_code_{int(time.time())}_{random.randint(10000, 99999)}.py"
with open(filename, "w") as file:
file.write(code_to_format)
isort.file(
filename, profile="black", known_first_party=["autogen"], float_to_top=True
)
formatted_code = ""
with open(filename, "r") as file:
formatted_code = file.read()
os.remove(filename)
return formatted_code
async def generate_code(agents, manager, contents, code_editor, groupchat):
code = """import autogen
import os
from autogen.agentchat.contrib.retrieve_user_proxy_agent import RetrieveUserProxyAgent
from autogen.agentchat.contrib.math_user_proxy_agent import MathUserProxyAgent
from autogen.code_utils import extract_code
config_list = autogen.config_list_from_json(
"OAI_CONFIG_LIST",
file_location=".",
)
if not config_list:
os.environ["MODEL"] = "<your model name>"
os.environ["OPENAI_API_KEY"] = "<your openai api key>"
os.environ["OPENAI_BASE_URL"] = "<your openai base url>" # optional
config_list = autogen.config_list_from_models(
model_list=[os.environ.get("MODEL", "gpt-35-turbo")],
)
llm_config = {
"timeout": 60,
"cache_seed": 42,
"config_list": config_list,
"temperature": 0,
}
def termination_msg(x):
_msg = str(x.get("content", "")).upper().strip().strip("\\n").strip(".")
return isinstance(x, dict) and (_msg.endswith("TERMINATE") or _msg.startswith("TERMINATE"))
def _is_termination_msg(message):
if isinstance(message, dict):
message = message.get("content")
if message is None:
return False
cb = extract_code(message)
contain_code = False
for c in cb:
# todo: support more languages
if c[0] == "python":
contain_code = True
break
return not contain_code
agents = []
"""
for agent in agents:
if isinstance(agent, RetrieveUserProxyAgent):
_retrieve_config = agent._retrieve_config
_retrieve_config["client"] = 'chromadb.PersistentClient(path=".chromadb")'
_code = f"""from autogen.agentchat.contrib.retrieve_user_proxy_agent import RetrieveUserProxyAgent
import chromadb
agent = RetrieveUserProxyAgent(
name="{agent.name}",
system_message=\"\"\"{agent.system_message}\"\"\",
is_termination_msg=_is_termination_msg,
human_input_mode="TERMINATE",
max_consecutive_auto_reply=5,
retrieve_config={_retrieve_config},
code_execution_config={agent._code_execution_config}, # set to False if you don't want to execute the code
default_auto_reply="{DEFAULT_AUTO_REPLY}",
)
"""
_code = _code.replace(
"""'chromadb.PersistentClient(path=".chromadb")'""",
"chromadb.PersistentClient(path='.chromadb')",
)
elif isinstance(agent, GPTAssistantAgent):
_code = f"""from auotgen.agentchat.contrib.gpt_assistant_agent import GPTAssistantAgent
agent = GPTAssistantAgent(
name="{agent.name}",
instructions=\"\"\"{agent.system_message}\"\"\",
llm_config=llm_config,
is_termination_msg=termination_msg,
)
"""
elif isinstance(agent, CompressibleAgent):
_code = f"""from autogen.agentchat.contrib.compressible_agent import CompressibleAgent
compress_config = {{
"mode": "COMPRESS",
"trigger_count": 600, # set this to a large number for less frequent compression
"verbose": True, # to allow printing of compression information: contex before and after compression
"leave_last_n": 2,
}}
agent = CompressibleAgent(
name="{agent.name}",
system_message=\"\"\"{agent.system_message}\"\"\",
llm_config=llm_config,
compress_config=compress_config,
is_termination_msg=termination_msg,
)
"""
elif isinstance(agent, UserProxyAgent):
_code = f"""from autogen import UserProxyAgent
agent = UserProxyAgent(
name="{agent.name}",
is_termination_msg=termination_msg,
human_input_mode="TERMINATE",
system_message=\"\"\"{agent.system_message}\"\"\",
default_auto_reply="{DEFAULT_AUTO_REPLY}",
max_consecutive_auto_reply=5,
code_execution_config={agent._code_execution_config},
)
"""
elif isinstance(agent, RetrieveAssistantAgent):
_code = f"""from autogen.agentchat.contrib.retrieve_assistant_agent import RetrieveAssistantAgent
agent = RetrieveAssistantAgent(
name="{agent.name}",
system_message=\"\"\"{agent.system_message}\"\"\",
llm_config=llm_config,
is_termination_msg=termination_msg,
)
"""
elif isinstance(agent, AssistantAgent):
_code = f"""from autogen import AssistantAgent
agent = AssistantAgent(
name="{agent.name}",
system_message=\"\"\"{agent.system_message}\"\"\",
llm_config=llm_config,
is_termination_msg=termination_msg,
)
"""
code += _code + "\n" + "agents.append(agent)\n\n"
_code = """
init_sender = None
for agent in agents:
if "UserProxy" in str(type(agent)):
init_sender = agent
break
if not init_sender:
init_sender = agents[0]
"""
code += _code
if manager:
_code = f"""
groupchat = autogen.GroupChat(
agents=agents, messages=[], max_round=12, speaker_selection_method="{groupchat.speaker_selection_method}", allow_repeat_speaker=False
) # todo: auto, sometimes message has no name
manager = autogen.GroupChatManager(groupchat=groupchat, llm_config=llm_config)
recipient = manager
"""
else:
_code = """
recipient = agents[1] if agents[1] != init_sender else agents[0]
"""
code += _code
_code = f"""
if isinstance(init_sender, (RetrieveUserProxyAgent, MathUserProxyAgent)):
init_sender.initiate_chat(recipient, problem="{contents}")
else:
init_sender.initiate_chat(recipient, message="{contents}")
"""
code += _code
code = textwrap.dedent(code)
code_editor.value = await format_code(code)