Omkar008 commited on
Commit
fe219c3
1 Parent(s): cff8649

Create ws_controller_1.py

Browse files
Files changed (1) hide show
  1. controllers/ws_controller_1.py +264 -0
controllers/ws_controller_1.py ADDED
@@ -0,0 +1,264 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import base64
3
+ import json
4
+ import logging
5
+ import re
6
+ from typing import Optional, List, Dict
7
+ import aiohttp
8
+ from bs4 import BeautifulSoup
9
+ from fastapi import WebSocket
10
+ from services import utils as ut
11
+ from models.models import Message, Attachment
12
+ from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
13
+ import functools
14
+ thread_pool = ThreadPoolExecutor(max_workers=20) # Increased from 10
15
+ process_pool = ProcessPoolExecutor(max_workers=4)
16
+
17
+ def get_company_type(company_name:str)->str:
18
+ company_types_dict ={'ao yun': 'wines and spirit', 'ardbeg': 'wines and spirit', 'belvedere': 'wines and spirit', 'bodega numanthia': 'wines and spirit', 'chandon': 'wines and spirit', 'château cheval blanc': 'wines and spirit', "château d'yquem": 'wines and spirit', 'château galoupet': 'wines and spirit', 'cheval des andes': 'wines and spirit', 'clos19': 'wines and spirit', 'cloudy bay': 'wines and spirit', 'colgin cellars': 'wines and spirit', 'dom pérignon': 'wines and spirit', 'domaine des lambrays': 'wines and spirit', 'eminente': 'wines and spirit', 'glenmorangie': 'wines and spirit', 'hennessy': 'wines and spirit', 'joseph phelps': 'wines and spirit', 'krug': 'wines and spirit', 'mercier': 'wines and spirit', 'moët & chandon': 'wines and spirit', 'newton vineyard': 'wines and spirit', 'ruinart': 'wines and spirit', 'terrazas de los andes': 'wines and spirit', 'veuve clicquot': 'wines and spirit', 'volcan de mi tierra': 'wines and spirit', 'woodinville': 'wines and spirit' , 'berluti': 'Fashion & Leather Goods', 'celine': 'Fashion & Leather Goods', 'christian dior': 'Fashion & Leather Goods', 'emilio pucci': 'Fashion & Leather Goods', 'fendi': 'Fashion & Leather Goods', 'givenchy': 'Fashion & Leather Goods', 'kenzo': 'Fashion & Leather Goods', 'loewe': 'Fashion & Leather Goods', 'loro piana': 'Fashion & Leather Goods', 'louis vuitton': 'Fashion & Leather Goods', 'marc jacobs': 'Fashion & Leather Goods', 'moynat': 'Fashion & Leather Goods', 'patou': 'Fashion & Leather Goods', 'rimowa': 'Fashion & Leather Goods','acqua di parma': 'Perfumes & Cosmetics', 'benefit cosmetics': 'Perfumes & Cosmetics', 'cha ling': 'Perfumes & Cosmetics', 'fenty beauty by rihanna': 'Perfumes & Cosmetics', 'fresh': 'Perfumes & Cosmetics', 'givenchy parfums': 'Perfumes & Cosmetics', 'guerlain': 'Perfumes & Cosmetics', 'kenzo parfums': 'Perfumes & Cosmetics', 'kvd beauty': 'Perfumes & Cosmetics', 'loewe perfumes': 'Perfumes & Cosmetics', 'maison francis kurkdjian': 'Perfumes & Cosmetics', 'make up for ever': 'Perfumes & Cosmetics', 'officine universelle buly': 'Perfumes & Cosmetics', 'olehenriksen': 'Perfumes & Cosmetics', 'parfums christian dior': 'Perfumes & Cosmetics', 'stella by stella mccartney': 'Perfumes & Cosmetics','bulgari': 'Watches & Jewelry', 'chaumet': 'Watches & Jewelry', 'fred': 'Watches & Jewelry', 'hublot': 'Watches & Jewelry', 'repossi': 'Watches & Jewelry', 'tag heuer': 'Watches & Jewelry', 'tiffany & co.': 'Watches & Jewelry', 'zenith': 'Watches & Jewelry','24s': 'Selective retailing', 'dfs': 'Selective retailing', 'la grande epicerie de paris': 'Selective retailing', 'le bon marché rive gauche': 'Selective retailing', 'sephora': 'Selective retailing','belmond': 'Other activities', 'cheval blanc': 'Other activities', 'connaissance des arts': 'Other activities', 'cova': 'Other activities', 'investir': 'Other activities', "jardin d'acclimatation": 'Other activities', 'le parisien': 'Other activities', 'les echos': 'Other activities', 'radio classique': 'Other activities', 'royal van lent': 'Other activities'}
19
+ print(company_types_dict["louis vuitton"])
20
+ return company_types_dict.get(company_name.lower(), 'Others')
21
+
22
+ def extract_subject_from_mail(message_data: dict) -> str:
23
+ if 'payload' in message_data and 'headers' in message_data['payload']:
24
+ headers = message_data['payload']['headers']
25
+ for header in headers:
26
+ if header['name'] == 'Subject':
27
+ return header['value']
28
+ return ""
29
+
30
+ def extract_domain_name(payload: dict, subject: str) -> str:
31
+ domain_name = 'others'
32
+ for fromdata in payload:
33
+ if fromdata['name'] == 'From':
34
+ domain_name = extract_domain_from_email(fromdata['value'])
35
+ break
36
+ if 'chanel' in subject.lower():
37
+ return 'chanel'
38
+ if 'louis vuitton' in subject.lower():
39
+ return 'Louis Vuitton'
40
+ return domain_name
41
+
42
+ def extract_domain_from_email(email_string: str) -> Optional[str]:
43
+ email_match = re.search(r'[\w\.-]+@[\w\.-]+', email_string)
44
+ if email_match:
45
+ email_address = email_match.group()
46
+ domain = email_address.split('@')[-1].split('.')[0]
47
+ return domain
48
+ return None
49
+
50
+ def extract_body_from_mail(message_data: dict) -> str:
51
+ body = None
52
+ if "payload" in message_data:
53
+ payload = message_data["payload"]
54
+ if "parts" in payload:
55
+ for part in payload["parts"]:
56
+ if part.get('mimeType') in ['text/plain', 'text/html']:
57
+ body_data = part['body'].get('data', '')
58
+ if body_data:
59
+ body = extract_text(base64.urlsafe_b64decode(body_data))
60
+ break
61
+ elif 'body' in payload:
62
+ body_data = payload['body'].get('data', '')
63
+ if body_data:
64
+ body = extract_text(base64.urlsafe_b64decode(body_data))
65
+
66
+ if not body:
67
+ body = message_data.get('snippet', '')
68
+ return body
69
+
70
+ def extract_text(html_content: str) -> str:
71
+ if not html_content:
72
+ return ""
73
+ soup = BeautifulSoup(html_content, 'html.parser')
74
+ text = soup.get_text(separator=' ')
75
+ return re.sub(r'\s+', ' ', text).strip()
76
+
77
+ # Asynchronous functions
78
+
79
+ async def fetch_attachment_data(session: aiohttp.ClientSession, access_token: str, message_id: str, attachment_id: str) -> Dict:
80
+ attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}"
81
+ async with session.get(attachment_url, headers={"Authorization": f"Bearer {access_token}"}) as response:
82
+ return await response.json()
83
+
84
+ async def extract_attachments_from_mail(session: aiohttp.ClientSession, access_token: str, message_data: dict) -> tuple[List[Attachment], List[Dict]]:
85
+ attachments = []
86
+ structured_data = []
87
+ if "payload" in message_data and "parts" in message_data["payload"]:
88
+ tasks = []
89
+ for part in message_data["payload"]["parts"]:
90
+ if "body" in part and "attachmentId" in part["body"]:
91
+ attachment_id = part["body"]["attachmentId"]
92
+ filename = part.get("filename", "untitled.txt")
93
+ if not filename.endswith((".zip", ".txt", ".png", ".jpg", ".jpeg", ".gif")):
94
+ tasks.append(fetch_attachment_data(session, access_token, message_data["id"], attachment_id))
95
+
96
+ attachment_data_list = await asyncio.gather(*tasks)
97
+
98
+ for part, attachment_data in zip(message_data["payload"]["parts"], attachment_data_list):
99
+ filename = part.get("filename", "untitled.txt")
100
+ if filename.endswith((".zip", ".txt", ".png", ".jpg", ".jpeg", ".gif")):
101
+ continue
102
+ data = attachment_data.get("data", "")
103
+ raw_text = ut.extract_text_from_attachment(filename, data)
104
+ struct_data = ut.strcuture_document_data(raw_text)
105
+ if struct_data:
106
+ structured_data.append(struct_data)
107
+
108
+ attachments.append(Attachment(attachment_len=len(data), filename=filename, data=data))
109
+
110
+ return attachments, structured_data
111
+
112
+ async def fetch_message_data(session: aiohttp.ClientSession, access_token: str, message_id: str) -> Message:
113
+ message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}"
114
+ async with session.get(message_url, headers={"Authorization": f"Bearer {access_token}"}) as response:
115
+ message_data = await response.json()
116
+
117
+ subject = extract_subject_from_mail(message_data)
118
+ company_from_mail = extract_domain_name(message_data['payload']['headers'], subject)
119
+ body = extract_body_from_mail(message_data)
120
+ attachments, structured_attachment_data = await extract_attachments_from_mail(session, access_token, message_data)
121
+ high_level_company_type = get_company_type(company_from_mail)
122
+
123
+ body_len = len(body) if body is not None else 0
124
+
125
+ return Message(
126
+ message_id=message_id,
127
+ body_len=body_len,
128
+ body=body,
129
+ attachments=attachments,
130
+ company=company_from_mail,
131
+ high_level_company_type=high_level_company_type,
132
+ structured_data=structured_attachment_data
133
+ )
134
+
135
+ async def process_message(message: Message, websocket: WebSocket, chunk_size: int):
136
+ message_json = message.to_json()
137
+ await send_message_in_chunks(websocket, message_json, chunk_size)
138
+ await websocket.send_text("NEXT_MESSAGE")
139
+
140
+ async def get_messages(code: str, websocket: WebSocket, brand_name: Optional[str] = None):
141
+ access_token = code
142
+ g_query = f'(subject:"your order" OR subject:receipts OR subject:receipt OR subject:aankoopbon OR subject:reçu OR subject:invoice OR subject:invoices OR category:purchases) has:attachment'
143
+ if brand_name:
144
+ g_query += f' AND from:{brand_name}'
145
+
146
+ page_token = None
147
+ processed_count = 0
148
+ batch_size = 100 # Increased from 50
149
+
150
+ async with aiohttp.ClientSession() as session:
151
+ while True:
152
+ gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={g_query}&maxResults={batch_size}"
153
+ if page_token:
154
+ gmail_url += f"&pageToken={page_token}"
155
+
156
+ async with session.get(gmail_url, headers={"Authorization": f"Bearer {access_token}"}) as response:
157
+ gmail_data = await response.json()
158
+
159
+ if "messages" not in gmail_data:
160
+ await websocket.send_text(json.dumps({"status": "No messages found"}))
161
+ break
162
+
163
+ message_ids = [message["id"] for message in gmail_data["messages"]]
164
+ full_messages = await fetch_messages_batch(session, access_token, message_ids)
165
+
166
+ processing_tasks = [process_message(session, access_token, message) for message in full_messages]
167
+
168
+ for future in asyncio.as_completed(processing_tasks):
169
+ processed_message = await future
170
+ await websocket.send_text(json.dumps({
171
+ "message": processed_message.to_json()
172
+ }))
173
+ processed_count += 1
174
+
175
+ if "nextPageToken" in gmail_data:
176
+ page_token = gmail_data["nextPageToken"]
177
+ # await websocket.send_text(json.dumps({"status": "next_page"}))
178
+ else:
179
+ # await websocket.send_text(json.dumps({"status": "completed", "total_processed": processed_count}))
180
+ break
181
+
182
+ async def send_message_in_chunks(websocket: WebSocket, message_json: dict, chunk_size: int):
183
+ message_str = json.dumps(message_json)
184
+ for i in range(0, len(message_str), chunk_size):
185
+ await websocket.send_text(message_str[i:i + chunk_size])
186
+
187
+
188
+ async def websocket_main(code: str, websocket: WebSocket,start,brand_name: Optional[str] = None):
189
+ access_token = code
190
+ await get_messages(access_token,websocket,start,brand_name)
191
+ await websocket.close()
192
+
193
+
194
+ # Adjust the number of workers as needed
195
+
196
+ async def run_in_thread(func, *args, **kwargs):
197
+ return await asyncio.get_event_loop().run_in_executor(thread_pool, functools.partial(func, *args, **kwargs))
198
+
199
+ async def run_in_process(func, *args, **kwargs):
200
+ return await asyncio.get_event_loop().run_in_executor(process_pool, functools.partial(func, *args, **kwargs))
201
+
202
+ async def fetch_messages_batch(session: aiohttp.ClientSession, access_token: str, message_ids: List[str]):
203
+ tasks = [fetch_message(session, access_token, message_id) for message_id in message_ids]
204
+ return await asyncio.gather(*tasks)
205
+
206
+ async def fetch_message(session: aiohttp.ClientSession, access_token: str, message_id: str):
207
+ message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}"
208
+ async with session.get(message_url, headers={"Authorization": f"Bearer {access_token}"}) as response:
209
+ return await response.json()
210
+
211
+ async def fetch_attachments_batch(session: aiohttp.ClientSession, access_token: str, attachments_info: List[tuple]):
212
+ tasks = [fetch_attachment(session, access_token, msg_id, att_id) for msg_id, att_id in attachments_info]
213
+ return await asyncio.gather(*tasks)
214
+
215
+ async def fetch_attachment(session: aiohttp.ClientSession, access_token: str, message_id: str, attachment_id: str):
216
+ attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}"
217
+ async with session.get(attachment_url, headers={"Authorization": f"Bearer {access_token}"}) as response:
218
+ return await response.json()
219
+
220
+ def process_attachment(filename: str, data: str):
221
+ raw_text = ut.extract_text_from_attachment(filename, data)
222
+ return ut.strcuture_document_data(raw_text)
223
+
224
+ async def process_message(session: aiohttp.ClientSession, access_token: str, message_data: dict):
225
+ subject = extract_subject_from_mail(message_data)
226
+ company_from_mail = extract_domain_name(message_data['payload']['headers'], subject)
227
+ body = extract_body_from_mail(message_data)
228
+
229
+ attachments = []
230
+ structured_data = []
231
+
232
+ if "payload" in message_data and "parts" in message_data["payload"]:
233
+ attachments_info = []
234
+ for part in message_data["payload"]["parts"]:
235
+ if "body" in part and "attachmentId" in part["body"]:
236
+ attachment_id = part["body"]["attachmentId"]
237
+ filename = part.get("filename", "untitled.txt")
238
+ if not filename.endswith((".zip", ".txt", ".png", ".jpg", ".jpeg", ".gif")):
239
+ attachments_info.append((message_data["id"], attachment_id))
240
+
241
+ attachment_data_list = await fetch_attachments_batch(session, access_token, attachments_info)
242
+
243
+ processing_tasks = []
244
+ for (_, _), attachment_data in zip(attachments_info, attachment_data_list):
245
+ data = attachment_data.get("data", "")
246
+ filename = next((part["filename"] for part in message_data["payload"]["parts"] if part["body"].get("attachmentId") == attachment_data["id"]), "untitled.txt")
247
+ processing_tasks.append(run_in_process(process_attachment, filename, data))
248
+ attachments.append(Attachment(attachment_len=len(data), filename=filename, data=data))
249
+
250
+ structured_data = await asyncio.gather(*processing_tasks)
251
+ structured_data = [data for data in structured_data if data]
252
+
253
+ high_level_company_type = await run_in_thread(get_company_type, company_from_mail)
254
+ body_len = len(body) if body is not None else 0
255
+
256
+ return Message(
257
+ message_id=message_data["id"],
258
+ body_len=body_len,
259
+ body=body,
260
+ attachments=attachments,
261
+ company=company_from_mail,
262
+ high_level_company_type=high_level_company_type,
263
+ structured_data=structured_data
264
+ )