Spaces:
Sleeping
Sleeping
# def get_emails(request: Request): | |
# access_token = request.session.get('access_token') | |
# if not access_token: | |
# return RedirectResponse(url="/") | |
# headers = { | |
# 'Authorization': f'Bearer {access_token}', | |
# 'Accept': 'application/json', | |
# } | |
# # Fetch messages | |
# response = requests.get('https://graph.microsoft.com/v1.0/me/messages', headers=headers) | |
# messages = response.json().get('value', []) | |
# if not messages: | |
# return {"request": request, "messages": [], "message": "No messages found."} | |
# emails = [] | |
# for message in messages: | |
# email_info = { | |
# "subject": message['subject'], | |
# "body_preview": message['bodyPreview'], | |
# "attachments": [] | |
# } | |
# # Fetch attachments | |
# message_id = message['id'] | |
# attachments_response = requests.get(f'https://graph.microsoft.com/v1.0/me/messages/{message_id}/attachments', headers=headers) | |
# attachments = attachments_response.json().get('value', []) | |
# for attachment in attachments: | |
# attachment_id = attachment['id'] | |
# attachment_name = attachment['name'] | |
# attachment_response = requests.get(f'https://graph.microsoft.com/v1.0/me/messages/{message_id}/attachments/{attachment_id}/$value', headers=headers) | |
# with open(f"static/{attachment_name}", 'wb') as file: | |
# file.write(attachment_response.content) | |
# email_info["attachments"].append(attachment_name) | |
# emails.append(email_info) | |
# return {"Successfully extracted the messages!!"} | |
import base64 | |
import json | |
import logging | |
import re | |
from concurrent.futures import ThreadPoolExecutor | |
from typing import Optional, List, Dict | |
import requests | |
from bs4 import BeautifulSoup | |
from models.models import Message, Attachment | |
from fastapi import WebSocket | |
from services import utils as ut | |
import asyncio | |
def get_company_type(company_name:str)->str: | |
company_types_dict ={'ao yun': 'wines and spirit', 'ardbeg': 'wines and spirit', 'belvedere': 'wines and spirit', 'bodega numanthia': 'wines and spirit', 'chandon': 'wines and spirit', 'château cheval blanc': 'wines and spirit', "château d'yquem": 'wines and spirit', 'château galoupet': 'wines and spirit', 'cheval des andes': 'wines and spirit', 'clos19': 'wines and spirit', 'cloudy bay': 'wines and spirit', 'colgin cellars': 'wines and spirit', 'dom pérignon': 'wines and spirit', 'domaine des lambrays': 'wines and spirit', 'eminente': 'wines and spirit', 'glenmorangie': 'wines and spirit', 'hennessy': 'wines and spirit', 'joseph phelps': 'wines and spirit', 'krug': 'wines and spirit', 'mercier': 'wines and spirit', 'moët & chandon': 'wines and spirit', 'newton vineyard': 'wines and spirit', 'ruinart': 'wines and spirit', 'terrazas de los andes': 'wines and spirit', 'veuve clicquot': 'wines and spirit', 'volcan de mi tierra': 'wines and spirit', 'woodinville': 'wines and spirit' , 'berluti': 'Fashion & Leather Goods', 'celine': 'Fashion & Leather Goods', 'christian dior': 'Fashion & Leather Goods', 'emilio pucci': 'Fashion & Leather Goods', 'fendi': 'Fashion & Leather Goods', 'givenchy': 'Fashion & Leather Goods', 'kenzo': 'Fashion & Leather Goods', 'loewe': 'Fashion & Leather Goods', 'loro piana': 'Fashion & Leather Goods', 'louis vuitton': 'Fashion & Leather Goods', 'marc jacobs': 'Fashion & Leather Goods', 'moynat': 'Fashion & Leather Goods', 'patou': 'Fashion & Leather Goods', 'rimowa': 'Fashion & Leather Goods','acqua di parma': 'Perfumes & Cosmetics', 'benefit cosmetics': 'Perfumes & Cosmetics', 'cha ling': 'Perfumes & Cosmetics', 'fenty beauty by rihanna': 'Perfumes & Cosmetics', 'fresh': 'Perfumes & Cosmetics', 'givenchy parfums': 'Perfumes & Cosmetics', 'guerlain': 'Perfumes & Cosmetics', 'kenzo parfums': 'Perfumes & Cosmetics', 'kvd beauty': 'Perfumes & Cosmetics', 'loewe perfumes': 'Perfumes & Cosmetics', 'maison francis kurkdjian': 'Perfumes & Cosmetics', 'make up for ever': 'Perfumes & Cosmetics', 'officine universelle buly': 'Perfumes & Cosmetics', 'olehenriksen': 'Perfumes & Cosmetics', 'parfums christian dior': 'Perfumes & Cosmetics', 'stella by stella mccartney': 'Perfumes & Cosmetics','bulgari': 'Watches & Jewelry', 'chaumet': 'Watches & Jewelry', 'fred': 'Watches & Jewelry', 'hublot': 'Watches & Jewelry', 'repossi': 'Watches & Jewelry', 'tag heuer': 'Watches & Jewelry', 'tiffany & co.': 'Watches & Jewelry', 'zenith': 'Watches & Jewelry','24s': 'Selective retailing', 'dfs': 'Selective retailing', 'la grande epicerie de paris': 'Selective retailing', 'le bon marché rive gauche': 'Selective retailing', 'sephora': 'Selective retailing','belmond': 'Other activities', 'cheval blanc': 'Other activities', 'connaissance des arts': 'Other activities', 'cova': 'Other activities', 'investir': 'Other activities', "jardin d'acclimatation": 'Other activities', 'le parisien': 'Other activities', 'les echos': 'Other activities', 'radio classique': 'Other activities', 'royal van lent': 'Other activities'} | |
print(company_types_dict["louis vuitton"]) | |
return company_types_dict.get(company_name.lower(), 'Others') | |
async def get_messages(code: str,websocket:WebSocket,brand_name: Optional[str] = None): | |
access_token = code | |
g_query = 'subject:"your order" OR subject:"receipts" OR subject:"receipt" OR subject:"aankoopbon" OR subject:"reçu" OR subject:"invoice" OR subject:"invoices" hasattachment:yes' | |
if brand_name is not None: | |
g_query = f'(subject:"your order" OR subject:"receipts" OR subject:"receipt" OR subject: "aankoopbon" OR subject:"reçu" OR subject:"invoice" OR subject:"invoices" OR from:{brand_name}) AND subject:{brand_name} has:attachment' | |
messages = [] | |
def fetch_message_wrapper(message_data): | |
message_id = message_data.get("id") | |
message_subject = message_data.get('subject') | |
message_body_preview = message_data.get('bodyPreview') | |
from_mail = message_data['from']['emailAddress']['address'] | |
if message_id: | |
return fetch_message_data(access_token,message_id,message_data,message_subject,message_body_preview,from_mail) | |
return None | |
while True: | |
outlook_url = f"https://graph.microsoft.com/v1.0/me/messages?q={g_query}" | |
outlook_response = requests.get(outlook_url, headers={"Authorization": f"Bearer {access_token}"}) | |
outlook_data = outlook_response.json() | |
print(len(outlook_data)) | |
print(outlook_data) | |
if "value" in outlook_data: | |
with ThreadPoolExecutor(max_workers=15) as executor: | |
futures=[executor.submit(fetch_message_wrapper, message_data) for message_data in | |
outlook_data["value"]] | |
for future in futures: | |
message = future.result() | |
if message: | |
messages.append(message) | |
for message_data in messages: | |
await process_message(message_data,websocket,10000) | |
if "@odata.nextLink" in outlook_data: | |
outlook_url = outlook_data["@odata.nextLink"] | |
else: | |
break | |
print("printing messages") | |
print(messages) | |
return messages | |
async def process_message(message:Message, websocket:WebSocket, chunk_size:int): | |
logging.info("process_message") | |
if message: | |
message_json = message.to_json() | |
logging.info(f"{message_json}") | |
await send_message_in_chunks(websocket, message_json, chunk_size) | |
await websocket.send_text("NEXT_MESSAGE") | |
def fetch_message_data(access_token: str,message_id:str, message_data,message_subject:str ,message_body_preview,from_mail:str) -> Message: | |
# message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}" | |
# message_response = requests.get(message_url, headers={"Authorization": f"Bearer {access_token}"}) | |
# message_data = message_response.json() | |
# print(message_data) | |
subject = message_subject | |
company_from_mail = from_mail | |
body = message_body_preview | |
attachments,structed_attachment_data = extract_attachments_from_mail(access_token, message_data) | |
high_level_company_type = get_company_type(company_from_mail) | |
# structed_attachment_data = extract_json_from_attachments(access_token , message_data) | |
body_len = 0 | |
if body is not None : | |
body_len = len(body) | |
# print("subject: ") | |
# print(subject) | |
# print("company name: ") | |
# print(company_from_mail) | |
# print("Printing the body of the mail: ") | |
# print(body) | |
# print("Printing attachment Data: ") | |
# print(attachments) | |
# print("Completed this mail.") | |
return Message(message_id=message_id, body_len=body_len,body=body, attachments=attachments, company=company_from_mail,high_level_company_type=high_level_company_type,structured_data = structed_attachment_data) | |
def extract_subject_from_mail(message_data: dict) -> str: | |
if 'payload' in message_data and 'headers' in message_data['payload']: | |
headers = message_data['payload']['headers'] | |
for header in headers: | |
if header['name'] == 'Subject': | |
return header['value'] | |
return "" | |
else: | |
return "" | |
def extract_domain_name(payload: dict, subject: str) -> str: | |
domain_name = 'others' | |
for fromdata in payload: | |
if fromdata['name'] == 'From': | |
domain_name = extract_domain_from_email(fromdata['value']) | |
break | |
if 'chanel' in subject.lower(): | |
return 'chanel' | |
if 'louis vuitton' in subject.lower(): | |
return 'Louis Vuitton' | |
return domain_name | |
def extract_domain_from_email(email_string: str) -> Optional[str]: | |
email_address = re.search(r'[\w\.-]+@[\w\.-]+', email_string).group() | |
domain = email_address.split('@')[-1].split('.')[0] | |
if email_address and domain: | |
return domain | |
else: | |
return None | |
# def extract_body_from_mail(message_data: dict) -> str: | |
# body = None | |
# if "payload" in message_data and "parts" in message_data["payload"]: | |
# for part in message_data["payload"]["parts"]: | |
# if 'mimeType' in part and (part['mimeType'] == 'text/plain' or part['mimeType'] == 'text/html'): | |
# body_data = part['body'].get('data', '') | |
# body_base64 = base64.urlsafe_b64decode(body_data) | |
# body = extract_text(body_base64) | |
# return body | |
def extract_body_from_mail(message_data: dict) -> str: | |
body = None | |
if "payload" in message_data: | |
payload = message_data["payload"] | |
if "parts" in payload: | |
for part in payload["parts"]: | |
if 'mimeType' in part and (part['mimeType'] == 'text/plain' or part['mimeType'] == 'text/html'): | |
body_data = part['body'].get('data', '') | |
if body_data: | |
body_base64 = base64.urlsafe_b64decode(body_data) | |
body = extract_text(body_base64) | |
elif 'body' in payload: | |
body_data = payload['body'].get('data', '') | |
if body_data: | |
body_base64 = base64.urlsafe_b64decode(body_data) | |
body = extract_text(body_base64) | |
elif 'parts' in payload['body']: | |
for part in payload['body']['parts']: | |
if 'mimeType' in part and (part['mimeType'] == 'text/plain' or part['mimeType'] == 'text/html'): | |
body_data = part['body'].get('data', '') | |
if body_data: | |
body_base64 = base64.urlsafe_b64decode(body_data) | |
body = extract_text(body_base64) | |
if not body: | |
body = message_data.get('snippet', '') | |
return body | |
def fetch_attachment_data(access_token: str, message_id: str, attachment_id: str) -> Dict: | |
attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}" | |
attachment_response = requests.get(attachment_url, headers={"Authorization": f"Bearer {access_token}"}) | |
return attachment_response.json() | |
def extract_attachments_from_mail(access_token: str, message_data: dict) -> List[Attachment]: | |
attachments = [] | |
structured_data = [] | |
message_id = message_data.get('id') | |
headers = { | |
'Authorization': f'Bearer {access_token}', | |
'Accept': 'application/json', | |
} | |
attachments_response = requests.get(f'https://graph.microsoft.com/v1.0/me/messages/{message_id}/attachments', headers=headers) | |
attachments = attachments_response.json().get('value', []) | |
for attachment in attachments: | |
attachment_id = attachment['id'] | |
filename = attachment['name'] | |
if filename.endswith(".zip") or filename.endswith(".txt") or filename.endswith(".png") or filename.endswith(".jpg") or filename.endswith(".jpeg") or filename.endswith(".gif"): | |
continue | |
attachment_response = requests.get(f'https://graph.microsoft.com/v1.0/me/messages/{message_id}/attachments/{attachment_id}/$value', headers=headers) | |
print("Attachment Response") | |
# rex = attachment_response.json() | |
# print(rex) | |
data = attachment_response.content | |
raw_text = ut.extract_text_from_attachment_outlook(filename , data) | |
struct_data = ut.strcuture_document_data(raw_text) | |
if struct_data: | |
structured_data.append(struct_data) | |
attachments.append(Attachment(attachment_len = len(data),filename=filename, data=data)) | |
return attachments,structured_data | |
#Gmail data fetching below | |
# if "payload" in message_data and "parts" in message_data["payload"]: | |
# for part in message_data["payload"]["parts"]: | |
# if "body" in part and "attachmentId" in part["body"]: | |
# attachment_id = part["body"]["attachmentId"] | |
# attachment_data = fetch_attachment_data(access_token, message_data["id"], attachment_id) | |
# filename = part.get("filename", "untitled.txt") | |
# if filename.endswith(".zip") or filename.endswith(".txt") or filename.endswith(".png") or filename.endswith(".jpg") or filename.endswith(".jpeg") or filename.endswith(".gif"): | |
# continue | |
# data = attachment_data.get("data", "") | |
# raw_text=ut.extract_text_from_attachment(filename , data) | |
# struct_data = ut.strcuture_document_data(raw_text) | |
# if struct_data: | |
# structured_data.append(struct_data) | |
# attachments.append(Attachment(attachment_len = len(attachment_data.get("data", "")),filename=filename, data=attachment_data.get("data", ""))) | |
# return attachments,structured_data | |
def extract_text(html_content: str) -> str: | |
if not html_content: | |
raise ValueError("HTML content is empty or None") | |
soup = BeautifulSoup(html_content, 'html.parser') | |
text = soup.get_text(separator=' ') | |
text = re.sub(r'\s+', ' ', text).strip() | |
return text | |
async def websocket_main(code: str, websocket: WebSocket,brand_name: Optional[str] = None): | |
access_token = code | |
# messages = get_messages(access_token,websocket,brand_name) | |
await get_messages(access_token,websocket,brand_name) | |
# print("websocket_main") | |
# print(messages) | |
# # logging.info(f"brand_name:{brand_name}") | |
# await websocket.send_json({"total_messages": len(messages)}) | |
# print("Total Length of messages") | |
# print(len(messages)) | |
# chunk_size = 100000 | |
# i=0 | |
# for message in messages: | |
# message_json = message.to_json() | |
# logging.info(f"{i} th message") | |
# i=i+1 | |
# await send_message_in_chunks(websocket, message_json, chunk_size) | |
# await websocket.send_text("NEXT_MESSAGE") | |
await websocket.close() | |
async def send_message_in_chunks(websocket: WebSocket, message_json: dict, chunk_size: int): | |
# if message_json['attachments'] is not None : | |
# for attch in message_json['attachments']: | |
# attachment_len = attch['attachment_len'] | |
# print(body_len) | |
# print(attachment_len) | |
# if attachment_len == 0: | |
# attachment_len = None | |
# await websocket.send_json({"body_len":body_len ,"attachment_len":attachment_len}) | |
message_str = json.dumps(message_json) | |
# print("Printing message_str") | |
# print(message_str) | |
# logging.info(message_str) | |
# await websocket.send_json({"file_len":len(file)}) | |
for i in range(0, len(message_str), chunk_size): | |
await websocket.send_text(message_str[i:i + chunk_size]) | |