Spaces:
Sleeping
Sleeping
File size: 17,141 Bytes
b665f30 21791d0 b665f30 2c065ea 985eaa2 1e272ca e01e08c 16b124e 9bf7c59 4d4f9d0 33b6dc3 1e272ca b665f30 e01e08c b665f30 9ebb81c 2a1db67 c0ca9ad 31b785c b665f30 602cbe6 f75908f 21791d0 b665f30 4b33dd3 3313a0d 4b33dd3 0db13ac b665f30 e01e08c b665f30 e01e08c 07b8e82 b665f30 e01e08c b665f30 e01e08c b665f30 8abd0d4 6f9036e e01e08c 21791d0 b665f30 9ebb81c 07b8e82 21791d0 9ebb81c 14bec9f 1b5a70b 1f2aaa9 8abd0d4 1f2aaa9 9ebb81c 21791d0 9ebb81c 3313a0d e01e08c 9ebb81c 14bec9f 9ebb81c 14bec9f 9ebb81c 21791d0 b665f30 f13d7b7 9ebb81c 14bec9f b665f30 409c72c 0db13ac 9ebb81c f13d7b7 8a73b97 9777d71 25e5107 9ebb81c 0db13ac b665f30 716077c b665f30 9ebb81c b665f30 3d6350d 1e272ca 8364347 1e272ca b665f30 9ebb81c b665f30 dc6c45b c788a70 b665f30 3d6350d b665f30 6953b8f 067db0f 3d6350d 07b8e82 03c7264 07b8e82 03c7264 8a73b97 0817311 8a73b97 78b319c 03c7264 3d6350d 55bac05 3d6350d b665f30 e01e08c b665f30 e01e08c b665f30 409c72c b665f30 9ebb81c b665f30 2f346b0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 |
import base64
import json
import logging
import re
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
from typing import Optional, List, Dict
import requests
from bs4 import BeautifulSoup
from models.models import Message, Attachment
from fastapi import WebSocket
from services import utils as ut
import time
import concurrent.futures
import asyncio
def get_company_type(company_name:str)->str:
company_types_dict ={'ao yun': 'wines and spirit', 'ardbeg': 'wines and spirit', 'belvedere': 'wines and spirit', 'bodega numanthia': 'wines and spirit', 'chandon': 'wines and spirit', 'château cheval blanc': 'wines and spirit', "château d'yquem": 'wines and spirit', 'château galoupet': 'wines and spirit', 'cheval des andes': 'wines and spirit', 'clos19': 'wines and spirit', 'cloudy bay': 'wines and spirit', 'colgin cellars': 'wines and spirit', 'dom pérignon': 'wines and spirit', 'domaine des lambrays': 'wines and spirit', 'eminente': 'wines and spirit', 'glenmorangie': 'wines and spirit', 'hennessy': 'wines and spirit', 'joseph phelps': 'wines and spirit', 'krug': 'wines and spirit', 'mercier': 'wines and spirit', 'moët & chandon': 'wines and spirit', 'newton vineyard': 'wines and spirit', 'ruinart': 'wines and spirit', 'terrazas de los andes': 'wines and spirit', 'veuve clicquot': 'wines and spirit', 'volcan de mi tierra': 'wines and spirit', 'woodinville': 'wines and spirit' , 'berluti': 'Fashion & Leather Goods', 'celine': 'Fashion & Leather Goods', 'christian dior': 'Fashion & Leather Goods', 'emilio pucci': 'Fashion & Leather Goods', 'fendi': 'Fashion & Leather Goods', 'givenchy': 'Fashion & Leather Goods', 'kenzo': 'Fashion & Leather Goods', 'loewe': 'Fashion & Leather Goods', 'loro piana': 'Fashion & Leather Goods', 'louis vuitton': 'Fashion & Leather Goods', 'marc jacobs': 'Fashion & Leather Goods', 'moynat': 'Fashion & Leather Goods', 'patou': 'Fashion & Leather Goods', 'rimowa': 'Fashion & Leather Goods','acqua di parma': 'Perfumes & Cosmetics', 'benefit cosmetics': 'Perfumes & Cosmetics', 'cha ling': 'Perfumes & Cosmetics', 'fenty beauty by rihanna': 'Perfumes & Cosmetics', 'fresh': 'Perfumes & Cosmetics', 'givenchy parfums': 'Perfumes & Cosmetics', 'guerlain': 'Perfumes & Cosmetics', 'kenzo parfums': 'Perfumes & Cosmetics', 'kvd beauty': 'Perfumes & Cosmetics', 'loewe perfumes': 'Perfumes & Cosmetics', 'maison francis kurkdjian': 'Perfumes & Cosmetics', 'make up for ever': 'Perfumes & Cosmetics', 'officine universelle buly': 'Perfumes & Cosmetics', 'olehenriksen': 'Perfumes & Cosmetics', 'parfums christian dior': 'Perfumes & Cosmetics', 'stella by stella mccartney': 'Perfumes & Cosmetics','bulgari': 'Watches & Jewelry', 'chaumet': 'Watches & Jewelry', 'fred': 'Watches & Jewelry', 'hublot': 'Watches & Jewelry', 'repossi': 'Watches & Jewelry', 'tag heuer': 'Watches & Jewelry', 'tiffany & co.': 'Watches & Jewelry', 'zenith': 'Watches & Jewelry','24s': 'Selective retailing', 'dfs': 'Selective retailing', 'la grande epicerie de paris': 'Selective retailing', 'le bon marché rive gauche': 'Selective retailing', 'sephora': 'Selective retailing','belmond': 'Other activities', 'cheval blanc': 'Other activities', 'connaissance des arts': 'Other activities', 'cova': 'Other activities', 'investir': 'Other activities', "jardin d'acclimatation": 'Other activities', 'le parisien': 'Other activities', 'les echos': 'Other activities', 'radio classique': 'Other activities', 'royal van lent': 'Other activities'}
print(company_types_dict["louis vuitton"])
return company_types_dict.get(company_name.lower(), 'Others')
async def get_messages(code: str,websocket:WebSocket,start,brand_name: Optional[str] = None):
access_token = code
total_processed = 0
g_query = f'(subject:"your order" OR subject:receipts OR subject:receipt OR subject:aankoopbon OR subject:reçu OR subject:invoice OR subject:invoices OR category:purchases) has:attachment'
if brand_name is not None:
g_query = f'(subject:"your order" OR subject:receipts OR subject:receipt OR subject: aankoopbon OR subject:reçu OR subject:invoice OR subject:invoices OR category:purchases) AND ({brand_name}) has:attachment'
page_token = None
messages = []
# max_results = 10
# gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={jobs_query}&maxResults={max_results}"
# gmail_response = requests.get(gmail_url, headers={"Authorization": f"Bearer {access_token}"})
# gmail_data = gmail_response.json()
# messages.append(gmail_data['messages'])
def fetch_message_wrapper(message_data,websocket:WebSocket):
message_id = message_data.get("id")
if message_id:
msg = fetch_message_data(access_token, message_id)
return msg
return None
end = time.time()
print("time 0")
print(end - start)
start1 = time.time()
while True:
start2= time.time()
gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={g_query}&maxResults=20"
if page_token:
gmail_url += f"&pageToken={page_token}"
# print(gmail_url)
gmail_response = requests.get(gmail_url, headers={"Authorization": f"Bearer {access_token}"})
# print(gmail_response.text)
end2 = time.time()
gmail_data = gmail_response.json()
# print(gmail_data)
# print(len(gmail_data['messages']))
if "messages" in gmail_data:
# for message_data in gmail_data['messages']:
# message = fetch_message_wrapper(message_data,websocket)
# await process_message(message, websocket, 2000000)
with ThreadPoolExecutor(max_workers=20) as executor:
futures=[executor.submit(fetch_message_wrapper, message_data,websocket) for message_data in
gmail_data["messages"]]
print("Futures length")
print(len(futures))
# print(futures)
for future in concurrent.futures.as_completed(futures):
message = future.result()
# print(message)
if message:
total_processed += 1
# Process and send the message immediately
end1 = time.time()
print("time 1")
print("sending the message")
await process_message(message, websocket, 100000)
# # if message:
# # messages.append(message)
# print("Messages to be sent")
# # print(messages)s
# print(len(messages))
# # for message_data in messages:
# # await process_message(message_data,websocket,10000)
if "nextPageToken" in gmail_data:
page_token = gmail_data["nextPageToken"]
else:
break
await websocket.send_text(f"Total messages processed: {total_processed}")
print(f"Total messages processed: {total_processed}")
logging.info(f"Total Processed Messages : {total_processed}")
print("printing messages")
# print(messages)
return messages
#async def process_message(message:Message, websocket:WebSocket, chunk_size:int):
async def process_message(message:Message, websocket:WebSocket, chunk_size:int):
logging.info("process_message")
# print(message)
if message and message.structured_data:
message_json = message.to_json()
# logging.info(f"{message_json}")
await send_message_in_chunks(websocket, message_json, 50000)
# await websocket.send_text(str(message_json))
await websocket.send_text("NEXT_MESSAGE")
def fetch_message_data(access_token: str, message_id: str) -> Message:
message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}"
message_response = requests.get(message_url, headers={"Authorization": f"Bearer {access_token}"})
message_data = message_response.json()
# print(message_data)
subject = extract_subject_from_mail(message_data)
print("printing message data")
print(message_data)
company_from_mail = extract_domain_name(message_data['payload']['headers'], subject)
body = extract_body_from_mail(message_data)
attachments,structed_attachment_data = extract_attachments_from_mail(access_token, message_data)
high_level_company_type = get_company_type(company_from_mail)
body_len = 0
if body is not None :
body_len = len(body)
return Message(message_id=message_id, body_len=body_len,body=body, attachments=attachments, company=company_from_mail,high_level_company_type=high_level_company_type,structured_data = structed_attachment_data)
# def fetch_message_data(access_token: str, message_id: str) -> Message:
# message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}"
# message_response = requests.get(message_url, headers={"Authorization": f"Bearer {access_token}"})
# message_data = message_response.json()
# with ThreadPoolExecutor(max_workers=50) as executor:
# # Submit tasks to executor
# future_subject = executor.submit(extract_subject_from_mail, message_data)
# subject = future_subject.result()
# future_company_from_mail = executor.submit(extract_domain_name, message_data['payload']['headers'], future_subject.result())
# company_from_mail = future_company_from_mail.result()
# future_body = executor.submit(extract_body_from_mail, message_data)
# body = future_body.result()
# # Extract attachments and measure time
# future_attachments = executor.submit(extract_attachments_from_mail, access_token, message_data)
# attachments, structured_attachment_data = future_attachments.result()
# future_high_level_company_type = executor.submit(get_company_type, future_company_from_mail.result())
# high_level_company_type = future_high_level_company_type.result()
# body_len = len(body) if body is not None else 0
# return Message(
# message_id=message_id,
# body_len=body_len,
# body=body,
# attachments=attachments,
# company=company_from_mail,
# high_level_company_type=high_level_company_type,
# structured_data=structured_attachment_data
# )
def extract_subject_from_mail(message_data: dict) -> str:
if 'payload' in message_data and 'headers' in message_data['payload']:
headers = message_data['payload']['headers']
for header in headers:
if header['name'] == 'Subject':
return header['value']
return ""
else:
return ""
def extract_domain_name(payload: dict, subject: str) -> str:
domain_name = 'others'
for fromdata in payload:
if fromdata['name'] == 'From':
domain_name = extract_domain_from_email(fromdata['value'])
break
if 'chanel' in subject.lower():
return 'chanel'
if 'louis vuitton' in subject.lower():
return 'Louis Vuitton'
return domain_name
def extract_domain_from_email(email_string: str) -> Optional[str]:
email_address = re.search(r'[\w\.-]+@[\w\.-]+', email_string).group()
domain = email_address.split('@')[-1].split('.')[0]
if email_address and domain:
return domain
else:
return None
def extract_body_from_mail(message_data: dict) -> str:
body = None
if "payload" in message_data:
payload = message_data["payload"]
if "parts" in payload:
for part in payload["parts"]:
if 'mimeType' in part and (part['mimeType'] == 'text/plain' or part['mimeType'] == 'text/html'):
body_data = part['body'].get('data', '')
if body_data:
body_base64 = base64.urlsafe_b64decode(body_data)
body = extract_text(body_base64)
elif 'body' in payload:
body_data = payload['body'].get('data', '')
if body_data:
body_base64 = base64.urlsafe_b64decode(body_data)
body = extract_text(body_base64)
elif 'parts' in payload['body']:
for part in payload['body']['parts']:
if 'mimeType' in part and (part['mimeType'] == 'text/plain' or part['mimeType'] == 'text/html'):
body_data = part['body'].get('data', '')
if body_data:
body_base64 = base64.urlsafe_b64decode(body_data)
body = extract_text(body_base64)
if not body:
body = message_data.get('snippet', '')
return body
def fetch_attachment_data(access_token: str, message_id: str, attachment_id: str) -> Dict:
attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}"
attachment_response = requests.get(attachment_url, headers={"Authorization": f"Bearer {access_token}"})
return attachment_response.json()
def extract_attachments_from_mail(access_token: str, message_data: dict) -> List[Attachment]:
attachments = []
structured_data = []
if "payload" in message_data and "parts" in message_data["payload"]:
for part in message_data["payload"]["parts"]:
if "body" in part and "attachmentId" in part["body"]:
attachment_id = part["body"]["attachmentId"]
attachment_data = fetch_attachment_data(access_token, message_data["id"], attachment_id)
filename = part.get("filename", "untitled.txt")
if filename.endswith(".zip") or filename.endswith(".txt") or filename.endswith(".png") or filename.endswith(".jpg") or filename.endswith(".jpeg") or filename.endswith(".gif"):
continue
data = attachment_data.get("data", "")
try:
raw_text = ut.extract_text_from_attachment(filename, data)
except Exception as e:
print(f"Error processing attachment {filename}: {str(e)}")
continue
struct_data = ut.strcuture_document_data(raw_text)
st_str ={
"brand": "INSERT BRAND NAME",
"total_cost": "INSERT TOTAL COST",
"location": "INSERT LOCATION FROM",
"purchase_category": "INSERT PURCHASE CATEGORY",
"brand_category": "INSERT BRAND CATEGORY",
"Date": "INSERT RECEIPT DATE",
"currency": "INSERT CURRENCY",
"filename": "GENERATE A FILENAME",
"payment_method": "INSERT PAYMENT METHOD"
}
if struct_data:
for key,value in st_str.items():
if struct_data[key]:
if value in struct_data[key]:
struct_data[key]=None
all_null = all(value is None for value in struct_data.values())
if all_null:
struct_data=None
structured_data.append(struct_data)
attachments.append(Attachment(attachment_len = len(attachment_data.get("data", "")),filename=filename, data=attachment_data.get("data", "")))
return attachments,structured_data
def extract_text(html_content: str) -> str:
if not html_content:
raise ValueError("HTML content is empty or None")
soup = BeautifulSoup(html_content, 'html.parser')
text = soup.get_text(separator=' ')
text = re.sub(r'\s+', ' ', text).strip()
return text
async def websocket_main(code: str, websocket: WebSocket,start,brand_name: Optional[str] = None):
access_token = code
await get_messages(access_token,websocket,start,brand_name)
await websocket.close()
#async def send_message_in_chunks(websocket: WebSocket, message_json: dict, chunk_size: int):
async def send_message_in_chunks(websocket: WebSocket, message_json: dict, chunk_size: int):
# if message_json['attachments'] is not None :
# for attch in message_json['attachments']:
# attachment_len = attch['attachment_len']
# print(body_len)
# print(attachment_len)
# if attachment_len == 0:
# attachment_len = None
# await websocket.send_json({"body_len":body_len ,"attachment_len":attachment_len})
message_str = json.dumps(message_json)
for i in range(0, len(message_str), chunk_size):
await websocket.send_text(message_str[i:i + chunk_size]) |