import gradio as gr from groq import Groq import json import requests from datetime import datetime import logging import os from typing import Dict, List, Optional import time from googlesearch import search import threading import queue import colorama from colorama import Fore, Style import random import pandas as pd import csv from PIL import Image from io import BytesIO from selenium import webdriver from selenium.webdriver.common.by import By import pytesseract # Initialize colorama for colored console output colorama.init() # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s | %(levelname)s | %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('agent_chat.log') ] ) # Initialize Groq client GROQ_API_KEY = "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" client = Groq(api_key=GROQ_API_KEY) def google_search(query: str, num_results: int = 5) -> List[str]: """Perform a Google search and return results""" try: search_results = [] for result in search(query, stop=num_results): search_results.append(result) return search_results except Exception as e: logging.error(f"Google search error: {str(e)}") return [] class ConversationManager: def __init__(self): self.markdown_file = "conversation_history.md" self.text_file = "conversation_history.txt" self.current_session = datetime.now().strftime("%Y-%m-%d %H:%M:%S") def log_conversation(self, message: str, agent: str, is_task: bool = False): """Log conversation to both markdown and text files""" # Log to markdown file with open(self.markdown_file, "a", encoding="utf-8") as f: if not os.path.getsize(self.markdown_file): f.write(f"# Scamrakshak Team Conversations\n\n") if is_task: f.write(f"\n### Task Assignment ({self.current_session})\n") f.write(f"**From CEO to {agent}**:\n") f.write(f"```\n{message}\n```\n") else: f.write(f"\n### {agent} Response ({self.current_session})\n") f.write(f"{message}\n") f.write("\n---\n") # Log to text file with open(self.text_file, "a", encoding="utf-8") as f: if not os.path.getsize(self.text_file): f.write("=== SCAMRAKSHAK TEAM CONVERSATIONS ===\n\n") timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") if is_task: f.write(f"\n[{timestamp}] TASK ASSIGNMENT\n") f.write(f"From: CEO\n") f.write(f"To: {agent}\n") f.write(f"Task: {message}\n") else: f.write(f"\n[{timestamp}] {agent} RESPONSE\n") f.write(f"{message}\n") f.write("\n" + "="*50 + "\n") class Agent: def __init__(self, name: str, role: str, system_prompt: str, conversation_manager: ConversationManager): self.name = name self.role = role self.system_prompt = system_prompt self.conversation_manager = conversation_manager self.conversation_history: List[Dict] = [] self.task_queue = queue.Queue() self.research_results = {} self.detection_running = False self.stop_requested = False def get_response(self, user_input: str, from_agent: str = None) -> str: # First check for scam detection commands scam_detection_response = self.handle_scam_detection(user_input) if scam_detection_response: return scam_detection_response # Continue with normal response processing try: # Add context about who is sending the message sender_context = f"Message from {from_agent}: " if from_agent else "" # Perform research if needed research_results = [] if "research" in user_input.lower() or "search" in user_input.lower(): research_results = google_search(user_input) research_context = "\n\nResearch results:\n" + "\n".join(research_results) else: research_context = "" # Prepare messages including conversation history messages = [{"role": "system", "content": self.system_prompt}] messages.extend(self.conversation_history) messages.append({ "role": "user", "content": f"{sender_context}{user_input}{research_context}" }) # Get response from Groq chat_completion = client.chat.completions.create( messages=messages, model="llama-3.2-90b-text-preview", temperature=0.7, max_tokens=1000 ) response = chat_completion.choices[0].message.content # Log the response self.conversation_manager.log_conversation( response, self.name, is_task=False ) # Update conversation history self.conversation_history.append({"role": "user", "content": user_input}) self.conversation_history.append({"role": "assistant", "content": response}) # Keep only last 10 messages to prevent context length issues if len(self.conversation_history) > 10: self.conversation_history = self.conversation_history[-10:] return f"{self.name}: {response}" except Exception as e: logging.error(f"Error getting response from {self.name}: {str(e)}") return f"Error: Could not get response from {self.name}. Please try again." def assign_task(self, task: str, from_agent: str): """Add a task to the agent's queue""" self.task_queue.put((task, from_agent)) self.conversation_manager.log_conversation( task, self.name, is_task=True ) def process_task(self) -> Optional[str]: """Process the next task in the queue""" if not self.task_queue.empty(): task, from_agent = self.task_queue.get() response = self.get_response(task, from_agent) return response return None def log_communication(self, message: str, from_agent: str = None, to_agent: str = None): """Log communication between agents""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") if from_agent and to_agent: print(f"{Fore.YELLOW}[{timestamp}] {Fore.GREEN}{from_agent} → {to_agent}{Fore.WHITE}: {message}{Style.RESET_ALL}") elif from_agent: print(f"{Fore.YELLOW}[{timestamp}] {Fore.BLUE}{from_agent}{Fore.WHITE}: {message}{Style.RESET_ALL}") else: print(f"{Fore.YELLOW}[{timestamp}]{Fore.WHITE}: {message}{Style.RESET_ALL}") def handle_scam_detection(self, message: str) -> str: """Handle scam detection commands""" if message.lower() == "scam_status": try: cumulative_path = os.path.join('data', 'reports', 'cumulative_analysis.txt') if not os.path.exists(cumulative_path): return f"{self.name}: No analysis data available yet. Start detection with 'scam_detect'." with open(cumulative_path, 'r', encoding='utf-8') as f: analysis = f.read() return f"{self.name}: Current Analysis Report:\n\n{analysis}" except Exception as e: return f"{self.name}: Error reading analysis data: {str(e)}" if "scam_detect" in message.lower(): if self.detection_running: return f"{self.name}: Scam detection is already running. Use 'stop_detect' to stop it." try: self.detection_running = True self.stop_requested = False self.log_communication("Initializing scam detection process...", self.name) # Create necessary directories directories = ['data/images', 'data/texts', 'data/reports'] for directory in directories: os.makedirs(directory, exist_ok=True) # Start detection in background def run_detection(): try: self.log_communication("Starting image scraping...", self.name) image_urls = self.scrape_scam_images() if image_urls and not self.stop_requested: self.log_communication(f"Found {len(image_urls)} images. Processing...", self.name) self.process_scam_images(image_urls) # Clean up images images_dir = os.path.join('data', 'images') if os.path.exists(images_dir): import shutil shutil.rmtree(images_dir) os.makedirs(images_dir) self.detection_running = False if self.stop_requested: self.log_communication("Scam detection stopped by user.", self.name) else: self.log_communication("Scam detection completed and images cleaned up.", self.name) except Exception as e: self.detection_running = False self.log_communication(f"Error in scam detection: {str(e)}", self.name) # Start detection in background thread import threading detection_thread = threading.Thread(target=run_detection) detection_thread.start() return f"{self.name}: I've initiated the scam detection process. Use 'stop_detect' to stop or 'scam_detect status' to check status." except Exception as e: self.detection_running = False return f"{self.name}: Error starting scam detection: {str(e)}" elif message.lower() == "stop_detect": if not self.detection_running: return f"{self.name}: No scam detection process is currently running." self.stop_requested = True return f"{self.name}: Stopping scam detection process... This may take a moment to clean up." elif "scam_detect status" in message.lower(): return self.get_scam_detection_status() return None def scrape_scam_images(self): """Scrape images from Bing""" chrome_options = webdriver.ChromeOptions() chrome_options.add_argument('--headless') chrome_options.add_argument('--no-sandbox') chrome_options.add_argument('--disable-dev-shm-usage') driver = webdriver.Chrome(options=chrome_options) image_urls = [] try: search_query = "indian scam sms" encoded_query = search_query.replace(' ', '+') driver.get(f"https://www.bing.com/images/search?q={encoded_query}") self.log_communication("Loading images...", self.name) time.sleep(3) for i in range(5): driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") time.sleep(2) self.log_communication(f"Scroll {i+1}/5 completed", self.name) selectors = [".mimg", ".iusc"] for selector in selectors: elements = driver.find_elements(By.CSS_SELECTOR, selector) for element in elements: try: if selector == ".mimg": url = element.get_attribute('src') else: m = element.get_attribute('m') if m: m_json = json.loads(m) url = m_json.get('murl') else: continue if url and url.startswith('http') and url not in image_urls: image_urls.append(url) except Exception as e: self.log_communication(f"Error getting URL: {str(e)}", self.name) return image_urls finally: driver.quit() def process_scam_images(self, image_urls): """Process scraped images with OCR, Groq formatting, prediction, and storage""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") last_report_time = datetime.now() stats = { 'total_processed': 0, 'scam_count': 0, 'ham_count': 0, 'failed_count': 0 } try: self.log_communication(f"Starting to process {len(image_urls)} images...", self.name) for i, url in enumerate(image_urls, 1): if self.stop_requested: self.log_communication("Stopping image processing as requested...", self.name) break try: self.log_communication(f"Processing image {i}/{len(image_urls)}", self.name) # Download and process image response = requests.get(url, timeout=10) img = Image.open(BytesIO(response.content)) # Save image temporarily with proper path tracking img_filename = f"image_{timestamp}_{i}.png" img_path = os.path.join('data', 'images', img_filename) img.save(img_path) # Extract text using OCR text = pytesseract.image_to_string(img) if text.strip(): # Format text using Groq try: prompt = f""" Format the following extracted text from an SMS image. Keep the original content intact but improve the formatting and remove any OCR artifacts: {text.strip()} """ completion = client.chat.completions.create( messages=[{"role": "user", "content": prompt}], model="llama-3.2-90b-text-preview", temperature=0.3, max_tokens=1024 ) formatted_text = completion.choices[0].message.content.strip() # Send formatted text to prediction API if formatted_text: try: prediction_response = requests.post( "https://varun324242-sssssss.hf.space/predict", json={"message": formatted_text}, timeout=30 ) prediction_response.raise_for_status() prediction = prediction_response.json().get("predicted_result", "unknown") # Update stats stats['total_processed'] += 1 if prediction == "scam": stats['scam_count'] += 1 elif prediction == "ham": stats['ham_count'] += 1 # Store message with prediction message_data = [{ 'message': formatted_text, 'prediction': prediction, 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') }] self.update_scam_csv(message_data) self.log_communication(f"Message from image {i} processed and stored (Prediction: {prediction})", self.name) except Exception as e: self.log_communication(f"Prediction API error for image {i}: {str(e)}", self.name) stats['failed_count'] += 1 except Exception as e: self.log_communication(f"Error formatting text with Groq for image {i}: {str(e)}", self.name) stats['failed_count'] += 1 # Generate analysis report every 30 seconds if (datetime.now() - last_report_time).total_seconds() >= 30: self.generate_analysis_report(stats, is_final=False) last_report_time = datetime.now() except Exception as e: stats['failed_count'] += 1 self.log_communication(f"Error processing image {i}: {str(e)}", self.name) continue finally: # Delete processed image try: if os.path.exists(img_path): os.remove(img_path) self.log_communication(f"Deleted image: {img_filename}", self.name) except Exception as e: self.log_communication(f"Error deleting image {img_filename}: {str(e)}", self.name) # Generate final analysis report if not self.stop_requested: self.generate_analysis_report(stats, is_final=True) except Exception as e: self.log_communication(f"Critical error in image processing: {str(e)}", self.name) finally: # Clean up images directory images_dir = os.path.join('data', 'images') try: if os.path.exists(images_dir): import shutil shutil.rmtree(images_dir) os.makedirs(images_dir) self.log_communication("Images directory cleaned successfully", self.name) except Exception as e: self.log_communication(f"Error cleaning images directory: {str(e)}", self.name) def update_scam_csv(self, new_data): """Update scam123.csv immediately with new messages""" csv_path = os.path.join('data', 'scam123.csv') try: # Read existing messages existing_messages = set() if os.path.exists(csv_path): with open(csv_path, 'r', encoding='utf-8') as f: reader = csv.DictReader(f) existing_messages = {row['message'] for row in reader} # Add new messages messages_added = 0 for item in new_data: message = item.get('message', '').strip() if message and message not in existing_messages: existing_messages.add(message) messages_added += 1 # Write all messages to CSV with open(csv_path, 'w', encoding='utf-8', newline='') as f: writer = csv.DictWriter(f, fieldnames=['message']) writer.writeheader() for message in existing_messages: writer.writerow({'message': message}) # Create backup backup_path = os.path.join('data', 'backups', f'scam123_backup_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv') os.makedirs(os.path.join('data', 'backups'), exist_ok=True) with open(backup_path, 'w', encoding='utf-8', newline='') as f: writer = csv.DictWriter(f, fieldnames=['message']) writer.writeheader() for message in existing_messages: writer.writerow({'message': message}) if messages_added > 0: self.log_communication(f"Added {messages_added} new messages to scam123.csv", self.name) except Exception as e: self.log_communication(f"Error updating CSV: {str(e)}", self.name) def generate_analysis_report(self, stats, is_final=False): """Generate cumulative analysis report""" try: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") report_path = os.path.join('data', 'reports', f'analysis_report_{timestamp}.txt') cumulative_path = os.path.join('data', 'reports', 'cumulative_analysis.txt') # Read previous cumulative stats if exists cumulative_stats = { 'total_processed': 0, 'scam_count': 0, 'ham_count': 0, 'failed_count': 0, 'last_update': None } if os.path.exists(cumulative_path): with open(cumulative_path, 'r', encoding='utf-8') as f: for line in f: if 'Total Messages Processed:' in line: cumulative_stats['total_processed'] = int(line.split(':')[1].strip()) elif 'Scam Messages Detected:' in line: cumulative_stats['scam_count'] = int(line.split(':')[1].strip()) elif 'Ham Messages Detected:' in line: cumulative_stats['ham_count'] = int(line.split(':')[1].strip()) elif 'Failed Processing:' in line: cumulative_stats['failed_count'] = int(line.split(':')[1].strip()) # Update cumulative stats cumulative_stats['total_processed'] += stats['total_processed'] cumulative_stats['scam_count'] += stats['scam_count'] cumulative_stats['ham_count'] += stats['ham_count'] cumulative_stats['failed_count'] += stats['failed_count'] cumulative_stats['last_update'] = datetime.now() # Write current analysis report with open(report_path, 'w', encoding='utf-8') as f: f.write(f"Scam Detection Analysis Report\n") f.write(f"Generated at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") f.write(f"{'=' * 50}\n\n") f.write("Current Session Statistics:\n") f.write(f"Total Messages Processed: {stats['total_processed']}\n") f.write(f"Scam Messages Detected: {stats['scam_count']}\n") f.write(f"Ham Messages Detected: {stats['ham_count']}\n") f.write(f"Failed Processing: {stats['failed_count']}\n\n") f.write("Cumulative Statistics:\n") f.write(f"Total Messages Processed: {cumulative_stats['total_processed']}\n") f.write(f"Scam Messages Detected: {cumulative_stats['scam_count']}\n") f.write(f"Ham Messages Detected: {cumulative_stats['ham_count']}\n") f.write(f"Failed Processing: {cumulative_stats['failed_count']}\n\n") if cumulative_stats['total_processed'] > 0: scam_percentage = (cumulative_stats['scam_count'] / cumulative_stats['total_processed']) * 100 ham_percentage = (cumulative_stats['ham_count'] / cumulative_stats['total_processed']) * 100 f.write("Analysis:\n") f.write(f"Scam Percentage: {scam_percentage:.2f}%\n") f.write(f"Ham Percentage: {ham_percentage:.2f}%\n\n") if is_final: f.write("\nFinal Status:\n") f.write("Processing completed successfully\n") # Update cumulative analysis file with open(cumulative_path, 'w', encoding='utf-8') as f: f.write(f"Cumulative Scam Detection Analysis\n") f.write(f"Last Updated: {cumulative_stats['last_update'].strftime('%Y-%m-%d %H:%M:%S')}\n") f.write(f"{'=' * 50}\n\n") f.write(f"Total Messages Processed: {cumulative_stats['total_processed']}\n") f.write(f"Scam Messages Detected: {cumulative_stats['scam_count']}\n") f.write(f"Ham Messages Detected: {cumulative_stats['ham_count']}\n") f.write(f"Failed Processing: {cumulative_stats['failed_count']}\n\n") if cumulative_stats['total_processed'] > 0: scam_percentage = (cumulative_stats['scam_count'] / cumulative_stats['total_processed']) * 100 ham_percentage = (cumulative_stats['ham_count'] / cumulative_stats['total_processed']) * 100 f.write("Overall Analysis:\n") f.write(f"Scam Percentage: {scam_percentage:.2f}%\n") f.write(f"Ham Percentage: {ham_percentage:.2f}%\n") self.log_communication( f"Analysis reports generated:\n" f"Current: {report_path}\n" f"Cumulative: {cumulative_path}", self.name ) except Exception as e: self.log_communication(f"Error generating analysis report: {str(e)}", self.name) def get_scam_detection_status(self) -> str: """Get current status of scam detection""" try: # Check scam123.csv csv_file = os.path.join('data', 'scam123.csv') if not os.path.exists(csv_file): return f"{self.name}: No scam detection data available yet. Process hasn't started or no messages detected." try: # Read CSV and get statistics with open(csv_file, 'r', encoding='utf-8') as f: reader = csv.DictReader(f) messages = list(reader) total_messages = len(messages) status = ( f"{self.name}: Current Scam Detection Status:\n" f"Total Messages Collected: {total_messages}\n" ) # Check if process is currently running images_dir = os.path.join('data', 'images') if os.path.exists(images_dir) and len(os.listdir(images_dir)) > 0: status += "\nStatus: RUNNING - Currently processing images..." else: status += "\nStatus: IDLE - Waiting for new detection run" # Add last update time if file exists if total_messages > 0: file_modified_time = datetime.fromtimestamp(os.path.getmtime(csv_file)) status += f"\nLast Updated: {file_modified_time.strftime('%Y-%m-%d %H:%M:%S')}" return status except Exception as e: return f"{self.name}: Error reading scam detection data: {str(e)}" except Exception as e: return f"{self.name}: Error checking status: {str(e)}" # Initialize agents with their roles and prompts ceo_agent = Agent( name="CEO", role="Executive", system_prompt="""You are the CEO of Scamrakshak, a company dedicated to protecting users from digital scams. You can delegate tasks to the Tech Support and Research teams. When given a task about technical implementation or research: 1. Break it down into specific sub-tasks 2. Assign appropriate tasks to Tech Support and Research teams 3. Synthesize their responses into a comprehensive plan 4. Provide strategic oversight and direction Format task assignments as: "TASK FOR [AGENT]: [specific task description]" """, conversation_manager=ConversationManager() ) tech_support_agent = Agent( name="Tech Support", role="Support", system_prompt="""You are Scamrakshak's Technical Support specialist. When assigned tasks by the CEO: 1. Analyze technical requirements 2. Provide detailed implementation steps 3. Consider security implications 4. Suggest best practices and potential challenges 5. Research technical solutions using available resources Focus on practical, secure, and efficient solutions. Always consider Android best practices and security guidelines. """, conversation_manager=ConversationManager() ) researcher_agent = Agent( name="Researcher", role="Analyst", system_prompt="""You are Scamrakshak's Research Analyst specializing in scam trends and prevention. When assigned tasks by the CEO: 1. Research current trends and solutions 2. Analyze market data and competitor approaches 3. Provide data-backed recommendations 4. Consider regulatory and compliance aspects 5. Identify potential risks and opportunities Use research results to provide comprehensive analysis. Focus on actionable insights and industry best practices. """, conversation_manager=ConversationManager() ) class AgentSystem: def __init__(self): self.agents = { "CEO": ceo_agent, "Tech Support": tech_support_agent, "Researcher": researcher_agent } self.current_agent = "CEO" self.conversation_log = [] def switch_agent(self, agent_name: str) -> str: if agent_name in self.agents: self.current_agent = agent_name return f"Switched to {agent_name}" return f"Invalid agent name. Available agents: {', '.join(self.agents.keys())}" def process_task_chain(self, initial_input: str) -> List[str]: """Process a task through multiple agents""" responses = [] # Log initial request print(f"\n{Fore.CYAN}=== Starting New Task Chain ==={Style.RESET_ALL}") self.agents["CEO"].log_communication(initial_input, "User") # CEO processes initial request print(f"\n{Fore.CYAN}=== CEO Analyzing Request ==={Style.RESET_ALL}") ceo_response = self.agents["CEO"].get_response(initial_input) responses.append(ceo_response) # Extract and process tasks immediately tasks_found = False for line in ceo_response.split('\n'): if "TASK FOR" in line: tasks_found = True target_agent = line.split("TASK FOR")[1].split(":")[0].strip() task = line.split(":", 1)[1].strip() if target_agent.upper() == "RESEARCH TEAM": target_agent = "Researcher" # Map to correct agent name elif target_agent.upper() == "TECH SUPPORT TEAM": target_agent = "Tech Support" # Map to correct agent name if target_agent in self.agents: print(f"\n{Fore.CYAN}=== {target_agent} Processing Task ==={Style.RESET_ALL}") # Assign and process task immediately self.agents[target_agent].assign_task(task, "CEO") response = self.agents[target_agent].process_task() if response: responses.append(response) print(f"\n{Fore.GREEN}=== {target_agent} Task Complete ==={Style.RESET_ALL}") if tasks_found: # CEO synthesizes all responses print(f"\n{Fore.CYAN}=== CEO Synthesizing All Responses ==={Style.RESET_ALL}") synthesis_prompt = ( "Based on the research team and tech support findings above, " "provide a comprehensive summary and strategic recommendations. " "Include specific action items and next steps." ) final_response = self.agents["CEO"].get_response(synthesis_prompt) responses.append(final_response) else: print(f"{Fore.RED}No tasks were delegated in the CEO's response{Style.RESET_ALL}") print(f"\n{Fore.CYAN}=== Task Chain Complete ==={Style.RESET_ALL}\n") return responses def get_response(self, user_input: str) -> str: if "implement" in user_input.lower() or "research" in user_input.lower(): # Process as a task chain responses = self.process_task_chain(user_input) return "\n\n".join(responses) else: # Normal single-agent response self.agents[self.current_agent].log_communication(user_input, "User") response = self.agents[self.current_agent].get_response(user_input) return response # Initialize agent system agent_system = AgentSystem() def chat_interface(message: str, history: List[List[str]]) -> str: """Handle chat interactions and agent responses""" print(f"\n{Fore.CYAN}=== New User Message ==={Style.RESET_ALL}") # Check for agent switch command if message.startswith("/switch"): try: _, agent_name = message.split(" ", 1) response = agent_system.switch_agent(agent_name) print(f"{Fore.YELLOW}[SYSTEM] {response}{Style.RESET_ALL}") return response except ValueError: error_msg = "Invalid switch command. Use: /switch [CEO|Tech Support|Researcher]" print(f"{Fore.RED}[ERROR] {error_msg}{Style.RESET_ALL}") return error_msg else: # Get response from current agent return agent_system.get_response(message) # Add this new class for team chat class TeamChat: def __init__(self, agents: Dict[str, Agent]): self.agents = agents self.is_active = False self.conversation_manager = ConversationManager() def process_team_message(self, message: str, from_role: str = "Founder") -> List[str]: """Process a message in team chat mode""" responses = [] if from_role == "Founder": # CEO responds to founder's task ceo_prompt = f"As CEO, respond briefly to the founder's request: {message}. Keep it under 50 words and professional." ceo_response = self.agents["CEO"].get_response(ceo_prompt) responses.append(ceo_response) # CEO delegates if needed if "implement" in message.lower() or "research" in message.lower(): delegation_prompt = f"Delegate this task briefly to team members: {message}. Keep each delegation under 30 words." delegation = self.agents["CEO"].get_response(delegation_prompt) responses.append(delegation) # Team members acknowledge for agent_name in ["Tech Support", "Researcher"]: ack_prompt = f"Acknowledge the task briefly and professionally. Keep it under 20 words." ack = self.agents[agent_name].get_response(ack_prompt) responses.append(ack) else: # Normal team member response response_prompt = f"Respond briefly to the team chat message: {message}. Keep it under 30 words and professional." response = self.agents[from_role].get_response(response_prompt) responses.append(response) return responses def create_interface(): """Create and configure the Gradio interface""" with gr.Blocks( title="Scamrakshak AI Assistant", theme=gr.themes.Soft(), css=""" .gradio-container { font-family: 'Arial', sans-serif; max-width: 1000px; margin: auto; } .agent-status { padding: 1rem; margin: 1rem 0; border-radius: 0.5rem; background-color: #f8f9fa; border: 1px solid #dee2e6; } .agent-indicator { display: inline-block; padding: 0.25rem 0.5rem; border-radius: 0.25rem; margin-right: 0.5rem; font-weight: bold; } .ceo-color { background-color: #e3f2fd; color: #1565c0; } .tech-color { background-color: #f3e5f5; color: #7b1fa2; } .research-color { background-color: #e8f5e9; color: #2e7d32; } .chat-message { padding: 1rem; margin: 0.5rem; border-radius: 0.5rem; border-left: 4px solid; } .ceo-message { border-left-color: #1565c0; } .tech-message { border-left-color: #7b1fa2; } .research-message { border-left-color: #2e7d32; } .user-message { border-left-color: #ff9800; } .task-delegation { background-color: #fff3e0; border: 1px solid #ffe0b2; padding: 0.5rem; margin: 0.5rem 0; border-radius: 0.25rem; } """ ) as interface: with gr.Row(): gr.Markdown(""" # 🤖 Scamrakshak AI Assistant An advanced AI system with three specialized agents working together to protect you from scams. """) # Agent Status Panel with gr.Row() as agent_status: with gr.Column(scale=1): gr.Markdown(""" ### Active Agents """) with gr.Group(elem_classes="agent-status"): current_agent = gr.Textbox( label="Current Active Agent", value="CEO", interactive=False, elem_classes="agent-indicator ceo-color" ) gr.Markdown(""" #### Available Agents: - 👔 **CEO** - Strategic oversight and task delegation - 🛠️ **Tech Support** - Technical implementation and security - 🔍 **Researcher** - Trend analysis and market research Use `/switch [agent]` to change agents """) # Main Chat Interface with gr.Row(): with gr.Column(scale=3): chatbot = gr.Chatbot( label="Conversation", height=600, container=True, show_label=True, elem_id="chatbot" ) with gr.Row(): with gr.Column(scale=4): msg = gr.Textbox( label="Your message", placeholder="Ask a question or use /team to start team chat...", lines=2, show_label=True, container=True ) with gr.Column(scale=1): with gr.Row(): send = gr.Button("Send", variant="primary") clear = gr.Button("Clear", variant="stop") # Add team chat controls with gr.Row(): team_chat_active = gr.Checkbox( label="Team Chat Mode", value=False, interactive=True ) current_role = gr.Dropdown( choices=["Founder", "CEO", "Tech Support", "Researcher"], value="Founder", label="Speaking As", interactive=True ) # Message handling functions remain the same def user_message(message: str, history: List[List[str]], is_team_chat: bool, role: str) -> tuple[List[List[str]], str]: if message.strip() == "": return history, "" if message.startswith("/team"): is_team_chat = True return history, "" if is_team_chat: # Process team chat message team_chat = TeamChat(agent_system.agents) responses = team_chat.process_team_message(message, role) # Format team chat messages history.append([ f'
', "" ]) for response in responses: agent = response.split(":")[0] content = response.split(":", 1)[1] history.append([ "", f' ' ]) else: # Normal chat processing response = chat_interface(message, history) history.append([ f' ', response ]) return history, "" # Connect interface elements msg.submit( user_message, [msg, chatbot, team_chat_active, current_role], [chatbot, msg] ) send.click( user_message, [msg, chatbot, team_chat_active, current_role], [chatbot, msg] ) clear.click(lambda: ([], ""), None, [chatbot, msg]) # Update current agent display def update_current_agent(message: str) -> str: if message.startswith("/switch"): try: _, agent_name = message.split(" ", 1) if agent_name in ["CEO", "Tech Support", "Researcher"]: return agent_name except: pass return current_agent.value msg.submit(update_current_agent, [msg], [current_agent]) send.click(update_current_agent, [msg], [current_agent]) # Add Team Chat section with gr.Tab("Team Chat"): with gr.Column(): gr.Markdown(""" # 👥 Team Chat Room Watch the Scamrakshak team have spontaneous work discussions! """) team_chat_box = gr.Chatbot( label="Team Discussion", height=400 ) start_discussion = gr.Button("Start New Team Discussion", variant="primary") def trigger_team_discussion() -> List[List[str]]: team_chat = TeamChat(agent_system.agents) discussion = team_chat.start_team_discussion() formatted_discussion = [] for msg in discussion: agent = msg.split(":")[0] content = msg.split(":", 1)[1] formatted_discussion.append([ "", f' ' ]) return formatted_discussion start_discussion.click( trigger_team_discussion, outputs=[team_chat_box] ) gr.Markdown(""" ### About Team Chat - Team members spontaneously discuss work-related topics - Discussions are focused on improving Scamrakshak's services - Watch how different team members contribute their expertise - Topics include security, features, market trends, and more """) return interface if __name__ == "__main__": # Create and launch the interface demo = create_interface() demo.queue() # Enable queuing for better handling of multiple requests demo.launch( server_name="0.0.0.0", server_port=7860, share=True, show_error=True, show_api=False )