liushaojie
Add application file
360d784
raw
history blame
2.31 kB
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/5/11 14:42
@Author : alexanderwu
@File : manager.py
"""
from metagpt.llm import LLM
from metagpt.logs import logger
from metagpt.schema import Message
class Manager:
def __init__(self, llm: LLM = LLM()):
self.llm = llm # Large Language Model
self.role_directions = {
"BOSS": "Product Manager",
"Product Manager": "Architect",
"Architect": "Engineer",
"Engineer": "QA Engineer",
"QA Engineer": "Product Manager"
}
self.prompt_template = """
Given the following message:
{message}
And the current status of roles:
{roles}
Which role should handle this message?
"""
async def handle(self, message: Message, environment):
"""
管理员处理信息,现在简单的将信息递交给下一个人
The administrator processes the information, now simply passes the information on to the next person
:param message:
:param environment:
:return:
"""
# Get all roles from the environment
roles = environment.get_roles()
# logger.debug(f"{roles=}, {message=}")
# Build a context for the LLM to understand the situation
# context = {
# "message": str(message),
# "roles": {role.name: role.get_info() for role in roles},
# }
# Ask the LLM to decide which role should handle the message
# chosen_role_name = self.llm.ask(self.prompt_template.format(context))
# FIXME: 现在通过简单的字典决定流向,但之后还是应该有思考过程
#The direction of flow is now determined by a simple dictionary, but there should still be a thought process afterwards
next_role_profile = self.role_directions[message.role]
# logger.debug(f"{next_role_profile}")
for _, role in roles.items():
if next_role_profile == role.profile:
next_role = role
break
else:
logger.error(f"No available role can handle message: {message}.")
return
# Find the chosen role and handle the message
return await next_role.handle(message)