#!/usr/bin/env python # -*- coding: utf-8 -*- """ @Time : 2023/5/11 14:43 @Author : alexanderwu @File : engineer.py """ import asyncio from collections import OrderedDict from pathlib import Path import aiofiles from metagpt.actions import WriteCode, WriteCodeReview, WriteDesign, WriteTasks from metagpt.config import CONFIG from metagpt.logs import logger from metagpt.roles import Role from metagpt.schema import Message from metagpt.utils.common import CodeParser from metagpt.utils.special_tokens import FILENAME_CODE_SEP, MSG_SEP async def gather_ordered_k(coros, k) -> list: tasks = OrderedDict() results = [None] * len(coros) done_queue = asyncio.Queue() for i, coro in enumerate(coros): if len(tasks) >= k: done, _ = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED) for task in done: index = tasks.pop(task) await done_queue.put((index, task.result())) task = asyncio.create_task(coro) tasks[task] = i if tasks: done, _ = await asyncio.wait(tasks.keys()) for task in done: index = tasks[task] await done_queue.put((index, task.result())) while not done_queue.empty(): index, result = await done_queue.get() results[index] = result return results class Engineer(Role): def __init__( self, name="Alex", profile="Engineer", goal="Write elegant, readable, extensible, efficient code", constraints="The code you write should conform to code standard like PEP8, be modular, easy to read and maintain", n_borg=1, use_code_review=False, ): super().__init__(name, profile, goal, constraints) self._init_actions([WriteCode]) self.use_code_review = use_code_review if self.use_code_review: self._init_actions([WriteCode, WriteCodeReview]) self._watch([WriteTasks]) self.todos = [] self.n_borg = n_borg @classmethod def parse_tasks(self, task_msg: Message) -> list[str]: if task_msg.instruct_content: return task_msg.instruct_content.dict().get("Task list") return CodeParser.parse_file_list(block="Task list", text=task_msg.content) @classmethod def parse_code(self, code_text: str) -> str: return CodeParser.parse_code(block="", text=code_text) @classmethod def parse_workspace(cls, system_design_msg: Message) -> str: if system_design_msg.instruct_content: return system_design_msg.instruct_content.dict().get("Python package name").strip().strip("'").strip('"') return CodeParser.parse_str(block="Python package name", text=system_design_msg.content) def get_workspace(self) -> Path: msg = self._rc.memory.get_by_action(WriteDesign)[-1] if not msg: return CONFIG.workspace / "src" workspace = self.parse_workspace(msg) # Codes are written in workspace/{package_name}/{package_name} return CONFIG.workspace / workspace async def write_file(self, filename: str, code: str): workspace = self.get_workspace() filename = filename.replace('"', "").replace("\n", "") file = workspace / filename file.parent.mkdir(parents=True, exist_ok=True) async with aiofiles.open(file, "w") as f: await f.write(code) return file def recv(self, message: Message) -> None: self._rc.memory.add(message) if message in self._rc.important_memory: self.todos = self.parse_tasks(message) async def _act_mp(self) -> Message: # self.recreate_workspace() todo_coros = [] for todo in self.todos: todo_coro = WriteCode().run( context=self._rc.memory.get_by_actions([WriteTasks, WriteDesign]), filename=todo ) todo_coros.append(todo_coro) rsps = await gather_ordered_k(todo_coros, self.n_borg) for todo, code_rsp in zip(self.todos, rsps): _ = self.parse_code(code_rsp) logger.info(todo) logger.info(code_rsp) # self.write_file(todo, code) msg = Message(content=code_rsp, role=self.profile, cause_by=type(self._rc.todo)) self._rc.memory.add(msg) del self.todos[0] logger.info(f"Done {self.get_workspace()} generating.") msg = Message(content="all done.", role=self.profile, cause_by=type(self._rc.todo)) return msg async def _act_sp(self) -> Message: code_msg_all = [] # gather all code info, will pass to qa_engineer for tests later instruct_content = {} for todo in self.todos: code = await WriteCode().run(context=self._rc.history, filename=todo) # logger.info(todo) # logger.info(code_rsp) # code = self.parse_code(code_rsp) file_path = await self.write_file(todo, code) msg = Message(content=code, role=self.profile, cause_by=type(self._rc.todo)) self._rc.memory.add(msg) instruct_content[todo] = code # code_msg = todo + FILENAME_CODE_SEP + str(file_path) code_msg = (todo, file_path) code_msg_all.append(code_msg) logger.info(f"Done {self.get_workspace()} generating.") msg = Message( content=MSG_SEP.join(todo + FILENAME_CODE_SEP + str(file_path) for todo, file_path in code_msg_all), instruct_content=instruct_content, role=self.profile, cause_by=type(self._rc.todo), send_to="QaEngineer", ) return msg async def _act_sp_precision(self) -> Message: code_msg_all = [] # gather all code info, will pass to qa_engineer for tests later instruct_content = {} for todo in self.todos: """ # 从历史信息中挑选必须的信息,以减少prompt长度(人工经验总结) 1. Architect全部 2. ProjectManager全部 3. 是否需要其他代码(暂时需要)? TODO:目标是不需要。在任务拆分清楚后,根据设计思路,不需要其他代码也能够写清楚单个文件,如果不能则表示还需要在定义的更清晰,这个是代码能够写长的关键 """ context = [] msg = self._rc.memory.get_by_actions([WriteDesign, WriteTasks, WriteCode]) for m in msg: context.append(m.content) context_str = "\n".join(context) # 编写code code = await WriteCode().run(context=context_str, filename=todo) # code review if self.use_code_review: try: rewrite_code = await WriteCodeReview().run(context=context_str, code=code, filename=todo) code = rewrite_code except Exception as e: logger.error("code review failed!", e) pass file_path = await self.write_file(todo, code) msg = Message(content=code, role=self.profile, cause_by=WriteCode) self._rc.memory.add(msg) instruct_content[todo] = code code_msg = (todo, file_path) code_msg_all.append(code_msg) logger.info(f"Done {self.get_workspace()} generating.") msg = Message( content=MSG_SEP.join(todo + FILENAME_CODE_SEP + str(file_path) for todo, file_path in code_msg_all), instruct_content=instruct_content, role=self.profile, cause_by=type(self._rc.todo), send_to="QaEngineer", ) return msg async def _act(self) -> Message: if self.use_code_review: return await self._act_sp_precision() return await self._act_sp()