import base64 import json import logging import re from concurrent.futures import ThreadPoolExecutor from typing import Optional, List, Dict import requests from bs4 import BeautifulSoup from models.models import Message, Attachment from fastapi import WebSocket def get_messages(code: str,brand_name: Optional[str] = None) -> List[Message]: access_token = code g_query = f'(subject:"your order" OR subject:receipts OR subject:receipt OR subject:aankoopbon OR subject:reçu OR subject:invoice OR subject:invoices OR category:purchases)' if brand_name is not None: g_query = f'(subject:"your order" OR subject:receipts OR subject:receipt OR subject: aankoopbon OR subject:reçu OR subject:invoice OR subject:invoices OR category:purchases OR from:{brand_name}) AND subject:{brand_name}' page_token = None messages = [] # max_results = 10 # gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={jobs_query}&maxResults={max_results}" # gmail_response = requests.get(gmail_url, headers={"Authorization": f"Bearer {access_token}"}) # gmail_data = gmail_response.json() # messages.append(gmail_data['messages']) def fetch_message_wrapper(message_data): message_id = message_data.get("id") if message_id: return fetch_message_data(access_token, message_id) return None while True: gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={g_query}" if page_token: gmail_url += f"&pageToken={page_token}" gmail_response = requests.get(gmail_url, headers={"Authorization": f"Bearer {access_token}"}) gmail_data = gmail_response.json() print(len(gmail_data)) print(gmail_data) if "messages" in gmail_data: with ThreadPoolExecutor(max_workers=15) as executor: futures = [executor.submit(fetch_message_wrapper, message_data) for message_data in gmail_data["messages"]] for future in futures: message = future.result() if message: messages.append(message) if "nextPageToken" in gmail_data: page_token = gmail_data["nextPageToken"] else: break print("printing messages") print(messages) return messages def fetch_message_data(access_token: str, message_id: str) -> Message: message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}" message_response = requests.get(message_url, headers={"Authorization": f"Bearer {access_token}"}) message_data = message_response.json() # print(message_data) subject = extract_subject_from_mail(message_data) company_from_mail = extract_domain_name(message_data['payload']['headers'], subject) body = extract_body_from_mail(message_data) attachments = extract_attachments_from_mail(access_token, message_data) body_len = 0 if body is not None : body_len = len(body) # print("subject: ") # print(subject) # print("company name: ") # print(company_from_mail) # print("Printing the body of the mail: ") # print(body) # print("Printing attachment Data: ") # print(attachments) # print("Completed this mail.") return Message(message_id=message_id, body_len=body_len,body=body, attachments=attachments, company=company_from_mail) def extract_subject_from_mail(message_data: dict) -> str: if 'payload' in message_data and 'headers' in message_data['payload']: headers = message_data['payload']['headers'] for header in headers: if header['name'] == 'Subject': return header['value'] return "" else: return "" def extract_domain_name(payload: dict, subject: str) -> str: domain_name = 'others' for fromdata in payload: if fromdata['name'] == 'From': domain_name = extract_domain_from_email(fromdata['value']) break if 'chanel' in subject.lower(): return 'chanel' if 'louis vuitton' in subject.lower(): return 'Louis Vuitton' return domain_name def extract_domain_from_email(email_string: str) -> Optional[str]: email_address = re.search(r'[\w\.-]+@[\w\.-]+', email_string).group() domain = email_address.split('@')[-1].split('.')[0] if email_address and domain: return domain else: return None # def extract_body_from_mail(message_data: dict) -> str: # body = None # if "payload" in message_data and "parts" in message_data["payload"]: # for part in message_data["payload"]["parts"]: # if 'mimeType' in part and (part['mimeType'] == 'text/plain' or part['mimeType'] == 'text/html'): # body_data = part['body'].get('data', '') # body_base64 = base64.urlsafe_b64decode(body_data) # body = extract_text(body_base64) # return body def extract_body_from_mail(message_data: dict) -> str: body = None if "payload" in message_data: payload = message_data["payload"] if "parts" in payload: for part in payload["parts"]: if 'mimeType' in part and (part['mimeType'] == 'text/plain' or part['mimeType'] == 'text/html'): body_data = part['body'].get('data', '') if body_data: body_base64 = base64.urlsafe_b64decode(body_data) body = extract_text(body_base64) elif 'body' in payload: body_data = payload['body'].get('data', '') if body_data: body_base64 = base64.urlsafe_b64decode(body_data) body = extract_text(body_base64) elif 'parts' in payload['body']: for part in payload['body']['parts']: if 'mimeType' in part and (part['mimeType'] == 'text/plain' or part['mimeType'] == 'text/html'): body_data = part['body'].get('data', '') if body_data: body_base64 = base64.urlsafe_b64decode(body_data) body = extract_text(body_base64) if not body: body = message_data.get('snippet', '') return body def fetch_attachment_data(access_token: str, message_id: str, attachment_id: str) -> Dict: attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}" attachment_response = requests.get(attachment_url, headers={"Authorization": f"Bearer {access_token}"}) return attachment_response.json() def extract_attachments_from_mail(access_token: str, message_data: dict) -> List[Attachment]: attachments = [] if "payload" in message_data and "parts" in message_data["payload"]: for part in message_data["payload"]["parts"]: if "body" in part and "attachmentId" in part["body"]: attachment_id = part["body"]["attachmentId"] attachment_data = fetch_attachment_data(access_token, message_data["id"], attachment_id) filename = part.get("filename", "untitled.txt") attachments.append(Attachment(attachment_len = len(attachment_data.get("data", "")),filename=filename, data=attachment_data.get("data", ""))) return attachments def extract_text(html_content: str) -> str: if not html_content: raise ValueError("HTML content is empty or None") soup = BeautifulSoup(html_content, 'html.parser') text = soup.get_text(separator=' ') text = re.sub(r'\s+', ' ', text).strip() return text async def websocket_main(code: str, websocket: WebSocket,brand_name: Optional[str] = None): access_token = code messages = get_messages(access_token,brand_name) print("websocket_main") print(messages) # logging.info(f"brand_name:{brand_name}") await websocket.send_json({"total_messages": len(messages)}) print("Total Length of messages") print(len(messages)) chunk_size = 100000 for message in messages: message_json = message.to_json() await send_message_in_chunks(websocket, message_json, chunk_size) await websocket.send_text("NEXT_MESSAGE") await websocket.close() async def send_message_in_chunks(websocket: WebSocket, message_json: dict, chunk_size: int): message_str = json.dumps(message_json) # await websocket.send_json({"file_len":len(file)}) for i in range(0, len(message_str), chunk_size): await websocket.send_text(message_str[i:i + chunk_size])