Spaces:
Running
Running
from logger_config import setup_logger | |
from typing import Dict, Any, Optional, List, Union | |
from dataclasses import dataclass, asdict | |
from enum import Enum | |
import json | |
from dify_client_python.dify_client.models.stream import ( | |
StreamEvent, | |
StreamResponse, | |
build_chat_stream_response | |
) | |
import re | |
logger = setup_logger() | |
class EventType(Enum): | |
AGENT_THOUGHT = "agent_thought" | |
AGENT_MESSAGE = "agent_message" | |
MESSAGE_END = "message_end" | |
PING = "ping" | |
class ToolCall: | |
tool_name: str | |
tool_input: Dict[str, Any] | |
tool_output: Optional[str] | |
tool_labels: Dict[str, Dict[str, str]] | |
class Citation: | |
dataset_id: str | |
dataset_name: str | |
document_id: str | |
document_name: str | |
segment_id: str | |
score: float | |
content: str | |
class ProcessedResponse: | |
event_type: EventType | |
task_id: str | |
message_id: str | |
conversation_id: str | |
content: str | |
tool_calls: List[ToolCall] | |
citations: List[Citation] | |
metadata: Dict[str, Any] | |
created_at: int | |
class EnumEncoder(json.JSONEncoder): | |
def default(self, obj): | |
if isinstance(obj, Enum): | |
return obj.value | |
if hasattr(obj, 'dict'): | |
return obj.dict() | |
return super().default(obj) | |
class SSEParser: | |
def __init__(self): | |
self.logger = setup_logger("sse_parser") | |
def parse_sse_event(self, data: str) -> Optional[Dict]: | |
"""Parse SSE event data and return cleaned dictionary""" | |
self.logger.debug("Parsing SSE event") | |
try: | |
# Extract the data portion | |
if "data:" in data: | |
data = data.split("data:", 1)[1].strip() | |
# Parse JSON data | |
parsed_data = json.loads(data) | |
# Clean tool outputs if present | |
if "observation" in parsed_data: | |
try: | |
observation = parsed_data["observation"] | |
if observation and isinstance(observation, str): | |
tool_data = json.loads(observation) | |
# Extract relevant tool output | |
for key, value in tool_data.items(): | |
if isinstance(value, str) and "llm_result" in value: | |
tool_result = json.loads(value)["llm_result"] | |
parsed_data["observation"] = self.clean_tool_output(tool_result) | |
except: | |
pass # Keep original observation if parsing fails | |
return parsed_data | |
except json.JSONDecodeError as e: | |
self.logger.error(f"JSON decode error: {str(e)}") | |
return None | |
except Exception as e: | |
self.logger.error(f"Parse error: {str(e)}") | |
return None | |
def clean_tool_output(self, output: str) -> str: | |
"""Clean tool output by removing markdown and other formatting""" | |
# Remove markdown code blocks | |
output = re.sub(r'```.*?```', '', output, flags=re.DOTALL) | |
# Remove other markdown formatting | |
output = re.sub(r'[*_`#]', '', output) | |
# Clean up whitespace | |
output = re.sub(r'\n{3,}', '\n\n', output.strip()) | |
return output |