Spaces:
Running
Running
import pandas as pd | |
import time | |
from groq import Groq | |
import os | |
import csv | |
from tqdm import tqdm | |
import logging | |
from datetime import datetime | |
import json | |
import sys | |
import requests | |
import aiohttp | |
import asyncio | |
import google.auth | |
from google.oauth2.credentials import Credentials | |
from google_auth_oauthlib.flow import InstalledAppFlow | |
from google.auth.transport.requests import Request | |
from google.oauth2 import service_account | |
from googleapiclient.discovery import build | |
from googleapiclient.http import MediaFileUpload, MediaIoBaseDownload | |
import io | |
# OAuth 2.0 credentials | |
CLIENT_ID = "483287191355-udtleajik8ko1o2n03fqmimuu47n3hba.apps.googleusercontent.com" | |
CLIENT_SECRET = "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" | |
SCOPES = ['https://www.googleapis.com/auth/drive.file'] | |
def authenticate_google(): | |
"""Authenticate with Google Drive using OAuth 2.0""" | |
creds = None | |
# Load credentials from client_secret.json if exists | |
if os.path.exists('client_secret.json'): | |
flow = InstalledAppFlow.from_client_secrets_file( | |
'client_secret.json', SCOPES) | |
creds = flow.run_local_server(port=0) | |
else: | |
# Create credentials manually if client_secret.json not found | |
flow = InstalledAppFlow.from_client_config( | |
{ | |
"installed": { | |
"client_id": CLIENT_ID, | |
"client_secret": CLIENT_SECRET, | |
"redirect_uris": ["urn:ietf:wg:oauth:2.0:oob"], | |
"auth_uri": "https://accounts.google.com/o/oauth2/auth", | |
"token_uri": "https://oauth2.googleapis.com/token" | |
} | |
}, | |
SCOPES | |
) | |
creds = flow.run_local_server(port=0) | |
# Save credentials | |
with open('token.json', 'w') as token: | |
token.write(creds.to_json()) | |
return creds | |
def mount_drive(): | |
"""Mount Google Drive with authentication""" | |
try: | |
# Authenticate | |
creds = authenticate_google() | |
# Build drive service | |
service = build('drive', 'v3', credentials=creds) | |
logging.info("Google Drive mounted successfully") | |
return service | |
except Exception as e: | |
logging.error(f"Error mounting drive: {str(e)}") | |
raise | |
def setup_logging(): | |
"""Setup enhanced logging configuration""" | |
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
log_dir = 'logs' | |
# Create logs directory structure | |
os.makedirs(f"{log_dir}/api", exist_ok=True) | |
os.makedirs(f"{log_dir}/process", exist_ok=True) | |
os.makedirs(f"{log_dir}/error", exist_ok=True) | |
# Configure logging with multiple handlers | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s | %(levelname)s | %(message)s', | |
handlers=[ | |
# Console handler | |
logging.StreamHandler(sys.stdout), | |
# Main process log | |
logging.FileHandler(f'{log_dir}/process/process_{timestamp}.log'), | |
# API interactions log | |
logging.FileHandler(f'{log_dir}/api/api_{timestamp}.log'), | |
# Error log | |
logging.FileHandler(f'{log_dir}/error/error_{timestamp}.log') | |
] | |
) | |
logging.info(""" | |
================================================================= | |
Starting Message Processing System | |
================================================================= | |
Time: {timestamp} | |
Log Directory: {log_dir} | |
================================================================= | |
""") | |
return timestamp | |
def initialize_groq(): | |
"""Initialize Groq API client""" | |
try: | |
groq_client = Groq(api_key="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") | |
logging.info("Groq client initialized successfully") | |
return groq_client | |
except Exception as e: | |
logging.error(f"Failed to initialize Groq client: {str(e)}") | |
raise | |
def log_api_details(message_id, original_message, converted_message, processing_time, status, batch_num): | |
"""Log detailed API interaction information""" | |
api_log = { | |
'batch_number': batch_num, | |
'message_id': message_id, | |
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'), | |
'original_message': original_message, | |
'converted_message': converted_message, | |
'processing_time_seconds': processing_time, | |
'status': status | |
} | |
logging.debug(f"API Details: {json.dumps(api_log, indent=2)}") | |
def convert_to_ham_message(groq_client, scam_info, message_id, batch_num): | |
"""Convert scam message to legitimate message using Groq with detailed logging""" | |
start_time = time.time() | |
try: | |
logging.info(f"[Batch {batch_num}][Message {message_id}] Starting processing") | |
logging.debug(f"[Batch {batch_num}][Message {message_id}] Original message: {scam_info}") | |
# Special handling for specific message IDs | |
if message_id in [1589, 1597]: | |
# Skip API call and return original message for these IDs | |
processing_time = time.time() - start_time | |
logging.info(f"[Batch {batch_num}][Message {message_id}] Using original message") | |
log_api_details( | |
message_id=message_id, | |
original_message=scam_info, | |
converted_message=scam_info, | |
processing_time=processing_time, | |
status='success', | |
batch_num=batch_num | |
) | |
return scam_info, processing_time | |
prompt = f""" | |
Convert the following potential scam message into a legitimate, non-fraudulent message | |
while maintaining similar context but removing any fraudulent elements: | |
{scam_info} | |
Generate only the converted message without any additional remarks or characters. | |
""" | |
logging.info(f"[Batch {batch_num}][Message {message_id}] Sending request to Groq API") | |
logging.debug(f"[Batch {batch_num}][Message {message_id}] Prompt: {prompt}") | |
# List of models to try in order of preference | |
models = [ | |
"llava-v1.5-7b-4096-preview", # Default model | |
"gemma-7b-it", | |
"gemma2-9b-it", | |
"llama-3.1-70b-versatile", | |
"llama-3.1-8b-instant", | |
"mixtral-8x7b-32768" | |
] | |
for model in models: | |
try: | |
completion = groq_client.chat.completions.create( | |
messages=[ | |
{ | |
"role": "user", | |
"content": prompt | |
} | |
], | |
model=model, | |
temperature=0.7, | |
) | |
# If successful, break out of the loop | |
break | |
except Exception as e: | |
if "429" in str(e) or "503" in str(e): | |
logging.warning(f"API error with model {model}, trying next model...") | |
continue | |
else: | |
raise e | |
else: | |
# If we've exhausted all models | |
raise Exception("All models failed with rate limit or service errors") | |
processing_time = time.time() - start_time | |
converted_message = completion.choices[0].message.content.strip() | |
logging.info(f"[Batch {batch_num}][Message {message_id}] Conversion successful using model {model}") | |
logging.debug(f""" | |
[Batch {batch_num}][Message {message_id}] Conversion details: | |
- Processing time: {processing_time:.2f} seconds | |
- Original length: {len(scam_info)} | |
- Converted length: {len(converted_message)} | |
- Original message: {scam_info} | |
- Converted message: {converted_message} | |
- Model used: {model} | |
""") | |
log_api_details( | |
message_id=message_id, | |
original_message=scam_info, | |
converted_message=converted_message, | |
processing_time=processing_time, | |
status='success', | |
batch_num=batch_num | |
) | |
return converted_message, processing_time | |
except Exception as e: | |
error_msg = f"[Batch {batch_num}][Message {message_id}] Error in API call: {str(e)}" | |
logging.error(error_msg) | |
log_api_details( | |
message_id=message_id, | |
original_message=scam_info, | |
converted_message=None, | |
processing_time=time.time() - start_time, | |
status=f'error: {str(e)}', | |
batch_num=batch_num | |
) | |
return None, time.time() - start_time | |
def process_csv(input_file, output_file, batch_size=50): | |
"""Process CSV file in batches with enhanced logging""" | |
try: | |
# Mount Google Drive using OAuth | |
drive_service = mount_drive() | |
logging.info("Google Drive mounted successfully") | |
logging.info(f"Reading input CSV file: {input_file}") | |
df = pd.read_csv(input_file, encoding='latin-1') | |
total_messages = len(df) | |
logging.info(f"Loaded {total_messages:,} total messages from CSV") | |
# Check and get the correct column name | |
if ',crimeaditionalinfo' in df.columns: | |
message_column = ',crimeaditionalinfo' | |
elif 'crimeaditionalinfo' in df.columns: | |
message_column = 'crimeaditionalinfo' | |
else: | |
available_columns = df.columns.tolist() | |
logging.error(f"Required column not found. Available columns: {available_columns}") | |
raise KeyError("Could not find crimeaditionalinfo column") | |
logging.info(f"Using column: {message_column}") | |
# Initialize counters for detailed logging | |
messages_processed = 0 | |
current_batch = 0 | |
total_batches = (total_messages + batch_size - 1) // batch_size | |
logging.info(f""" | |
Processing Configuration: | |
- Total messages to process: {total_messages:,} | |
- Batch size: {batch_size} | |
- Total batches: {total_batches} | |
- Column being processed: {message_column} | |
""") | |
groq_client = initialize_groq() | |
output_dir = os.path.dirname(output_file) | |
if output_dir and not os.path.exists(output_dir): | |
os.makedirs(output_dir) | |
logging.info(f"Created output directory: {output_dir}") | |
stats_dir = os.path.join(output_dir, 'statistics') | |
os.makedirs(stats_dir, exist_ok=True) | |
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
stats_file = os.path.join(stats_dir, f'processing_statistics_{timestamp}.csv') | |
batch_stats_file = os.path.join(stats_dir, f'batch_statistics_{timestamp}.csv') | |
stats_fieldnames = ['batch_num', 'message_id', 'processing_time', 'status', 'timestamp'] | |
batch_fieldnames = ['batch_num', 'start_time', 'end_time', 'total_time', | |
'messages_processed', 'successes', 'errors', 'avg_time_per_message'] | |
for file, fields in [(stats_file, stats_fieldnames), | |
(batch_stats_file, batch_fieldnames)]: | |
with open(file, 'w', newline='') as f: | |
writer = csv.DictWriter(f, fieldnames=fields) | |
writer.writeheader() | |
fieldnames = ['batch_num', 'message_id', 'original_message', 'converted_message', | |
'processing_time', 'processing_timestamp'] | |
processed_count = 0 | |
error_count = 0 | |
total_processing_time = 0 | |
# Open output file | |
with open(output_file, 'w', newline='', encoding='utf-8') as f_main: | |
writer_main = csv.DictWriter(f_main, fieldnames=fieldnames) | |
writer_main.writeheader() | |
# Add batch progress header | |
logging.info(""" | |
================================================================= | |
Starting Batch Processing | |
================================================================= | |
""") | |
# Modified progress bar without eta reference | |
progress_bar = tqdm( | |
range(0, len(df), batch_size), | |
desc="Processing batches", | |
unit="batch", | |
total=(len(df) + batch_size - 1) // batch_size, | |
ncols=100, # Fixed width for progress bar | |
bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]' | |
) | |
for i in progress_bar: | |
current_batch += 1 | |
batch = df.iloc[i:i + batch_size] | |
batch_size_current = len(batch) | |
batch_num = i // batch_size + 1 | |
batch_start_time = time.time() | |
# Initialize batch counters | |
batch_processed = 0 | |
batch_errors = 0 | |
# Modified batch start logging without eta | |
logging.info(f""" | |
================================================================= | |
Starting Batch {batch_num}/{total_batches} | |
================================================================= | |
Batch Details: | |
- Start Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | |
- Messages in batch: {batch_size_current} | |
- Message range: {i+1} to {min(i + batch_size, len(df))} | |
Overall Progress: | |
- Total processed: {messages_processed:,}/{total_messages:,} | |
- Progress: {(messages_processed/total_messages)*100:.2f}% | |
- Success rate: {(processed_count/(processed_count+error_count)*100 if processed_count+error_count > 0 else 0):.2f}% | |
Performance: | |
- Average processing time: {(total_processing_time/processed_count if processed_count > 0 else 0):.2f}s per message | |
================================================================= | |
""") | |
# Update progress bar description with current stats | |
progress_bar.set_description( | |
f"Batch {batch_num}/{total_batches} " | |
f"[{messages_processed}/{total_messages} msgs | " | |
f"Success: {processed_count:,} | " | |
f"Errors: {error_count:,}]" | |
) | |
for idx, row in batch.iterrows(): | |
message_id = idx + 1 | |
messages_processed += 1 | |
try: | |
scam_info = row[message_column] | |
# Modified message processing status without eta | |
logging.info(f""" | |
================================================================= | |
Processing Message {message_id} ({messages_processed:,}/{total_messages:,}) | |
================================================================= | |
Current Status: | |
- Batch: {batch_num}/{total_batches} | |
- Message: {messages_processed} of {total_messages} | |
- Batch Progress: {messages_processed - (batch_num-1)*batch_size} of {batch_size_current} | |
- Overall Progress: {(messages_processed/total_messages)*100:.2f}% | |
Message Details: | |
- Original Length: {len(scam_info)} characters | |
- Processing Time (so far): {total_processing_time:.2f}s | |
- Average Time/Message: {(total_processing_time/processed_count if processed_count > 0 else 0):.2f}s | |
Current Statistics: | |
- Successfully Processed: {processed_count:,} | |
- Errors: {error_count:,} | |
- Success Rate: {(processed_count/(processed_count+error_count)*100 if processed_count+error_count > 0 else 0):.2f}% | |
Estimated: | |
- Messages Remaining: {total_messages - messages_processed:,} | |
================================================================= | |
""") | |
# Update progress bar with current message | |
progress_bar.set_description( | |
f"Batch {batch_num}/{total_batches} " | |
f"[{messages_processed}/{total_messages} msgs | " | |
f"Success: {processed_count:,} | " | |
f"Errors: {error_count:,}]" | |
) | |
ham_message, proc_time = convert_to_ham_message( | |
groq_client, scam_info, message_id, batch_num | |
) | |
# After message completion status | |
status = 'success' if ham_message else 'error' | |
logging.info(f""" | |
================================================================= | |
Message {message_id} Completed | |
================================================================= | |
Results: | |
- Status: {status} | |
- Processing Time: {proc_time:.2f}s | |
- Message {messages_processed:,} of {total_messages:,} | |
Progress: | |
- Batch Progress: {messages_processed - (batch_num-1)*batch_size}/{batch_size_current} | |
- Overall Progress: {(messages_processed/total_messages)*100:.2f}% | |
- Success Rate: {(processed_count/(processed_count+error_count)*100 if processed_count+error_count > 0 else 0):.2f}% | |
Remaining: | |
- Messages: {total_messages - messages_processed:,} | |
================================================================= | |
""") | |
with open(stats_file, 'a', newline='') as sf: | |
stats_writer = csv.DictWriter(sf, fieldnames=stats_fieldnames) | |
stats_writer.writerow({ | |
'batch_num': batch_num, | |
'message_id': message_id, | |
'processing_time': proc_time, | |
'status': 'success' if ham_message else 'error', | |
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
}) | |
if ham_message: | |
# Prepare record | |
record = { | |
'batch_num': batch_num, | |
'message_id': message_id, | |
'original_message': scam_info, | |
'converted_message': ham_message, | |
'processing_time': proc_time, | |
'processing_timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
} | |
try: | |
writer_main.writerow(record) | |
processed_count += 1 | |
batch_processed += 1 | |
total_processing_time += proc_time | |
except Exception as e: | |
logging.error(f""" | |
================================================================= | |
Message {message_id} Storage Error [Batch {batch_num}] | |
================================================================= | |
Error: {str(e)} | |
================================================================= | |
""") | |
error_count += 1 | |
else: | |
error_count += 1 | |
batch_errors += 1 | |
# After processing each message | |
status = 'success' if ham_message else 'error' | |
logging.info(f""" | |
Message {message_id} Complete: | |
- Status: {status} | |
- Processing Time: {proc_time:.2f}s | |
- Running Success Rate: {(processed_count/(processed_count+error_count)*100 if processed_count+error_count > 0 else 0):.2f}% | |
""") | |
except KeyError as e: | |
logging.error(f"Column not found: {str(e)}") | |
logging.error(f"Available columns: {row.index.tolist()}") | |
raise | |
except Exception as e: | |
logging.error(f"Error processing message {message_id}: {str(e)}") | |
error_count += 1 | |
batch_errors += 1 | |
# Modified batch completion logging without eta | |
batch_completion_time = time.time() - batch_start_time | |
logging.info(f""" | |
================================================================= | |
Completed Batch {batch_num}/{total_batches} | |
================================================================= | |
Batch Statistics: | |
- Processing time: {batch_completion_time:.2f}s | |
- Messages processed: {batch_processed:,} | |
- Successful: {batch_processed:,} | |
- Errors: {batch_errors:,} | |
- Success rate: {(batch_processed/batch_size_current)*100:.2f}% | |
Performance Metrics: | |
- Average time per message: {batch_completion_time/batch_size_current:.2f}s | |
- Messages per second: {batch_size_current/batch_completion_time:.2f} | |
Overall Progress: | |
- Total processed: {messages_processed:,}/{total_messages:,} | |
- Overall progress: {(messages_processed/total_messages)*100:.2f}% | |
- Remaining messages: {total_messages - messages_processed:,} | |
================================================================= | |
""") | |
# Update progress bar | |
progress_bar.update(1) | |
time.sleep(1) | |
# Final summary with detailed statistics | |
avg_processing_time = total_processing_time / processed_count if processed_count > 0 else 0 | |
logging.info(f""" | |
Final Processing Summary: | |
---------------------- | |
Messages: | |
- Total Messages: {total_messages:,} | |
- Successfully Processed: {processed_count:,} | |
- Errors: {error_count:,} | |
- Success Rate: {(processed_count/(processed_count+error_count))*100:.2f}% | |
Timing: | |
- Total Processing Time: {total_processing_time:.2f} seconds | |
- Average Time per Message: {avg_processing_time:.2f} seconds | |
- Average Time per Batch: {(total_processing_time/total_batches):.2f} seconds | |
Performance: | |
- Messages per Second: {processed_count/total_processing_time:.2f} | |
- Batches per Hour: {(total_batches/(total_processing_time/3600)):.2f} | |
Output: | |
- Results file: {output_file} | |
- Statistics directory: {stats_dir} | |
---------------------- | |
""") | |
return processed_count, error_count, processed_count, 0 | |
except Exception as e: | |
logging.error(f"Critical error in process_csv: {str(e)}", exc_info=True) | |
raise | |
def generate_summary(category, stats, timestamp, summary_file): | |
"""Generate and append summary for each category""" | |
summary = f""" | |
================================================================= | |
Category: {category} | |
Processed at: {timestamp} | |
================================================================= | |
Processing Statistics: | |
- Total Messages: {stats['total']} | |
- Successfully Processed: {stats['processed']} | |
- Errors: {stats['errors']} | |
- Success Rate: {stats['success_rate']:.2f}% | |
Processing Time: | |
- Total Runtime: {stats['runtime']:.2f} seconds | |
- Average Time per Message: {stats['avg_time']:.2f} seconds | |
Files Generated: | |
- Output File: {stats['output_file']} | |
================================================================= | |
""" | |
# Append to summary file | |
with open(summary_file, 'a', encoding='utf-8') as f: | |
f.write(summary) | |
logging.info(f"Summary updated for category: {category}") | |
def setup_drive(): | |
"""Setup Google Drive using OAuth2""" | |
# Update scopes to match | |
SCOPES = [ | |
'https://www.googleapis.com/auth/drive.readonly', | |
'https://www.googleapis.com/auth/drive.file', | |
'https://www.googleapis.com/auth/drive.install', | |
'https://www.googleapis.com/auth/userinfo.email', | |
'https://www.googleapis.com/auth/userinfo.profile', | |
'https://www.googleapis.com/auth/gmail.readonly', | |
'openid' | |
] | |
try: | |
# Delete existing token | |
if os.path.exists('token.json'): | |
os.remove('token.json') | |
# Create credentials | |
credentials = { | |
"installed": { | |
"client_id": "483287191355-udtleajik8ko1o2n03fqmimuu47n3hba.apps.googleusercontent.com", | |
"client_secret": "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", | |
"auth_uri": "https://accounts.google.com/o/oauth2/auth", | |
"token_uri": "https://oauth2.googleapis.com/token", | |
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", | |
"redirect_uris": ["http://localhost:8080/"] | |
} | |
} | |
# Create flow | |
flow = InstalledAppFlow.from_client_config( | |
credentials, | |
SCOPES | |
) | |
# Get credentials | |
creds = flow.run_local_server(port=8080) | |
# Save credentials | |
with open('token.json', 'w') as token: | |
token.write(creds.to_json()) | |
# Build service | |
service = build('drive', 'v3', credentials=creds) | |
logging.info("Google Drive service initialized successfully") | |
return service | |
except Exception as e: | |
logging.error(f"Error setting up Drive service: {str(e)}") | |
raise | |
def get_files_in_drive(service): | |
"""List all files in Google Drive""" | |
try: | |
results = service.files().list( | |
pageSize=1000, | |
fields="nextPageToken, files(id, name, mimeType)", | |
q="mimeType='application/vnd.google-apps.spreadsheet' or mimeType='text/csv'" | |
).execute() | |
files = results.get('files', []) | |
logging.info(f"Found {len(files)} files in Drive") | |
return files | |
except Exception as e: | |
logging.error(f"Error listing files: {str(e)}") | |
raise | |
def download_file(service, file_id, file_name): | |
"""Download a file from Google Drive""" | |
try: | |
request = service.files().get_media(fileId=file_id) | |
file = io.BytesIO() | |
downloader = MediaIoBaseDownload(file, request) | |
done = False | |
while done is False: | |
status, done = downloader.next_chunk() | |
logging.info(f"Download {int(status.progress() * 100)}%") | |
file.seek(0) | |
with open(file_name, 'wb') as f: | |
f.write(file.read()) | |
logging.info(f"Downloaded file: {file_name}") | |
return True | |
except Exception as e: | |
logging.error(f"Error downloading file {file_name}: {str(e)}") | |
return False | |
def upload_file(service, file_path, file_name=None, parent_id=None): | |
"""Upload a file to Google Drive""" | |
try: | |
file_metadata = { | |
'name': file_name or os.path.basename(file_path) | |
} | |
if parent_id: | |
file_metadata['parents'] = [parent_id] | |
media = MediaFileUpload( | |
file_path, | |
mimetype='text/csv', | |
resumable=True | |
) | |
file = service.files().create( | |
body=file_metadata, | |
media_body=media, | |
fields='id' | |
).execute() | |
logging.info(f"Uploaded file: {file_name or os.path.basename(file_path)}") | |
return file.get('id') | |
except Exception as e: | |
logging.error(f"Error uploading file {file_path}: {str(e)}") | |
return None | |
def get_category_paths(): | |
"""Get files from Google Drive""" | |
try: | |
# Setup drive service | |
service = setup_drive() | |
logging.info("Drive service setup complete") | |
# Define complete file mappings with correct paths | |
file_mappings = { | |
# Online Financial Fraud | |
"EWallet_Related_Fraud_messages.csv": "subcategory_messages/Online_Financial_Fraud", | |
"Fraud_CallVishing_messages.csv": "subcategory_messages/Online_Financial_Fraud", | |
"UPI_Related_Frauds_messages.csv": "subcategory_messages/Online_Financial_Fraud", | |
"Internet_Banking_Related_Fraud_messages.csv": "subcategory_messages/Online_Financial_Fraud", | |
"DebitCredit_Card_FraudSim_Swap_Fraud_messages.csv": "subcategory_messages/Online_Financial_Fraud", | |
# Online and Social Media Related Crime | |
"Cyber_Bullying__Stalking__Sexting_messages.csv": "subcategory_messages/Online_and_Social_Media_Related_Crime", | |
"Profile_Hacking_Identity_Theft_messages.csv": "subcategory_messages/Online_and_Social_Media_Related_Crime", | |
"Cheating_by_Impersonation_messages.csv": "subcategory_messages/Online_and_Social_Media_Related_Crime", | |
"FakeImpersonating_Profile_messages.csv": "subcategory_messages/Online_and_Social_Media_Related_Crime", | |
# Cyber Attack Dependent Crimes | |
"Malware_Attack_messages.csv": "subcategory_messages/Cyber_Attack__Dependent_Crimes", | |
"SQL_Injection_messages.csv": "subcategory_messages/Cyber_Attack__Dependent_Crimes", | |
"Hacking_Defacement_messages.csv": "subcategory_messages/Cyber_Attack__Dependent_Crimes", | |
"Ransomware_Attack_messages.csv": "subcategory_messages/Cyber_Attack__Dependent_Crimes", | |
"Tampering_with_computer_source_documents_messages.csv": "subcategory_messages/Cyber_Attack__Dependent_Crimes", | |
# Cryptocurrency Crime | |
"Cryptocurrency_Fraud_messages.csv": "subcategory_messages/Cryptocurrency_Crime", | |
# Hacking and System Damage | |
"Email_Hacking_messages.csv": "subcategory_messages/Hacking__Damage_to_computercomputer_system_etc", | |
"Website_DefacementHacking_messages.csv": "subcategory_messages/Hacking__Damage_to_computercomputer_system_etc", | |
"Damage_to_computer_computer_systems_etc_messages.csv": "subcategory_messages/Hacking__Damage_to_computercomputer_system_etc", | |
"Unauthorised_AccessData_Breach_messages.csv": "subcategory_messages/Hacking__Damage_to_computercomputer_system_etc", | |
# Other Categories | |
"Cyber_Terrorism_messages.csv": "subcategory_messages/Cyber_Terrorism", | |
"Online_Trafficking_messages.csv": "subcategory_messages/Online_Cyber_Trafficking", | |
"Ransomware_messages.csv": "subcategory_messages/Ransomware", | |
"Against_Interest_of_sovereignty_or_integrity_of_India_messages.csv": "subcategory_messages/Report_Unlawful_Content" | |
} | |
logging.info(f""" | |
================================================================= | |
Starting File Processing | |
================================================================= | |
Total Files to Process: {len(file_mappings)} | |
Categories: | |
- Online Financial Fraud | |
- Online and Social Media Related Crime | |
- Cyber Attack Dependent Crimes | |
- Cryptocurrency Crime | |
- Hacking and System Damage | |
- Other Categories | |
================================================================= | |
""") | |
# Get all files from Drive | |
all_files = get_files_in_drive(service) | |
# Map files to their IDs with enhanced logging | |
files = {} | |
found_count = 0 | |
missing_count = 0 | |
for file_name, folder_path in file_mappings.items(): | |
try: | |
query = f"name='{file_name}'" | |
if folder_path: | |
folder_results = service.files().list( | |
q=f"name='{folder_path}' and mimeType='application/vnd.google-apps.folder'", | |
spaces='drive', | |
fields='files(id)' | |
).execute() | |
folder_id = folder_results.get('files', [])[0]['id'] if folder_results.get('files') else None | |
if folder_id: | |
query += f" and '{folder_id}' in parents" | |
results = service.files().list( | |
q=query, | |
spaces='drive', | |
fields='files(id, name)' | |
).execute() | |
files_found = results.get('files', []) | |
if files_found: | |
file_id = files_found[0]['id'] | |
files[file_name] = { | |
'id': file_id, | |
'path': folder_path, | |
'name': file_name | |
} | |
found_count += 1 | |
logging.info(f""" | |
Found File: | |
- Name: {file_name} | |
- Location: {folder_path} | |
- ID: {file_id} | |
""") | |
else: | |
missing_count += 1 | |
logging.warning(f""" | |
Missing File: | |
- Name: {file_name} | |
- Expected Location: {folder_path} | |
""") | |
files[file_name] = None | |
except Exception as e: | |
logging.error(f"Error processing {file_name}: {str(e)}") | |
files[file_name] = None | |
missing_count += 1 | |
logging.info(f""" | |
================================================================= | |
File Discovery Complete | |
================================================================= | |
Total Files: {len(file_mappings)} | |
Found: {found_count} | |
Missing: {missing_count} | |
Success Rate: {(found_count/len(file_mappings))*100:.1f}% | |
================================================================= | |
""") | |
return files, service | |
except Exception as e: | |
logging.error(f"Error accessing Drive files: {str(e)}") | |
raise | |
def process_file(service, groq_client, file_info, timestamp): | |
"""Process a single file with detailed logging""" | |
file_name = file_info['name'] | |
file_id = file_info['id'] | |
folder_path = file_info['path'] | |
logging.info(f""" | |
================================================================= | |
Starting File Processing | |
================================================================= | |
File: {file_name} | |
Location: {folder_path} | |
Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | |
================================================================= | |
""") | |
try: | |
# Download file | |
temp_input = f"temp_input_{file_name}" | |
if not download_file(service, file_id, temp_input): | |
raise Exception("Failed to download file") | |
# Read CSV | |
df = pd.read_csv(temp_input, encoding='latin-1') | |
# Check for column name | |
if 'crimeaditionalinfo' not in df.columns: | |
logging.error(f"Column 'crimeaditionalinfo' not found. Available columns: {df.columns.tolist()}") | |
raise KeyError("Required column 'crimeaditionalinfo' not found") | |
message_column = 'crimeaditionalinfo' # Using fixed column name | |
logging.info(f"Using column: {message_column}") | |
total_messages = len(df) | |
logging.info(f""" | |
File Statistics: | |
- Total Messages: {total_messages:,} | |
- Columns: {', '.join(df.columns)} | |
""") | |
# Setup output | |
output_name = f"converted_{file_name}" | |
stats = { | |
'processed': 0, | |
'errors': 0, | |
'start_time': time.time(), | |
'api_calls': 0, | |
'api_errors': 0 | |
} | |
# Process messages | |
with tqdm(total=total_messages, desc=f"Processing {file_name}") as pbar: | |
for idx, row in df.iterrows(): | |
try: | |
message_id = idx + 1 | |
original_message = row[message_column] # Using fixed column name | |
# Log message start | |
logging.info(f""" | |
------------------------------------------------------------- | |
Processing Message {message_id}/{total_messages} | |
Length: {len(original_message)} chars | |
Progress: {(idx/total_messages)*100:.1f}% | |
------------------------------------------------------------- | |
""") | |
# Convert message | |
converted_message, proc_time = convert_to_ham_message( | |
groq_client, original_message, message_id, 1 | |
) | |
if converted_message: | |
stats['processed'] += 1 | |
stats['api_calls'] += 1 | |
else: | |
stats['errors'] += 1 | |
stats['api_errors'] += 1 | |
# Update progress | |
elapsed = time.time() - stats['start_time'] | |
pbar.set_description( | |
f"File: {file_name} | " | |
f"Success: {stats['processed']:,} | " | |
f"Errors: {stats['errors']:,} | " | |
f"Time: {elapsed:.1f}s" | |
) | |
pbar.update(1) | |
except Exception as e: | |
logging.error(f"Error processing message {message_id}: {str(e)}") | |
stats['errors'] += 1 | |
continue | |
# Generate summary | |
runtime = time.time() - stats['start_time'] | |
success_rate = (stats['processed']/total_messages)*100 | |
logging.info(f""" | |
================================================================= | |
Processing Complete: {file_name} | |
================================================================= | |
Statistics: | |
- Total Messages: {total_messages:,} | |
- Successfully Processed: {stats['processed']:,} | |
- Errors: {stats['errors']:,} | |
- Success Rate: {success_rate:.1f}% | |
Performance: | |
- Runtime: {runtime:.1f} seconds | |
- Average Time/Message: {runtime/total_messages:.2f} seconds | |
- Messages/Second: {total_messages/runtime:.1f} | |
API Statistics: | |
- Total API Calls: {stats['api_calls']:,} | |
- Failed API Calls: {stats['api_errors']:,} | |
- API Success Rate: {(stats['api_calls']-stats['api_errors'])/stats['api_calls']*100:.1f}% | |
================================================================= | |
""") | |
return stats | |
except Exception as e: | |
logging.error(f""" | |
================================================================= | |
Critical Error Processing File: {file_name} | |
Error: {str(e)} | |
================================================================= | |
""") | |
raise | |
finally: | |
# Cleanup | |
if os.path.exists(temp_input): | |
os.remove(temp_input) | |
def get_and_process_files(): | |
"""Get and process files from Google Drive one at a time""" | |
try: | |
# Setup drive service and Groq client | |
service = setup_drive() | |
groq_client = initialize_groq() | |
logging.info("Drive service and Groq API initialized successfully") | |
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
# Get files and process them | |
files, _ = get_category_paths() | |
for file_name, file_info in files.items(): | |
if file_info is None: | |
logging.warning(f"Skipping {file_name} - Not found") | |
continue | |
try: | |
file_id = file_info['id'] | |
folder_path = file_info['path'] | |
logging.info(f""" | |
================================================================= | |
Starting Processing: {file_name} | |
Folder: {folder_path} | |
File ID: {file_id} | |
================================================================= | |
""") | |
# Download file | |
temp_input = f"temp_input_{file_name}" | |
if download_file(service, file_id, temp_input): | |
try: | |
# Read CSV | |
df = pd.read_csv(temp_input, encoding='latin-1') | |
total_messages = len(df) | |
# Setup output files | |
output_name = f"converted_{file_name}" | |
output_path = os.path.join(folder_path, output_name) | |
processed_count = 0 | |
error_count = 0 | |
# Process in batches | |
batch_size = 50 | |
progress_bar = tqdm(total=total_messages, desc=f"Processing {file_name}") | |
# Create output CSV | |
fieldnames = ['message_id', 'original_message', 'converted_message', | |
'processing_time', 'model_used', 'timestamp'] | |
with open(output_name, 'w', newline='', encoding='utf-8') as f: | |
writer = csv.DictWriter(f, fieldnames=fieldnames) | |
writer.writeheader() | |
# Process messages | |
for idx, row in df.iterrows(): | |
try: | |
message_id = idx + 1 | |
original_message = row['crimeaditionalinfo'] # Using fixed column name | |
# Convert message using Groq | |
converted_message, proc_time = convert_to_ham_message( | |
groq_client, | |
original_message, | |
message_id, | |
1 # Batch number | |
) | |
if converted_message: | |
writer.writerow({ | |
'message_id': message_id, | |
'original_message': original_message, | |
'converted_message': converted_message, | |
'processing_time': proc_time, | |
'model_used': 'groq-mixtral', | |
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
}) | |
processed_count += 1 | |
else: | |
error_count += 1 | |
progress_bar.update(1) | |
progress_bar.set_description( | |
f"Processing {file_name} [Success: {processed_count} | Errors: {error_count}]" | |
) | |
except Exception as e: | |
logging.error(f"Error processing message {message_id}: {str(e)}") | |
error_count += 1 | |
continue | |
progress_bar.close() | |
# Upload processed file back to Drive | |
upload_file(service, output_name, output_name, file_info.get('folder_id')) | |
logging.info(f""" | |
================================================================= | |
Completed Processing: {file_name} | |
Total Messages: {total_messages} | |
Processed Successfully: {processed_count} | |
Errors: {error_count} | |
Success Rate: {(processed_count/total_messages)*100:.2f}% | |
Output File: {output_name} | |
================================================================= | |
""") | |
finally: | |
# Cleanup | |
if os.path.exists(temp_input): | |
os.remove(temp_input) | |
if os.path.exists(output_name): | |
os.remove(output_name) | |
except Exception as e: | |
logging.error(f"Error processing file {file_name}: {str(e)}") | |
continue | |
except Exception as e: | |
logging.error(f"Error in get_and_process_files: {str(e)}") | |
raise | |
def main(): | |
# Setup logging | |
timestamp = setup_logging() | |
try: | |
# Process files | |
get_and_process_files() | |
except Exception as e: | |
logging.error(f"Error in main: {str(e)}") | |
raise | |
if __name__ == "__main__": | |
main() | |