Omkar008 commited on
Commit
c1eed6e
1 Parent(s): 9dc6b7f

Update controllers/ws_controller_1.py

Browse files
Files changed (1) hide show
  1. controllers/ws_controller_1.py +19 -28
controllers/ws_controller_1.py CHANGED
@@ -9,10 +9,9 @@ from bs4 import BeautifulSoup
9
  from fastapi import WebSocket
10
  from services import utils as ut
11
  from models.models import Message, Attachment
12
- from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
13
  import functools
14
- thread_pool = ThreadPoolExecutor(max_workers=20) # Increased from 10
15
- process_pool = ProcessPoolExecutor(max_workers=4)
16
 
17
  def get_company_type(company_name:str)->str:
18
  company_types_dict ={'ao yun': 'wines and spirit', 'ardbeg': 'wines and spirit', 'belvedere': 'wines and spirit', 'bodega numanthia': 'wines and spirit', 'chandon': 'wines and spirit', 'château cheval blanc': 'wines and spirit', "château d'yquem": 'wines and spirit', 'château galoupet': 'wines and spirit', 'cheval des andes': 'wines and spirit', 'clos19': 'wines and spirit', 'cloudy bay': 'wines and spirit', 'colgin cellars': 'wines and spirit', 'dom pérignon': 'wines and spirit', 'domaine des lambrays': 'wines and spirit', 'eminente': 'wines and spirit', 'glenmorangie': 'wines and spirit', 'hennessy': 'wines and spirit', 'joseph phelps': 'wines and spirit', 'krug': 'wines and spirit', 'mercier': 'wines and spirit', 'moët & chandon': 'wines and spirit', 'newton vineyard': 'wines and spirit', 'ruinart': 'wines and spirit', 'terrazas de los andes': 'wines and spirit', 'veuve clicquot': 'wines and spirit', 'volcan de mi tierra': 'wines and spirit', 'woodinville': 'wines and spirit' , 'berluti': 'Fashion & Leather Goods', 'celine': 'Fashion & Leather Goods', 'christian dior': 'Fashion & Leather Goods', 'emilio pucci': 'Fashion & Leather Goods', 'fendi': 'Fashion & Leather Goods', 'givenchy': 'Fashion & Leather Goods', 'kenzo': 'Fashion & Leather Goods', 'loewe': 'Fashion & Leather Goods', 'loro piana': 'Fashion & Leather Goods', 'louis vuitton': 'Fashion & Leather Goods', 'marc jacobs': 'Fashion & Leather Goods', 'moynat': 'Fashion & Leather Goods', 'patou': 'Fashion & Leather Goods', 'rimowa': 'Fashion & Leather Goods','acqua di parma': 'Perfumes & Cosmetics', 'benefit cosmetics': 'Perfumes & Cosmetics', 'cha ling': 'Perfumes & Cosmetics', 'fenty beauty by rihanna': 'Perfumes & Cosmetics', 'fresh': 'Perfumes & Cosmetics', 'givenchy parfums': 'Perfumes & Cosmetics', 'guerlain': 'Perfumes & Cosmetics', 'kenzo parfums': 'Perfumes & Cosmetics', 'kvd beauty': 'Perfumes & Cosmetics', 'loewe perfumes': 'Perfumes & Cosmetics', 'maison francis kurkdjian': 'Perfumes & Cosmetics', 'make up for ever': 'Perfumes & Cosmetics', 'officine universelle buly': 'Perfumes & Cosmetics', 'olehenriksen': 'Perfumes & Cosmetics', 'parfums christian dior': 'Perfumes & Cosmetics', 'stella by stella mccartney': 'Perfumes & Cosmetics','bulgari': 'Watches & Jewelry', 'chaumet': 'Watches & Jewelry', 'fred': 'Watches & Jewelry', 'hublot': 'Watches & Jewelry', 'repossi': 'Watches & Jewelry', 'tag heuer': 'Watches & Jewelry', 'tiffany & co.': 'Watches & Jewelry', 'zenith': 'Watches & Jewelry','24s': 'Selective retailing', 'dfs': 'Selective retailing', 'la grande epicerie de paris': 'Selective retailing', 'le bon marché rive gauche': 'Selective retailing', 'sephora': 'Selective retailing','belmond': 'Other activities', 'cheval blanc': 'Other activities', 'connaissance des arts': 'Other activities', 'cova': 'Other activities', 'investir': 'Other activities', "jardin d'acclimatation": 'Other activities', 'le parisien': 'Other activities', 'les echos': 'Other activities', 'radio classique': 'Other activities', 'royal van lent': 'Other activities'}
@@ -145,11 +144,10 @@ async def get_messages(code: str, websocket: WebSocket,start, brand_name: Option
145
 
146
  page_token = None
147
  processed_count = 0
148
- batch_size = 100 # Increased from 50
149
 
150
  async with aiohttp.ClientSession() as session:
151
  while True:
152
- gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={g_query}&maxResults={batch_size}"
153
  if page_token:
154
  gmail_url += f"&pageToken={page_token}"
155
 
@@ -160,23 +158,25 @@ async def get_messages(code: str, websocket: WebSocket,start, brand_name: Option
160
  await websocket.send_text(json.dumps({"status": "No messages found"}))
161
  break
162
 
163
- message_ids = [message["id"] for message in gmail_data["messages"]]
164
- full_messages = await fetch_messages_batch(session, access_token, message_ids)
165
 
166
  processing_tasks = [process_message(session, access_token, message) for message in full_messages]
167
 
168
  for future in asyncio.as_completed(processing_tasks):
169
  processed_message = await future
170
  await websocket.send_text(json.dumps({
171
- "message": processed_message.to_json()
 
 
172
  }))
173
  processed_count += 1
174
 
175
  if "nextPageToken" in gmail_data:
176
  page_token = gmail_data["nextPageToken"]
177
- # await websocket.send_text(json.dumps({"status": "next_page"}))
178
  else:
179
- # await websocket.send_text(json.dumps({"status": "completed", "total_processed": processed_count}))
180
  break
181
 
182
  async def send_message_in_chunks(websocket: WebSocket, message_json: dict, chunk_size: int):
@@ -196,22 +196,11 @@ async def websocket_main(code: str, websocket: WebSocket,start,brand_name: Opti
196
  async def run_in_thread(func, *args, **kwargs):
197
  return await asyncio.get_event_loop().run_in_executor(thread_pool, functools.partial(func, *args, **kwargs))
198
 
199
- async def run_in_process(func, *args, **kwargs):
200
- return await asyncio.get_event_loop().run_in_executor(process_pool, functools.partial(func, *args, **kwargs))
201
-
202
- async def fetch_messages_batch(session: aiohttp.ClientSession, access_token: str, message_ids: List[str]):
203
- tasks = [fetch_message(session, access_token, message_id) for message_id in message_ids]
204
- return await asyncio.gather(*tasks)
205
-
206
  async def fetch_message(session: aiohttp.ClientSession, access_token: str, message_id: str):
207
  message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}"
208
  async with session.get(message_url, headers={"Authorization": f"Bearer {access_token}"}) as response:
209
  return await response.json()
210
 
211
- async def fetch_attachments_batch(session: aiohttp.ClientSession, access_token: str, attachments_info: List[tuple]):
212
- tasks = [fetch_attachment(session, access_token, msg_id, att_id) for msg_id, att_id in attachments_info]
213
- return await asyncio.gather(*tasks)
214
-
215
  async def fetch_attachment(session: aiohttp.ClientSession, access_token: str, message_id: str, attachment_id: str):
216
  attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}"
217
  async with session.get(attachment_url, headers={"Authorization": f"Bearer {access_token}"}) as response:
@@ -230,21 +219,23 @@ async def process_message(session: aiohttp.ClientSession, access_token: str, mes
230
  structured_data = []
231
 
232
  if "payload" in message_data and "parts" in message_data["payload"]:
233
- attachments_info = []
234
  for part in message_data["payload"]["parts"]:
235
  if "body" in part and "attachmentId" in part["body"]:
236
  attachment_id = part["body"]["attachmentId"]
237
  filename = part.get("filename", "untitled.txt")
238
- if not filename.endswith((".zip", ".txt", ".png", ".jpg", ".jpeg", ".gif")):
239
- attachments_info.append((message_data["id"], attachment_id))
 
 
 
240
 
241
- attachment_data_list = await fetch_attachments_batch(session, access_token, attachments_info)
242
 
243
  processing_tasks = []
244
- for (_, _), attachment_data in zip(attachments_info, attachment_data_list):
245
  data = attachment_data.get("data", "")
246
- filename = next((part["filename"] for part in message_data["payload"]["parts"] if part["body"].get("attachmentId") == attachment_data["id"]), "untitled.txt")
247
- processing_tasks.append(run_in_process(process_attachment, filename, data))
248
  attachments.append(Attachment(attachment_len=len(data), filename=filename, data=data))
249
 
250
  structured_data = await asyncio.gather(*processing_tasks)
 
9
  from fastapi import WebSocket
10
  from services import utils as ut
11
  from models.models import Message, Attachment
12
+ from concurrent.futures import ThreadPoolExecutor
13
  import functools
14
+ thread_pool = ThreadPoolExecutor(max_workers=10)
 
15
 
16
  def get_company_type(company_name:str)->str:
17
  company_types_dict ={'ao yun': 'wines and spirit', 'ardbeg': 'wines and spirit', 'belvedere': 'wines and spirit', 'bodega numanthia': 'wines and spirit', 'chandon': 'wines and spirit', 'château cheval blanc': 'wines and spirit', "château d'yquem": 'wines and spirit', 'château galoupet': 'wines and spirit', 'cheval des andes': 'wines and spirit', 'clos19': 'wines and spirit', 'cloudy bay': 'wines and spirit', 'colgin cellars': 'wines and spirit', 'dom pérignon': 'wines and spirit', 'domaine des lambrays': 'wines and spirit', 'eminente': 'wines and spirit', 'glenmorangie': 'wines and spirit', 'hennessy': 'wines and spirit', 'joseph phelps': 'wines and spirit', 'krug': 'wines and spirit', 'mercier': 'wines and spirit', 'moët & chandon': 'wines and spirit', 'newton vineyard': 'wines and spirit', 'ruinart': 'wines and spirit', 'terrazas de los andes': 'wines and spirit', 'veuve clicquot': 'wines and spirit', 'volcan de mi tierra': 'wines and spirit', 'woodinville': 'wines and spirit' , 'berluti': 'Fashion & Leather Goods', 'celine': 'Fashion & Leather Goods', 'christian dior': 'Fashion & Leather Goods', 'emilio pucci': 'Fashion & Leather Goods', 'fendi': 'Fashion & Leather Goods', 'givenchy': 'Fashion & Leather Goods', 'kenzo': 'Fashion & Leather Goods', 'loewe': 'Fashion & Leather Goods', 'loro piana': 'Fashion & Leather Goods', 'louis vuitton': 'Fashion & Leather Goods', 'marc jacobs': 'Fashion & Leather Goods', 'moynat': 'Fashion & Leather Goods', 'patou': 'Fashion & Leather Goods', 'rimowa': 'Fashion & Leather Goods','acqua di parma': 'Perfumes & Cosmetics', 'benefit cosmetics': 'Perfumes & Cosmetics', 'cha ling': 'Perfumes & Cosmetics', 'fenty beauty by rihanna': 'Perfumes & Cosmetics', 'fresh': 'Perfumes & Cosmetics', 'givenchy parfums': 'Perfumes & Cosmetics', 'guerlain': 'Perfumes & Cosmetics', 'kenzo parfums': 'Perfumes & Cosmetics', 'kvd beauty': 'Perfumes & Cosmetics', 'loewe perfumes': 'Perfumes & Cosmetics', 'maison francis kurkdjian': 'Perfumes & Cosmetics', 'make up for ever': 'Perfumes & Cosmetics', 'officine universelle buly': 'Perfumes & Cosmetics', 'olehenriksen': 'Perfumes & Cosmetics', 'parfums christian dior': 'Perfumes & Cosmetics', 'stella by stella mccartney': 'Perfumes & Cosmetics','bulgari': 'Watches & Jewelry', 'chaumet': 'Watches & Jewelry', 'fred': 'Watches & Jewelry', 'hublot': 'Watches & Jewelry', 'repossi': 'Watches & Jewelry', 'tag heuer': 'Watches & Jewelry', 'tiffany & co.': 'Watches & Jewelry', 'zenith': 'Watches & Jewelry','24s': 'Selective retailing', 'dfs': 'Selective retailing', 'la grande epicerie de paris': 'Selective retailing', 'le bon marché rive gauche': 'Selective retailing', 'sephora': 'Selective retailing','belmond': 'Other activities', 'cheval blanc': 'Other activities', 'connaissance des arts': 'Other activities', 'cova': 'Other activities', 'investir': 'Other activities', "jardin d'acclimatation": 'Other activities', 'le parisien': 'Other activities', 'les echos': 'Other activities', 'radio classique': 'Other activities', 'royal van lent': 'Other activities'}
 
144
 
145
  page_token = None
146
  processed_count = 0
 
147
 
148
  async with aiohttp.ClientSession() as session:
149
  while True:
150
+ gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={g_query}&maxResults=50"
151
  if page_token:
152
  gmail_url += f"&pageToken={page_token}"
153
 
 
158
  await websocket.send_text(json.dumps({"status": "No messages found"}))
159
  break
160
 
161
+ message_tasks = [fetch_message(session, access_token, message["id"]) for message in gmail_data["messages"]]
162
+ full_messages = await asyncio.gather(*message_tasks)
163
 
164
  processing_tasks = [process_message(session, access_token, message) for message in full_messages]
165
 
166
  for future in asyncio.as_completed(processing_tasks):
167
  processed_message = await future
168
  await websocket.send_text(json.dumps({
169
+ "status": "processing",
170
+ "message": processed_message.to_json(),
171
+ "processed_count": processed_count + 1
172
  }))
173
  processed_count += 1
174
 
175
  if "nextPageToken" in gmail_data:
176
  page_token = gmail_data["nextPageToken"]
177
+ await websocket.send_text(json.dumps({"status": "next_page"}))
178
  else:
179
+ await websocket.send_text(json.dumps({"status": "completed", "total_processed": processed_count}))
180
  break
181
 
182
  async def send_message_in_chunks(websocket: WebSocket, message_json: dict, chunk_size: int):
 
196
  async def run_in_thread(func, *args, **kwargs):
197
  return await asyncio.get_event_loop().run_in_executor(thread_pool, functools.partial(func, *args, **kwargs))
198
 
 
 
 
 
 
 
 
199
  async def fetch_message(session: aiohttp.ClientSession, access_token: str, message_id: str):
200
  message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}"
201
  async with session.get(message_url, headers={"Authorization": f"Bearer {access_token}"}) as response:
202
  return await response.json()
203
 
 
 
 
 
204
  async def fetch_attachment(session: aiohttp.ClientSession, access_token: str, message_id: str, attachment_id: str):
205
  attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}"
206
  async with session.get(attachment_url, headers={"Authorization": f"Bearer {access_token}"}) as response:
 
219
  structured_data = []
220
 
221
  if "payload" in message_data and "parts" in message_data["payload"]:
222
+ attachment_tasks = []
223
  for part in message_data["payload"]["parts"]:
224
  if "body" in part and "attachmentId" in part["body"]:
225
  attachment_id = part["body"]["attachmentId"]
226
  filename = part.get("filename", "untitled.txt")
227
+ if filename.endswith((".zip", ".txt", ".png", ".jpg", ".jpeg", ".gif")):
228
+ continue
229
+
230
+ task = fetch_attachment(session, access_token, message_data["id"], attachment_id)
231
+ attachment_tasks.append((filename, task))
232
 
233
+ attachment_results = await asyncio.gather(*(task for _, task in attachment_tasks))
234
 
235
  processing_tasks = []
236
+ for (filename, _), attachment_data in zip(attachment_tasks, attachment_results):
237
  data = attachment_data.get("data", "")
238
+ processing_tasks.append(run_in_thread(process_attachment, filename, data))
 
239
  attachments.append(Attachment(attachment_len=len(data), filename=filename, data=data))
240
 
241
  structured_data = await asyncio.gather(*processing_tasks)