|
import json |
|
from typing import Optional |
|
|
|
from core.app.app_config.entities import ( |
|
DatasetEntity, |
|
DatasetRetrieveConfigEntity, |
|
EasyUIBasedAppConfig, |
|
ExternalDataVariableEntity, |
|
ModelConfigEntity, |
|
PromptTemplateEntity, |
|
VariableEntity, |
|
) |
|
from core.app.apps.agent_chat.app_config_manager import AgentChatAppConfigManager |
|
from core.app.apps.chat.app_config_manager import ChatAppConfigManager |
|
from core.app.apps.completion.app_config_manager import CompletionAppConfigManager |
|
from core.file.models import FileExtraConfig |
|
from core.helper import encrypter |
|
from core.model_runtime.entities.llm_entities import LLMMode |
|
from core.model_runtime.utils.encoders import jsonable_encoder |
|
from core.prompt.simple_prompt_transform import SimplePromptTransform |
|
from core.workflow.nodes import NodeType |
|
from events.app_event import app_was_created |
|
from extensions.ext_database import db |
|
from models.account import Account |
|
from models.api_based_extension import APIBasedExtension, APIBasedExtensionPoint |
|
from models.model import App, AppMode, AppModelConfig |
|
from models.workflow import Workflow, WorkflowType |
|
|
|
|
|
class WorkflowConverter: |
|
""" |
|
App Convert to Workflow Mode |
|
""" |
|
|
|
def convert_to_workflow( |
|
self, app_model: App, account: Account, name: str, icon_type: str, icon: str, icon_background: str |
|
): |
|
""" |
|
Convert app to workflow |
|
|
|
- basic mode of chatbot app |
|
|
|
- expert mode of chatbot app |
|
|
|
- completion app |
|
|
|
:param app_model: App instance |
|
:param account: Account |
|
:param name: new app name |
|
:param icon: new app icon |
|
:param icon_type: new app icon type |
|
:param icon_background: new app icon background |
|
:return: new App instance |
|
""" |
|
|
|
if not app_model.app_model_config: |
|
raise ValueError("App model config is required") |
|
|
|
workflow = self.convert_app_model_config_to_workflow( |
|
app_model=app_model, app_model_config=app_model.app_model_config, account_id=account.id |
|
) |
|
|
|
|
|
new_app = App() |
|
new_app.tenant_id = app_model.tenant_id |
|
new_app.name = name or app_model.name + "(workflow)" |
|
new_app.mode = AppMode.ADVANCED_CHAT.value if app_model.mode == AppMode.CHAT.value else AppMode.WORKFLOW.value |
|
new_app.icon_type = icon_type or app_model.icon_type |
|
new_app.icon = icon or app_model.icon |
|
new_app.icon_background = icon_background or app_model.icon_background |
|
new_app.enable_site = app_model.enable_site |
|
new_app.enable_api = app_model.enable_api |
|
new_app.api_rpm = app_model.api_rpm |
|
new_app.api_rph = app_model.api_rph |
|
new_app.is_demo = False |
|
new_app.is_public = app_model.is_public |
|
new_app.created_by = account.id |
|
new_app.updated_by = account.id |
|
db.session.add(new_app) |
|
db.session.flush() |
|
db.session.commit() |
|
|
|
workflow.app_id = new_app.id |
|
db.session.commit() |
|
|
|
app_was_created.send(new_app, account=account) |
|
|
|
return new_app |
|
|
|
def convert_app_model_config_to_workflow(self, app_model: App, app_model_config: AppModelConfig, account_id: str): |
|
""" |
|
Convert app model config to workflow mode |
|
:param app_model: App instance |
|
:param app_model_config: AppModelConfig instance |
|
:param account_id: Account ID |
|
""" |
|
|
|
new_app_mode = self._get_new_app_mode(app_model) |
|
|
|
|
|
app_config = self._convert_to_app_config(app_model=app_model, app_model_config=app_model_config) |
|
|
|
|
|
graph = {"nodes": [], "edges": []} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
start_node = self._convert_to_start_node(variables=app_config.variables) |
|
|
|
graph["nodes"].append(start_node) |
|
|
|
|
|
external_data_variable_node_mapping = {} |
|
if app_config.external_data_variables: |
|
http_request_nodes, external_data_variable_node_mapping = self._convert_to_http_request_node( |
|
app_model=app_model, |
|
variables=app_config.variables, |
|
external_data_variables=app_config.external_data_variables, |
|
) |
|
|
|
for http_request_node in http_request_nodes: |
|
graph = self._append_node(graph, http_request_node) |
|
|
|
|
|
if app_config.dataset: |
|
knowledge_retrieval_node = self._convert_to_knowledge_retrieval_node( |
|
new_app_mode=new_app_mode, dataset_config=app_config.dataset, model_config=app_config.model |
|
) |
|
|
|
if knowledge_retrieval_node: |
|
graph = self._append_node(graph, knowledge_retrieval_node) |
|
|
|
|
|
llm_node = self._convert_to_llm_node( |
|
original_app_mode=AppMode.value_of(app_model.mode), |
|
new_app_mode=new_app_mode, |
|
graph=graph, |
|
model_config=app_config.model, |
|
prompt_template=app_config.prompt_template, |
|
file_upload=app_config.additional_features.file_upload, |
|
external_data_variable_node_mapping=external_data_variable_node_mapping, |
|
) |
|
|
|
graph = self._append_node(graph, llm_node) |
|
|
|
if new_app_mode == AppMode.WORKFLOW: |
|
|
|
end_node = self._convert_to_end_node() |
|
graph = self._append_node(graph, end_node) |
|
else: |
|
answer_node = self._convert_to_answer_node() |
|
graph = self._append_node(graph, answer_node) |
|
|
|
app_model_config_dict = app_config.app_model_config_dict |
|
|
|
|
|
if new_app_mode == AppMode.ADVANCED_CHAT: |
|
features = { |
|
"opening_statement": app_model_config_dict.get("opening_statement"), |
|
"suggested_questions": app_model_config_dict.get("suggested_questions"), |
|
"suggested_questions_after_answer": app_model_config_dict.get("suggested_questions_after_answer"), |
|
"speech_to_text": app_model_config_dict.get("speech_to_text"), |
|
"text_to_speech": app_model_config_dict.get("text_to_speech"), |
|
"file_upload": app_model_config_dict.get("file_upload"), |
|
"sensitive_word_avoidance": app_model_config_dict.get("sensitive_word_avoidance"), |
|
"retriever_resource": app_model_config_dict.get("retriever_resource"), |
|
} |
|
else: |
|
features = { |
|
"text_to_speech": app_model_config_dict.get("text_to_speech"), |
|
"file_upload": app_model_config_dict.get("file_upload"), |
|
"sensitive_word_avoidance": app_model_config_dict.get("sensitive_word_avoidance"), |
|
} |
|
|
|
|
|
workflow = Workflow( |
|
tenant_id=app_model.tenant_id, |
|
app_id=app_model.id, |
|
type=WorkflowType.from_app_mode(new_app_mode).value, |
|
version="draft", |
|
graph=json.dumps(graph), |
|
features=json.dumps(features), |
|
created_by=account_id, |
|
environment_variables=[], |
|
conversation_variables=[], |
|
) |
|
|
|
db.session.add(workflow) |
|
db.session.commit() |
|
|
|
return workflow |
|
|
|
def _convert_to_app_config(self, app_model: App, app_model_config: AppModelConfig) -> EasyUIBasedAppConfig: |
|
app_mode = AppMode.value_of(app_model.mode) |
|
if app_mode == AppMode.AGENT_CHAT or app_model.is_agent: |
|
app_model.mode = AppMode.AGENT_CHAT.value |
|
app_config = AgentChatAppConfigManager.get_app_config( |
|
app_model=app_model, app_model_config=app_model_config |
|
) |
|
elif app_mode == AppMode.CHAT: |
|
app_config = ChatAppConfigManager.get_app_config(app_model=app_model, app_model_config=app_model_config) |
|
elif app_mode == AppMode.COMPLETION: |
|
app_config = CompletionAppConfigManager.get_app_config( |
|
app_model=app_model, app_model_config=app_model_config |
|
) |
|
else: |
|
raise ValueError("Invalid app mode") |
|
|
|
return app_config |
|
|
|
def _convert_to_start_node(self, variables: list[VariableEntity]) -> dict: |
|
""" |
|
Convert to Start Node |
|
:param variables: list of variables |
|
:return: |
|
""" |
|
return { |
|
"id": "start", |
|
"position": None, |
|
"data": { |
|
"title": "START", |
|
"type": NodeType.START.value, |
|
"variables": [jsonable_encoder(v) for v in variables], |
|
}, |
|
} |
|
|
|
def _convert_to_http_request_node( |
|
self, app_model: App, variables: list[VariableEntity], external_data_variables: list[ExternalDataVariableEntity] |
|
) -> tuple[list[dict], dict[str, str]]: |
|
""" |
|
Convert API Based Extension to HTTP Request Node |
|
:param app_model: App instance |
|
:param variables: list of variables |
|
:param external_data_variables: list of external data variables |
|
:return: |
|
""" |
|
index = 1 |
|
nodes = [] |
|
external_data_variable_node_mapping = {} |
|
tenant_id = app_model.tenant_id |
|
for external_data_variable in external_data_variables: |
|
tool_type = external_data_variable.type |
|
if tool_type != "api": |
|
continue |
|
|
|
tool_variable = external_data_variable.variable |
|
tool_config = external_data_variable.config |
|
|
|
|
|
api_based_extension_id = tool_config.get("api_based_extension_id") |
|
if not api_based_extension_id: |
|
continue |
|
|
|
|
|
api_based_extension = self._get_api_based_extension( |
|
tenant_id=tenant_id, api_based_extension_id=api_based_extension_id |
|
) |
|
|
|
|
|
api_key = encrypter.decrypt_token(tenant_id=tenant_id, token=api_based_extension.api_key) |
|
|
|
inputs = {} |
|
for v in variables: |
|
inputs[v.variable] = "{{#start." + v.variable + "#}}" |
|
|
|
request_body = { |
|
"point": APIBasedExtensionPoint.APP_EXTERNAL_DATA_TOOL_QUERY.value, |
|
"params": { |
|
"app_id": app_model.id, |
|
"tool_variable": tool_variable, |
|
"inputs": inputs, |
|
"query": "{{#sys.query#}}" if app_model.mode == AppMode.CHAT.value else "", |
|
}, |
|
} |
|
|
|
request_body_json = json.dumps(request_body) |
|
request_body_json = request_body_json.replace(r"\{\{", "{{").replace(r"\}\}", "}}") |
|
|
|
http_request_node = { |
|
"id": f"http_request_{index}", |
|
"position": None, |
|
"data": { |
|
"title": f"HTTP REQUEST {api_based_extension.name}", |
|
"type": NodeType.HTTP_REQUEST.value, |
|
"method": "post", |
|
"url": api_based_extension.api_endpoint, |
|
"authorization": {"type": "api-key", "config": {"type": "bearer", "api_key": api_key}}, |
|
"headers": "", |
|
"params": "", |
|
"body": {"type": "json", "data": request_body_json}, |
|
}, |
|
} |
|
|
|
nodes.append(http_request_node) |
|
|
|
|
|
code_node = { |
|
"id": f"code_{index}", |
|
"position": None, |
|
"data": { |
|
"title": f"Parse {api_based_extension.name} Response", |
|
"type": NodeType.CODE.value, |
|
"variables": [{"variable": "response_json", "value_selector": [http_request_node["id"], "body"]}], |
|
"code_language": "python3", |
|
"code": "import json\n\ndef main(response_json: str) -> str:\n response_body = json.loads(" |
|
'response_json)\n return {\n "result": response_body["result"]\n }', |
|
"outputs": {"result": {"type": "string"}}, |
|
}, |
|
} |
|
|
|
nodes.append(code_node) |
|
|
|
external_data_variable_node_mapping[external_data_variable.variable] = code_node["id"] |
|
index += 1 |
|
|
|
return nodes, external_data_variable_node_mapping |
|
|
|
def _convert_to_knowledge_retrieval_node( |
|
self, new_app_mode: AppMode, dataset_config: DatasetEntity, model_config: ModelConfigEntity |
|
) -> Optional[dict]: |
|
""" |
|
Convert datasets to Knowledge Retrieval Node |
|
:param new_app_mode: new app mode |
|
:param dataset_config: dataset |
|
:param model_config: model config |
|
:return: |
|
""" |
|
retrieve_config = dataset_config.retrieve_config |
|
if new_app_mode == AppMode.ADVANCED_CHAT: |
|
query_variable_selector = ["sys", "query"] |
|
elif retrieve_config.query_variable: |
|
|
|
query_variable_selector = ["start", retrieve_config.query_variable] |
|
else: |
|
return None |
|
|
|
return { |
|
"id": "knowledge_retrieval", |
|
"position": None, |
|
"data": { |
|
"title": "KNOWLEDGE RETRIEVAL", |
|
"type": NodeType.KNOWLEDGE_RETRIEVAL.value, |
|
"query_variable_selector": query_variable_selector, |
|
"dataset_ids": dataset_config.dataset_ids, |
|
"retrieval_mode": retrieve_config.retrieve_strategy.value, |
|
"single_retrieval_config": { |
|
"model": { |
|
"provider": model_config.provider, |
|
"name": model_config.model, |
|
"mode": model_config.mode, |
|
"completion_params": { |
|
**model_config.parameters, |
|
"stop": model_config.stop, |
|
}, |
|
} |
|
} |
|
if retrieve_config.retrieve_strategy == DatasetRetrieveConfigEntity.RetrieveStrategy.SINGLE |
|
else None, |
|
"multiple_retrieval_config": { |
|
"top_k": retrieve_config.top_k, |
|
"score_threshold": retrieve_config.score_threshold, |
|
"reranking_model": retrieve_config.reranking_model, |
|
} |
|
if retrieve_config.retrieve_strategy == DatasetRetrieveConfigEntity.RetrieveStrategy.MULTIPLE |
|
else None, |
|
}, |
|
} |
|
|
|
def _convert_to_llm_node( |
|
self, |
|
original_app_mode: AppMode, |
|
new_app_mode: AppMode, |
|
graph: dict, |
|
model_config: ModelConfigEntity, |
|
prompt_template: PromptTemplateEntity, |
|
file_upload: Optional[FileExtraConfig] = None, |
|
external_data_variable_node_mapping: dict[str, str] | None = None, |
|
) -> dict: |
|
""" |
|
Convert to LLM Node |
|
:param original_app_mode: original app mode |
|
:param new_app_mode: new app mode |
|
:param graph: graph |
|
:param model_config: model config |
|
:param prompt_template: prompt template |
|
:param file_upload: file upload config (optional) |
|
:param external_data_variable_node_mapping: external data variable node mapping |
|
""" |
|
|
|
start_node = next(filter(lambda n: n["data"]["type"] == NodeType.START.value, graph["nodes"])) |
|
knowledge_retrieval_node = next( |
|
filter(lambda n: n["data"]["type"] == NodeType.KNOWLEDGE_RETRIEVAL.value, graph["nodes"]), None |
|
) |
|
|
|
role_prefix = None |
|
|
|
|
|
if model_config.mode == LLMMode.CHAT.value: |
|
if prompt_template.prompt_type == PromptTemplateEntity.PromptType.SIMPLE: |
|
if not prompt_template.simple_prompt_template: |
|
raise ValueError("Simple prompt template is required") |
|
|
|
prompt_transform = SimplePromptTransform() |
|
prompt_template_config = prompt_transform.get_prompt_template( |
|
app_mode=original_app_mode, |
|
provider=model_config.provider, |
|
model=model_config.model, |
|
pre_prompt=prompt_template.simple_prompt_template, |
|
has_context=knowledge_retrieval_node is not None, |
|
query_in_prompt=False, |
|
) |
|
|
|
template = prompt_template_config["prompt_template"].template |
|
if not template: |
|
prompts = [] |
|
else: |
|
template = self._replace_template_variables( |
|
template, start_node["data"]["variables"], external_data_variable_node_mapping |
|
) |
|
|
|
prompts = [{"role": "user", "text": template}] |
|
else: |
|
advanced_chat_prompt_template = prompt_template.advanced_chat_prompt_template |
|
|
|
prompts = [] |
|
if advanced_chat_prompt_template: |
|
for m in advanced_chat_prompt_template.messages: |
|
text = m.text |
|
text = self._replace_template_variables( |
|
text, start_node["data"]["variables"], external_data_variable_node_mapping |
|
) |
|
|
|
prompts.append({"role": m.role.value, "text": text}) |
|
|
|
else: |
|
if prompt_template.prompt_type == PromptTemplateEntity.PromptType.SIMPLE: |
|
if not prompt_template.simple_prompt_template: |
|
raise ValueError("Simple prompt template is required") |
|
|
|
prompt_transform = SimplePromptTransform() |
|
prompt_template_config = prompt_transform.get_prompt_template( |
|
app_mode=original_app_mode, |
|
provider=model_config.provider, |
|
model=model_config.model, |
|
pre_prompt=prompt_template.simple_prompt_template, |
|
has_context=knowledge_retrieval_node is not None, |
|
query_in_prompt=False, |
|
) |
|
|
|
template = prompt_template_config["prompt_template"].template |
|
template = self._replace_template_variables( |
|
template=template, |
|
variables=start_node["data"]["variables"], |
|
external_data_variable_node_mapping=external_data_variable_node_mapping, |
|
) |
|
|
|
prompts = {"text": template} |
|
|
|
prompt_rules = prompt_template_config["prompt_rules"] |
|
role_prefix = { |
|
"user": prompt_rules.get("human_prefix", "Human"), |
|
"assistant": prompt_rules.get("assistant_prefix", "Assistant"), |
|
} |
|
else: |
|
advanced_completion_prompt_template = prompt_template.advanced_completion_prompt_template |
|
if advanced_completion_prompt_template: |
|
text = advanced_completion_prompt_template.prompt |
|
text = self._replace_template_variables( |
|
template=text, |
|
variables=start_node["data"]["variables"], |
|
external_data_variable_node_mapping=external_data_variable_node_mapping, |
|
) |
|
else: |
|
text = "" |
|
|
|
text = text.replace("{{#query#}}", "{{#sys.query#}}") |
|
|
|
prompts = { |
|
"text": text, |
|
} |
|
|
|
if advanced_completion_prompt_template and advanced_completion_prompt_template.role_prefix: |
|
role_prefix = { |
|
"user": advanced_completion_prompt_template.role_prefix.user, |
|
"assistant": advanced_completion_prompt_template.role_prefix.assistant, |
|
} |
|
|
|
memory = None |
|
if new_app_mode == AppMode.ADVANCED_CHAT: |
|
memory = {"role_prefix": role_prefix, "window": {"enabled": False}} |
|
|
|
completion_params = model_config.parameters |
|
completion_params.update({"stop": model_config.stop}) |
|
return { |
|
"id": "llm", |
|
"position": None, |
|
"data": { |
|
"title": "LLM", |
|
"type": NodeType.LLM.value, |
|
"model": { |
|
"provider": model_config.provider, |
|
"name": model_config.model, |
|
"mode": model_config.mode, |
|
"completion_params": completion_params, |
|
}, |
|
"prompt_template": prompts, |
|
"memory": memory, |
|
"context": { |
|
"enabled": knowledge_retrieval_node is not None, |
|
"variable_selector": ["knowledge_retrieval", "result"] |
|
if knowledge_retrieval_node is not None |
|
else None, |
|
}, |
|
"vision": { |
|
"enabled": file_upload is not None, |
|
"variable_selector": ["sys", "files"] if file_upload is not None else None, |
|
"configs": {"detail": file_upload.image_config.detail} |
|
if file_upload is not None and file_upload.image_config is not None |
|
else None, |
|
}, |
|
}, |
|
} |
|
|
|
def _replace_template_variables( |
|
self, template: str, variables: list[dict], external_data_variable_node_mapping: dict[str, str] | None = None |
|
) -> str: |
|
""" |
|
Replace Template Variables |
|
:param template: template |
|
:param variables: list of variables |
|
:param external_data_variable_node_mapping: external data variable node mapping |
|
:return: |
|
""" |
|
for v in variables: |
|
template = template.replace("{{" + v["variable"] + "}}", "{{#start." + v["variable"] + "#}}") |
|
|
|
if external_data_variable_node_mapping: |
|
for variable, code_node_id in external_data_variable_node_mapping.items(): |
|
template = template.replace("{{" + variable + "}}", "{{#" + code_node_id + ".result#}}") |
|
|
|
return template |
|
|
|
def _convert_to_end_node(self) -> dict: |
|
""" |
|
Convert to End Node |
|
:return: |
|
""" |
|
|
|
return { |
|
"id": "end", |
|
"position": None, |
|
"data": { |
|
"title": "END", |
|
"type": NodeType.END.value, |
|
"outputs": [{"variable": "result", "value_selector": ["llm", "text"]}], |
|
}, |
|
} |
|
|
|
def _convert_to_answer_node(self) -> dict: |
|
""" |
|
Convert to Answer Node |
|
:return: |
|
""" |
|
|
|
return { |
|
"id": "answer", |
|
"position": None, |
|
"data": {"title": "ANSWER", "type": NodeType.ANSWER.value, "answer": "{{#llm.text#}}"}, |
|
} |
|
|
|
def _create_edge(self, source: str, target: str) -> dict: |
|
""" |
|
Create Edge |
|
:param source: source node id |
|
:param target: target node id |
|
:return: |
|
""" |
|
return {"id": f"{source}-{target}", "source": source, "target": target} |
|
|
|
def _append_node(self, graph: dict, node: dict) -> dict: |
|
""" |
|
Append Node to Graph |
|
|
|
:param graph: Graph, include: nodes, edges |
|
:param node: Node to append |
|
:return: |
|
""" |
|
previous_node = graph["nodes"][-1] |
|
graph["nodes"].append(node) |
|
graph["edges"].append(self._create_edge(previous_node["id"], node["id"])) |
|
return graph |
|
|
|
def _get_new_app_mode(self, app_model: App) -> AppMode: |
|
""" |
|
Get new app mode |
|
:param app_model: App instance |
|
:return: AppMode |
|
""" |
|
if app_model.mode == AppMode.COMPLETION.value: |
|
return AppMode.WORKFLOW |
|
else: |
|
return AppMode.ADVANCED_CHAT |
|
|
|
def _get_api_based_extension(self, tenant_id: str, api_based_extension_id: str): |
|
""" |
|
Get API Based Extension |
|
:param tenant_id: tenant id |
|
:param api_based_extension_id: api based extension id |
|
:return: |
|
""" |
|
api_based_extension = ( |
|
db.session.query(APIBasedExtension) |
|
.filter(APIBasedExtension.tenant_id == tenant_id, APIBasedExtension.id == api_based_extension_id) |
|
.first() |
|
) |
|
|
|
if not api_based_extension: |
|
raise ValueError(f"API Based Extension not found, id: {api_based_extension_id}") |
|
|
|
return api_based_extension |
|
|