Spaces:
Running
Running
import subprocess | |
import sys | |
def install_dependencies(): | |
"""Install required packages and system dependencies""" | |
print("Installing required packages...") | |
# List of required pip packages | |
pip_packages = [ | |
'gradio', | |
'groq', | |
'requests>=2.25.1', | |
'Pillow>=8.0.0', | |
'pytesseract>=0.3.8', | |
'pandas', | |
'colorama', | |
'python-dotenv' | |
] | |
# Install pip packages | |
for package in pip_packages: | |
try: | |
subprocess.check_call([sys.executable, '-m', 'pip', 'install', package]) | |
except subprocess.CalledProcessError as e: | |
print(f"Error installing {package}: {str(e)}") | |
continue | |
# Install system packages if running on Linux | |
if sys.platform.startswith('linux'): | |
try: | |
# Update package list | |
subprocess.check_call(['apt-get', 'update']) | |
# Install tesseract-ocr | |
subprocess.check_call(['apt-get', 'install', '-y', 'tesseract-ocr']) | |
except subprocess.CalledProcessError as e: | |
print(f"Error installing system packages: {str(e)}") | |
except FileNotFoundError: | |
print("Warning: apt-get not found. Skipping system package installation.") | |
# Run dependency installation | |
install_dependencies() | |
# Now import all required modules | |
import gradio as gr | |
from groq import Groq | |
import json | |
import requests | |
from datetime import datetime | |
import logging | |
import os | |
from typing import Dict, List, Optional | |
import time | |
import threading | |
import queue | |
import colorama | |
from colorama import Fore, Style | |
import random | |
import pandas as pd | |
import csv | |
from PIL import Image | |
from io import BytesIO | |
import pytesseract | |
# Initialize colorama for colored console output | |
colorama.init() | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s | %(levelname)s | %(message)s', | |
handlers=[ | |
logging.StreamHandler(), | |
logging.FileHandler('agent_chat.log') | |
] | |
) | |
# Initialize Groq client | |
GROQ_API_KEY = "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" | |
client = Groq(api_key=GROQ_API_KEY) | |
def google_search(query: str, num_results: int = 5) -> List[str]: | |
"""Perform a Google search and return results""" | |
try: | |
search_results = [] | |
for result in search(query, stop=num_results): | |
search_results.append(result) | |
return search_results | |
except Exception as e: | |
logging.error(f"Google search error: {str(e)}") | |
return [] | |
class ConversationManager: | |
def __init__(self): | |
self.markdown_file = "conversation_history.md" | |
self.text_file = "conversation_history.txt" | |
self.current_session = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
def log_conversation(self, message: str, agent: str, is_task: bool = False): | |
"""Log conversation to both markdown and text files""" | |
# Log to markdown file | |
with open(self.markdown_file, "a", encoding="utf-8") as f: | |
if not os.path.getsize(self.markdown_file): | |
f.write(f"# Scamrakshak Team Conversations\n\n") | |
if is_task: | |
f.write(f"\n### Task Assignment ({self.current_session})\n") | |
f.write(f"**From CEO to {agent}**:\n") | |
f.write(f"```\n{message}\n```\n") | |
else: | |
f.write(f"\n### {agent} Response ({self.current_session})\n") | |
f.write(f"{message}\n") | |
f.write("\n---\n") | |
# Log to text file | |
with open(self.text_file, "a", encoding="utf-8") as f: | |
if not os.path.getsize(self.text_file): | |
f.write("=== SCAMRAKSHAK TEAM CONVERSATIONS ===\n\n") | |
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
if is_task: | |
f.write(f"\n[{timestamp}] TASK ASSIGNMENT\n") | |
f.write(f"From: CEO\n") | |
f.write(f"To: {agent}\n") | |
f.write(f"Task: {message}\n") | |
else: | |
f.write(f"\n[{timestamp}] {agent} RESPONSE\n") | |
f.write(f"{message}\n") | |
f.write("\n" + "="*50 + "\n") | |
class Agent: | |
def __init__(self, name: str, role: str, system_prompt: str, conversation_manager: ConversationManager): | |
self.name = name | |
self.role = role | |
self.system_prompt = system_prompt | |
self.conversation_manager = conversation_manager | |
self.conversation_history: List[Dict] = [] | |
self.task_queue = queue.Queue() | |
self.research_results = {} | |
self.detection_running = False | |
self.stop_requested = False | |
def get_response(self, user_input: str, from_agent: str = None) -> str: | |
# First check for scam detection commands | |
scam_detection_response = self.handle_scam_detection(user_input) | |
if scam_detection_response: | |
return scam_detection_response | |
# Continue with normal response processing | |
try: | |
# Add context about who is sending the message | |
sender_context = f"Message from {from_agent}: " if from_agent else "" | |
# Perform research if needed | |
research_results = [] | |
if "research" in user_input.lower() or "search" in user_input.lower(): | |
research_results = google_search(user_input) | |
research_context = "\n\nResearch results:\n" + "\n".join(research_results) | |
else: | |
research_context = "" | |
# Prepare messages including conversation history | |
messages = [{"role": "system", "content": self.system_prompt}] | |
messages.extend(self.conversation_history) | |
messages.append({ | |
"role": "user", | |
"content": f"{sender_context}{user_input}{research_context}" | |
}) | |
# Get response from Groq | |
chat_completion = client.chat.completions.create( | |
messages=messages, | |
model="llama-3.2-90b-text-preview", | |
temperature=0.7, | |
max_tokens=1000 | |
) | |
response = chat_completion.choices[0].message.content | |
# Log the response | |
self.conversation_manager.log_conversation( | |
response, | |
self.name, | |
is_task=False | |
) | |
# Update conversation history | |
self.conversation_history.append({"role": "user", "content": user_input}) | |
self.conversation_history.append({"role": "assistant", "content": response}) | |
# Keep only last 10 messages to prevent context length issues | |
if len(self.conversation_history) > 10: | |
self.conversation_history = self.conversation_history[-10:] | |
return f"{self.name}: {response}" | |
except Exception as e: | |
logging.error(f"Error getting response from {self.name}: {str(e)}") | |
return f"Error: Could not get response from {self.name}. Please try again." | |
def assign_task(self, task: str, from_agent: str): | |
"""Add a task to the agent's queue""" | |
self.task_queue.put((task, from_agent)) | |
self.conversation_manager.log_conversation( | |
task, | |
self.name, | |
is_task=True | |
) | |
def process_task(self) -> Optional[str]: | |
"""Process the next task in the queue""" | |
if not self.task_queue.empty(): | |
task, from_agent = self.task_queue.get() | |
response = self.get_response(task, from_agent) | |
return response | |
return None | |
def log_communication(self, message: str, from_agent: str = None, to_agent: str = None): | |
"""Log communication between agents""" | |
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
if from_agent and to_agent: | |
print(f"{Fore.YELLOW}[{timestamp}] {Fore.GREEN}{from_agent} → {to_agent}{Fore.WHITE}: {message}{Style.RESET_ALL}") | |
elif from_agent: | |
print(f"{Fore.YELLOW}[{timestamp}] {Fore.BLUE}{from_agent}{Fore.WHITE}: {message}{Style.RESET_ALL}") | |
else: | |
print(f"{Fore.YELLOW}[{timestamp}]{Fore.WHITE}: {message}{Style.RESET_ALL}") | |
def handle_scam_detection(self, message: str) -> str: | |
"""Handle scam detection commands""" | |
if message.lower() == "scam_status": | |
try: | |
cumulative_path = os.path.join('data', 'reports', 'cumulative_analysis.txt') | |
if not os.path.exists(cumulative_path): | |
return f"{self.name}: No analysis data available yet. Start detection with 'scam_detect'." | |
with open(cumulative_path, 'r', encoding='utf-8') as f: | |
analysis = f.read() | |
return f"{self.name}: Current Analysis Report:\n\n{analysis}" | |
except Exception as e: | |
return f"{self.name}: Error reading analysis data: {str(e)}" | |
if "scam_detect" in message.lower(): | |
if self.detection_running: | |
return f"{self.name}: Scam detection is already running. Use 'stop_detect' to stop it." | |
try: | |
self.detection_running = True | |
self.stop_requested = False | |
self.log_communication("Initializing scam detection process...", self.name) | |
# Create necessary directories | |
directories = ['data/images', 'data/texts', 'data/reports'] | |
for directory in directories: | |
os.makedirs(directory, exist_ok=True) | |
# Start detection in background | |
def run_detection(): | |
try: | |
self.log_communication("Starting image scraping...", self.name) | |
image_urls = self.scrape_scam_images() | |
if image_urls and not self.stop_requested: | |
self.log_communication(f"Found {len(image_urls)} images. Processing...", self.name) | |
self.process_scam_images(image_urls) | |
# Clean up images | |
images_dir = os.path.join('data', 'images') | |
if os.path.exists(images_dir): | |
import shutil | |
shutil.rmtree(images_dir) | |
os.makedirs(images_dir) | |
self.detection_running = False | |
if self.stop_requested: | |
self.log_communication("Scam detection stopped by user.", self.name) | |
else: | |
self.log_communication("Scam detection completed and images cleaned up.", self.name) | |
except Exception as e: | |
self.detection_running = False | |
self.log_communication(f"Error in scam detection: {str(e)}", self.name) | |
# Start detection in background thread | |
import threading | |
detection_thread = threading.Thread(target=run_detection) | |
detection_thread.start() | |
return f"{self.name}: I've initiated the scam detection process. Use 'stop_detect' to stop or 'scam_detect status' to check status." | |
except Exception as e: | |
self.detection_running = False | |
return f"{self.name}: Error starting scam detection: {str(e)}" | |
elif message.lower() == "stop_detect": | |
if not self.detection_running: | |
return f"{self.name}: No scam detection process is currently running." | |
self.stop_requested = True | |
return f"{self.name}: Stopping scam detection process... This may take a moment to clean up." | |
elif "scam_detect status" in message.lower(): | |
return self.get_scam_detection_status() | |
return None | |
def scrape_scam_images(self): | |
"""Scrape images from Bing""" | |
# Read URLs from the text file instead of web scraping | |
try: | |
urls_file = os.path.join('data', 'texts', 'scam_urls_20241104_190804.txt') | |
if os.path.exists(urls_file): | |
with open(urls_file, 'r') as f: | |
image_urls = [line.strip() for line in f if line.strip()] | |
self.log_communication(f"Loaded {len(image_urls)} URLs from file", self.name) | |
return image_urls | |
else: | |
self.log_communication("URLs file not found, using fallback method", self.name) | |
except Exception as e: | |
self.log_communication(f"Error reading URLs file: {str(e)}", self.name) | |
# Fallback to direct URL list if file reading fails | |
image_urls = [ | |
"https://www.imda.gov.sg/-/media/Imda/Images/Programmes/Anti-Scam-Measures/Anti-scam-message-sms.png", | |
"https://www.westpac.com.au/content/dam/public/wbc/images/other/security/latest-scams/JUN2023_Tax_ATO_2_SMS_Scam_Example.png", | |
"https://www.ocbc.com/iwov-resources/sg/ocbc/personal/img/live/security/security-advisory/SMS-scam-may.png" | |
] | |
self.log_communication(f"Using {len(image_urls)} default URLs", self.name) | |
return image_urls | |
def process_scam_images(self, image_urls): | |
"""Process images with OCR and prediction""" | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
last_report_time = datetime.now() | |
stats = { | |
'total_processed': 0, | |
'scam_count': 0, | |
'ham_count': 0, | |
'failed_count': 0 | |
} | |
try: | |
self.log_communication(f"Starting to process {len(image_urls)} images...", self.name) | |
# Create necessary directories | |
for dir_path in ['data/images', 'data/texts', 'data/reports']: | |
os.makedirs(dir_path, exist_ok=True) | |
for i, url in enumerate(image_urls, 1): | |
if self.stop_requested: | |
self.log_communication("Stopping image processing as requested...", self.name) | |
break | |
try: | |
self.log_communication(f"Processing image {i}/{len(image_urls)}", self.name) | |
# Download image | |
try: | |
response = requests.get(url, timeout=10) | |
img = Image.open(BytesIO(response.content)) | |
except Exception as e: | |
self.log_communication(f"Error downloading image {i}: {str(e)}", self.name) | |
stats['failed_count'] += 1 | |
continue | |
# Save image temporarily | |
img_filename = f"image_{timestamp}_{i}.png" | |
img_path = os.path.join('data', 'images', img_filename) | |
try: | |
img.save(img_path) | |
except Exception as e: | |
self.log_communication(f"Error saving image {i}: {str(e)}", self.name) | |
stats['failed_count'] += 1 | |
continue | |
# Extract text using OCR | |
try: | |
text = pytesseract.image_to_string(img) | |
if not text.strip(): | |
stats['failed_count'] += 1 | |
continue | |
except Exception as e: | |
self.log_communication(f"OCR error for image {i}: {str(e)}", self.name) | |
stats['failed_count'] += 1 | |
continue | |
# Format text using Groq | |
try: | |
prompt = f""" | |
Format the following extracted text from an SMS image. | |
Keep the original content intact but improve the formatting and remove any OCR artifacts: | |
{text.strip()} | |
""" | |
completion = client.chat.completions.create( | |
messages=[{"role": "user", "content": prompt}], | |
model="llama-3.2-90b-text-preview", | |
temperature=0.3, | |
max_tokens=1024 | |
) | |
formatted_text = completion.choices[0].message.content.strip() | |
except Exception as e: | |
self.log_communication(f"Text formatting error for image {i}: {str(e)}", self.name) | |
stats['failed_count'] += 1 | |
continue | |
# Make prediction | |
if formatted_text: | |
try: | |
prediction_response = requests.post( | |
"https://varun324242-sssssss.hf.space/predict", | |
json={"message": formatted_text}, | |
timeout=30 | |
) | |
prediction_response.raise_for_status() | |
prediction = prediction_response.json().get("predicted_result", "unknown") | |
# Update stats | |
stats['total_processed'] += 1 | |
if prediction == "scam": | |
stats['scam_count'] += 1 | |
elif prediction == "ham": | |
stats['ham_count'] += 1 | |
# Store message | |
self.update_scam_csv([{ | |
'message': formatted_text, | |
'prediction': prediction, | |
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
}]) | |
except Exception as e: | |
self.log_communication(f"Prediction error for image {i}: {str(e)}", self.name) | |
stats['failed_count'] += 1 | |
continue | |
# Generate periodic reports | |
if (datetime.now() - last_report_time).total_seconds() >= 30: | |
self.generate_analysis_report(stats, is_final=False) | |
last_report_time = datetime.now() | |
# Clean up image | |
try: | |
if os.path.exists(img_path): | |
os.remove(img_path) | |
except Exception as e: | |
self.log_communication(f"Error cleaning up image {i}: {str(e)}", self.name) | |
except Exception as e: | |
stats['failed_count'] += 1 | |
self.log_communication(f"Error processing image {i}: {str(e)}", self.name) | |
continue | |
# Generate final report | |
if not self.stop_requested: | |
self.generate_analysis_report(stats, is_final=True) | |
except Exception as e: | |
self.log_communication(f"Critical error in image processing: {str(e)}", self.name) | |
finally: | |
# Clean up images directory | |
try: | |
images_dir = os.path.join('data', 'images') | |
if os.path.exists(images_dir): | |
for file in os.listdir(images_dir): | |
try: | |
os.remove(os.path.join(images_dir, file)) | |
except: | |
pass | |
except Exception as e: | |
self.log_communication(f"Error cleaning up images directory: {str(e)}", self.name) | |
def update_scam_csv(self, new_data): | |
"""Update scam123.csv in Hugging Face J.A.R.V.I.S space""" | |
# Hugging Face configuration | |
HF_TOKEN = "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" | |
REPO_ID = "varun324242/J.A.R.V.I.S" | |
try: | |
# Local file paths | |
local_csv_path = os.path.join('data', 'scam123.csv') | |
temp_csv_path = os.path.join('data', 'temp_scam123.csv') | |
# Headers for HF API | |
headers = { | |
"Authorization": f"Bearer {HF_TOKEN}", | |
"Content-Type": "application/octet-stream" | |
} | |
# First try to get existing data from Hugging Face | |
try: | |
# Use the correct URL format for the J.A.R.V.I.S space | |
response = requests.get( | |
f"https://huggingface.co/spaces/{REPO_ID}/raw/main/data/scam123.csv", | |
headers=headers, | |
allow_redirects=True | |
) | |
if response.status_code == 200: | |
# Save HF content to temporary file | |
with open(temp_csv_path, 'wb') as f: | |
f.write(response.content) | |
# Read messages from HF file | |
hf_messages = set() | |
with open(temp_csv_path, 'r', encoding='utf-8') as f: | |
reader = csv.DictReader(f) | |
hf_messages = {row['message'] for row in reader} | |
else: | |
self.log_communication(f"No existing file on HF (status code: {response.status_code})", self.name) | |
hf_messages = set() | |
except Exception as e: | |
self.log_communication(f"Error reading HF CSV: {str(e)}", self.name) | |
hf_messages = set() | |
# Read local messages if they exist | |
local_messages = set() | |
if os.path.exists(local_csv_path): | |
try: | |
with open(local_csv_path, 'r', encoding='utf-8') as f: | |
reader = csv.DictReader(f) | |
local_messages = {row['message'] for row in reader} | |
except Exception as e: | |
self.log_communication(f"Error reading local CSV: {str(e)}", self.name) | |
# Combine existing messages from both sources | |
existing_messages = hf_messages.union(local_messages) | |
# Add new messages | |
messages_added = 0 | |
for item in new_data: | |
message = item.get('message', '').strip() | |
if message and message not in existing_messages: | |
existing_messages.add(message) | |
messages_added += 1 | |
if messages_added > 0: | |
# Update local file | |
try: | |
# Create data directory if it doesn't exist | |
os.makedirs(os.path.dirname(local_csv_path), exist_ok=True) | |
# Write updated CSV file | |
with open(local_csv_path, 'w', encoding='utf-8', newline='') as f: | |
writer = csv.DictWriter(f, fieldnames=['message']) | |
writer.writeheader() | |
for message in existing_messages: | |
writer.writerow({'message': message}) | |
# Create backup | |
backup_dir = os.path.join('data', 'backups') | |
os.makedirs(backup_dir, exist_ok=True) | |
backup_path = os.path.join(backup_dir, | |
f'scam123_backup_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv') | |
with open(backup_path, 'w', encoding='utf-8', newline='') as f: | |
writer = csv.DictWriter(f, fieldnames=['message']) | |
writer.writeheader() | |
for message in existing_messages: | |
writer.writerow({'message': message}) | |
# Update Hugging Face space using the correct API endpoint | |
try: | |
# Read file content | |
with open(local_csv_path, 'rb') as f: | |
file_content = f.read() | |
# Use the correct API endpoint for the J.A.R.V.I.S space | |
upload_url = f"https://huggingface.co/spaces/{REPO_ID}/blob/main/data/scam123.csv" | |
# Make the request | |
response = requests.put( | |
upload_url, | |
headers=headers, | |
data=file_content | |
) | |
if response.status_code in [200, 201]: | |
self.log_communication( | |
f"Successfully updated both local and HF files with {messages_added} new messages", | |
self.name | |
) | |
else: | |
self.log_communication( | |
f"Error updating HF file: {response.status_code} - {response.text}", | |
self.name | |
) | |
except Exception as e: | |
self.log_communication(f"Error uploading to HF: {str(e)}", self.name) | |
except Exception as e: | |
self.log_communication(f"Error updating local files: {str(e)}", self.name) | |
# Clean up temporary file | |
try: | |
if os.path.exists(temp_csv_path): | |
os.remove(temp_csv_path) | |
except: | |
pass | |
except Exception as e: | |
self.log_communication(f"Critical error in CSV update: {str(e)}", self.name) | |
def generate_analysis_report(self, stats, is_final=False): | |
"""Generate cumulative analysis report""" | |
try: | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
report_path = os.path.join('data', 'reports', f'analysis_report_{timestamp}.txt') | |
cumulative_path = os.path.join('data', 'reports', 'cumulative_analysis.txt') | |
# Read previous cumulative stats if exists | |
cumulative_stats = { | |
'total_processed': 0, | |
'scam_count': 0, | |
'ham_count': 0, | |
'failed_count': 0, | |
'last_update': None | |
} | |
if os.path.exists(cumulative_path): | |
with open(cumulative_path, 'r', encoding='utf-8') as f: | |
for line in f: | |
if 'Total Messages Processed:' in line: | |
cumulative_stats['total_processed'] = int(line.split(':')[1].strip()) | |
elif 'Scam Messages Detected:' in line: | |
cumulative_stats['scam_count'] = int(line.split(':')[1].strip()) | |
elif 'Ham Messages Detected:' in line: | |
cumulative_stats['ham_count'] = int(line.split(':')[1].strip()) | |
elif 'Failed Processing:' in line: | |
cumulative_stats['failed_count'] = int(line.split(':')[1].strip()) | |
# Update cumulative stats | |
cumulative_stats['total_processed'] += stats['total_processed'] | |
cumulative_stats['scam_count'] += stats['scam_count'] | |
cumulative_stats['ham_count'] += stats['ham_count'] | |
cumulative_stats['failed_count'] += stats['failed_count'] | |
cumulative_stats['last_update'] = datetime.now() | |
# Write current analysis report | |
with open(report_path, 'w', encoding='utf-8') as f: | |
f.write(f"Scam Detection Analysis Report\n") | |
f.write(f"Generated at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") | |
f.write(f"{'=' * 50}\n\n") | |
f.write("Current Session Statistics:\n") | |
f.write(f"Total Messages Processed: {stats['total_processed']}\n") | |
f.write(f"Scam Messages Detected: {stats['scam_count']}\n") | |
f.write(f"Ham Messages Detected: {stats['ham_count']}\n") | |
f.write(f"Failed Processing: {stats['failed_count']}\n\n") | |
f.write("Cumulative Statistics:\n") | |
f.write(f"Total Messages Processed: {cumulative_stats['total_processed']}\n") | |
f.write(f"Scam Messages Detected: {cumulative_stats['scam_count']}\n") | |
f.write(f"Ham Messages Detected: {cumulative_stats['ham_count']}\n") | |
f.write(f"Failed Processing: {cumulative_stats['failed_count']}\n\n") | |
if cumulative_stats['total_processed'] > 0: | |
scam_percentage = (cumulative_stats['scam_count'] / cumulative_stats['total_processed']) * 100 | |
ham_percentage = (cumulative_stats['ham_count'] / cumulative_stats['total_processed']) * 100 | |
f.write("Analysis:\n") | |
f.write(f"Scam Percentage: {scam_percentage:.2f}%\n") | |
f.write(f"Ham Percentage: {ham_percentage:.2f}%\n\n") | |
if is_final: | |
f.write("\nFinal Status:\n") | |
f.write("Processing completed successfully\n") | |
# Update cumulative analysis file | |
with open(cumulative_path, 'w', encoding='utf-8') as f: | |
f.write(f"Cumulative Scam Detection Analysis\n") | |
f.write(f"Last Updated: {cumulative_stats['last_update'].strftime('%Y-%m-%d %H:%M:%S')}\n") | |
f.write(f"{'=' * 50}\n\n") | |
f.write(f"Total Messages Processed: {cumulative_stats['total_processed']}\n") | |
f.write(f"Scam Messages Detected: {cumulative_stats['scam_count']}\n") | |
f.write(f"Ham Messages Detected: {cumulative_stats['ham_count']}\n") | |
f.write(f"Failed Processing: {cumulative_stats['failed_count']}\n\n") | |
if cumulative_stats['total_processed'] > 0: | |
scam_percentage = (cumulative_stats['scam_count'] / cumulative_stats['total_processed']) * 100 | |
ham_percentage = (cumulative_stats['ham_count'] / cumulative_stats['total_processed']) * 100 | |
f.write("Overall Analysis:\n") | |
f.write(f"Scam Percentage: {scam_percentage:.2f}%\n") | |
f.write(f"Ham Percentage: {ham_percentage:.2f}%\n") | |
self.log_communication( | |
f"Analysis reports generated:\n" | |
f"Current: {report_path}\n" | |
f"Cumulative: {cumulative_path}", | |
self.name | |
) | |
except Exception as e: | |
self.log_communication(f"Error generating analysis report: {str(e)}", self.name) | |
def get_scam_detection_status(self) -> str: | |
"""Get current status of scam detection""" | |
try: | |
# Check scam123.csv | |
csv_file = os.path.join('data', 'scam123.csv') | |
if not os.path.exists(csv_file): | |
return f"{self.name}: No scam detection data available yet. Process hasn't started or no messages detected." | |
try: | |
# Read CSV and get statistics | |
with open(csv_file, 'r', encoding='utf-8') as f: | |
reader = csv.DictReader(f) | |
messages = list(reader) | |
total_messages = len(messages) | |
status = ( | |
f"{self.name}: Current Scam Detection Status:\n" | |
f"Total Messages Collected: {total_messages}\n" | |
) | |
# Check if process is currently running | |
images_dir = os.path.join('data', 'images') | |
if os.path.exists(images_dir) and len(os.listdir(images_dir)) > 0: | |
status += "\nStatus: RUNNING - Currently processing images..." | |
else: | |
status += "\nStatus: IDLE - Waiting for new detection run" | |
# Add last update time if file exists | |
if total_messages > 0: | |
file_modified_time = datetime.fromtimestamp(os.path.getmtime(csv_file)) | |
status += f"\nLast Updated: {file_modified_time.strftime('%Y-%m-%d %H:%M:%S')}" | |
return status | |
except Exception as e: | |
return f"{self.name}: Error reading scam detection data: {str(e)}" | |
except Exception as e: | |
return f"{self.name}: Error checking status: {str(e)}" | |
# Initialize agents with their roles and prompts | |
ceo_agent = Agent( | |
name="CEO", | |
role="Executive", | |
system_prompt="""You are the CEO of Scamrakshak, a company dedicated to protecting users from digital scams. | |
You can delegate tasks to the Tech Support and Research teams. | |
When given a task about technical implementation or research: | |
1. Break it down into specific sub-tasks | |
2. Assign appropriate tasks to Tech Support and Research teams | |
3. Synthesize their responses into a comprehensive plan | |
4. Provide strategic oversight and direction | |
Format task assignments as: "TASK FOR [AGENT]: [specific task description]" | |
""", | |
conversation_manager=ConversationManager() | |
) | |
tech_support_agent = Agent( | |
name="Tech Support", | |
role="Support", | |
system_prompt="""You are Scamrakshak's Technical Support specialist. | |
When assigned tasks by the CEO: | |
1. Analyze technical requirements | |
2. Provide detailed implementation steps | |
3. Consider security implications | |
4. Suggest best practices and potential challenges | |
5. Research technical solutions using available resources | |
Focus on practical, secure, and efficient solutions. | |
Always consider Android best practices and security guidelines. | |
""", | |
conversation_manager=ConversationManager() | |
) | |
researcher_agent = Agent( | |
name="Researcher", | |
role="Analyst", | |
system_prompt="""You are Scamrakshak's Research Analyst specializing in scam trends and prevention. | |
When assigned tasks by the CEO: | |
1. Research current trends and solutions | |
2. Analyze market data and competitor approaches | |
3. Provide data-backed recommendations | |
4. Consider regulatory and compliance aspects | |
5. Identify potential risks and opportunities | |
Use research results to provide comprehensive analysis. | |
Focus on actionable insights and industry best practices. | |
""", | |
conversation_manager=ConversationManager() | |
) | |
class AgentSystem: | |
def __init__(self): | |
self.agents = { | |
"CEO": ceo_agent, | |
"Tech Support": tech_support_agent, | |
"Researcher": researcher_agent | |
} | |
self.current_agent = "CEO" | |
self.conversation_log = [] | |
def switch_agent(self, agent_name: str) -> str: | |
if agent_name in self.agents: | |
self.current_agent = agent_name | |
return f"Switched to {agent_name}" | |
return f"Invalid agent name. Available agents: {', '.join(self.agents.keys())}" | |
def process_task_chain(self, initial_input: str) -> List[str]: | |
"""Process a task through multiple agents""" | |
responses = [] | |
# Log initial request | |
print(f"\n{Fore.CYAN}=== Starting New Task Chain ==={Style.RESET_ALL}") | |
self.agents["CEO"].log_communication(initial_input, "User") | |
# CEO processes initial request | |
print(f"\n{Fore.CYAN}=== CEO Analyzing Request ==={Style.RESET_ALL}") | |
ceo_response = self.agents["CEO"].get_response(initial_input) | |
responses.append(ceo_response) | |
# Extract and process tasks immediately | |
tasks_found = False | |
for line in ceo_response.split('\n'): | |
if "TASK FOR" in line: | |
tasks_found = True | |
target_agent = line.split("TASK FOR")[1].split(":")[0].strip() | |
task = line.split(":", 1)[1].strip() | |
if target_agent.upper() == "RESEARCH TEAM": | |
target_agent = "Researcher" # Map to correct agent name | |
elif target_agent.upper() == "TECH SUPPORT TEAM": | |
target_agent = "Tech Support" # Map to correct agent name | |
if target_agent in self.agents: | |
print(f"\n{Fore.CYAN}=== {target_agent} Processing Task ==={Style.RESET_ALL}") | |
# Assign and process task immediately | |
self.agents[target_agent].assign_task(task, "CEO") | |
response = self.agents[target_agent].process_task() | |
if response: | |
responses.append(response) | |
print(f"\n{Fore.GREEN}=== {target_agent} Task Complete ==={Style.RESET_ALL}") | |
if tasks_found: | |
# CEO synthesizes all responses | |
print(f"\n{Fore.CYAN}=== CEO Synthesizing All Responses ==={Style.RESET_ALL}") | |
synthesis_prompt = ( | |
"Based on the research team and tech support findings above, " | |
"provide a comprehensive summary and strategic recommendations. " | |
"Include specific action items and next steps." | |
) | |
final_response = self.agents["CEO"].get_response(synthesis_prompt) | |
responses.append(final_response) | |
else: | |
print(f"{Fore.RED}No tasks were delegated in the CEO's response{Style.RESET_ALL}") | |
print(f"\n{Fore.CYAN}=== Task Chain Complete ==={Style.RESET_ALL}\n") | |
return responses | |
def get_response(self, user_input: str) -> str: | |
if "implement" in user_input.lower() or "research" in user_input.lower(): | |
# Process as a task chain | |
responses = self.process_task_chain(user_input) | |
return "\n\n".join(responses) | |
else: | |
# Normal single-agent response | |
self.agents[self.current_agent].log_communication(user_input, "User") | |
response = self.agents[self.current_agent].get_response(user_input) | |
return response | |
# Initialize agent system | |
agent_system = AgentSystem() | |
def chat_interface(message: str, history: List[List[str]]) -> str: | |
"""Handle chat interactions and agent responses""" | |
print(f"\n{Fore.CYAN}=== New User Message ==={Style.RESET_ALL}") | |
# Check for agent switch command | |
if message.startswith("/switch"): | |
try: | |
_, agent_name = message.split(" ", 1) | |
response = agent_system.switch_agent(agent_name) | |
print(f"{Fore.YELLOW}[SYSTEM] {response}{Style.RESET_ALL}") | |
return response | |
except ValueError: | |
error_msg = "Invalid switch command. Use: /switch [CEO|Tech Support|Researcher]" | |
print(f"{Fore.RED}[ERROR] {error_msg}{Style.RESET_ALL}") | |
return error_msg | |
else: | |
# Get response from current agent | |
return agent_system.get_response(message) | |
# Add this new class for team chat | |
class TeamChat: | |
def __init__(self, agents: Dict[str, Agent]): | |
self.agents = agents | |
self.is_active = False | |
self.conversation_manager = ConversationManager() | |
def process_team_message(self, message: str, from_role: str = "Founder") -> List[str]: | |
"""Process a message in team chat mode""" | |
responses = [] | |
if from_role == "Founder": | |
# CEO responds to founder's task | |
ceo_prompt = f"As CEO, respond briefly to the founder's request: {message}. Keep it under 50 words and professional." | |
ceo_response = self.agents["CEO"].get_response(ceo_prompt) | |
responses.append(ceo_response) | |
# CEO delegates if needed | |
if "implement" in message.lower() or "research" in message.lower(): | |
delegation_prompt = f"Delegate this task briefly to team members: {message}. Keep each delegation under 30 words." | |
delegation = self.agents["CEO"].get_response(delegation_prompt) | |
responses.append(delegation) | |
# Team members acknowledge | |
for agent_name in ["Tech Support", "Researcher"]: | |
ack_prompt = f"Acknowledge the task briefly and professionally. Keep it under 20 words." | |
ack = self.agents[agent_name].get_response(ack_prompt) | |
responses.append(ack) | |
else: | |
# Normal team member response | |
response_prompt = f"Respond briefly to the team chat message: {message}. Keep it under 30 words and professional." | |
response = self.agents[from_role].get_response(response_prompt) | |
responses.append(response) | |
return responses | |
def create_interface(): | |
"""Create and configure the Gradio interface""" | |
with gr.Blocks( | |
title="Scamrakshak AI Assistant", | |
theme=gr.themes.Soft(), | |
css=""" | |
.gradio-container { | |
font-family: 'Arial', sans-serif; | |
max-width: 1000px; | |
margin: auto; | |
} | |
.agent-status { | |
padding: 1rem; | |
margin: 1rem 0; | |
border-radius: 0.5rem; | |
background-color: #f8f9fa; | |
border: 1px solid #dee2e6; | |
} | |
.agent-indicator { | |
display: inline-block; | |
padding: 0.25rem 0.5rem; | |
border-radius: 0.25rem; | |
margin-right: 0.5rem; | |
font-weight: bold; | |
} | |
.ceo-color { background-color: #e3f2fd; color: #1565c0; } | |
.tech-color { background-color: #f3e5f5; color: #7b1fa2; } | |
.research-color { background-color: #e8f5e9; color: #2e7d32; } | |
.chat-message { | |
padding: 1rem; | |
margin: 0.5rem; | |
border-radius: 0.5rem; | |
border-left: 4px solid; | |
} | |
.ceo-message { border-left-color: #1565c0; } | |
.tech-message { border-left-color: #7b1fa2; } | |
.research-message { border-left-color: #2e7d32; } | |
.user-message { border-left-color: #ff9800; } | |
.task-delegation { | |
background-color: #fff3e0; | |
border: 1px solid #ffe0b2; | |
padding: 0.5rem; | |
margin: 0.5rem 0; | |
border-radius: 0.25rem; | |
} | |
""" | |
) as interface: | |
with gr.Row(): | |
gr.Markdown(""" | |
# 🤖 Scamrakshak AI Assistant | |
An advanced AI system with three specialized agents working together to protect you from scams. | |
""") | |
# Agent Status Panel | |
with gr.Row() as agent_status: | |
with gr.Column(scale=1): | |
gr.Markdown(""" | |
### Active Agents | |
""") | |
with gr.Group(elem_classes="agent-status"): | |
current_agent = gr.Textbox( | |
label="Current Active Agent", | |
value="CEO", | |
interactive=False, | |
elem_classes="agent-indicator ceo-color" | |
) | |
gr.Markdown(""" | |
#### Available Agents: | |
- 👔 **CEO** - Strategic oversight and task delegation | |
- 🛠 **Tech Support** - Technical implementation and security | |
- 🔍 **Researcher** - Trend analysis and market research | |
Use `/switch [agent]` to change agents | |
""") | |
# Main Chat Interface | |
with gr.Row(): | |
with gr.Column(scale=3): | |
chatbot = gr.Chatbot( | |
label="Conversation", | |
height=600, | |
container=True, | |
show_label=True, | |
elem_id="chatbot" | |
) | |
with gr.Row(): | |
with gr.Column(scale=4): | |
msg = gr.Textbox( | |
label="Your message", | |
placeholder="Ask a question or use /team to start team chat...", | |
lines=2, | |
show_label=True, | |
container=True | |
) | |
with gr.Column(scale=1): | |
with gr.Row(): | |
send = gr.Button("Send", variant="primary") | |
clear = gr.Button("Clear", variant="stop") | |
# Add team chat controls | |
with gr.Row(): | |
team_chat_active = gr.Checkbox( | |
label="Team Chat Mode", | |
value=False, | |
interactive=True | |
) | |
current_role = gr.Dropdown( | |
choices=["Founder", "CEO", "Tech Support", "Researcher"], | |
value="Founder", | |
label="Speaking As", | |
interactive=True | |
) | |
# Message handling functions remain the same | |
def user_message(message: str, history: List[List[str]], is_team_chat: bool, role: str) -> tuple[List[List[str]], str]: | |
if message.strip() == "": | |
return history, "" | |
if message.startswith("/team"): | |
is_team_chat = True | |
return history, "" | |
if is_team_chat: | |
# Process team chat message | |
team_chat = TeamChat(agent_system.agents) | |
responses = team_chat.process_team_message(message, role) | |
# Format team chat messages | |
history.append([ | |
f'<div class="team-chat-message {role.lower()}-message">{role}: {message}</div>', | |
"" | |
]) | |
for response in responses: | |
agent = response.split(":")[0] | |
content = response.split(":", 1)[1] | |
history.append([ | |
"", | |
f'<div class="team-chat-message {agent.lower()}-message">{response}</div>' | |
]) | |
else: | |
# Normal chat processing | |
response = chat_interface(message, history) | |
history.append([ | |
f'<div class="user-message">{message}</div>', | |
response | |
]) | |
return history, "" | |
# Connect interface elements | |
msg.submit( | |
user_message, | |
[msg, chatbot, team_chat_active, current_role], | |
[chatbot, msg] | |
) | |
send.click( | |
user_message, | |
[msg, chatbot, team_chat_active, current_role], | |
[chatbot, msg] | |
) | |
clear.click(lambda: ([], ""), None, [chatbot, msg]) | |
# Update current agent display | |
def update_current_agent(message: str) -> str: | |
if message.startswith("/switch"): | |
try: | |
_, agent_name = message.split(" ", 1) | |
if agent_name in ["CEO", "Tech Support", "Researcher"]: | |
return agent_name | |
except: | |
pass | |
return current_agent.value | |
msg.submit(update_current_agent, [msg], [current_agent]) | |
send.click(update_current_agent, [msg], [current_agent]) | |
# Add Team Chat section | |
with gr.Tab("Team Chat"): | |
with gr.Column(): | |
gr.Markdown(""" | |
# 👥 Team Chat Room | |
Watch the Scamrakshak team have spontaneous work discussions! | |
""") | |
team_chat_box = gr.Chatbot( | |
label="Team Discussion", | |
height=400 | |
) | |
start_discussion = gr.Button("Start New Team Discussion", variant="primary") | |
def trigger_team_discussion() -> List[List[str]]: | |
team_chat = TeamChat(agent_system.agents) | |
discussion = team_chat.start_team_discussion() | |
formatted_discussion = [] | |
for msg in discussion: | |
agent = msg.split(":")[0] | |
content = msg.split(":", 1)[1] | |
formatted_discussion.append([ | |
"", | |
f'<div class="{agent.lower()}-message">{msg}</div>' | |
]) | |
return formatted_discussion | |
start_discussion.click( | |
trigger_team_discussion, | |
outputs=[team_chat_box] | |
) | |
gr.Markdown(""" | |
### About Team Chat | |
- Team members spontaneously discuss work-related topics | |
- Discussions are focused on improving Scamrakshak's services | |
- Watch how different team members contribute their expertise | |
- Topics include security, features, market trends, and more | |
""") | |
return interface | |
if __name__ == "__main__": | |
# Create and launch the interface | |
demo = create_interface() | |
demo.queue() # Enable queuing for better handling of multiple requests | |
demo.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
share=True, | |
show_error=True, | |
show_api=False | |
) | |