cc-api / json_parser.py
Severian's picture
Upload 81 files
995af0f verified
from logger_config import setup_logger
from typing import Dict, Any, Optional, List, Union
from dataclasses import dataclass, asdict
from enum import Enum
import json
from dify_client_python.dify_client.models.stream import (
StreamEvent,
StreamResponse,
build_chat_stream_response
)
import re
logger = setup_logger()
class EventType(Enum):
AGENT_THOUGHT = "agent_thought"
AGENT_MESSAGE = "agent_message"
MESSAGE_END = "message_end"
PING = "ping"
@dataclass
class ToolCall:
tool_name: str
tool_input: Dict[str, Any]
tool_output: Optional[str]
tool_labels: Dict[str, Dict[str, str]]
@dataclass
class Citation:
dataset_id: str
dataset_name: str
document_id: str
document_name: str
segment_id: str
score: float
content: str
@dataclass
class ProcessedResponse:
event_type: EventType
task_id: str
message_id: str
conversation_id: str
content: str
tool_calls: List[ToolCall]
citations: List[Citation]
metadata: Dict[str, Any]
created_at: int
class EnumEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, Enum):
return obj.value
if hasattr(obj, 'dict'):
return obj.dict()
return super().default(obj)
class SSEParser:
def __init__(self):
self.logger = setup_logger("sse_parser")
def parse_sse_event(self, data: str) -> Optional[Dict]:
"""Parse SSE event data and return cleaned dictionary"""
self.logger.debug("Parsing SSE event")
try:
# Extract the data portion
if "data:" in data:
data = data.split("data:", 1)[1].strip()
# Parse JSON data
parsed_data = json.loads(data)
# Clean tool outputs if present
if "observation" in parsed_data:
try:
observation = parsed_data["observation"]
if observation and isinstance(observation, str):
tool_data = json.loads(observation)
# Extract relevant tool output
for key, value in tool_data.items():
if isinstance(value, str) and "llm_result" in value:
tool_result = json.loads(value)["llm_result"]
parsed_data["observation"] = self.clean_tool_output(tool_result)
except:
pass # Keep original observation if parsing fails
return parsed_data
except json.JSONDecodeError as e:
self.logger.error(f"JSON decode error: {str(e)}")
return None
except Exception as e:
self.logger.error(f"Parse error: {str(e)}")
return None
def clean_tool_output(self, output: str) -> str:
"""Clean tool output by removing markdown and other formatting"""
# Remove markdown code blocks
output = re.sub(r'```.*?```', '', output, flags=re.DOTALL)
# Remove other markdown formatting
output = re.sub(r'[*_`#]', '', output)
# Clean up whitespace
output = re.sub(r'\n{3,}', '\n\n', output.strip())
return output