|
|
|
|
|
""" |
|
@Time : 2023/5/11 14:43 |
|
@Author : alexanderwu |
|
@File : engineer.py |
|
""" |
|
import asyncio |
|
from collections import OrderedDict |
|
from pathlib import Path |
|
|
|
import aiofiles |
|
|
|
from metagpt.actions import WriteCode, WriteCodeReview, WriteDesign, WriteTasks |
|
from metagpt.config import CONFIG |
|
from metagpt.logs import logger |
|
from metagpt.roles import Role |
|
from metagpt.schema import Message |
|
from metagpt.utils.common import CodeParser |
|
from metagpt.utils.special_tokens import FILENAME_CODE_SEP, MSG_SEP |
|
|
|
|
|
async def gather_ordered_k(coros, k) -> list: |
|
tasks = OrderedDict() |
|
results = [None] * len(coros) |
|
done_queue = asyncio.Queue() |
|
|
|
for i, coro in enumerate(coros): |
|
if len(tasks) >= k: |
|
done, _ = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED) |
|
for task in done: |
|
index = tasks.pop(task) |
|
await done_queue.put((index, task.result())) |
|
task = asyncio.create_task(coro) |
|
tasks[task] = i |
|
|
|
if tasks: |
|
done, _ = await asyncio.wait(tasks.keys()) |
|
for task in done: |
|
index = tasks[task] |
|
await done_queue.put((index, task.result())) |
|
|
|
while not done_queue.empty(): |
|
index, result = await done_queue.get() |
|
results[index] = result |
|
|
|
return results |
|
|
|
|
|
class Engineer(Role): |
|
def __init__( |
|
self, |
|
name="Alex", |
|
profile="Engineer", |
|
goal="Write elegant, readable, extensible, efficient code", |
|
constraints="The code you write should conform to code standard like PEP8, be modular, easy to read and maintain", |
|
n_borg=1, |
|
use_code_review=False, |
|
): |
|
super().__init__(name, profile, goal, constraints) |
|
self._init_actions([WriteCode]) |
|
self.use_code_review = use_code_review |
|
if self.use_code_review: |
|
self._init_actions([WriteCode, WriteCodeReview]) |
|
self._watch([WriteTasks]) |
|
self.todos = [] |
|
self.n_borg = n_borg |
|
|
|
@classmethod |
|
def parse_tasks(self, task_msg: Message) -> list[str]: |
|
if task_msg.instruct_content: |
|
return task_msg.instruct_content.dict().get("Task list") |
|
return CodeParser.parse_file_list(block="Task list", text=task_msg.content) |
|
|
|
@classmethod |
|
def parse_code(self, code_text: str) -> str: |
|
return CodeParser.parse_code(block="", text=code_text) |
|
|
|
@classmethod |
|
def parse_workspace(cls, system_design_msg: Message) -> str: |
|
if system_design_msg.instruct_content: |
|
return system_design_msg.instruct_content.dict().get("Python package name").strip().strip("'").strip('"') |
|
return CodeParser.parse_str(block="Python package name", text=system_design_msg.content) |
|
|
|
def get_workspace(self) -> Path: |
|
msg = self._rc.memory.get_by_action(WriteDesign)[-1] |
|
if not msg: |
|
return CONFIG.workspace / "src" |
|
workspace = self.parse_workspace(msg) |
|
|
|
return CONFIG.workspace / workspace |
|
|
|
async def write_file(self, filename: str, code: str): |
|
workspace = self.get_workspace() |
|
filename = filename.replace('"', "").replace("\n", "") |
|
file = workspace / filename |
|
file.parent.mkdir(parents=True, exist_ok=True) |
|
async with aiofiles.open(file, "w") as f: |
|
await f.write(code) |
|
return file |
|
|
|
def recv(self, message: Message) -> None: |
|
self._rc.memory.add(message) |
|
if message in self._rc.important_memory: |
|
self.todos = self.parse_tasks(message) |
|
|
|
async def _act_mp(self) -> Message: |
|
|
|
todo_coros = [] |
|
for todo in self.todos: |
|
todo_coro = WriteCode().run( |
|
context=self._rc.memory.get_by_actions([WriteTasks, WriteDesign]), filename=todo |
|
) |
|
todo_coros.append(todo_coro) |
|
|
|
rsps = await gather_ordered_k(todo_coros, self.n_borg) |
|
for todo, code_rsp in zip(self.todos, rsps): |
|
_ = self.parse_code(code_rsp) |
|
logger.info(todo) |
|
logger.info(code_rsp) |
|
|
|
msg = Message(content=code_rsp, role=self.profile, cause_by=type(self._rc.todo)) |
|
self._rc.memory.add(msg) |
|
del self.todos[0] |
|
|
|
logger.info(f"Done {self.get_workspace()} generating.") |
|
msg = Message(content="all done.", role=self.profile, cause_by=type(self._rc.todo)) |
|
return msg |
|
|
|
async def _act_sp(self) -> Message: |
|
code_msg_all = [] |
|
instruct_content = {} |
|
for todo in self.todos: |
|
code = await WriteCode().run(context=self._rc.history, filename=todo) |
|
|
|
|
|
|
|
file_path = await self.write_file(todo, code) |
|
msg = Message(content=code, role=self.profile, cause_by=type(self._rc.todo)) |
|
self._rc.memory.add(msg) |
|
instruct_content[todo] = code |
|
|
|
|
|
code_msg = (todo, file_path) |
|
code_msg_all.append(code_msg) |
|
|
|
logger.info(f"Done {self.get_workspace()} generating.") |
|
msg = Message( |
|
content=MSG_SEP.join(todo + FILENAME_CODE_SEP + str(file_path) for todo, file_path in code_msg_all), |
|
instruct_content=instruct_content, |
|
role=self.profile, |
|
cause_by=type(self._rc.todo), |
|
send_to="QaEngineer", |
|
) |
|
return msg |
|
|
|
async def _act_sp_precision(self) -> Message: |
|
code_msg_all = [] |
|
instruct_content = {} |
|
for todo in self.todos: |
|
""" |
|
# 从历史信息中挑选必须的信息,以减少prompt长度(人工经验总结) |
|
1. Architect全部 |
|
2. ProjectManager全部 |
|
3. 是否需要其他代码(暂时需要)? |
|
TODO:目标是不需要。在任务拆分清楚后,根据设计思路,不需要其他代码也能够写清楚单个文件,如果不能则表示还需要在定义的更清晰,这个是代码能够写长的关键 |
|
""" |
|
context = [] |
|
msg = self._rc.memory.get_by_actions([WriteDesign, WriteTasks, WriteCode]) |
|
for m in msg: |
|
context.append(m.content) |
|
context_str = "\n".join(context) |
|
|
|
code = await WriteCode().run(context=context_str, filename=todo) |
|
|
|
if self.use_code_review: |
|
try: |
|
rewrite_code = await WriteCodeReview().run(context=context_str, code=code, filename=todo) |
|
code = rewrite_code |
|
except Exception as e: |
|
logger.error("code review failed!", e) |
|
pass |
|
file_path = await self.write_file(todo, code) |
|
msg = Message(content=code, role=self.profile, cause_by=WriteCode) |
|
self._rc.memory.add(msg) |
|
instruct_content[todo] = code |
|
|
|
code_msg = (todo, file_path) |
|
code_msg_all.append(code_msg) |
|
|
|
logger.info(f"Done {self.get_workspace()} generating.") |
|
msg = Message( |
|
content=MSG_SEP.join(todo + FILENAME_CODE_SEP + str(file_path) for todo, file_path in code_msg_all), |
|
instruct_content=instruct_content, |
|
role=self.profile, |
|
cause_by=type(self._rc.todo), |
|
send_to="QaEngineer", |
|
) |
|
return msg |
|
|
|
async def _act(self) -> Message: |
|
if self.use_code_review: |
|
return await self._act_sp_precision() |
|
return await self._act_sp() |
|
|