import logging import base64 import requests import asyncio from fastapi import WebSocket from services import utils as util import re from bs4 import BeautifulSoup async def send_chunked_data(websocket: WebSocket, filename: str, data: str ,company_associated:str , message_id:str): chunk_size = 2000 # Set an appropriate chunk size for i in range(0, len(data), chunk_size): await websocket.send_json({"filename": filename, "data_chunk": data[i:i + chunk_size]}) await asyncio.sleep(0.4) await websocket.send_json({"company_associated":company_associated , "message_id":message_id}) await websocket.send_text("FinishedThisAttachment") async def send_chunked_data_without_attch(websocket: WebSocket,body_text:str,message_id:str , company_associated:str): chunk_size = 2000 # Set an appropriate chunk size await websocket.send_text("This message does'nt contain an Attachment") for i in range(0, len(body_text), chunk_size): await websocket.send_json({"data_chunk": body_text[i:i + chunk_size]}) await asyncio.sleep(0.4) await websocket.send_json({"company_associated":company_associated , "message_id":message_id}) await websocket.send_text("FinishedThisAttachmentnotContainingAttachment") async def process_messages(access_token: str, websocket: WebSocket): logging.info("Entered process_messages") messages = get_messages(access_token) await websocket.send_json({"total_messages": len(messages)}) await websocket.send_text("CompletedSendingTotalMessagesLength") for message in messages: message_id = message.get("id") if message_id: message_data = fetch_message_data(access_token, message_id) await process_message_data(access_token,message_data, websocket,message_id) await websocket.send_text("CompletedFetchingMessages") async def websocket_main(code: str, websocket: WebSocket): logging.info("Entered mwebsocket_main") access_token = code await process_messages(access_token, websocket) logging.info("Completed Fetching all the messages") websocket.close() def get_messages(code: str): logging.info("Entered get_messages") access_token = code page_token = None messages = [] jobs_query = f'subject:"your order" OR subject:receipts OR subject:receipt OR subject: aankoopbon OR subject:reçu OR subject:invoice OR subject:invoice OR category:purchases' max_results = 10 while True: gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={jobs_query}&maxResults={max_results}" if page_token: gmail_url += f"&pageToken={page_token}" gmail_response = requests.get(gmail_url, headers={"Authorization": f"Bearer {access_token}"}) logging.info(f"{gmail_response}") gmail_data = gmail_response.json() if "messages" in gmail_data: messages.extend(gmail_data["messages"]) # if len(messages) 10: # break if "nextPageToken" in gmail_data: page_token = gmail_data["nextPageToken"] else: break logging.info("Total Length:") logging.info(len(messages)) return messages def fetch_message_data(access_token: str, message_id: str): logging.info(f"Entered fetch_message_data for message_id: {message_id}") message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}" message_response = requests.get(message_url, headers={"Authorization": f"Bearer {access_token}"}) return message_response.json() async def process_message_data(access_token:str,message_data: dict, websocket: WebSocket,message_id:str): logging.info("Entered process_message_data") subject='' body_base64 = '' body_html='' body_text = '' compnay_from_mail = 'others' #Extracting subject subject = extract_subject_from_mail(message_data) company_from_mail = extract_domain_name(message_data['payload']['headers'],subject) if "payload" in message_data and "parts" in message_data["payload"]: #Extracting the domain name from the senders email for part in message_data["payload"]["parts"]: if 'mimeType' not in part: continue mime_type = part['mimeType'] if mime_type == 'text/plain' or mime_type == 'text/html': body_data = part['body'].get('data', '') body_base64 = base64.urlsafe_b64decode(body_data) body_text = extract_text(body_base64) if "body" in part and "attachmentId" not in part["body"]: await process_mail_body_data(websocket , body_text , message_id , company_from_mail) if "body" in part and "attachmentId" in part["body"]: attachment_id = part["body"]["attachmentId"] attachment_data = fetch_attachment_data(access_token, message_data["id"], attachment_id) body_text='' await process_attachment_data(part, attachment_data, websocket,company_from_mail ,message_id) async def process_attachment_data(part: dict, attachment_data: dict, websocket: WebSocket,company_associated:str,message_id:str): logging.info("Entered process_attachment_data") filename = part.get("filename", "untitled.txt") data = attachment_data.get("data", {}) if data: attachment_content = base64.urlsafe_b64decode(data) extracted_text = await util.extract_text_from_attachment(filename, attachment_content) logging.info(f"Extracted text from attachment {filename}: {extracted_text}") await send_chunked_data(websocket, filename, data , company_associated ,message_id) async def process_mail_body_data(websocket:WebSocket ,body_text : str, message_id:str,company_associated:str): await send_chunked_data_without_attch(websocket,body_text,message_id,company_associated) def fetch_attachment_data(access_token: str, message_id: str, attachment_id: str): logging.info(f"Entered fetch_attachment_data for attachment_id: {attachment_id}") attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}" attachment_response = requests.get(attachment_url, headers={"Authorization": f"Bearer {access_token}"}) return attachment_response.json() def extract_subject_from_mail(message_data: dict): if 'payload' in message_data and 'headers' in message_data['payload']: headers = message_data['payload']['headers'] for header in headers: if header['name'] == 'Subject': return header['value'] # If 'Subject' header is not found, return a default value or handle it gracefully return "" else: # If 'payload' or 'headers' are not present, return a default value or handle it gracefully return "" def extract_domain_name(payload:dict,subject:str): domain_name = 'others' for fromdata in payload: if fromdata['name'] == 'From': domain_name = extract_domain_from_email(fromdata['value']) break if 'chanel' in subject.lower(): return 'chanel' if 'louis vuitton' in subject.lower(): return'Louis Vuitton' return domain_name def extract_domain_from_email(email_string:str): # Extracting the email address using regex email_address = re.search(r'[\w\.-]+@[\w\.-]+', email_string).group() # Extracting the domain name from the email address domain = email_address.split('@')[-1].split('.')[0] if email_address and domain : return domain else: return None def extract_text(html_content:str): """ Extracts text and links from HTML content. Args: html_content (str): The HTML content to process. Returns: tuple: A tuple containing the extracted text (str) and links (list of tuples). Raises: ValueError: If the input HTML content is empty or None. """ if not html_content: raise ValueError("HTML content is empty or None") soup = BeautifulSoup(html_content, 'html.parser') # Extract text text = soup.get_text(separator=' ') text = re.sub(r'\s+', ' ', text).strip() print("Printing the extracted text from the html") print(text) print() print() # Extract links links = [(link.text, link['href']) for link in soup.find_all('a', href=True)] return text