Spaces:
Sleeping
Sleeping
import asyncio | |
import base64 | |
import json | |
import logging | |
import re | |
from typing import Optional, List, Dict | |
import aiohttp | |
from bs4 import BeautifulSoup | |
from fastapi import WebSocket | |
from services import utils as ut | |
from models.models import Message, Attachment | |
from concurrent.futures import ThreadPoolExecutor | |
import functools | |
thread_pool = ThreadPoolExecutor(max_workers=10) | |
def get_company_type(company_name:str)->str: | |
company_types_dict ={'ao yun': 'wines and spirit', 'ardbeg': 'wines and spirit', 'belvedere': 'wines and spirit', 'bodega numanthia': 'wines and spirit', 'chandon': 'wines and spirit', 'château cheval blanc': 'wines and spirit', "château d'yquem": 'wines and spirit', 'château galoupet': 'wines and spirit', 'cheval des andes': 'wines and spirit', 'clos19': 'wines and spirit', 'cloudy bay': 'wines and spirit', 'colgin cellars': 'wines and spirit', 'dom pérignon': 'wines and spirit', 'domaine des lambrays': 'wines and spirit', 'eminente': 'wines and spirit', 'glenmorangie': 'wines and spirit', 'hennessy': 'wines and spirit', 'joseph phelps': 'wines and spirit', 'krug': 'wines and spirit', 'mercier': 'wines and spirit', 'moët & chandon': 'wines and spirit', 'newton vineyard': 'wines and spirit', 'ruinart': 'wines and spirit', 'terrazas de los andes': 'wines and spirit', 'veuve clicquot': 'wines and spirit', 'volcan de mi tierra': 'wines and spirit', 'woodinville': 'wines and spirit' , 'berluti': 'Fashion & Leather Goods', 'celine': 'Fashion & Leather Goods', 'christian dior': 'Fashion & Leather Goods', 'emilio pucci': 'Fashion & Leather Goods', 'fendi': 'Fashion & Leather Goods', 'givenchy': 'Fashion & Leather Goods', 'kenzo': 'Fashion & Leather Goods', 'loewe': 'Fashion & Leather Goods', 'loro piana': 'Fashion & Leather Goods', 'louis vuitton': 'Fashion & Leather Goods', 'marc jacobs': 'Fashion & Leather Goods', 'moynat': 'Fashion & Leather Goods', 'patou': 'Fashion & Leather Goods', 'rimowa': 'Fashion & Leather Goods','acqua di parma': 'Perfumes & Cosmetics', 'benefit cosmetics': 'Perfumes & Cosmetics', 'cha ling': 'Perfumes & Cosmetics', 'fenty beauty by rihanna': 'Perfumes & Cosmetics', 'fresh': 'Perfumes & Cosmetics', 'givenchy parfums': 'Perfumes & Cosmetics', 'guerlain': 'Perfumes & Cosmetics', 'kenzo parfums': 'Perfumes & Cosmetics', 'kvd beauty': 'Perfumes & Cosmetics', 'loewe perfumes': 'Perfumes & Cosmetics', 'maison francis kurkdjian': 'Perfumes & Cosmetics', 'make up for ever': 'Perfumes & Cosmetics', 'officine universelle buly': 'Perfumes & Cosmetics', 'olehenriksen': 'Perfumes & Cosmetics', 'parfums christian dior': 'Perfumes & Cosmetics', 'stella by stella mccartney': 'Perfumes & Cosmetics','bulgari': 'Watches & Jewelry', 'chaumet': 'Watches & Jewelry', 'fred': 'Watches & Jewelry', 'hublot': 'Watches & Jewelry', 'repossi': 'Watches & Jewelry', 'tag heuer': 'Watches & Jewelry', 'tiffany & co.': 'Watches & Jewelry', 'zenith': 'Watches & Jewelry','24s': 'Selective retailing', 'dfs': 'Selective retailing', 'la grande epicerie de paris': 'Selective retailing', 'le bon marché rive gauche': 'Selective retailing', 'sephora': 'Selective retailing','belmond': 'Other activities', 'cheval blanc': 'Other activities', 'connaissance des arts': 'Other activities', 'cova': 'Other activities', 'investir': 'Other activities', "jardin d'acclimatation": 'Other activities', 'le parisien': 'Other activities', 'les echos': 'Other activities', 'radio classique': 'Other activities', 'royal van lent': 'Other activities'} | |
print(company_types_dict["louis vuitton"]) | |
return company_types_dict.get(company_name.lower(), 'Others') | |
def extract_subject_from_mail(message_data: dict) -> str: | |
if 'payload' in message_data and 'headers' in message_data['payload']: | |
headers = message_data['payload']['headers'] | |
for header in headers: | |
if header['name'] == 'Subject': | |
return header['value'] | |
return "" | |
def extract_domain_name(payload: dict, subject: str) -> str: | |
domain_name = 'others' | |
for fromdata in payload: | |
if fromdata['name'] == 'From': | |
domain_name = extract_domain_from_email(fromdata['value']) | |
break | |
if 'chanel' in subject.lower(): | |
return 'chanel' | |
if 'louis vuitton' in subject.lower(): | |
return 'Louis Vuitton' | |
return domain_name | |
def extract_domain_from_email(email_string: str) -> Optional[str]: | |
email_match = re.search(r'[\w\.-]+@[\w\.-]+', email_string) | |
if email_match: | |
email_address = email_match.group() | |
domain = email_address.split('@')[-1].split('.')[0] | |
return domain | |
return None | |
def extract_body_from_mail(message_data: dict) -> str: | |
body = None | |
if "payload" in message_data: | |
payload = message_data["payload"] | |
if "parts" in payload: | |
for part in payload["parts"]: | |
if part.get('mimeType') in ['text/plain', 'text/html']: | |
body_data = part['body'].get('data', '') | |
if body_data: | |
body = extract_text(base64.urlsafe_b64decode(body_data)) | |
break | |
elif 'body' in payload: | |
body_data = payload['body'].get('data', '') | |
if body_data: | |
body = extract_text(base64.urlsafe_b64decode(body_data)) | |
if not body: | |
body = message_data.get('snippet', '') | |
return body | |
def extract_text(html_content: str) -> str: | |
if not html_content: | |
return "" | |
soup = BeautifulSoup(html_content, 'html.parser') | |
text = soup.get_text(separator=' ') | |
return re.sub(r'\s+', ' ', text).strip() | |
# Asynchronous functions | |
async def fetch_attachment_data(session: aiohttp.ClientSession, access_token: str, message_id: str, attachment_id: str) -> Dict: | |
attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}" | |
async with session.get(attachment_url, headers={"Authorization": f"Bearer {access_token}"}) as response: | |
return await response.json() | |
async def extract_attachments_from_mail(session: aiohttp.ClientSession, access_token: str, message_data: dict) -> tuple[List[Attachment], List[Dict]]: | |
attachments = [] | |
structured_data = [] | |
if "payload" in message_data and "parts" in message_data["payload"]: | |
tasks = [] | |
for part in message_data["payload"]["parts"]: | |
if "body" in part and "attachmentId" in part["body"]: | |
attachment_id = part["body"]["attachmentId"] | |
filename = part.get("filename", "untitled.txt") | |
if not filename.endswith((".zip", ".txt", ".png", ".jpg", ".jpeg", ".gif")): | |
tasks.append(fetch_attachment_data(session, access_token, message_data["id"], attachment_id)) | |
attachment_data_list = await asyncio.gather(*tasks) | |
for part, attachment_data in zip(message_data["payload"]["parts"], attachment_data_list): | |
filename = part.get("filename", "untitled.txt") | |
if filename.endswith((".zip", ".txt", ".png", ".jpg", ".jpeg", ".gif")): | |
continue | |
data = attachment_data.get("data", "") | |
raw_text = ut.extract_text_from_attachment(filename, data) | |
struct_data = ut.strcuture_document_data(raw_text) | |
if struct_data: | |
structured_data.append(struct_data) | |
attachments.append(Attachment(attachment_len=len(data), filename=filename, data=data)) | |
return attachments, structured_data | |
async def fetch_message_data(session: aiohttp.ClientSession, access_token: str, message_id: str) -> Message: | |
message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}" | |
async with session.get(message_url, headers={"Authorization": f"Bearer {access_token}"}) as response: | |
message_data = await response.json() | |
subject = extract_subject_from_mail(message_data) | |
company_from_mail = extract_domain_name(message_data['payload']['headers'], subject) | |
body = extract_body_from_mail(message_data) | |
attachments, structured_attachment_data = await extract_attachments_from_mail(session, access_token, message_data) | |
high_level_company_type = get_company_type(company_from_mail) | |
body_len = len(body) if body is not None else 0 | |
return Message( | |
message_id=message_id, | |
body_len=body_len, | |
body=body, | |
attachments=attachments, | |
company=company_from_mail, | |
high_level_company_type=high_level_company_type, | |
structured_data=structured_attachment_data | |
) | |
# async def process_message(message: Message, websocket: WebSocket, chunk_size: int): | |
# message_json = message.to_json() | |
# await send_message_in_chunks(websocket, message_json, chunk_size) | |
# await websocket.send_text("NEXT_MESSAGE") | |
async def get_messages(code: str, websocket: WebSocket,start, brand_name: Optional[str] = None): | |
access_token = code | |
g_query = f'(subject:"your order" OR subject:receipts OR subject:receipt OR subject:aankoopbon OR subject:reçu OR subject:invoice OR subject:invoices OR category:purchases) has:attachment' | |
if brand_name: | |
g_query += f' AND from:{brand_name}' | |
page_token = None | |
processed_count = 0 | |
skipped_count = 0 | |
async with aiohttp.ClientSession() as session: | |
while True: | |
gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={g_query}&maxResults=50" | |
if page_token: | |
gmail_url += f"&pageToken={page_token}" | |
async with session.get(gmail_url, headers={"Authorization": f"Bearer {access_token}"}) as response: | |
gmail_data = await response.json() | |
if "messages" not in gmail_data: | |
await websocket.send_text(json.dumps({"status": "No messages found"})) | |
break | |
message_tasks = [fetch_message(session, access_token, message["id"]) for message in gmail_data["messages"]] | |
full_messages = await asyncio.gather(*message_tasks) | |
processing_tasks = [process_message(session, access_token, message) for message in full_messages] | |
for future in asyncio.as_completed(processing_tasks): | |
processed_message = await future | |
if processed_message is not None: | |
await websocket.send_text(json.dumps(processed_message.to_json())) | |
await websocket.send_text("NEXT_MESSAGE") | |
processed_count += 1 | |
else: | |
skipped_count += 1 | |
if "nextPageToken" in gmail_data: | |
page_token = gmail_data["nextPageToken"] | |
else: | |
break | |
logging.info(f"Processed Message {processed_count}") | |
logging.info(f"Skipped Message {skipped_count}") | |
async def send_message_in_chunks(websocket: WebSocket, message_json: dict, chunk_size: int): | |
message_str = json.dumps(message_json) | |
for i in range(0, len(message_str), chunk_size): | |
await websocket.send_text(message_str[i:i + chunk_size]) | |
async def websocket_main(code: str, websocket: WebSocket,start,brand_name: Optional[str] = None): | |
access_token = code | |
await get_messages(access_token,websocket,start,brand_name) | |
await websocket.close() | |
# Adjust the number of workers as needed | |
async def run_in_thread(func, *args, **kwargs): | |
return await asyncio.get_event_loop().run_in_executor(thread_pool, functools.partial(func, *args, **kwargs)) | |
async def fetch_message(session: aiohttp.ClientSession, access_token: str, message_id: str): | |
message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}" | |
async with session.get(message_url, headers={"Authorization": f"Bearer {access_token}"}) as response: | |
return await response.json() | |
async def fetch_attachment(session: aiohttp.ClientSession, access_token: str, message_id: str, attachment_id: str): | |
attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}" | |
async with session.get(attachment_url, headers={"Authorization": f"Bearer {access_token}"}) as response: | |
return await response.json() | |
def process_attachment(filename: str, data: str): | |
raw_text = ut.extract_text_from_attachment(filename, data) | |
return ut.strcuture_document_data(raw_text) | |
async def process_message(session: aiohttp.ClientSession, access_token: str, message_data: dict): | |
subject = extract_subject_from_mail(message_data) | |
if 'payload' in message_data and 'headers' in message_data['payload']: | |
company_from_mail = extract_domain_name(message_data['payload']['headers'], subject) | |
else: | |
# If 'payload' or 'headers' is missing, use a default value or extract from another field | |
company_from_mail = "unknown" # or some other default value | |
return None | |
logging.warning(f"Payload or headers missing for message ID: {message_data.get('id', 'unknown')}") | |
body = extract_body_from_mail(message_data) | |
attachments = [] | |
structured_data = [] | |
if "payload" in message_data and "parts" in message_data["payload"]: | |
attachment_tasks = [] | |
for part in message_data["payload"]["parts"]: | |
if "body" in part and "attachmentId" in part["body"]: | |
attachment_id = part["body"]["attachmentId"] | |
filename = part.get("filename", "untitled.txt") | |
if filename.endswith((".zip", ".txt", ".png", ".jpg", ".jpeg", ".gif")): | |
continue | |
task = fetch_attachment(session, access_token, message_data["id"], attachment_id) | |
attachment_tasks.append((filename, task)) | |
attachment_results = await asyncio.gather(*(task for _, task in attachment_tasks)) | |
processing_tasks = [] | |
for (filename, _), attachment_data in zip(attachment_tasks, attachment_results): | |
data = attachment_data.get("data", "") | |
processing_tasks.append(run_in_thread(process_attachment, filename, data)) | |
attachments.append(Attachment(attachment_len=len(data), filename=filename, data=data)) | |
structured_data = await asyncio.gather(*processing_tasks) | |
structured_data = [data for data in structured_data if data] | |
high_level_company_type = await run_in_thread(get_company_type, company_from_mail) | |
body_len = len(body) if body is not None else 0 | |
return Message( | |
message_id=message_data["id"], | |
body_len=body_len, | |
body=body, | |
attachments=attachments, | |
company=company_from_mail, | |
high_level_company_type=high_level_company_type, | |
structured_data=structured_data | |
) | |