Spaces:
Running
Running
import pandas as pd | |
import time | |
from groq import Groq | |
import os | |
import csv | |
from tqdm import tqdm | |
import logging | |
from datetime import datetime | |
import json | |
import sys | |
import requests | |
import aiohttp | |
import asyncio | |
import google.auth | |
from google.oauth2.credentials import Credentials | |
from google_auth_oauthlib.flow import InstalledAppFlow | |
from google.auth.transport.requests import Request | |
from google.oauth2 import service_account | |
from googleapiclient.discovery import build | |
from googleapiclient.http import MediaFileUpload, MediaIoBaseDownload | |
import io | |
# OAuth 2.0 credentials | |
CLIENT_ID = "483287191355-udtleajik8ko1o2n03fqmimuu47n3hba.apps.googleusercontent.com" | |
CLIENT_SECRET = "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" | |
SCOPES = ['https://www.googleapis.com/auth/drive.file'] | |
def authenticate_google(): | |
"""Authenticate with Google Drive using OAuth 2.0""" | |
creds = None | |
# Load credentials from client_secret.json if exists | |
if os.path.exists('client_secret.json'): | |
flow = InstalledAppFlow.from_client_secrets_file( | |
'client_secret.json', SCOPES) | |
creds = flow.run_local_server(port=0) | |
else: | |
# Create credentials manually if client_secret.json not found | |
flow = InstalledAppFlow.from_client_config( | |
{ | |
"installed": { | |
"client_id": CLIENT_ID, | |
"client_secret": CLIENT_SECRET, | |
"redirect_uris": ["urn:ietf:wg:oauth:2.0:oob"], | |
"auth_uri": "https://accounts.google.com/o/oauth2/auth", | |
"token_uri": "https://oauth2.googleapis.com/token" | |
} | |
}, | |
SCOPES | |
) | |
creds = flow.run_local_server(port=0) | |
# Save credentials | |
with open('token.json', 'w') as token: | |
token.write(creds.to_json()) | |
return creds | |
def mount_drive(): | |
"""Mount Google Drive with authentication""" | |
try: | |
# Authenticate | |
creds = authenticate_google() | |
# Build drive service | |
service = build('drive', 'v3', credentials=creds) | |
logging.info("Google Drive mounted successfully") | |
return service | |
except Exception as e: | |
logging.error(f"Error mounting drive: {str(e)}") | |
raise | |
def setup_logging(): | |
"""Setup enhanced logging configuration""" | |
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
log_dir = 'logs' | |
# Create logs directory structure | |
os.makedirs(f"{log_dir}/api", exist_ok=True) | |
os.makedirs(f"{log_dir}/process", exist_ok=True) | |
os.makedirs(f"{log_dir}/error", exist_ok=True) | |
# Configure logging with multiple handlers | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s | %(levelname)s | %(message)s', | |
handlers=[ | |
# Console handler | |
logging.StreamHandler(sys.stdout), | |
# Main process log | |
logging.FileHandler(f'{log_dir}/process/process_{timestamp}.log'), | |
# API interactions log | |
logging.FileHandler(f'{log_dir}/api/api_{timestamp}.log'), | |
# Error log | |
logging.FileHandler(f'{log_dir}/error/error_{timestamp}.log') | |
] | |
) | |
logging.info(""" | |
================================================================= | |
Starting Message Processing System | |
================================================================= | |
Time: {timestamp} | |
Log Directory: {log_dir} | |
================================================================= | |
""") | |
return timestamp | |
def initialize_groq(): | |
"""Initialize Groq API client""" | |
try: | |
groq_client = Groq(api_key="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") | |
logging.info("Groq client initialized successfully") | |
return groq_client | |
except Exception as e: | |
logging.error(f"Failed to initialize Groq client: {str(e)}") | |
raise | |
def log_api_details(message_id, original_message, converted_message, processing_time, status): | |
"""Log detailed API interaction information""" | |
api_log = { | |
'message_id': message_id, | |
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'), | |
'original_message': original_message, | |
'converted_message': converted_message, | |
'processing_time_seconds': processing_time, | |
'status': status | |
} | |
logging.debug(f"API Details: {json.dumps(api_log, indent=2)}") | |
def convert_to_ham_message(groq_client, scam_info, message_id): | |
"""Convert scam message to legitimate message using Groq with detailed logging""" | |
start_time = time.time() | |
try: | |
logging.info(f"[Message {message_id}] Starting processing") | |
logging.debug(f"[Message {message_id}] Original message: {scam_info}") | |
# Special handling for specific message IDs | |
if message_id in [1589, 1597]: | |
# Skip API call and return original message for these IDs | |
processing_time = time.time() - start_time | |
logging.info(f"[Message {message_id}] Using original message") | |
log_api_details( | |
message_id=message_id, | |
original_message=scam_info, | |
converted_message=scam_info, | |
processing_time=processing_time, | |
status='success' | |
) | |
return scam_info, processing_time | |
prompt = f""" | |
Convert the following potential scam message into a legitimate, non-fraudulent message | |
while maintaining similar context but removing any fraudulent elements: | |
{scam_info} | |
Generate only the converted message without any additional remarks or characters. | |
""" | |
logging.info(f"[Message {message_id}] Sending request to Groq API") | |
logging.debug(f"[Message {message_id}] Prompt: {prompt}") | |
# List of models to try in order of preference | |
models = [ | |
"llama-3.2-90b-text-preview", | |
"llama-3.2-11b-vision-preview", | |
"llama-3.2-3b-preview", | |
"llama-3.2-1b-preview", | |
"llama3-8b-8192", | |
"llama-3.1-70b-versatile", | |
"llama-3.1-8b-instant", | |
"llama3-70b-8192", | |
"llama-3.2-90b-text-preview", # Default model | |
"llama-3.2-90b-vision-preview", | |
"llama3-groq-70b-8192-tool-use-preview", | |
"llama3-groq-8b-8192-tool-use-preview", | |
"llama-guard-3-8b", | |
"gemma-7b-it", | |
"gemma2-9b-it" | |
] | |
for model in models: | |
try: | |
completion = groq_client.chat.completions.create( | |
messages=[ | |
{ | |
"role": "user", | |
"content": prompt | |
} | |
], | |
model=model, | |
temperature=0.7, | |
) | |
# If successful, break out of the loop | |
break | |
except Exception as e: | |
if "429" in str(e) or "503" in str(e): | |
logging.warning(f"API error with model {model}, trying next model...") | |
continue | |
else: | |
raise e | |
else: | |
# If we've exhausted all models | |
raise Exception("All models failed with rate limit or service errors") | |
processing_time = time.time() - start_time | |
converted_message = completion.choices[0].message.content.strip() | |
logging.info(f"[Message {message_id}] Conversion successful using model {model}") | |
logging.debug(f""" | |
[Message {message_id}] Conversion details: | |
- Processing time: {processing_time:.2f} seconds | |
- Original length: {len(scam_info)} | |
- Converted length: {len(converted_message)} | |
- Original message: {scam_info} | |
- Converted message: {converted_message} | |
- Model used: {model} | |
""") | |
log_api_details( | |
message_id=message_id, | |
original_message=scam_info, | |
converted_message=converted_message, | |
processing_time=processing_time, | |
status='success' | |
) | |
return converted_message, processing_time | |
except Exception as e: | |
error_msg = f"[Message {message_id}] Error in API call: {str(e)}" | |
logging.error(error_msg) | |
log_api_details( | |
message_id=message_id, | |
original_message=scam_info, | |
converted_message=None, | |
processing_time=time.time() - start_time, | |
status=f'error: {str(e)}' | |
) | |
return None, time.time() - start_time | |
def process_csv(input_file, output_file): | |
"""Process CSV file with simplified logic to handle only the first 5 messages""" | |
try: | |
# Mount Google Drive using OAuth | |
drive_service = mount_drive() | |
logging.info("Google Drive mounted successfully") | |
logging.info(f"Reading input CSV file: {input_file}") | |
df = pd.read_csv(input_file, encoding='latin-1') | |
# Only keep the first 5 rows (index 0 to 4) | |
df = df.iloc[4565:5803] | |
total_messages = len(df) | |
logging.info(f"Processing first 5 rows ({total_messages} rows)") | |
# Check and get the correct column name | |
if ',crimeaditionalinfo' in df.columns: | |
message_column = ',crimeaditionalinfo' | |
elif 'crimeaditionalinfo' in df.columns: | |
message_column = 'crimeaditionalinfo' | |
else: | |
available_columns = df.columns.tolist() | |
logging.error(f"Required column not found. Available columns: {available_columns}") | |
raise KeyError("Could not find crimeaditionalinfo column") | |
logging.info(f"Using column: {message_column}") | |
# Process only the first 5 messages | |
for i in range(total_messages): | |
message_id = i + 1 | |
scam_info = df.iloc[i][message_column] | |
try: | |
# Process the message | |
ham_message, proc_time = convert_to_ham_message( | |
groq_client, scam_info, message_id | |
) | |
# Log success | |
if ham_message: | |
logging.info(f"Success for message ID {message_id}") | |
except Exception as e: | |
logging.error(f"Error processing message {message_id}: {str(e)}") | |
logging.info("Processed 5 messages successfully.") | |
except Exception as e: | |
logging.error(f"Critical error in process_csv: {str(e)}", exc_info=True) | |
raise | |
def generate_summary(category, stats, timestamp, summary_file): | |
"""Generate and append summary for each category""" | |
summary = f""" | |
================================================================= | |
Category: {category} | |
Processed at: {timestamp} | |
================================================================= | |
Processing Statistics: | |
- Total Messages: {stats['total']} | |
- Successfully Processed: {stats['processed']} | |
- Errors: {stats['errors']} | |
- Success Rate: {stats['success_rate']:.2f}% | |
Processing Time: | |
- Total Runtime: {stats['runtime']:.2f} seconds | |
- Average Time per Message: {stats['avg_time']:.2f} seconds | |
Files Generated: | |
- Output File: {stats['output_file']} | |
================================================================= | |
""" | |
# Append to summary file | |
with open(summary_file, 'a', encoding='utf-8') as f: | |
f.write(summary) | |
logging.info(f"Summary updated for category: {category}") | |
def setup_drive(): | |
"""Setup Google Drive using OAuth2""" | |
# Update scopes to match | |
SCOPES = [ | |
'https://www.googleapis.com/auth/drive.readonly', | |
'https://www.googleapis.com/auth/drive.file', | |
'https://www.googleapis.com/auth/drive.install', | |
'https://www.googleapis.com/auth/userinfo.email', | |
'https://www.googleapis.com/auth/userinfo.profile', | |
'https://www.googleapis.com/auth/gmail.readonly', | |
'openid' | |
] | |
try: | |
# Delete existing token | |
if os.path.exists('token.json'): | |
os.remove('token.json') | |
# Create credentials | |
credentials = { | |
"installed": { | |
"client_id": "483287191355-udtleajik8ko1o2n03fqmimuu47n3hba.apps.googleusercontent.com", | |
"client_secret": "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", | |
"auth_uri": "https://accounts.google.com/o/oauth2/auth", | |
"token_uri": "https://oauth2.googleapis.com/token", | |
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", | |
"redirect_uris": ["http://localhost:8080/"] | |
} | |
} | |
# Create flow | |
flow = InstalledAppFlow.from_client_config( | |
credentials, | |
SCOPES | |
) | |
# Get credentials | |
creds = flow.run_local_server(port=8080) | |
# Save credentials | |
with open('token.json', 'w') as token: | |
token.write(creds.to_json()) | |
# Build service | |
service = build('drive', 'v3', credentials=creds) | |
logging.info("Google Drive service initialized successfully") | |
return service | |
except Exception as e: | |
logging.error(f"Error setting up Drive service: {str(e)}") | |
raise | |
def get_files_in_drive(service): | |
"""List all files in Google Drive""" | |
try: | |
results = service.files().list( | |
pageSize=1000, | |
fields="nextPageToken, files(id, name, mimeType)", | |
q="mimeType='application/vnd.google-apps.spreadsheet' or mimeType='text/csv'" | |
).execute() | |
files = results.get('files', []) | |
logging.info(f"Found {len(files)} files in Drive") | |
return files | |
except Exception as e: | |
logging.error(f"Error listing files: {str(e)}") | |
raise | |
def download_file(service, file_id, file_name): | |
"""Download a file from Google Drive""" | |
try: | |
request = service.files().get_media(fileId=file_id) | |
file = io.BytesIO() | |
downloader = MediaIoBaseDownload(file, request) | |
done = False | |
while done is False: | |
status, done = downloader.next_chunk() | |
logging.info(f"Download {int(status.progress() * 100)}%") | |
file.seek(0) | |
with open(file_name, 'wb') as f: | |
f.write(file.read()) | |
logging.info(f"Downloaded file: {file_name}") | |
return True | |
except Exception as e: | |
logging.error(f"Error downloading file {file_name}: {str(e)}") | |
return False | |
def upload_file(service, file_path, file_name=None, parent_id=None): | |
"""Upload a file to Google Drive""" | |
try: | |
file_metadata = { | |
'name': file_name or os.path.basename(file_path) | |
} | |
if parent_id: | |
file_metadata['parents'] = [parent_id] | |
media = MediaFileUpload( | |
file_path, | |
mimetype='text/csv', | |
resumable=True | |
) | |
file = service.files().create( | |
body=file_metadata, | |
media_body=media, | |
fields='id' | |
).execute() | |
logging.info(f"Uploaded file: {file_name or os.path.basename(file_path)}") | |
return file.get('id') | |
except Exception as e: | |
logging.error(f"Error uploading file {file_path}: {str(e)}") | |
return None | |
def get_category_paths(): | |
"""Get files from Google Drive""" | |
try: | |
# Setup drive service | |
service = setup_drive() | |
logging.info("Drive service setup complete") | |
# Define file mappings with correct paths from subcategory_messages | |
file_mappings = { | |
# Cyber Attack Dependent Crimes | |
# Cryptocurrency Crime | |
"Fraud_CallVishing_messages.csv": "subcategory_messages/Online_Financial_Fraud", | |
} | |
# Get all files from Drive | |
all_files = get_files_in_drive(service) | |
# Map files to their IDs | |
files = {} | |
for file_name, folder_path in file_mappings.items(): | |
query = f"name='{file_name}'" | |
if folder_path: | |
# Get folder ID first | |
folder_results = service.files().list( | |
q=f"name='{folder_path}' and mimeType='application/vnd.google-apps.folder'", | |
spaces='drive', | |
fields='files(id)' | |
).execute() | |
folder_id = folder_results.get('files', [])[0]['id'] if folder_results.get('files') else None | |
if folder_id: | |
query += f" and '{folder_id}' in parents" | |
# Search for file | |
results = service.files().list( | |
q=query, | |
spaces='drive', | |
fields='files(id, name)' | |
).execute() | |
files_found = results.get('files', []) | |
if files_found: | |
file_id = files_found[0]['id'] | |
files[file_name] = { | |
'id': file_id, | |
'name': file_name, | |
'path': folder_path | |
} | |
logging.info(f"Found file: {file_name} in {folder_path} (ID: {file_id})") | |
else: | |
logging.warning(f"File not found: {file_name} in {folder_path}") | |
files[file_name] = None | |
return files, service | |
except Exception as e: | |
logging.error(f"Error accessing Drive files: {str(e)}") | |
raise | |
def process_file(service, groq_client, file_info): | |
"""Process a single file with detailed logging""" | |
# Ensure file_info has the correct structure | |
if not all(key in file_info for key in ['name', 'id', 'path']): | |
logging.error(f"File info is missing required keys: {file_info}") | |
return | |
file_name = file_info['name'] | |
file_id = file_info['id'] | |
folder_path = file_info['path'] | |
logging.info(f""" | |
================================================================= | |
Starting File Processing | |
================================================================= | |
File: {file_name} | |
Location: {folder_path} | |
Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | |
================================================================= | |
""") | |
try: | |
# Download file | |
temp_input = f"temp_input_{file_name}" | |
if not download_file(service, file_id, temp_input): | |
raise Exception("Failed to download file") | |
# Read CSV and take only first 5 rows | |
df = pd.read_csv(temp_input, encoding='latin-1') | |
df = df.iloc[4565:5803] | |
# Check for column name | |
if 'crimeaditionalinfo' not in df.columns: | |
logging.error(f"Column 'crimeaditionalinfo' not found. Available columns: {df.columns.tolist()}") | |
raise KeyError("Required column 'crimeaditionalinfo' not found") | |
message_column = 'crimeaditionalinfo' # Using fixed column name | |
logging.info(f"Using column: {message_column}") | |
total_messages = len(df) | |
logging.info(f""" | |
File Statistics: | |
- Processing first 5 messages | |
- Columns: {', '.join(df.columns)} | |
""") | |
# Setup output | |
output_name = f"converted4_{file_name}" | |
processed_count = 0 | |
error_count = 0 | |
# Create output CSV | |
fieldnames = ['message_id', 'original_message', 'converted_message', | |
'processing_time', 'model_used', 'timestamp'] | |
with open(output_name, 'w', newline='', encoding='utf-8') as f: | |
writer = csv.DictWriter(f, fieldnames=fieldnames) | |
writer.writeheader() | |
# Process messages | |
for idx, row in df.iterrows(): | |
try: | |
message_id = idx + 1 | |
original_message = row[message_column] | |
# Convert message using Groq | |
converted_message, proc_time = convert_to_ham_message( | |
groq_client, | |
original_message, | |
message_id | |
) | |
if converted_message: | |
writer.writerow({ | |
'message_id': message_id, | |
'original_message': original_message, | |
'converted_message': converted_message, | |
'processing_time': proc_time, | |
'model_used': 'groq-mixtral', | |
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
}) | |
processed_count += 1 | |
logging.info(f"Processed message {message_id} successfully") | |
else: | |
error_count += 1 | |
logging.error(f"Failed to process message {message_id}") | |
except Exception as e: | |
logging.error(f"Error processing message {message_id}: {str(e)}") | |
error_count += 1 | |
continue | |
# Upload processed file back to Drive | |
try: | |
# Ensure the parent_id is a folder ID | |
folder_id = get_folder_id(service, "subcategory_messages") | |
file_id = upload_file(service, output_name, output_name, folder_id) | |
logging.info(f"Uploaded converted file to Drive with ID: {file_id}") | |
except Exception as e: | |
logging.error(f"Error uploading converted file: {str(e)}") | |
logging.info(f""" | |
================================================================= | |
Completed Processing: {file_name} | |
Messages Processed Successfully: {processed_count} | |
Errors: {error_count} | |
Output File: {output_name} | |
================================================================= | |
""") | |
except Exception as e: | |
logging.error(f"Error processing file {file_name}: {str(e)}") | |
raise | |
finally: | |
# Cleanup | |
if os.path.exists(temp_input): | |
os.remove(temp_input) | |
if os.path.exists(output_name): | |
os.remove(output_name) | |
def get_folder_id(service, folder_name): | |
"""Retrieve the folder ID for a given folder name""" | |
folder_query = f"name='{folder_name}' and mimeType='application/vnd.google-apps.folder'" | |
folder_results = service.files().list( | |
q=folder_query, | |
spaces='drive', | |
fields='files(id, name)' | |
).execute() | |
folders = folder_results.get('files', []) | |
if not folders: | |
raise FileNotFoundError(f"Folder not found: {folder_name}") | |
return folders[0]['id'] | |
def get_and_process_files(): | |
"""Get and process files from Google Drive one at a time""" | |
try: | |
# Setup drive service and Groq client | |
service = setup_drive() | |
groq_client = initialize_groq() | |
logging.info("Drive service and Groq API initialized successfully") | |
# Get files and process them | |
files, _ = get_category_paths() | |
for file_name, file_info in files.items(): | |
if file_info is None: | |
logging.warning(f"Skipping {file_name} - Not found") | |
continue | |
try: | |
process_file(service, groq_client, file_info) | |
except Exception as e: | |
logging.error(f"Error processing file {file_name}: {str(e)}") | |
continue | |
except Exception as e: | |
logging.error(f"Error in get_and_process_files: {str(e)}") | |
raise | |
def main(): | |
# Setup logging | |
timestamp = setup_logging() | |
try: | |
# Process files | |
get_and_process_files() | |
except Exception as e: | |
logging.error(f"Error in main: {str(e)}") | |
raise | |
if __name__ == "__main__": | |
main() | |