|
import asyncio |
|
import datetime |
|
import json |
|
import os |
|
from functools import partial |
|
from pathlib import Path |
|
from typing import Any, Coroutine, Optional |
|
|
|
import aiofiles |
|
from aiobotocore.session import get_session |
|
from mdutils.mdutils import MdUtils |
|
from metagpt.actions import Action |
|
from metagpt.actions.action_output import ActionOutput |
|
from metagpt.actions.design_api import WriteDesign |
|
from metagpt.actions.prepare_documents import PrepareDocuments |
|
from metagpt.actions.project_management import WriteTasks |
|
from metagpt.actions.summarize_code import SummarizeCode |
|
from metagpt.actions.write_code import WriteCode |
|
from metagpt.actions.write_prd import WritePRD |
|
from metagpt.config import CONFIG |
|
from metagpt.const import ( |
|
COMPETITIVE_ANALYSIS_FILE_REPO, |
|
DATA_API_DESIGN_FILE_REPO, |
|
SEQ_FLOW_FILE_REPO, |
|
SERDESER_PATH, |
|
) |
|
from metagpt.roles import Architect, Engineer, ProductManager, ProjectManager, Role |
|
from metagpt.schema import Message |
|
from metagpt.team import Team |
|
from metagpt.utils.common import any_to_str, read_json_file, write_json_file |
|
from metagpt.utils.git_repository import GitRepository |
|
from pydantic import BaseModel, Field |
|
from zipstream import AioZipStream |
|
|
|
_default_llm_stream_log = partial(print, end="") |
|
|
|
|
|
class PackInfo(BaseModel): |
|
url: str |
|
|
|
|
|
class RoleRun(Action): |
|
role: Role |
|
|
|
def __init__(self, *args, **kwargs): |
|
super().__init__(*args, **kwargs) |
|
action = self.role.rc.todo |
|
self.desc = f"{self.role.profile} {action.desc or str(action)}" |
|
|
|
|
|
class PackProject(Action): |
|
role: Role |
|
|
|
def __init__(self, *args, **kwargs): |
|
super().__init__(*args, **kwargs) |
|
self.desc = "Pack the project with prd, design, code and more." |
|
|
|
async def run(self, key: str): |
|
url = await self.upload(key) |
|
info = PackInfo(url=url) |
|
mdfile = MdUtils(None) |
|
mdfile.new_line(mdfile.new_inline_link(url, url.rsplit("/", 1)[-1])) |
|
return ActionOutput(mdfile.get_md_text(), info) |
|
|
|
async def upload(self, key: str): |
|
files = [] |
|
workspace = CONFIG.git_repo.workdir |
|
workspace = str(workspace) |
|
for r, _, fs in os.walk(workspace): |
|
_r = r[len(workspace) :].lstrip("/") |
|
for f in fs: |
|
files.append({"file": os.path.join(r, f), "name": os.path.join(_r, f)}) |
|
|
|
chunks = [] |
|
async for chunk in AioZipStream(files, chunksize=32768).stream(): |
|
chunks.append(chunk) |
|
return await get_download_url(b"".join(chunks), key) |
|
|
|
|
|
class SoftwareCompany(Role): |
|
"""封装软件公司成角色,以快速接入agent store。""" |
|
|
|
finish: bool = False |
|
company: Team = Field(default_factory=Team) |
|
active_role: Optional[Role] = None |
|
git_repo: Optional[GitRepository] = None |
|
max_auto_summarize_code: int = 0 |
|
|
|
def __init__(self, use_code_review=False, *args, **kwargs): |
|
super().__init__(*args, **kwargs) |
|
engineer = Engineer(n_borg=5, use_code_review=use_code_review) |
|
self.company.hire([ProductManager(), Architect(), ProjectManager(), engineer]) |
|
self._init_actions([PackProject(role=engineer)]) |
|
|
|
def recv(self, message: Message) -> None: |
|
self.company.run_project(message.content) |
|
|
|
async def _think(self) -> Coroutine[Any, Any, bool]: |
|
"""软件公司运行需要4轮 |
|
|
|
BOSS -> ProductManager -> Architect -> ProjectManager -> Engineer |
|
BossRequirement -> WritePRD -> WriteDesign -> WriteTasks -> WriteCode -> |
|
""" |
|
if self.finish: |
|
self.rc.todo = None |
|
return False |
|
|
|
if self.git_repo is not None: |
|
CONFIG.git_repo = self.git_repo |
|
|
|
environment = self.company.env |
|
for role in environment.roles.values(): |
|
if await role._observe(): |
|
await role._think() |
|
if isinstance(role.rc.todo, PrepareDocuments): |
|
self.active_role = role |
|
await self.act() |
|
self.git_repo = CONFIG.git_repo |
|
return await self._think() |
|
|
|
if isinstance(role.rc.todo, SummarizeCode): |
|
return await self._think() |
|
|
|
self.rc.todo = RoleRun(role=role) |
|
self.active_role = role |
|
return True |
|
|
|
self._set_state(0) |
|
return True |
|
|
|
async def _act(self) -> Message: |
|
if self.git_repo is not None: |
|
CONFIG.git_repo = self.git_repo |
|
CONFIG.src_workspace = CONFIG.git_repo.workdir / CONFIG.git_repo.workdir.name |
|
CONFIG.max_auto_summarize_code = self.max_auto_summarize_code |
|
|
|
if isinstance(self.rc.todo, PackProject): |
|
workdir = CONFIG.git_repo.workdir |
|
name = workdir.name |
|
uid = workdir.parent.name |
|
now = datetime.datetime.now().strftime("%Y%m%d%H%M%S") |
|
key = f"{uid}/metagpt-{name}-{now}.zip" |
|
output = await self.rc.todo.run(key) |
|
self.finish = True |
|
return Message(output.content, role=self.profile, cause_by=type(self.rc.todo)) |
|
|
|
default_log_stream = CONFIG.get("LLM_STREAM_LOG", _default_llm_stream_log) |
|
|
|
start = False |
|
insert_code = False |
|
|
|
def log_stream(msg): |
|
nonlocal start, insert_code |
|
if not start: |
|
if msg.startswith("["): |
|
msg = "```json\n" + msg |
|
insert_code = True |
|
start = True |
|
return default_log_stream(msg) |
|
|
|
CONFIG.LLM_STREAM_LOG = log_stream |
|
|
|
output = await self.active_role._act() |
|
self.active_role._set_state(state=-1) |
|
self.active_role.publish_message(output) |
|
|
|
if insert_code: |
|
default_log_stream("\n```\n") |
|
|
|
cause_by = output.cause_by |
|
|
|
if cause_by == any_to_str(WritePRD): |
|
output = await self.format_prd(output) |
|
elif cause_by == any_to_str(WriteDesign): |
|
output = await self.format_system_design(output) |
|
elif cause_by == any_to_str(WriteTasks): |
|
output = await self.format_tasks(output) |
|
elif cause_by == any_to_str(WriteCode): |
|
output = await self.format_code(output) |
|
elif cause_by == any_to_str(SummarizeCode): |
|
output = await self.format_code_summary(output) |
|
return output |
|
|
|
async def format_prd(self, msg: Message): |
|
docs = [(k, v) for k, v in msg.instruct_content.docs.items()] |
|
prd_doc = docs[0][1] |
|
data = json.loads(prd_doc.content) |
|
|
|
mdfile = MdUtils(None) |
|
title = "Original Requirements" |
|
mdfile.new_header(2, title, add_table_of_contents=False) |
|
mdfile.new_paragraph(data[title]) |
|
|
|
title = "Product Goals" |
|
mdfile.new_header(2, title, add_table_of_contents=False) |
|
mdfile.new_list(data[title], marked_with="1") |
|
|
|
title = "User Stories" |
|
mdfile.new_header(2, title, add_table_of_contents=False) |
|
mdfile.new_list(data[title], marked_with="1") |
|
|
|
title = "Competitive Analysis" |
|
mdfile.new_header(2, title, add_table_of_contents=False) |
|
if all(i.count(":") == 1 for i in data[title]): |
|
mdfile.new_table( |
|
2, len(data[title]) + 1, ["Competitor", "Description", *(i for j in data[title] for i in j.split(":"))] |
|
) |
|
else: |
|
mdfile.new_list(data[title], marked_with="1") |
|
|
|
title = "Competitive Quadrant Chart" |
|
mdfile.new_header(2, title, add_table_of_contents=False) |
|
competitive_analysis_path = ( |
|
CONFIG.git_repo.workdir / Path(COMPETITIVE_ANALYSIS_FILE_REPO) / Path(prd_doc.filename).with_suffix(".png") |
|
) |
|
|
|
if competitive_analysis_path.exists(): |
|
key = str(competitive_analysis_path.relative_to(CONFIG.git_repo.workdir.parent.parent)) |
|
url = await upload_file_to_s3(competitive_analysis_path, key) |
|
mdfile.new_line(mdfile.new_inline_image(title, url)) |
|
else: |
|
mdfile.insert_code(data[title], "mermaid") |
|
|
|
title = "Requirement Analysis" |
|
mdfile.new_header(2, title, add_table_of_contents=False) |
|
mdfile.new_paragraph(data[title]) |
|
|
|
title = "Requirement Pool" |
|
mdfile.new_header(2, title, add_table_of_contents=False) |
|
mdfile.new_table( |
|
2, len(data[title]) + 1, ["Task Description", "Priority", *(i for j in data[title] for i in j)] |
|
) |
|
|
|
title = "UI Design draft" |
|
mdfile.new_header(2, title, add_table_of_contents=False) |
|
mdfile.new_paragraph(data[title]) |
|
|
|
title = "Anything UNCLEAR" |
|
mdfile.new_header(2, title, add_table_of_contents=False) |
|
mdfile.new_paragraph(data[title]) |
|
content = mdfile.get_md_text() |
|
return Message(content, cause_by=msg.cause_by, role=msg.role) |
|
|
|
async def format_system_design(self, msg: Message): |
|
system_designs = [(k, v) for k, v in msg.instruct_content.docs.items()] |
|
system_design_doc = system_designs[0][1] |
|
data = json.loads(system_design_doc.content) |
|
|
|
mdfile = MdUtils(None) |
|
|
|
title = "Implementation approach" |
|
mdfile.new_header(2, title, add_table_of_contents=False) |
|
mdfile.new_paragraph(data[title]) |
|
|
|
title = "File list" |
|
mdfile.new_header(2, title, add_table_of_contents=False) |
|
mdfile.new_list(data[title], marked_with="1") |
|
|
|
title = "Data structures and interfaces" |
|
mdfile.new_header(2, title, add_table_of_contents=False) |
|
|
|
data_api_design_path = ( |
|
CONFIG.git_repo.workdir |
|
/ Path(DATA_API_DESIGN_FILE_REPO) |
|
/ Path(system_design_doc.filename).with_suffix(".png") |
|
) |
|
if data_api_design_path.exists(): |
|
key = str(data_api_design_path.relative_to(CONFIG.git_repo.workdir.parent.parent)) |
|
url = await upload_file_to_s3(data_api_design_path, key) |
|
mdfile.new_line(mdfile.new_inline_image(title, url)) |
|
else: |
|
mdfile.insert_code(data[title], "mermaid") |
|
|
|
title = "Program call flow" |
|
mdfile.new_header(2, title, add_table_of_contents=False) |
|
seq_flow_path = ( |
|
CONFIG.git_repo.workdir / SEQ_FLOW_FILE_REPO / Path(system_design_doc.filename).with_suffix(".png") |
|
) |
|
if seq_flow_path.exists(): |
|
key = str(seq_flow_path.relative_to(CONFIG.git_repo.workdir.parent.parent)) |
|
url = await upload_file_to_s3(seq_flow_path, key) |
|
mdfile.new_line(mdfile.new_inline_image(title, url)) |
|
else: |
|
mdfile.insert_code(data[title], "mermaid") |
|
|
|
title = "Anything UNCLEAR" |
|
mdfile.new_header(2, title, add_table_of_contents=False) |
|
mdfile.new_paragraph(data[title]) |
|
content = mdfile.get_md_text() |
|
return Message(content, cause_by=msg.cause_by, role=msg.role) |
|
|
|
async def format_tasks(self, msg: Message): |
|
tasks = [(k, v) for k, v in msg.instruct_content.docs.items()] |
|
task_doc = tasks[0][1] |
|
data = json.loads(task_doc.content) |
|
|
|
mdfile = MdUtils(None) |
|
title = "Required Python packages" |
|
mdfile.new_header(2, title, add_table_of_contents=False) |
|
mdfile.insert_code("\n".join(data[title]), "txt") |
|
|
|
title = "Required Other language third-party packages" |
|
mdfile.new_header(2, title, add_table_of_contents=False) |
|
mdfile.insert_code("\n".join(data[title]), "txt") |
|
|
|
title = "Logic Analysis" |
|
mdfile.new_header(2, title, add_table_of_contents=False) |
|
mdfile.new_table( |
|
2, len(data[title]) + 1, ["Filename", "Class/Function Name", *(i for j in data[title] for i in j)] |
|
) |
|
|
|
title = "Task list" |
|
mdfile.new_header(2, title, add_table_of_contents=False) |
|
mdfile.new_list(data[title]) |
|
|
|
title = "Full API spec" |
|
mdfile.new_header(2, title, add_table_of_contents=False) |
|
if data[title]: |
|
mdfile.insert_code(data[title], "json") |
|
|
|
title = "Shared Knowledge" |
|
mdfile.new_header(2, title, add_table_of_contents=False) |
|
mdfile.insert_code(data[title], "python") |
|
|
|
title = "Anything UNCLEAR" |
|
mdfile.new_header(2, title, add_table_of_contents=False) |
|
mdfile.insert_code(data[title], "python") |
|
content = mdfile.get_md_text() |
|
return Message(content, cause_by=msg.cause_by, role=msg.role) |
|
|
|
async def format_code(self, msg: Message): |
|
data = msg.content.splitlines() |
|
workdir = CONFIG.git_repo.workdir |
|
code_root = workdir / workdir.name |
|
|
|
mdfile = MdUtils(None) |
|
|
|
for filename in data: |
|
mdfile.new_header(2, filename, add_table_of_contents=False) |
|
async with aiofiles.open(code_root / filename) as f: |
|
content = await f.read() |
|
suffix = filename.rsplit(".", maxsplit=1)[-1] |
|
mdfile.insert_code(content, "python" if suffix == "py" else suffix) |
|
return Message(mdfile.get_md_text(), cause_by=msg.cause_by, role=msg.role) |
|
|
|
async def format_code_summary(self, msg: Message): |
|
|
|
return msg |
|
|
|
async def think(self): |
|
await self._think() |
|
return self.rc.todo |
|
|
|
async def act(self): |
|
return await self._act() |
|
|
|
def serialize(self, stg_path: Path = None): |
|
stg_path = SERDESER_PATH.joinpath("software_company") if stg_path is None else stg_path |
|
|
|
team_info_path = stg_path.joinpath("software_company_info.json") |
|
write_json_file(team_info_path, self.model_dump(exclude={"company": True})) |
|
|
|
self.company.serialize(stg_path.joinpath("company")) |
|
|
|
@classmethod |
|
def deserialize(cls, stg_path: Path) -> "Team": |
|
"""stg_path = ./storage/team""" |
|
|
|
software_company_info_path = stg_path.joinpath("software_company_info.json") |
|
if not software_company_info_path.exists(): |
|
raise FileNotFoundError( |
|
"recover storage meta file `team_info.json` not exist, " |
|
"not to recover and please start a new project." |
|
) |
|
|
|
software_company_info: dict = read_json_file(software_company_info_path) |
|
|
|
|
|
company = Team.deserialize(stg_path=stg_path.joinpath("company")) |
|
software_company_info.update({"company": company}) |
|
|
|
return cls(**software_company_info) |
|
|
|
|
|
async def upload_file_to_s3(filepath: str, key: str): |
|
async with aiofiles.open(filepath, "rb") as f: |
|
content = await f.read() |
|
return await get_download_url(content, key) |
|
|
|
|
|
async def get_download_url(content: bytes, key: str) -> str: |
|
if CONFIG.get("STORAGE_TYPE") == "S3": |
|
session = get_session() |
|
async with session.create_client( |
|
"s3", |
|
aws_secret_access_key=CONFIG.get("S3_SECRET_KEY"), |
|
aws_access_key_id=CONFIG.get("S3_ACCESS_KEY"), |
|
endpoint_url=CONFIG.get("S3_ENDPOINT_URL"), |
|
use_ssl=CONFIG.get("S3_SECURE"), |
|
) as client: |
|
|
|
bucket = CONFIG.get("S3_BUCKET") |
|
await client.put_object(Bucket=bucket, Key=key, Body=content) |
|
return f"{CONFIG.get('S3_ENDPOINT_URL')}/{bucket}/{key}" |
|
else: |
|
storage = CONFIG.get("LOCAL_ROOT", "storage") |
|
base_url = CONFIG.get("LOCAL_BASE_URL", "storage") |
|
filepath = Path(storage) / key |
|
filepath.parent.mkdir(exist_ok=True, parents=True) |
|
async with aiofiles.open(filepath, "wb") as f: |
|
await f.write(content) |
|
return f"{base_url}/{key}" |
|
|
|
|
|
async def main(idea, **kwargs): |
|
sc = SoftwareCompany(**kwargs) |
|
sc.recv(Message(idea)) |
|
while await sc.think(): |
|
print(await sc.act()) |
|
|
|
|
|
if __name__ == "__main__": |
|
asyncio.run(main()) |
|
|