import asyncio from functools import partial import json from pydantic import BaseModel, Field import datetime import os from pathlib import Path from typing import Any, Coroutine, Optional import aiofiles from aiobotocore.session import get_session from mdutils.mdutils import MdUtils from zipstream import AioZipStream from metagpt.actions import Action from metagpt.actions.action_output import ActionOutput from metagpt.actions.design_api import WriteDesign from metagpt.actions.prepare_documents import PrepareDocuments from metagpt.actions.project_management import WriteTasks from metagpt.actions.summarize_code import SummarizeCode from metagpt.actions.write_code import WriteCode from metagpt.actions.write_prd import WritePRD from metagpt.config import CONFIG from metagpt.const import COMPETITIVE_ANALYSIS_FILE_REPO, DATA_API_DESIGN_FILE_REPO, SEQ_FLOW_FILE_REPO, SERDESER_PATH from metagpt.roles import Architect, Engineer, ProductManager, ProjectManager, Role from metagpt.schema import Message from metagpt.team import Team from metagpt.utils.common import any_to_str, read_json_file, write_json_file from metagpt.utils.git_repository import GitRepository _default_llm_stream_log = partial(print, end="") class RoleRun(Action): role: Role def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) action = self.role._rc.todo self.desc = f"{self.role.profile} {action.desc or str(action)}" class PackProject(Action): role: Role def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.desc = "Pack the project with prd, design, code and more." async def run(self, key: str): url = await self.upload(key) mdfile = MdUtils(None) mdfile.new_line(mdfile.new_inline_link(url, url.rsplit("/", 1)[-1])) return ActionOutput(mdfile.get_md_text(), BaseModel()) async def upload(self, key: str): files = [] workspace = CONFIG.git_repo.workdir workspace = str(workspace) for r, _, fs in os.walk(workspace): _r = r[len(workspace):].lstrip("/") for f in fs: files.append({"file": os.path.join(r, f), "name": os.path.join(_r, f)}) # aiozipstream chunks = [] async for chunk in AioZipStream(files, chunksize=32768).stream(): chunks.append(chunk) return await get_download_url(b"".join(chunks), key) class SoftwareCompany(Role): """封装软件公司成角色,以快速接入agent store。""" finish: bool = False company: Team = Field(default_factory=Team) active_role: Optional[Role] = None git_repo: Optional[GitRepository] = None max_auto_summarize_code: int = 0 def __init__(self, use_code_review=False, *args, **kwargs): super().__init__(*args, **kwargs) engineer = Engineer(n_borg=5, use_code_review=use_code_review) self.company.hire([ProductManager(), Architect(), ProjectManager(), engineer]) self._init_actions([PackProject(role=engineer)]) def recv(self, message: Message) -> None: self.company.run_project(message.content) async def _think(self) -> Coroutine[Any, Any, bool]: """软件公司运行需要4轮 BOSS -> ProductManager -> Architect -> ProjectManager -> Engineer BossRequirement -> WritePRD -> WriteDesign -> WriteTasks -> WriteCode -> """ if self.finish: self._rc.todo = None return False if self.git_repo is not None: CONFIG.git_repo = self.git_repo environment = self.company.env for role in environment.roles.values(): if await role._observe(): await role._think() if isinstance(role._rc.todo, PrepareDocuments): self.active_role = role await self.act() self.git_repo = CONFIG.git_repo return await self._think() if isinstance(role._rc.todo, SummarizeCode): return await self._think() self._rc.todo = RoleRun(role=role) self.active_role = role return True self._set_state(0) return True async def _act(self) -> Message: if self.git_repo is not None: CONFIG.git_repo = self.git_repo CONFIG.src_workspace = CONFIG.git_repo.workdir / CONFIG.git_repo.workdir.name CONFIG.max_auto_summarize_code = self.max_auto_summarize_code if isinstance(self._rc.todo, PackProject): workdir = CONFIG.git_repo.workdir name = workdir.name uid = workdir.parent.parent.name now = datetime.datetime.now().strftime("%Y%m%d%H%M%S") key = f"{uid}/metagpt-{name}-{now}.zip" output = await self._rc.todo.run(key) self.finish = True return Message(output.content, role=self.profile, cause_by=type(self._rc.todo)) default_log_stream = CONFIG.get("LLM_STREAM_LOG", _default_llm_stream_log) start = False insert_code = False def log_stream(msg): nonlocal start, insert_code if not start: if msg.startswith("["): msg = "```json\n" + msg insert_code = True start = True return default_log_stream(msg) CONFIG.LLM_STREAM_LOG = log_stream output = await self.active_role._act() self.active_role._set_state(state=-1) self.active_role.publish_message(output) if insert_code: default_log_stream("\n```\n") cause_by = output.cause_by if cause_by == any_to_str(WritePRD): output = await self.format_prd(output) elif cause_by == any_to_str(WriteDesign): output = await self.format_system_design(output) elif cause_by == any_to_str(WriteTasks): output = await self.format_tasks(output) elif cause_by == any_to_str(WriteCode): output = await self.format_code(output) elif cause_by == any_to_str(SummarizeCode): output = await self.format_code_summary(output) return output async def format_prd(self, msg: Message): docs = [(k, v) for k, v in msg.instruct_content.docs.items()] prd_doc = docs[0][1] data = json.loads(prd_doc.content) mdfile = MdUtils(None) title = "Original Requirements" mdfile.new_header(2, title, add_table_of_contents=False) mdfile.new_paragraph(data[title]) title = "Product Goals" mdfile.new_header(2, title, add_table_of_contents=False) mdfile.new_list(data[title], marked_with="1") title = "User Stories" mdfile.new_header(2, title, add_table_of_contents=False) mdfile.new_list(data[title], marked_with="1") title = "Competitive Analysis" mdfile.new_header(2, title, add_table_of_contents=False) if all(i.count(":") == 1 for i in data[title]): mdfile.new_table( 2, len(data[title]) + 1, ["Competitor", "Description", *(i for j in data[title] for i in j.split(":"))] ) else: mdfile.new_list(data[title], marked_with="1") title = "Competitive Quadrant Chart" mdfile.new_header(2, title, add_table_of_contents=False) competitive_analysis_path = CONFIG.git_repo.workdir / Path(COMPETITIVE_ANALYSIS_FILE_REPO) / Path(prd_doc.filename).with_suffix(".png") if competitive_analysis_path.exists(): key = str(competitive_analysis_path.relative_to(CONFIG.git_repo.workdir.parent.parent)) url = await upload_file_to_s3(competitive_analysis_path, key) mdfile.new_line(mdfile.new_inline_image(title, url)) else: mdfile.insert_code(data[title], "mermaid") title = "Requirement Analysis" mdfile.new_header(2, title, add_table_of_contents=False) mdfile.new_paragraph(data[title]) title = "Requirement Pool" mdfile.new_header(2, title, add_table_of_contents=False) mdfile.new_table( 2, len(data[title]) + 1, ["Task Description", "Priority", *(i for j in data[title] for i in j)] ) title = "UI Design draft" mdfile.new_header(2, title, add_table_of_contents=False) mdfile.new_paragraph(data[title]) title = "Anything UNCLEAR" mdfile.new_header(2, title, add_table_of_contents=False) mdfile.new_paragraph(data[title]) content = mdfile.get_md_text() return Message(content, cause_by=msg.cause_by, role=msg.role) async def format_system_design(self, msg: Message): system_designs = [(k, v) for k, v in msg.instruct_content.docs.items()] system_design_doc = system_designs[0][1] data = json.loads(system_design_doc.content) mdfile = MdUtils(None) title = "Implementation approach" mdfile.new_header(2, title, add_table_of_contents=False) mdfile.new_paragraph(data[title]) title = "File list" mdfile.new_header(2, title, add_table_of_contents=False) mdfile.new_list(data[title], marked_with="1") title = "Data structures and interfaces" mdfile.new_header(2, title, add_table_of_contents=False) data_api_design_path = CONFIG.git_repo.workdir / Path(DATA_API_DESIGN_FILE_REPO) / Path(system_design_doc.filename).with_suffix(".png") if data_api_design_path.exists(): key = str(data_api_design_path.relative_to(CONFIG.git_repo.workdir.parent.parent)) url = await upload_file_to_s3(data_api_design_path, key) mdfile.new_line(mdfile.new_inline_image(title, url)) else: mdfile.insert_code(data[title], "mermaid") title = "Program call flow" mdfile.new_header(2, title, add_table_of_contents=False) seq_flow_path = CONFIG.git_repo.workdir / SEQ_FLOW_FILE_REPO / Path(system_design_doc.filename).with_suffix(".png") if seq_flow_path.exists(): key = str(seq_flow_path.relative_to(CONFIG.git_repo.workdir.parent.parent)) url = await upload_file_to_s3(seq_flow_path, key) mdfile.new_line(mdfile.new_inline_image(title, url)) else: mdfile.insert_code(data[title], "mermaid") title = "Anything UNCLEAR" mdfile.new_header(2, title, add_table_of_contents=False) mdfile.new_paragraph(data[title]) content = mdfile.get_md_text() return Message(content, cause_by=msg.cause_by, role=msg.role) async def format_tasks(self, msg: Message): tasks = [(k, v) for k, v in msg.instruct_content.docs.items()] task_doc = tasks[0][1] data = json.loads(task_doc.content) mdfile = MdUtils(None) title = "Required Python packages" mdfile.new_header(2, title, add_table_of_contents=False) mdfile.insert_code("\n".join(data[title]), "txt") title = "Required Other language third-party packages" mdfile.new_header(2, title, add_table_of_contents=False) mdfile.insert_code("\n".join(data[title]), "txt") title = "Logic Analysis" mdfile.new_header(2, title, add_table_of_contents=False) mdfile.new_table( 2, len(data[title]) + 1, ["Filename", "Class/Function Name", *(i for j in data[title] for i in j)] ) title = "Task list" mdfile.new_header(2, title, add_table_of_contents=False) mdfile.new_list(data[title]) title = "Full API spec" mdfile.new_header(2, title, add_table_of_contents=False) if data[title]: mdfile.insert_code(data[title], "json") title = "Shared Knowledge" mdfile.new_header(2, title, add_table_of_contents=False) mdfile.insert_code(data[title], "python") title = "Anything UNCLEAR" mdfile.new_header(2, title, add_table_of_contents=False) mdfile.insert_code(data[title], "python") content = mdfile.get_md_text() return Message(content, cause_by=msg.cause_by, role=msg.role) async def format_code(self, msg: Message): data = msg.content.splitlines() workdir = CONFIG.git_repo.workdir code_root = workdir / workdir.name mdfile = MdUtils(None) for filename in data: mdfile.new_header(2, filename, add_table_of_contents=False) async with aiofiles.open(code_root / filename) as f: content = await f.read() suffix = filename.rsplit(".", maxsplit=1)[-1] mdfile.insert_code(content, "python" if suffix == "py" else suffix) return Message(mdfile.get_md_text(), cause_by=msg.cause_by, role=msg.role) async def format_code_summary(self, msg: Message): # TODO return msg async def think(self): await self._think() return self._rc.todo async def act(self): return await self._act() def serialize(self, stg_path: Path = None): stg_path = SERDESER_PATH.joinpath("software_company") if stg_path is None else stg_path team_info_path = stg_path.joinpath("software_company_info.json") write_json_file(team_info_path, self.dict(exclude={"company": True})) self.company.serialize(stg_path.joinpath("company")) # save company alone @classmethod def deserialize(cls, stg_path: Path) -> "Team": """stg_path = ./storage/team""" # recover team_info software_company_info_path = stg_path.joinpath("software_company_info.json") if not software_company_info_path.exists(): raise FileNotFoundError( "recover storage meta file `team_info.json` not exist, " "not to recover and please start a new project." ) software_company_info: dict = read_json_file(software_company_info_path) # recover environment company = Team.deserialize(stg_path=stg_path.joinpath("company")) software_company_info.update({"company": company}) return cls(**software_company_info) async def upload_file_to_s3(filepath: str, key: str): async with aiofiles.open(filepath, "rb") as f: content = await f.read() return await get_download_url(content, key) async def get_download_url(content: bytes, key: str) -> str: if CONFIG.get("STORAGE_TYPE") == "S3": session = get_session() async with session.create_client( "s3", aws_secret_access_key=CONFIG.get("S3_SECRET_KEY"), aws_access_key_id=CONFIG.get("S3_ACCESS_KEY"), endpoint_url=CONFIG.get("S3_ENDPOINT_URL"), use_ssl=CONFIG.get("S3_SECURE"), ) as client: # upload object to amazon s3 bucket = CONFIG.get("S3_BUCKET") await client.put_object(Bucket=bucket, Key=key, Body=content) return f"{CONFIG.get('S3_ENDPOINT_URL')}/{bucket}/{key}" else: storage = CONFIG.get("LOCAL_ROOT", "storage") base_url = CONFIG.get("LOCAL_BASE_URL", "storage") filepath = Path(storage) / key filepath.parent.mkdir(exist_ok=True, parents=True) async with aiofiles.open(filepath, "wb") as f: await f.write(content) return f"{base_url}/{key}" async def main(idea, **kwargs): sc = SoftwareCompany(**kwargs) sc.recv(Message(idea)) while await sc.think(): print(await sc.act()) if __name__ == "__main__": asyncio.run(main())