|
|
|
|
|
""" |
|
@Time : 2023/4/29 16:07 |
|
@Author : alexanderwu |
|
@File : common.py |
|
@Modified By: mashenquan, 2023-8-17, add `initalize_enviroment()` to load `config/config.yaml` to `os.environ` |
|
""" |
|
import ast |
|
import contextlib |
|
import inspect |
|
import os |
|
import re |
|
from pathlib import Path |
|
from typing import List, Tuple |
|
|
|
import yaml |
|
|
|
from metagpt.logs import logger |
|
|
|
|
|
def check_cmd_exists(command) -> int: |
|
""" 检查命令是否存在 |
|
:param command: 待检查的命令 |
|
:return: 如果命令存在,返回0,如果不存在,返回非0 |
|
""" |
|
check_command = 'command -v ' + command + ' >/dev/null 2>&1 || { echo >&2 "no mermaid"; exit 1; }' |
|
result = os.system(check_command) |
|
return result |
|
|
|
|
|
class OutputParser: |
|
|
|
@classmethod |
|
def parse_blocks(cls, text: str): |
|
|
|
blocks = text.split("##") |
|
|
|
|
|
block_dict = {} |
|
|
|
|
|
for block in blocks: |
|
|
|
if block.strip() != "": |
|
|
|
block_title, block_content = block.split("\n", 1) |
|
|
|
if block_title[-1] == ":": |
|
block_title = block_title[:-1] |
|
block_dict[block_title.strip()] = block_content.strip() |
|
|
|
return block_dict |
|
|
|
@classmethod |
|
def parse_code(cls, text: str, lang: str = "") -> str: |
|
pattern = rf'```{lang}.*?\s+(.*?)```' |
|
match = re.search(pattern, text, re.DOTALL) |
|
if match: |
|
code = match.group(1) |
|
else: |
|
raise Exception |
|
return code |
|
|
|
@classmethod |
|
def parse_str(cls, text: str): |
|
text = text.split("=")[-1] |
|
text = text.strip().strip("'").strip("\"") |
|
return text |
|
|
|
@classmethod |
|
def parse_file_list(cls, text: str) -> list[str]: |
|
|
|
pattern = r'\s*(.*=.*)?(\[.*\])' |
|
|
|
|
|
match = re.search(pattern, text, re.DOTALL) |
|
if match: |
|
tasks_list_str = match.group(2) |
|
|
|
|
|
tasks = ast.literal_eval(tasks_list_str) |
|
else: |
|
tasks = text.split("\n") |
|
return tasks |
|
|
|
@staticmethod |
|
def parse_python_code(text: str) -> str: |
|
for pattern in ( |
|
r'(.*?```python.*?\s+)?(?P<code>.*)(```.*?)', |
|
r'(.*?```python.*?\s+)?(?P<code>.*)', |
|
): |
|
match = re.search(pattern, text, re.DOTALL) |
|
if not match: |
|
continue |
|
code = match.group("code") |
|
if not code: |
|
continue |
|
with contextlib.suppress(Exception): |
|
ast.parse(code) |
|
return code |
|
raise ValueError("Invalid python code") |
|
|
|
@classmethod |
|
def parse_data(cls, data): |
|
block_dict = cls.parse_blocks(data) |
|
parsed_data = {} |
|
for block, content in block_dict.items(): |
|
|
|
try: |
|
content = cls.parse_code(text=content) |
|
except Exception: |
|
pass |
|
|
|
|
|
try: |
|
content = cls.parse_file_list(text=content) |
|
except Exception: |
|
pass |
|
parsed_data[block] = content |
|
return parsed_data |
|
|
|
@classmethod |
|
def parse_data_with_mapping(cls, data, mapping): |
|
block_dict = cls.parse_blocks(data) |
|
parsed_data = {} |
|
for block, content in block_dict.items(): |
|
|
|
try: |
|
content = cls.parse_code(text=content) |
|
except Exception: |
|
pass |
|
typing_define = mapping.get(block, None) |
|
if isinstance(typing_define, tuple): |
|
typing = typing_define[0] |
|
else: |
|
typing = typing_define |
|
if typing == List[str] or typing == List[Tuple[str, str]]: |
|
|
|
try: |
|
content = cls.parse_file_list(text=content) |
|
except Exception: |
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
parsed_data[block] = content |
|
return parsed_data |
|
|
|
|
|
class CodeParser: |
|
|
|
@classmethod |
|
def parse_block(cls, block: str, text: str) -> str: |
|
blocks = cls.parse_blocks(text) |
|
for k, v in blocks.items(): |
|
if block in k: |
|
return v |
|
return "" |
|
|
|
@classmethod |
|
def parse_blocks(cls, text: str): |
|
|
|
blocks = text.split("##") |
|
|
|
|
|
block_dict = {} |
|
|
|
|
|
for block in blocks: |
|
|
|
if block.strip() != "": |
|
|
|
block_title, block_content = block.split("\n", 1) |
|
block_dict[block_title.strip()] = block_content.strip() |
|
|
|
return block_dict |
|
|
|
@classmethod |
|
def parse_code(cls, block: str, text: str, lang: str = "") -> str: |
|
if block: |
|
text = cls.parse_block(block, text) |
|
pattern = rf'```{lang}.*?\s+(.*?)```' |
|
match = re.search(pattern, text, re.DOTALL) |
|
if match: |
|
code = match.group(1) |
|
else: |
|
logger.error(f"{pattern} not match following text:") |
|
logger.error(text) |
|
raise Exception |
|
return code |
|
|
|
@classmethod |
|
def parse_str(cls, block: str, text: str, lang: str = ""): |
|
code = cls.parse_code(block, text, lang) |
|
code = code.split("=")[-1] |
|
code = code.strip().strip("'").strip("\"") |
|
return code |
|
|
|
@classmethod |
|
def parse_file_list(cls, block: str, text: str, lang: str = "") -> list[str]: |
|
|
|
code = cls.parse_code(block, text, lang) |
|
|
|
pattern = r'\s*(.*=.*)?(\[.*\])' |
|
|
|
|
|
match = re.search(pattern, code, re.DOTALL) |
|
if match: |
|
tasks_list_str = match.group(2) |
|
|
|
|
|
tasks = ast.literal_eval(tasks_list_str) |
|
else: |
|
raise Exception |
|
return tasks |
|
|
|
|
|
class NoMoneyException(Exception): |
|
"""Raised when the operation cannot be completed due to insufficient funds""" |
|
|
|
def __init__(self, amount, message="Insufficient funds"): |
|
self.amount = amount |
|
self.message = message |
|
super().__init__(self.message) |
|
|
|
def __str__(self): |
|
return f'{self.message} -> Amount required: {self.amount}' |
|
|
|
|
|
def print_members(module, indent=0): |
|
""" |
|
https://stackoverflow.com/questions/1796180/how-can-i-get-a-list-of-all-classes-within-current-module-in-python |
|
:param module: |
|
:param indent: |
|
:return: |
|
""" |
|
prefix = ' ' * indent |
|
for name, obj in inspect.getmembers(module): |
|
print(name, obj) |
|
if inspect.isclass(obj): |
|
print(f'{prefix}Class: {name}') |
|
|
|
if name in ['__class__', '__base__']: |
|
continue |
|
print_members(obj, indent + 2) |
|
elif inspect.isfunction(obj): |
|
print(f'{prefix}Function: {name}') |
|
elif inspect.ismethod(obj): |
|
print(f'{prefix}Method: {name}') |
|
|
|
|
|
def parse_recipient(text): |
|
pattern = r"## Send To:\s*([A-Za-z]+)\s*?" |
|
recipient = re.search(pattern, text) |
|
return recipient.group(1) if recipient else "" |
|
|
|
|