hushh-valet-chat / controllers /ws_controller.py
Omkar008's picture
Update controllers/ws_controller.py
2a1db67 verified
raw
history blame
No virus
9.25 kB
import base64
import json
import logging
import re
from concurrent.futures import ThreadPoolExecutor
from typing import Optional, List, Dict
import requests
from bs4 import BeautifulSoup
from models.models import Message, Attachment
from fastapi import WebSocket
def get_messages(code: str,brand_name: Optional[str] = None) -> List[Message]:
access_token = code
g_query = f'(subject:"your order" OR subject:receipts OR subject:receipt OR subject:aankoopbon OR subject:reçu OR subject:invoice OR subject:invoices OR category:purchases) has:attachment'
if brand_name is not None:
g_query = f'(subject:"your order" OR subject:receipts OR subject:receipt OR subject: aankoopbon OR subject:reçu OR subject:invoice OR subject:invoices OR category:purchases OR from:{brand_name}) AND subject:{brand_name} has:attachment'
page_token = None
messages = []
# max_results = 10
# gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={jobs_query}&maxResults={max_results}"
# gmail_response = requests.get(gmail_url, headers={"Authorization": f"Bearer {access_token}"})
# gmail_data = gmail_response.json()
# messages.append(gmail_data['messages'])
def fetch_message_wrapper(message_data):
message_id = message_data.get("id")
if message_id:
return fetch_message_data(access_token, message_id)
return None
while True:
gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={g_query}"
if page_token:
gmail_url += f"&pageToken={page_token}"
gmail_response = requests.get(gmail_url, headers={"Authorization": f"Bearer {access_token}"})
gmail_data = gmail_response.json()
print(len(gmail_data))
print(gmail_data)
if "messages" in gmail_data:
with ThreadPoolExecutor(max_workers=15) as executor:
futures = [executor.submit(fetch_message_wrapper, message_data) for message_data in
gmail_data["messages"]]
for future in futures:
message = future.result()
if message:
messages.append(message)
if "nextPageToken" in gmail_data:
page_token = gmail_data["nextPageToken"]
else:
break
print("printing messages")
print(messages)
return messages
def fetch_message_data(access_token: str, message_id: str) -> Message:
message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}"
message_response = requests.get(message_url, headers={"Authorization": f"Bearer {access_token}"})
message_data = message_response.json()
# print(message_data)
subject = extract_subject_from_mail(message_data)
company_from_mail = extract_domain_name(message_data['payload']['headers'], subject)
body = extract_body_from_mail(message_data)
attachments = extract_attachments_from_mail(access_token, message_data)
body_len = 0
if body is not None :
body_len = len(body)
# print("subject: ")
# print(subject)
# print("company name: ")
# print(company_from_mail)
# print("Printing the body of the mail: ")
# print(body)
# print("Printing attachment Data: ")
# print(attachments)
# print("Completed this mail.")
return Message(message_id=message_id, body_len=body_len,body=body, attachments=attachments, company=company_from_mail)
def extract_subject_from_mail(message_data: dict) -> str:
if 'payload' in message_data and 'headers' in message_data['payload']:
headers = message_data['payload']['headers']
for header in headers:
if header['name'] == 'Subject':
return header['value']
return ""
else:
return ""
def extract_domain_name(payload: dict, subject: str) -> str:
domain_name = 'others'
for fromdata in payload:
if fromdata['name'] == 'From':
domain_name = extract_domain_from_email(fromdata['value'])
break
if 'chanel' in subject.lower():
return 'chanel'
if 'louis vuitton' in subject.lower():
return 'Louis Vuitton'
return domain_name
def extract_domain_from_email(email_string: str) -> Optional[str]:
email_address = re.search(r'[\w\.-]+@[\w\.-]+', email_string).group()
domain = email_address.split('@')[-1].split('.')[0]
if email_address and domain:
return domain
else:
return None
# def extract_body_from_mail(message_data: dict) -> str:
# body = None
# if "payload" in message_data and "parts" in message_data["payload"]:
# for part in message_data["payload"]["parts"]:
# if 'mimeType' in part and (part['mimeType'] == 'text/plain' or part['mimeType'] == 'text/html'):
# body_data = part['body'].get('data', '')
# body_base64 = base64.urlsafe_b64decode(body_data)
# body = extract_text(body_base64)
# return body
def extract_body_from_mail(message_data: dict) -> str:
body = None
if "payload" in message_data:
payload = message_data["payload"]
if "parts" in payload:
for part in payload["parts"]:
if 'mimeType' in part and (part['mimeType'] == 'text/plain' or part['mimeType'] == 'text/html'):
body_data = part['body'].get('data', '')
if body_data:
body_base64 = base64.urlsafe_b64decode(body_data)
body = extract_text(body_base64)
elif 'body' in payload:
body_data = payload['body'].get('data', '')
if body_data:
body_base64 = base64.urlsafe_b64decode(body_data)
body = extract_text(body_base64)
elif 'parts' in payload['body']:
for part in payload['body']['parts']:
if 'mimeType' in part and (part['mimeType'] == 'text/plain' or part['mimeType'] == 'text/html'):
body_data = part['body'].get('data', '')
if body_data:
body_base64 = base64.urlsafe_b64decode(body_data)
body = extract_text(body_base64)
if not body:
body = message_data.get('snippet', '')
return body
def fetch_attachment_data(access_token: str, message_id: str, attachment_id: str) -> Dict:
attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}"
attachment_response = requests.get(attachment_url, headers={"Authorization": f"Bearer {access_token}"})
return attachment_response.json()
def extract_attachments_from_mail(access_token: str, message_data: dict) -> List[Attachment]:
attachments = []
if "payload" in message_data and "parts" in message_data["payload"]:
for part in message_data["payload"]["parts"]:
if "body" in part and "attachmentId" in part["body"]:
attachment_id = part["body"]["attachmentId"]
attachment_data = fetch_attachment_data(access_token, message_data["id"], attachment_id)
filename = part.get("filename", "untitled.txt")
attachments.append(Attachment(attachment_len = len(attachment_data.get("data", "")),filename=filename, data=attachment_data.get("data", "")))
return attachments
def extract_text(html_content: str) -> str:
if not html_content:
raise ValueError("HTML content is empty or None")
soup = BeautifulSoup(html_content, 'html.parser')
text = soup.get_text(separator=' ')
text = re.sub(r'\s+', ' ', text).strip()
return text
async def websocket_main(code: str, websocket: WebSocket,brand_name: Optional[str] = None):
access_token = code
messages = get_messages(access_token,brand_name)
print("websocket_main")
print(messages)
# logging.info(f"brand_name:{brand_name}")
await websocket.send_json({"total_messages": len(messages)})
print("Total Length of messages")
print(len(messages))
chunk_size = 100000
for message in messages:
message_json = message.to_json()
await send_message_in_chunks(websocket, message_json, chunk_size)
await websocket.send_text("NEXT_MESSAGE")
await websocket.close()
async def send_message_in_chunks(websocket: WebSocket, message_json: dict, chunk_size: int):
attachment_len = 0
body_len = 0
body_len = message_json['body_len']
if message_json['attachments'] is not None :
for attch in message_json['attachments']:
attachment_len = attch['attachment_len']
print(body_len)
print(attachment_len)
if attachment_len == 0:
attachment_len = None
await websocket.send_json({"body_len":body_len ,"attachment_len":attachment_len})
message_str = json.dumps(message_json)
# print("Printing message_str")
# print(message_str)
# logging.info(message_str)
# await websocket.send_json({"file_len":len(file)})
for i in range(0, len(message_str), chunk_size):
await websocket.send_text(message_str[i:i + chunk_size])