import base64 import json import logging import re from concurrent.futures import ProcessPoolExecutor from concurrent.futures import ThreadPoolExecutor from typing import Optional, List, Dict import requests from bs4 import BeautifulSoup from models.models import Message, Attachment from fastapi import WebSocket from services import utils as ut import time import concurrent.futures # from models import supabase_models as sp import asyncio def get_company_type(company_name:str)->str: company_types_dict ={'ao yun': 'wines and spirit', 'ardbeg': 'wines and spirit', 'belvedere': 'wines and spirit', 'bodega numanthia': 'wines and spirit', 'chandon': 'wines and spirit', 'château cheval blanc': 'wines and spirit', "château d'yquem": 'wines and spirit', 'château galoupet': 'wines and spirit', 'cheval des andes': 'wines and spirit', 'clos19': 'wines and spirit', 'cloudy bay': 'wines and spirit', 'colgin cellars': 'wines and spirit', 'dom pérignon': 'wines and spirit', 'domaine des lambrays': 'wines and spirit', 'eminente': 'wines and spirit', 'glenmorangie': 'wines and spirit', 'hennessy': 'wines and spirit', 'joseph phelps': 'wines and spirit', 'krug': 'wines and spirit', 'mercier': 'wines and spirit', 'moët & chandon': 'wines and spirit', 'newton vineyard': 'wines and spirit', 'ruinart': 'wines and spirit', 'terrazas de los andes': 'wines and spirit', 'veuve clicquot': 'wines and spirit', 'volcan de mi tierra': 'wines and spirit', 'woodinville': 'wines and spirit' , 'berluti': 'Fashion & Leather Goods', 'celine': 'Fashion & Leather Goods', 'christian dior': 'Fashion & Leather Goods', 'emilio pucci': 'Fashion & Leather Goods', 'fendi': 'Fashion & Leather Goods', 'givenchy': 'Fashion & Leather Goods', 'kenzo': 'Fashion & Leather Goods', 'loewe': 'Fashion & Leather Goods', 'loro piana': 'Fashion & Leather Goods', 'louis vuitton': 'Fashion & Leather Goods', 'marc jacobs': 'Fashion & Leather Goods', 'moynat': 'Fashion & Leather Goods', 'patou': 'Fashion & Leather Goods', 'rimowa': 'Fashion & Leather Goods','acqua di parma': 'Perfumes & Cosmetics', 'benefit cosmetics': 'Perfumes & Cosmetics', 'cha ling': 'Perfumes & Cosmetics', 'fenty beauty by rihanna': 'Perfumes & Cosmetics', 'fresh': 'Perfumes & Cosmetics', 'givenchy parfums': 'Perfumes & Cosmetics', 'guerlain': 'Perfumes & Cosmetics', 'kenzo parfums': 'Perfumes & Cosmetics', 'kvd beauty': 'Perfumes & Cosmetics', 'loewe perfumes': 'Perfumes & Cosmetics', 'maison francis kurkdjian': 'Perfumes & Cosmetics', 'make up for ever': 'Perfumes & Cosmetics', 'officine universelle buly': 'Perfumes & Cosmetics', 'olehenriksen': 'Perfumes & Cosmetics', 'parfums christian dior': 'Perfumes & Cosmetics', 'stella by stella mccartney': 'Perfumes & Cosmetics','bulgari': 'Watches & Jewelry', 'chaumet': 'Watches & Jewelry', 'fred': 'Watches & Jewelry', 'hublot': 'Watches & Jewelry', 'repossi': 'Watches & Jewelry', 'tag heuer': 'Watches & Jewelry', 'tiffany & co.': 'Watches & Jewelry', 'zenith': 'Watches & Jewelry','24s': 'Selective retailing', 'dfs': 'Selective retailing', 'la grande epicerie de paris': 'Selective retailing', 'le bon marché rive gauche': 'Selective retailing', 'sephora': 'Selective retailing','belmond': 'Other activities', 'cheval blanc': 'Other activities', 'connaissance des arts': 'Other activities', 'cova': 'Other activities', 'investir': 'Other activities', "jardin d'acclimatation": 'Other activities', 'le parisien': 'Other activities', 'les echos': 'Other activities', 'radio classique': 'Other activities', 'royal van lent': 'Other activities'} print(company_types_dict["louis vuitton"]) return company_types_dict.get(company_name.lower(), 'Others') async def get_messages(code: str,websocket:WebSocket,start,brand_name: Optional[str] = None): access_token = code total_processed = 0 g_query = f'(subject:"your order" OR subject:receipts OR subject:receipt OR subject:aankoopbon OR subject:reçu OR subject:invoice OR subject:invoices OR category:purchases) has:attachment' if brand_name is not None: g_query = f'(subject:"your order" OR subject:receipts OR subject:receipt OR subject: aankoopbon OR subject:reçu OR subject:invoice OR subject:invoices OR category:purchases OR from:{brand_name}) AND subject:{brand_name} has:attachment' page_token = None messages = [] # max_results = 10 # gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={jobs_query}&maxResults={max_results}" # gmail_response = requests.get(gmail_url, headers={"Authorization": f"Bearer {access_token}"}) # gmail_data = gmail_response.json() # messages.append(gmail_data['messages']) def fetch_message_wrapper(message_data,websocket:WebSocket): message_id = message_data.get("id") if message_id: return fetch_message_data(access_token, message_id) return None end = time.time() print("time 0") print(end - start) start1 = time.time() while True: start2= time.time() gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={g_query}&maxResults=20" if page_token: gmail_url += f"&pageToken={page_token}" # print(gmail_url) gmail_response = requests.get(gmail_url, headers={"Authorization": f"Bearer {access_token}"}) # print(gmail_response.text) end2 = time.time() print("End 2 ") print(end2-start2) print("response length") print(gmail_response.content.__len__()) gmail_data = gmail_response.json() # print(gmail_data) print(len(gmail_data['messages'])) if "messages" in gmail_data: # for message_data in gmail_data['messages']: # message = fetch_message_wrapper(message_data,websocket) # await process_message(message, websocket, 2000000) with ThreadPoolExecutor(max_workers=20) as executor: futures=[executor.submit(fetch_message_wrapper, message_data,websocket) for message_data in gmail_data["messages"]] print("Futures length") print(len(futures)) # print(futures) for future in concurrent.futures.as_completed(futures): message = future.result() # print(message) if message: total_processed += 1 # Process and send the message immediately end1 = time.time() print("time 1") print("sending the message") await process_message(message, websocket, 100000) # # if message: # # messages.append(message) # print("Messages to be sent") # # print(messages)s # print(len(messages)) # # for message_data in messages: # # await process_message(message_data,websocket,10000) if "nextPageToken" in gmail_data: page_token = gmail_data["nextPageToken"] else: break print(f"Total messages processed: {total_processed}") logging.info(f"Total Processed Messages : {total_processed}") print("printing messages") # print(messages) return messages async def process_message(message:Message, websocket:WebSocket, chunk_size:int): logging.info("process_message") print(message) if message: message_json = message.to_json() # logging.info(f"{message_json}") await send_message_in_chunks(websocket, message_json, 50000) # await websocket.send_text(str(message_json)) await websocket.send_text("NEXT_MESSAGE") def fetch_message_data(access_token: str, message_id: str) -> Message: message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}" message_response = requests.get(message_url, headers={"Authorization": f"Bearer {access_token}"}) message_data = message_response.json() # print(message_data) subject = extract_subject_from_mail(message_data) print("printing message data") print(message_data) company_from_mail = extract_domain_name(message_data['payload']['headers'], subject) body = extract_body_from_mail(message_data) attachments,structed_attachment_data = extract_attachments_from_mail(access_token, message_data) high_level_company_type = get_company_type(company_from_mail) body_len = 0 if body is not None : body_len = len(body) return Message(message_id=message_id, body_len=body_len,body=body, attachments=attachments, company=company_from_mail,high_level_company_type=high_level_company_type,structured_data = structed_attachment_data) # def fetch_message_data(access_token: str, message_id: str) -> Message: # message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}" # message_response = requests.get(message_url, headers={"Authorization": f"Bearer {access_token}"}) # message_data = message_response.json() # with ThreadPoolExecutor(max_workers=50) as executor: # # Submit tasks to executor # future_subject = executor.submit(extract_subject_from_mail, message_data) # subject = future_subject.result() # future_company_from_mail = executor.submit(extract_domain_name, message_data['payload']['headers'], future_subject.result()) # company_from_mail = future_company_from_mail.result() # future_body = executor.submit(extract_body_from_mail, message_data) # body = future_body.result() # # Extract attachments and measure time # future_attachments = executor.submit(extract_attachments_from_mail, access_token, message_data) # attachments, structured_attachment_data = future_attachments.result() # future_high_level_company_type = executor.submit(get_company_type, future_company_from_mail.result()) # high_level_company_type = future_high_level_company_type.result() # body_len = len(body) if body is not None else 0 # return Message( # message_id=message_id, # body_len=body_len, # body=body, # attachments=attachments, # company=company_from_mail, # high_level_company_type=high_level_company_type, # structured_data=structured_attachment_data # ) def extract_subject_from_mail(message_data: dict) -> str: if 'payload' in message_data and 'headers' in message_data['payload']: headers = message_data['payload']['headers'] for header in headers: if header['name'] == 'Subject': return header['value'] return "" else: return "" def extract_domain_name(payload: dict, subject: str) -> str: domain_name = 'others' for fromdata in payload: if fromdata['name'] == 'From': domain_name = extract_domain_from_email(fromdata['value']) break if 'chanel' in subject.lower(): return 'chanel' if 'louis vuitton' in subject.lower(): return 'Louis Vuitton' return domain_name def extract_domain_from_email(email_string: str) -> Optional[str]: email_address = re.search(r'[\w\.-]+@[\w\.-]+', email_string).group() domain = email_address.split('@')[-1].split('.')[0] if email_address and domain: return domain else: return None def extract_body_from_mail(message_data: dict) -> str: body = None if "payload" in message_data: payload = message_data["payload"] if "parts" in payload: for part in payload["parts"]: if 'mimeType' in part and (part['mimeType'] == 'text/plain' or part['mimeType'] == 'text/html'): body_data = part['body'].get('data', '') if body_data: body_base64 = base64.urlsafe_b64decode(body_data) body = extract_text(body_base64) elif 'body' in payload: body_data = payload['body'].get('data', '') if body_data: body_base64 = base64.urlsafe_b64decode(body_data) body = extract_text(body_base64) elif 'parts' in payload['body']: for part in payload['body']['parts']: if 'mimeType' in part and (part['mimeType'] == 'text/plain' or part['mimeType'] == 'text/html'): body_data = part['body'].get('data', '') if body_data: body_base64 = base64.urlsafe_b64decode(body_data) body = extract_text(body_base64) if not body: body = message_data.get('snippet', '') return body def fetch_attachment_data(access_token: str, message_id: str, attachment_id: str) -> Dict: attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}" attachment_response = requests.get(attachment_url, headers={"Authorization": f"Bearer {access_token}"}) return attachment_response.json() def extract_attachments_from_mail(access_token: str, message_data: dict) -> List[Attachment]: attachments = [] structured_data = [] if "payload" in message_data and "parts" in message_data["payload"]: for part in message_data["payload"]["parts"]: if "body" in part and "attachmentId" in part["body"]: attachment_id = part["body"]["attachmentId"] attachment_data = fetch_attachment_data(access_token, message_data["id"], attachment_id) filename = part.get("filename", "untitled.txt") if filename.endswith(".zip") or filename.endswith(".txt") or filename.endswith(".png") or filename.endswith(".jpg") or filename.endswith(".jpeg") or filename.endswith(".gif"): continue data = attachment_data.get("data", "") try: raw_text = ut.extract_text_from_attachment(filename, data) except Exception as e: print(f"Error processing attachment {filename}: {str(e)}") continue struct_data = ut.strcuture_document_data(raw_text) st_str = """ { "brand": "INSERT BRAND NAME FROM THE RECEIPT OCR TEXT. IF NOT PRESENT RETURN null", "total_cost": "INSERT TOTAL COST FROM THE RECEIPT OCR TEXT. TOTAL AMOUNT IS MAXIMUM VALUE IN THE OCR TEXT. IF NOT PRESENT RETURN null", "location": "INSERT LOCATION FROM THE RECEIPT OCR TEXT. IF NOT PRESENT RETURN null", "purchase_category": "INSERT PURCHASE CATEGORY FROM THE RECEIPT OCR TEXT. IF NOT PRESENT RETURN null", "brand_category": "INSERT BRAND CATEGORY FROM THE RECEIPT OCR TEXT. CHOOSE CLOSEST BRAND CATEGORY BASED ON THE OCR FROM THIS ARRAY [\"Fashion and Apparel\",\"Jewelry and Watches\",\"Beauty and Personal Care\",\"Automobiles\",\"Real Estate\",\"Travel and Leisure\",\"Culinary Services\",\"Home and Lifestyle\",\"Technology and Electronics\",\"Sports and Leisure\",\"Art and Collectibles\",\"Health and Wellness\",\"Stationery and Writing Instruments\",\"Children and Baby\",\"Pet Accessories\",\"Financial Services\",\"Airline Services\",\"Accommodation Services\",\"Beverages Services\",\"Services\"] ELSE IF NOT PRESENT RETURN null", "Date": "INSERT RECEIPT DATE FROM THE RECEIPT OCR TEXT. IF NOT PRESENT RETURN null. FORMAT: dd-mm-yyyy", "currency": "INSERT CURRENCY FROM THE RECEIPT OCR TEXT. LOOK FOR CURRENCY SYMBOLS (e.g., $, \u20ac, \u00a3, \u00a5) OR CURRENCY CODES (e.g., USD, EUR, GBP, JPY).ALWAYS RETURN CURRENCY CODE.IF NOT FOUND RETURN null.", "filename": "GENERATE A FILENAME BASED ON THE RECEIPT OCR TEXT. USE THE FORMAT: 'PURCHASE_TYPE_BRAND_DATE' (e.g., 'clothing_gucci_20230715'). USE UNDERSCORES FOR SPACES.IF YOU CANNOT FIND THE COMPONENTS RETURN THIS FIELD AS NULL.", "payment_method": "INSERT PAYMENT METHOD FROM THE RECEIPT OCR TEXT. LOOK FOR KEYWORDS LIKE 'CASH', 'CARD', 'CREDIT', 'DEBIT', 'VISA', 'MASTERCARD', 'AMEX', 'PAYPAL', ETC. IF NOT FOUND RETURN null." } """ if struct_data is None or struct_data == st_str : struct_data = None else: structured_data.append(struct_data) attachments.append(Attachment(attachment_len = len(attachment_data.get("data", "")),filename=filename, data=attachment_data.get("data", ""))) return attachments,structured_data def extract_text(html_content: str) -> str: if not html_content: raise ValueError("HTML content is empty or None") soup = BeautifulSoup(html_content, 'html.parser') text = soup.get_text(separator=' ') text = re.sub(r'\s+', ' ', text).strip() return text async def websocket_main(code: str, websocket: WebSocket,start,brand_name: Optional[str] = None): access_token = code await get_messages(access_token,websocket,start,brand_name) await websocket.close() async def send_message_in_chunks(websocket: WebSocket, message_json: dict, chunk_size: int): # if message_json['attachments'] is not None : # for attch in message_json['attachments']: # attachment_len = attch['attachment_len'] # print(body_len) # print(attachment_len) # if attachment_len == 0: # attachment_len = None # await websocket.send_json({"body_len":body_len ,"attachment_len":attachment_len}) message_str = json.dumps(message_json) for i in range(0, len(message_str), chunk_size): await websocket.send_text(message_str[i:i + chunk_size])