|
|
|
|
|
""" |
|
@Time : 2023/5/11 14:42 |
|
@Author : alexanderwu |
|
@File : manager.py |
|
""" |
|
from metagpt.llm import LLM |
|
from metagpt.logs import logger |
|
from metagpt.schema import Message |
|
|
|
|
|
class Manager: |
|
def __init__(self, llm: LLM = LLM()): |
|
self.llm = llm |
|
self.role_directions = { |
|
"BOSS": "Product Manager", |
|
"Product Manager": "Architect", |
|
"Architect": "Engineer", |
|
"Engineer": "QA Engineer", |
|
"QA Engineer": "Product Manager" |
|
} |
|
self.prompt_template = """ |
|
Given the following message: |
|
{message} |
|
|
|
And the current status of roles: |
|
{roles} |
|
|
|
Which role should handle this message? |
|
""" |
|
|
|
async def handle(self, message: Message, environment): |
|
""" |
|
管理员处理信息,现在简单的将信息递交给下一个人 |
|
The administrator processes the information, now simply passes the information on to the next person |
|
:param message: |
|
:param environment: |
|
:return: |
|
""" |
|
|
|
roles = environment.get_roles() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
next_role_profile = self.role_directions[message.role] |
|
|
|
for _, role in roles.items(): |
|
if next_role_profile == role.profile: |
|
next_role = role |
|
break |
|
else: |
|
logger.error(f"No available role can handle message: {message}.") |
|
return |
|
|
|
|
|
return await next_role.handle(message) |
|
|