Omkar008 commited on
Commit
21791d0
1 Parent(s): 154a342

Update controllers/ws_controller.py

Browse files
Files changed (1) hide show
  1. controllers/ws_controller.py +30 -12
controllers/ws_controller.py CHANGED
@@ -2,6 +2,7 @@ import base64
2
  import json
3
  import logging
4
  import re
 
5
  from concurrent.futures import ThreadPoolExecutor
6
  from typing import Optional, List, Dict
7
  import requests
@@ -9,7 +10,7 @@ from bs4 import BeautifulSoup
9
  from models.models import Message, Attachment
10
  from fastapi import WebSocket
11
  from services import utils as ut
12
- from models import supabase_models as sp
13
  import asyncio
14
  def get_company_type(company_name:str)->str:
15
  company_types_dict ={'ao yun': 'wines and spirit', 'ardbeg': 'wines and spirit', 'belvedere': 'wines and spirit', 'bodega numanthia': 'wines and spirit', 'chandon': 'wines and spirit', 'château cheval blanc': 'wines and spirit', "château d'yquem": 'wines and spirit', 'château galoupet': 'wines and spirit', 'cheval des andes': 'wines and spirit', 'clos19': 'wines and spirit', 'cloudy bay': 'wines and spirit', 'colgin cellars': 'wines and spirit', 'dom pérignon': 'wines and spirit', 'domaine des lambrays': 'wines and spirit', 'eminente': 'wines and spirit', 'glenmorangie': 'wines and spirit', 'hennessy': 'wines and spirit', 'joseph phelps': 'wines and spirit', 'krug': 'wines and spirit', 'mercier': 'wines and spirit', 'moët & chandon': 'wines and spirit', 'newton vineyard': 'wines and spirit', 'ruinart': 'wines and spirit', 'terrazas de los andes': 'wines and spirit', 'veuve clicquot': 'wines and spirit', 'volcan de mi tierra': 'wines and spirit', 'woodinville': 'wines and spirit' , 'berluti': 'Fashion & Leather Goods', 'celine': 'Fashion & Leather Goods', 'christian dior': 'Fashion & Leather Goods', 'emilio pucci': 'Fashion & Leather Goods', 'fendi': 'Fashion & Leather Goods', 'givenchy': 'Fashion & Leather Goods', 'kenzo': 'Fashion & Leather Goods', 'loewe': 'Fashion & Leather Goods', 'loro piana': 'Fashion & Leather Goods', 'louis vuitton': 'Fashion & Leather Goods', 'marc jacobs': 'Fashion & Leather Goods', 'moynat': 'Fashion & Leather Goods', 'patou': 'Fashion & Leather Goods', 'rimowa': 'Fashion & Leather Goods','acqua di parma': 'Perfumes & Cosmetics', 'benefit cosmetics': 'Perfumes & Cosmetics', 'cha ling': 'Perfumes & Cosmetics', 'fenty beauty by rihanna': 'Perfumes & Cosmetics', 'fresh': 'Perfumes & Cosmetics', 'givenchy parfums': 'Perfumes & Cosmetics', 'guerlain': 'Perfumes & Cosmetics', 'kenzo parfums': 'Perfumes & Cosmetics', 'kvd beauty': 'Perfumes & Cosmetics', 'loewe perfumes': 'Perfumes & Cosmetics', 'maison francis kurkdjian': 'Perfumes & Cosmetics', 'make up for ever': 'Perfumes & Cosmetics', 'officine universelle buly': 'Perfumes & Cosmetics', 'olehenriksen': 'Perfumes & Cosmetics', 'parfums christian dior': 'Perfumes & Cosmetics', 'stella by stella mccartney': 'Perfumes & Cosmetics','bulgari': 'Watches & Jewelry', 'chaumet': 'Watches & Jewelry', 'fred': 'Watches & Jewelry', 'hublot': 'Watches & Jewelry', 'repossi': 'Watches & Jewelry', 'tag heuer': 'Watches & Jewelry', 'tiffany & co.': 'Watches & Jewelry', 'zenith': 'Watches & Jewelry','24s': 'Selective retailing', 'dfs': 'Selective retailing', 'la grande epicerie de paris': 'Selective retailing', 'le bon marché rive gauche': 'Selective retailing', 'sephora': 'Selective retailing','belmond': 'Other activities', 'cheval blanc': 'Other activities', 'connaissance des arts': 'Other activities', 'cova': 'Other activities', 'investir': 'Other activities', "jardin d'acclimatation": 'Other activities', 'le parisien': 'Other activities', 'les echos': 'Other activities', 'radio classique': 'Other activities', 'royal van lent': 'Other activities'}
@@ -28,13 +29,21 @@ async def get_messages(code: str,websocket:WebSocket,brand_name: Optional[str] =
28
  # gmail_response = requests.get(gmail_url, headers={"Authorization": f"Bearer {access_token}"})
29
  # gmail_data = gmail_response.json()
30
  # messages.append(gmail_data['messages'])
31
- def fetch_message_wrapper(message_data):
32
  message_id = message_data.get("id")
33
  if message_id:
34
  return fetch_message_data(access_token, message_id)
35
 
36
  return None
37
 
 
 
 
 
 
 
 
 
38
  while True:
39
  gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={g_query}"
40
  if page_token:
@@ -42,22 +51,30 @@ async def get_messages(code: str,websocket:WebSocket,brand_name: Optional[str] =
42
 
43
  gmail_response = requests.get(gmail_url, headers={"Authorization": f"Bearer {access_token}"})
44
  gmail_data = gmail_response.json()
45
- print(len(gmail_data))
46
  print(gmail_data)
47
-
48
  if "messages" in gmail_data:
49
  with ThreadPoolExecutor(max_workers=15) as executor:
50
-
51
-
52
-
53
- futures=[executor.submit(fetch_message_wrapper, message_data) for message_data in
54
- gmail_data["messages"]]
55
  for future in futures:
56
  message = future.result()
 
57
  if message:
58
- messages.append(message)
59
- for message_data in messages:
60
- await process_message(message_data,websocket,10000)
 
 
 
 
 
 
 
61
 
62
  if "nextPageToken" in gmail_data:
63
  page_token = gmail_data["nextPageToken"]
@@ -69,6 +86,7 @@ async def get_messages(code: str,websocket:WebSocket,brand_name: Optional[str] =
69
 
70
  async def process_message(message:Message, websocket:WebSocket, chunk_size:int):
71
  logging.info("process_message")
 
72
  if message:
73
  message_json = message.to_json()
74
  logging.info(f"{message_json}")
 
2
  import json
3
  import logging
4
  import re
5
+ from concurrent.futures import ProcessPoolExecutor
6
  from concurrent.futures import ThreadPoolExecutor
7
  from typing import Optional, List, Dict
8
  import requests
 
10
  from models.models import Message, Attachment
11
  from fastapi import WebSocket
12
  from services import utils as ut
13
+ # from models import supabase_models as sp
14
  import asyncio
15
  def get_company_type(company_name:str)->str:
16
  company_types_dict ={'ao yun': 'wines and spirit', 'ardbeg': 'wines and spirit', 'belvedere': 'wines and spirit', 'bodega numanthia': 'wines and spirit', 'chandon': 'wines and spirit', 'château cheval blanc': 'wines and spirit', "château d'yquem": 'wines and spirit', 'château galoupet': 'wines and spirit', 'cheval des andes': 'wines and spirit', 'clos19': 'wines and spirit', 'cloudy bay': 'wines and spirit', 'colgin cellars': 'wines and spirit', 'dom pérignon': 'wines and spirit', 'domaine des lambrays': 'wines and spirit', 'eminente': 'wines and spirit', 'glenmorangie': 'wines and spirit', 'hennessy': 'wines and spirit', 'joseph phelps': 'wines and spirit', 'krug': 'wines and spirit', 'mercier': 'wines and spirit', 'moët & chandon': 'wines and spirit', 'newton vineyard': 'wines and spirit', 'ruinart': 'wines and spirit', 'terrazas de los andes': 'wines and spirit', 'veuve clicquot': 'wines and spirit', 'volcan de mi tierra': 'wines and spirit', 'woodinville': 'wines and spirit' , 'berluti': 'Fashion & Leather Goods', 'celine': 'Fashion & Leather Goods', 'christian dior': 'Fashion & Leather Goods', 'emilio pucci': 'Fashion & Leather Goods', 'fendi': 'Fashion & Leather Goods', 'givenchy': 'Fashion & Leather Goods', 'kenzo': 'Fashion & Leather Goods', 'loewe': 'Fashion & Leather Goods', 'loro piana': 'Fashion & Leather Goods', 'louis vuitton': 'Fashion & Leather Goods', 'marc jacobs': 'Fashion & Leather Goods', 'moynat': 'Fashion & Leather Goods', 'patou': 'Fashion & Leather Goods', 'rimowa': 'Fashion & Leather Goods','acqua di parma': 'Perfumes & Cosmetics', 'benefit cosmetics': 'Perfumes & Cosmetics', 'cha ling': 'Perfumes & Cosmetics', 'fenty beauty by rihanna': 'Perfumes & Cosmetics', 'fresh': 'Perfumes & Cosmetics', 'givenchy parfums': 'Perfumes & Cosmetics', 'guerlain': 'Perfumes & Cosmetics', 'kenzo parfums': 'Perfumes & Cosmetics', 'kvd beauty': 'Perfumes & Cosmetics', 'loewe perfumes': 'Perfumes & Cosmetics', 'maison francis kurkdjian': 'Perfumes & Cosmetics', 'make up for ever': 'Perfumes & Cosmetics', 'officine universelle buly': 'Perfumes & Cosmetics', 'olehenriksen': 'Perfumes & Cosmetics', 'parfums christian dior': 'Perfumes & Cosmetics', 'stella by stella mccartney': 'Perfumes & Cosmetics','bulgari': 'Watches & Jewelry', 'chaumet': 'Watches & Jewelry', 'fred': 'Watches & Jewelry', 'hublot': 'Watches & Jewelry', 'repossi': 'Watches & Jewelry', 'tag heuer': 'Watches & Jewelry', 'tiffany & co.': 'Watches & Jewelry', 'zenith': 'Watches & Jewelry','24s': 'Selective retailing', 'dfs': 'Selective retailing', 'la grande epicerie de paris': 'Selective retailing', 'le bon marché rive gauche': 'Selective retailing', 'sephora': 'Selective retailing','belmond': 'Other activities', 'cheval blanc': 'Other activities', 'connaissance des arts': 'Other activities', 'cova': 'Other activities', 'investir': 'Other activities', "jardin d'acclimatation": 'Other activities', 'le parisien': 'Other activities', 'les echos': 'Other activities', 'radio classique': 'Other activities', 'royal van lent': 'Other activities'}
 
29
  # gmail_response = requests.get(gmail_url, headers={"Authorization": f"Bearer {access_token}"})
30
  # gmail_data = gmail_response.json()
31
  # messages.append(gmail_data['messages'])
32
+ def fetch_message_wrapper(message_data,websocket:WebSocket):
33
  message_id = message_data.get("id")
34
  if message_id:
35
  return fetch_message_data(access_token, message_id)
36
 
37
  return None
38
 
39
+ # if message_id :
40
+ # message_json = fetch_message_data(access_token,message_id)
41
+ # # await process_message(message_json,websocket,20000)
42
+
43
+ # if message_id:
44
+ # return await asyncio.to_thread(fetch_message_data, access_token, message_id)
45
+ # return None
46
+ # with ProcessPoolExecutor(max_workers=4) as executor:
47
  while True:
48
  gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={g_query}"
49
  if page_token:
 
51
 
52
  gmail_response = requests.get(gmail_url, headers={"Authorization": f"Bearer {access_token}"})
53
  gmail_data = gmail_response.json()
54
+ print(len(gmail_data['messages']))
55
  print(gmail_data)
56
+
57
  if "messages" in gmail_data:
58
  with ThreadPoolExecutor(max_workers=15) as executor:
59
+
60
+
61
+
62
+ futures=[executor.submit(fetch_message_wrapper, message_data,websocket) for message_data in
63
+ gmail_data["messages"]]
64
  for future in futures:
65
  message = future.result()
66
+ print(message)
67
  if message:
68
+ # Process and send the message immediately
69
+ await process_message(message, websocket, 20000)
70
+ # if message:
71
+ # messages.append(message)
72
+ print("Messages to be sent")
73
+ print(messages)
74
+ print(len(messages))
75
+ # for message_data in messages:
76
+ # await process_message(message_data,websocket,10000)
77
+
78
 
79
  if "nextPageToken" in gmail_data:
80
  page_token = gmail_data["nextPageToken"]
 
86
 
87
  async def process_message(message:Message, websocket:WebSocket, chunk_size:int):
88
  logging.info("process_message")
89
+ print(message)
90
  if message:
91
  message_json = message.to_json()
92
  logging.info(f"{message_json}")