hushh-valet-chat / controllers /ws_controller.py
Omkar008's picture
Update controllers/ws_controller.py
985eaa2 verified
raw
history blame
No virus
5.9 kB
import base64
import json
import logging
import re
from concurrent.futures import ThreadPoolExecutor
from typing import Optional, List, Dict
import requests
from bs4 import BeautifulSoup
from models.models import Message, Attachment
from fastapi import WebSocket
def get_messages(code: str) -> List[Message]:
access_token = code
page_token = None
messages = []
jobs_query = f'subject:"your order" OR subject:receipts OR subject:receipt OR subject: aankoopbon OR subject:reçu OR subject:invoice OR subject:invoice OR category:purchases'
max_results = 10
def fetch_message_wrapper(message_data):
message_id = message_data.get("id")
if message_id:
return fetch_message_data(access_token, message_id)
return None
while True:
gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={jobs_query}&maxResults={max_results}"
if page_token:
gmail_url += f"&pageToken={page_token}"
gmail_response = requests.get(gmail_url, headers={"Authorization": f"Bearer {access_token}"})
gmail_data = gmail_response.json()
if "messages" in gmail_data:
with ThreadPoolExecutor(max_workers=15) as executor:
futures = [executor.submit(fetch_message_wrapper, message_data) for message_data in
gmail_data["messages"]]
for future in futures:
message = future.result()
if message:
messages.append(message)
if "nextPageToken" in gmail_data:
page_token = gmail_data["nextPageToken"]
else:
break
return messages
def fetch_message_data(access_token: str, message_id: str) -> Message:
message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}"
message_response = requests.get(message_url, headers={"Authorization": f"Bearer {access_token}"})
message_data = message_response.json()
subject = extract_subject_from_mail(message_data)
company_from_mail = extract_domain_name(message_data['payload']['headers'], subject)
body = extract_body_from_mail(message_data)
attachments = extract_attachments_from_mail(access_token, message_data)
return Message(message_id=message_id, body=body, attachments=attachments, company=company_from_mail)
def extract_subject_from_mail(message_data: dict) -> str:
if 'payload' in message_data and 'headers' in message_data['payload']:
headers = message_data['payload']['headers']
for header in headers:
if header['name'] == 'Subject':
return header['value']
return ""
else:
return ""
def extract_domain_name(payload: dict, subject: str) -> str:
domain_name = 'others'
for fromdata in payload:
if fromdata['name'] == 'From':
domain_name = extract_domain_from_email(fromdata['value'])
break
if 'chanel' in subject.lower():
return 'chanel'
if 'louis vuitton' in subject.lower():
return 'Louis Vuitton'
return domain_name
def extract_domain_from_email(email_string: str) -> Optional[str]:
email_address = re.search(r'[\w\.-]+@[\w\.-]+', email_string).group()
domain = email_address.split('@')[-1].split('.')[0]
if email_address and domain:
return domain
else:
return None
def extract_body_from_mail(message_data: dict) -> str:
body = None
if "payload" in message_data and "parts" in message_data["payload"]:
for part in message_data["payload"]["parts"]:
if 'mimeType' in part and part['mimeType'] == 'text/plain':
body_data = part['body'].get('data', '')
body_base64 = base64.urlsafe_b64decode(body_data)
body = extract_text(body_base64)
break
return body
def fetch_attachment_data(access_token: str, message_id: str, attachment_id: str) -> Dict:
attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}"
attachment_response = requests.get(attachment_url, headers={"Authorization": f"Bearer {access_token}"})
return attachment_response.json()
def extract_attachments_from_mail(access_token: str, message_data: dict) -> List[Attachment]:
attachments = []
if "payload" in message_data and "parts" in message_data["payload"]:
for part in message_data["payload"]["parts"]:
if "body" in part and "attachmentId" in part["body"]:
attachment_id = part["body"]["attachmentId"]
attachment_data = fetch_attachment_data(access_token, message_data["id"], attachment_id)
filename = part.get("filename", "untitled.txt")
attachments.append(Attachment(filename=filename, data=attachment_data.get("data", "")))
return attachments
def extract_text(html_content: str) -> str:
if not html_content:
raise ValueError("HTML content is empty or None")
soup = BeautifulSoup(html_content, 'html.parser')
text = soup.get_text(separator=' ')
text = re.sub(r'\s+', ' ', text).strip()
return text
async def websocket_main(code: str, websocket: WebSocket):
access_token = code
messages = get_messages(access_token)
await websocket.send_json({"total_messages": len(messages)})
chunk_size = 100000
for message in messages:
message_json = message.to_json()
await send_message_in_chunks(websocket, message_json, chunk_size)
await websocket.send_text("NEXT_MESSAGE")
await websocket.close()
async def send_message_in_chunks(websocket: WebSocket, message_json: dict, chunk_size: int):
message_str = json.dumps(message_json)
for i in range(0, len(message_str), chunk_size):
await websocket.send_text(message_str[i:i + chunk_size])