hushh-valet-chat / controllers /ws_controller_1.py
Omkar008's picture
Update controllers/ws_controller_1.py
bc632de verified
raw
history blame
No virus
16.7 kB
import asyncio
import base64
import json
import logging
import re
from typing import Optional, List, Dict
import aiohttp
from bs4 import BeautifulSoup
from fastapi import WebSocket
from services import utils as ut
from models.models import Message, Attachment
from concurrent.futures import ThreadPoolExecutor
import functools
thread_pool = ThreadPoolExecutor(max_workers=10)
def get_company_type(company_name:str)->str:
company_types_dict ={'ao yun': 'wines and spirit', 'ardbeg': 'wines and spirit', 'belvedere': 'wines and spirit', 'bodega numanthia': 'wines and spirit', 'chandon': 'wines and spirit', 'château cheval blanc': 'wines and spirit', "château d'yquem": 'wines and spirit', 'château galoupet': 'wines and spirit', 'cheval des andes': 'wines and spirit', 'clos19': 'wines and spirit', 'cloudy bay': 'wines and spirit', 'colgin cellars': 'wines and spirit', 'dom pérignon': 'wines and spirit', 'domaine des lambrays': 'wines and spirit', 'eminente': 'wines and spirit', 'glenmorangie': 'wines and spirit', 'hennessy': 'wines and spirit', 'joseph phelps': 'wines and spirit', 'krug': 'wines and spirit', 'mercier': 'wines and spirit', 'moët & chandon': 'wines and spirit', 'newton vineyard': 'wines and spirit', 'ruinart': 'wines and spirit', 'terrazas de los andes': 'wines and spirit', 'veuve clicquot': 'wines and spirit', 'volcan de mi tierra': 'wines and spirit', 'woodinville': 'wines and spirit' , 'berluti': 'Fashion & Leather Goods', 'celine': 'Fashion & Leather Goods', 'christian dior': 'Fashion & Leather Goods', 'emilio pucci': 'Fashion & Leather Goods', 'fendi': 'Fashion & Leather Goods', 'givenchy': 'Fashion & Leather Goods', 'kenzo': 'Fashion & Leather Goods', 'loewe': 'Fashion & Leather Goods', 'loro piana': 'Fashion & Leather Goods', 'louis vuitton': 'Fashion & Leather Goods', 'marc jacobs': 'Fashion & Leather Goods', 'moynat': 'Fashion & Leather Goods', 'patou': 'Fashion & Leather Goods', 'rimowa': 'Fashion & Leather Goods','acqua di parma': 'Perfumes & Cosmetics', 'benefit cosmetics': 'Perfumes & Cosmetics', 'cha ling': 'Perfumes & Cosmetics', 'fenty beauty by rihanna': 'Perfumes & Cosmetics', 'fresh': 'Perfumes & Cosmetics', 'givenchy parfums': 'Perfumes & Cosmetics', 'guerlain': 'Perfumes & Cosmetics', 'kenzo parfums': 'Perfumes & Cosmetics', 'kvd beauty': 'Perfumes & Cosmetics', 'loewe perfumes': 'Perfumes & Cosmetics', 'maison francis kurkdjian': 'Perfumes & Cosmetics', 'make up for ever': 'Perfumes & Cosmetics', 'officine universelle buly': 'Perfumes & Cosmetics', 'olehenriksen': 'Perfumes & Cosmetics', 'parfums christian dior': 'Perfumes & Cosmetics', 'stella by stella mccartney': 'Perfumes & Cosmetics','bulgari': 'Watches & Jewelry', 'chaumet': 'Watches & Jewelry', 'fred': 'Watches & Jewelry', 'hublot': 'Watches & Jewelry', 'repossi': 'Watches & Jewelry', 'tag heuer': 'Watches & Jewelry', 'tiffany & co.': 'Watches & Jewelry', 'zenith': 'Watches & Jewelry','24s': 'Selective retailing', 'dfs': 'Selective retailing', 'la grande epicerie de paris': 'Selective retailing', 'le bon marché rive gauche': 'Selective retailing', 'sephora': 'Selective retailing','belmond': 'Other activities', 'cheval blanc': 'Other activities', 'connaissance des arts': 'Other activities', 'cova': 'Other activities', 'investir': 'Other activities', "jardin d'acclimatation": 'Other activities', 'le parisien': 'Other activities', 'les echos': 'Other activities', 'radio classique': 'Other activities', 'royal van lent': 'Other activities'}
print(company_types_dict["louis vuitton"])
return company_types_dict.get(company_name.lower(), 'Others')
def extract_subject_from_mail(message_data: dict) -> str:
if 'payload' in message_data and 'headers' in message_data['payload']:
headers = message_data['payload']['headers']
for header in headers:
if header['name'] == 'Subject':
return header['value']
return ""
def extract_domain_name(payload: dict, subject: str) -> str:
domain_name = 'others'
for fromdata in payload:
if fromdata['name'] == 'From':
domain_name = extract_domain_from_email(fromdata['value'])
break
if 'chanel' in subject.lower():
return 'chanel'
if 'louis vuitton' in subject.lower():
return 'Louis Vuitton'
return domain_name
def extract_domain_from_email(email_string: str) -> Optional[str]:
email_match = re.search(r'[\w\.-]+@[\w\.-]+', email_string)
if email_match:
email_address = email_match.group()
domain = email_address.split('@')[-1].split('.')[0]
return domain
return None
def extract_body_from_mail(message_data: dict) -> str:
body = None
if "payload" in message_data:
payload = message_data["payload"]
if "parts" in payload:
for part in payload["parts"]:
if part.get('mimeType') in ['text/plain', 'text/html']:
body_data = part['body'].get('data', '')
if body_data:
body = extract_text(base64.urlsafe_b64decode(body_data))
break
elif 'body' in payload:
body_data = payload['body'].get('data', '')
if body_data:
body = extract_text(base64.urlsafe_b64decode(body_data))
if not body:
body = message_data.get('snippet', '')
return body
def extract_text(html_content: str) -> str:
if not html_content:
return ""
soup = BeautifulSoup(html_content, 'html.parser')
text = soup.get_text(separator=' ')
return re.sub(r'\s+', ' ', text).strip()
# Asynchronous functions
async def fetch_attachment_data(session: aiohttp.ClientSession, access_token: str, message_id: str, attachment_id: str) -> Dict:
attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}"
async with session.get(attachment_url, headers={"Authorization": f"Bearer {access_token}"}) as response:
return await response.json()
# async def extract_attachments_from_mail(session: aiohttp.ClientSession, access_token: str, message_data: dict) -> tuple[List[Attachment], List[Dict]]:
# attachments = []
# structured_data = []
# if "payload" in message_data and "parts" in message_data["payload"]:
# tasks = []
# for part in message_data["payload"]["parts"]:
# if "body" in part and "attachmentId" in part["body"]:
# attachment_id = part["body"]["attachmentId"]
# filename = part.get("filename", "untitled.txt")
# if not filename.endswith((".zip", ".txt", ".png", ".jpg", ".jpeg", ".gif")):
# tasks.append(fetch_attachment_data(session, access_token, message_data["id"], attachment_id))
# attachment_data_list = await asyncio.gather(*tasks)
# for part, attachment_data in zip(message_data["payload"]["parts"], attachment_data_list):
# filename = part.get("filename", "untitled.txt")
# if filename.endswith((".zip", ".txt", ".png", ".jpg", ".jpeg", ".gif")):
# continue
# data = attachment_data.get("data", "")
# raw_text = ut.extract_text_from_attachment(filename, data)
# if raw_text is None or len(raw_text)<10 or len(raw_text.splitlines()) <= 2:
# continue
# struct_data = ut.strcuture_document_data(raw_text)
# if struct_data:
# structured_data.append(struct_data)
# attachments.append(Attachment(attachment_len=len(data), filename=filename, data=data))
# return attachments, structured_data
async def extract_attachments_from_mail(session: aiohttp.ClientSession, access_token: str, message_data: dict):
attachments = []
structured_data = []
if "payload" in message_data and "parts" in message_data["payload"]:
attachment_tasks = []
for part in message_data["payload"]["parts"]:
if "body" in part and "attachmentId" in part["body"]:
attachment_id = part["body"]["attachmentId"]
filename = part.get("filename", "untitled.txt")
if not filename.endswith((".zip", ".txt", ".png", ".jpg", ".jpeg", ".gif")):
attachment_tasks.append(fetch_attachment_data(session, access_token, message_data["id"], attachment_id))
# Fetch structure data concurrently with attachments
structure_task = asyncio.create_task(ut.strcuture_document_data(session, access_token, message_data["id"]))
# Use ThreadPoolExecutor for CPU-bound tasks
with ThreadPoolExecutor() as executor:
loop = asyncio.get_event_loop()
attachment_data_list, structure_data = await asyncio.gather(
loop.run_in_executor(executor, lambda: asyncio.gather(*attachment_tasks)),
structure_task
)
attachments.extend(attachment_data_list)
structured_data.append(structure_data)
return attachments, structured_data
async def fetch_message_data(session: aiohttp.ClientSession, access_token: str, message_id: str) -> Message:
message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}"
async with session.get(message_url, headers={"Authorization": f"Bearer {access_token}"}) as response:
message_data = await response.json()
subject = extract_subject_from_mail(message_data)
company_from_mail = extract_domain_name(message_data['payload']['headers'], subject)
body = extract_body_from_mail(message_data)
attachments, structured_attachment_data = await extract_attachments_from_mail(session, access_token, message_data)
high_level_company_type = get_company_type(company_from_mail)
body_len = len(body) if body is not None else 0
return Message(
message_id=message_id,
body_len=body_len,
body=body,
attachments=attachments,
company=company_from_mail,
high_level_company_type=high_level_company_type,
structured_data=structured_attachment_data
)
# async def process_message(message: Message, websocket: WebSocket, chunk_size: int):
# message_json = message.to_json()
# await send_message_in_chunks(websocket, message_json, chunk_size)
# await websocket.send_text("NEXT_MESSAGE")
async def get_messages(code: str, websocket: WebSocket,start, brand_name: Optional[str] = None):
access_token = code
g_query = f'(subject:"your order" OR subject:receipts OR subject:receipt OR subject:aankoopbon OR subject:reçu OR subject:invoice OR subject:invoices OR category:purchases) has:attachment'
if brand_name:
g_query += f' AND from:{brand_name}'
page_token = None
processed_count = 0
skipped_count = 0
async with aiohttp.ClientSession() as session:
while True:
gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={g_query}&maxResults=10"
if page_token:
gmail_url += f"&pageToken={page_token}"
async with session.get(gmail_url, headers={"Authorization": f"Bearer {access_token}"}) as response:
gmail_data = await response.json()
if "messages" not in gmail_data:
await websocket.send_text(json.dumps({"status": "No messages found"}))
break
message_tasks = [fetch_message(session, access_token, message["id"]) for message in gmail_data["messages"]]
full_messages = await asyncio.gather(*message_tasks)
processing_tasks = [process_message(session, access_token, message) for message in full_messages]
for future in asyncio.as_completed(processing_tasks):
processed_message = await future
if processed_message is not None:
await send_message_in_chunks(websocket,processed_message.to_json(),50000)
# await websocket.send_text(json.dumps(processed_message.to_json()))
await websocket.send_text("NEXT_MESSAGE")
processed_count += 1
else:
skipped_count += 1
if "nextPageToken" in gmail_data:
page_token = gmail_data["nextPageToken"]
else:
break
logging.info(f"Processed Message {processed_count}")
logging.info(f"Skipped Message {skipped_count}")
async def send_message_in_chunks(websocket: WebSocket, message_json: dict, chunk_size: int):
message_str = json.dumps(message_json)
for i in range(0, len(message_str), chunk_size):
await websocket.send_text(message_str[i:i + chunk_size])
async def websocket_main(code: str, websocket: WebSocket,start,brand_name: Optional[str] = None):
access_token = code
await get_messages(access_token,websocket,start,brand_name)
await websocket.close()
# Adjust the number of workers as needed
async def run_in_thread(func, *args, **kwargs):
return await asyncio.get_event_loop().run_in_executor(thread_pool, functools.partial(func, *args, **kwargs))
async def fetch_message(session: aiohttp.ClientSession, access_token: str, message_id: str):
message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}"
async with session.get(message_url, headers={"Authorization": f"Bearer {access_token}"}) as response:
return await response.json()
async def fetch_attachment(session: aiohttp.ClientSession, access_token: str, message_id: str, attachment_id: str):
attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}"
async with session.get(attachment_url, headers={"Authorization": f"Bearer {access_token}"}) as response:
return await response.json()
def process_attachment(filename: str, data: str):
raw_text = ut.extract_text_from_attachment(filename, data)
return ut.strcuture_document_data(raw_text)
async def process_message(session: aiohttp.ClientSession, access_token: str, message_data: dict):
subject = extract_subject_from_mail(message_data)
# if 'payload' in message_data and 'headers' in message_data['payload']:
# company_from_mail = extract_domain_name(message_data['payload']['headers'], subject)
# else:
# # If 'payload' or 'headers' is missing, use a default value or extract from another field
# company_from_mail = "unknown" # or some other default value
# logging.warning(f"Payload or headers missing for message ID: {message_data.get('id', 'unknown')}")
# return None
company_from_mail = extract_domain_name(message_data['payload']['headers'], subject)
body = extract_body_from_mail(message_data)
attachments = []
structured_data = []
if "payload" in message_data and "parts" in message_data["payload"]:
attachment_tasks = []
for part in message_data["payload"]["parts"]:
if "body" in part and "attachmentId" in part["body"]:
attachment_id = part["body"]["attachmentId"]
filename = part.get("filename", "untitled.txt")
if filename.endswith((".zip", ".txt", ".png", ".jpg", ".jpeg", ".gif")):
continue
task = fetch_attachment(session, access_token, message_data["id"], attachment_id)
attachment_tasks.append((filename, task))
attachment_results = await asyncio.gather(*(task for _, task in attachment_tasks))
processing_tasks = []
for (filename, _), attachment_data in zip(attachment_tasks, attachment_results):
data = attachment_data.get("data", "")
processing_tasks.append(run_in_thread(process_attachment, filename, data))
attachments.append(Attachment(attachment_len=len(data), filename=filename, data=data))
structured_data = await asyncio.gather(*processing_tasks)
structured_data = [data for data in structured_data if data]
high_level_company_type = await run_in_thread(get_company_type, company_from_mail)
body_len = len(body) if body is not None else 0
return Message(
message_id=message_data["id"],
body_len=body_len,
body=body,
attachments=attachments,
company=company_from_mail,
high_level_company_type=high_level_company_type,
structured_data=structured_data
)