|
from core.ops.ops_trace_manager import OpsTraceManager, provider_config_map |
|
from extensions.ext_database import db |
|
from models.model import App, TraceAppConfig |
|
|
|
|
|
class OpsService: |
|
@classmethod |
|
def get_tracing_app_config(cls, app_id: str, tracing_provider: str): |
|
""" |
|
Get tracing app config |
|
:param app_id: app id |
|
:param tracing_provider: tracing provider |
|
:return: |
|
""" |
|
trace_config_data: TraceAppConfig = ( |
|
db.session.query(TraceAppConfig) |
|
.filter(TraceAppConfig.app_id == app_id, TraceAppConfig.tracing_provider == tracing_provider) |
|
.first() |
|
) |
|
|
|
if not trace_config_data: |
|
return None |
|
|
|
|
|
tenant_id = db.session.query(App).filter(App.id == app_id).first().tenant_id |
|
decrypt_tracing_config = OpsTraceManager.decrypt_tracing_config( |
|
tenant_id, tracing_provider, trace_config_data.tracing_config |
|
) |
|
new_decrypt_tracing_config = OpsTraceManager.obfuscated_decrypt_token(tracing_provider, decrypt_tracing_config) |
|
|
|
if tracing_provider == "langfuse" and ( |
|
"project_key" not in decrypt_tracing_config or not decrypt_tracing_config.get("project_key") |
|
): |
|
try: |
|
project_key = OpsTraceManager.get_trace_config_project_key(decrypt_tracing_config, tracing_provider) |
|
new_decrypt_tracing_config.update( |
|
{ |
|
"project_url": "{host}/project/{key}".format( |
|
host=decrypt_tracing_config.get("host"), key=project_key |
|
) |
|
} |
|
) |
|
except Exception: |
|
new_decrypt_tracing_config.update( |
|
{"project_url": "{host}/".format(host=decrypt_tracing_config.get("host"))} |
|
) |
|
|
|
if tracing_provider == "langsmith" and ( |
|
"project_url" not in decrypt_tracing_config or not decrypt_tracing_config.get("project_url") |
|
): |
|
try: |
|
project_url = OpsTraceManager.get_trace_config_project_url(decrypt_tracing_config, tracing_provider) |
|
new_decrypt_tracing_config.update({"project_url": project_url}) |
|
except Exception: |
|
new_decrypt_tracing_config.update({"project_url": "https://smith.langchain.com/"}) |
|
|
|
trace_config_data.tracing_config = new_decrypt_tracing_config |
|
return trace_config_data.to_dict() |
|
|
|
@classmethod |
|
def create_tracing_app_config(cls, app_id: str, tracing_provider: str, tracing_config: dict): |
|
""" |
|
Create tracing app config |
|
:param app_id: app id |
|
:param tracing_provider: tracing provider |
|
:param tracing_config: tracing config |
|
:return: |
|
""" |
|
if tracing_provider not in provider_config_map and tracing_provider: |
|
return {"error": f"Invalid tracing provider: {tracing_provider}"} |
|
|
|
config_class, other_keys = ( |
|
provider_config_map[tracing_provider]["config_class"], |
|
provider_config_map[tracing_provider]["other_keys"], |
|
) |
|
default_config_instance = config_class(**tracing_config) |
|
for key in other_keys: |
|
if key in tracing_config and tracing_config[key] == "": |
|
tracing_config[key] = getattr(default_config_instance, key, None) |
|
|
|
|
|
if not OpsTraceManager.check_trace_config_is_effective(tracing_config, tracing_provider): |
|
return {"error": "Invalid Credentials"} |
|
|
|
|
|
if tracing_provider == "langfuse": |
|
project_key = OpsTraceManager.get_trace_config_project_key(tracing_config, tracing_provider) |
|
project_url = "{host}/project/{key}".format(host=tracing_config.get("host"), key=project_key) |
|
elif tracing_provider == "langsmith": |
|
project_url = OpsTraceManager.get_trace_config_project_url(tracing_config, tracing_provider) |
|
else: |
|
project_url = None |
|
|
|
|
|
trace_config_data: TraceAppConfig = ( |
|
db.session.query(TraceAppConfig) |
|
.filter(TraceAppConfig.app_id == app_id, TraceAppConfig.tracing_provider == tracing_provider) |
|
.first() |
|
) |
|
|
|
if trace_config_data: |
|
return None |
|
|
|
|
|
tenant_id = db.session.query(App).filter(App.id == app_id).first().tenant_id |
|
tracing_config = OpsTraceManager.encrypt_tracing_config(tenant_id, tracing_provider, tracing_config) |
|
if project_url: |
|
tracing_config["project_url"] = project_url |
|
trace_config_data = TraceAppConfig( |
|
app_id=app_id, |
|
tracing_provider=tracing_provider, |
|
tracing_config=tracing_config, |
|
) |
|
db.session.add(trace_config_data) |
|
db.session.commit() |
|
|
|
return {"result": "success"} |
|
|
|
@classmethod |
|
def update_tracing_app_config(cls, app_id: str, tracing_provider: str, tracing_config: dict): |
|
""" |
|
Update tracing app config |
|
:param app_id: app id |
|
:param tracing_provider: tracing provider |
|
:param tracing_config: tracing config |
|
:return: |
|
""" |
|
if tracing_provider not in provider_config_map: |
|
raise ValueError(f"Invalid tracing provider: {tracing_provider}") |
|
|
|
|
|
current_trace_config = ( |
|
db.session.query(TraceAppConfig) |
|
.filter(TraceAppConfig.app_id == app_id, TraceAppConfig.tracing_provider == tracing_provider) |
|
.first() |
|
) |
|
|
|
if not current_trace_config: |
|
return None |
|
|
|
|
|
tenant_id = db.session.query(App).filter(App.id == app_id).first().tenant_id |
|
tracing_config = OpsTraceManager.encrypt_tracing_config( |
|
tenant_id, tracing_provider, tracing_config, current_trace_config.tracing_config |
|
) |
|
|
|
|
|
|
|
decrypt_tracing_config = OpsTraceManager.decrypt_tracing_config(tenant_id, tracing_provider, tracing_config) |
|
if not OpsTraceManager.check_trace_config_is_effective(decrypt_tracing_config, tracing_provider): |
|
raise ValueError("Invalid Credentials") |
|
|
|
current_trace_config.tracing_config = tracing_config |
|
db.session.commit() |
|
|
|
return current_trace_config.to_dict() |
|
|
|
@classmethod |
|
def delete_tracing_app_config(cls, app_id: str, tracing_provider: str): |
|
""" |
|
Delete tracing app config |
|
:param app_id: app id |
|
:param tracing_provider: tracing provider |
|
:return: |
|
""" |
|
trace_config = ( |
|
db.session.query(TraceAppConfig) |
|
.filter(TraceAppConfig.app_id == app_id, TraceAppConfig.tracing_provider == tracing_provider) |
|
.first() |
|
) |
|
|
|
if not trace_config: |
|
return None |
|
|
|
db.session.delete(trace_config) |
|
db.session.commit() |
|
|
|
return True |
|
|