Spaces:
Sleeping
Sleeping
Update controllers/ws_controller.py
Browse files- controllers/ws_controller.py +37 -37
controllers/ws_controller.py
CHANGED
@@ -10,6 +10,7 @@ from bs4 import BeautifulSoup
|
|
10 |
from models.models import Message, Attachment
|
11 |
from fastapi import WebSocket
|
12 |
from services import utils as ut
|
|
|
13 |
# from models import supabase_models as sp
|
14 |
import asyncio
|
15 |
def get_company_type(company_name:str)->str:
|
@@ -17,7 +18,8 @@ def get_company_type(company_name:str)->str:
|
|
17 |
print(company_types_dict["louis vuitton"])
|
18 |
return company_types_dict.get(company_name.lower(), 'Others')
|
19 |
|
20 |
-
async def get_messages(code: str,websocket:WebSocket,brand_name: Optional[str] = None):
|
|
|
21 |
access_token = code
|
22 |
g_query = f'(subject:"your order" OR subject:receipts OR subject:receipt OR subject:aankoopbon OR subject:reçu OR subject:invoice OR subject:invoices OR category:purchases) has:attachment'
|
23 |
if brand_name is not None:
|
@@ -36,37 +38,50 @@ async def get_messages(code: str,websocket:WebSocket,brand_name: Optional[str] =
|
|
36 |
|
37 |
return None
|
38 |
|
39 |
-
|
40 |
-
|
41 |
-
|
42 |
-
|
43 |
-
# if message_id:
|
44 |
-
# return await asyncio.to_thread(fetch_message_data, access_token, message_id)
|
45 |
-
# return None
|
46 |
-
# with ProcessPoolExecutor(max_workers=4) as executor:
|
47 |
while True:
|
48 |
-
|
|
|
49 |
if page_token:
|
50 |
gmail_url += f"&pageToken={page_token}"
|
|
|
51 |
|
52 |
gmail_response = requests.get(gmail_url, headers={"Authorization": f"Bearer {access_token}"})
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
53 |
gmail_data = gmail_response.json()
|
54 |
-
print(len(gmail_data['messages']))
|
55 |
print(gmail_data)
|
|
|
|
|
56 |
|
57 |
if "messages" in gmail_data:
|
58 |
-
with ThreadPoolExecutor(max_workers=
|
59 |
|
60 |
|
61 |
|
62 |
futures=[executor.submit(fetch_message_wrapper, message_data,websocket) for message_data in
|
63 |
gmail_data["messages"]]
|
|
|
|
|
64 |
for future in futures:
|
65 |
message = future.result()
|
66 |
print(message)
|
67 |
if message:
|
68 |
# Process and send the message immediately
|
69 |
-
|
|
|
|
|
|
|
|
|
|
|
70 |
# if message:
|
71 |
# messages.append(message)
|
72 |
print("Messages to be sent")
|
@@ -90,7 +105,8 @@ async def process_message(message:Message, websocket:WebSocket, chunk_size:int):
|
|
90 |
if message:
|
91 |
message_json = message.to_json()
|
92 |
logging.info(f"{message_json}")
|
93 |
-
await send_message_in_chunks(websocket, message_json,
|
|
|
94 |
await websocket.send_text("NEXT_MESSAGE")
|
95 |
|
96 |
|
@@ -103,25 +119,19 @@ def fetch_message_data(access_token: str, message_id: str) -> Message:
|
|
103 |
company_from_mail = extract_domain_name(message_data['payload']['headers'], subject)
|
104 |
|
105 |
body = extract_body_from_mail(message_data)
|
106 |
-
|
107 |
attachments,structed_attachment_data = extract_attachments_from_mail(access_token, message_data)
|
|
|
|
|
|
|
|
|
108 |
high_level_company_type = get_company_type(company_from_mail)
|
109 |
-
# structed_attachment_data = extract_json_from_attachments(access_token , message_data)
|
110 |
|
111 |
|
112 |
body_len = 0
|
113 |
if body is not None :
|
114 |
body_len = len(body)
|
115 |
|
116 |
-
# print("subject: ")
|
117 |
-
# print(subject)
|
118 |
-
# print("company name: ")
|
119 |
-
# print(company_from_mail)
|
120 |
-
# print("Printing the body of the mail: ")
|
121 |
-
# print(body)
|
122 |
-
# print("Printing attachment Data: ")
|
123 |
-
# print(attachments)
|
124 |
-
# print("Completed this mail.")
|
125 |
return Message(message_id=message_id, body_len=body_len,body=body, attachments=attachments, company=company_from_mail,high_level_company_type=high_level_company_type,structured_data = structed_attachment_data)
|
126 |
|
127 |
|
@@ -159,16 +169,6 @@ def extract_domain_from_email(email_string: str) -> Optional[str]:
|
|
159 |
return None
|
160 |
|
161 |
|
162 |
-
# def extract_body_from_mail(message_data: dict) -> str:
|
163 |
-
# body = None
|
164 |
-
# if "payload" in message_data and "parts" in message_data["payload"]:
|
165 |
-
# for part in message_data["payload"]["parts"]:
|
166 |
-
# if 'mimeType' in part and (part['mimeType'] == 'text/plain' or part['mimeType'] == 'text/html'):
|
167 |
-
# body_data = part['body'].get('data', '')
|
168 |
-
# body_base64 = base64.urlsafe_b64decode(body_data)
|
169 |
-
# body = extract_text(body_base64)
|
170 |
-
# return body
|
171 |
-
|
172 |
|
173 |
def extract_body_from_mail(message_data: dict) -> str:
|
174 |
body = None
|
@@ -237,10 +237,10 @@ def extract_text(html_content: str) -> str:
|
|
237 |
return text
|
238 |
|
239 |
|
240 |
-
async def websocket_main(code: str, websocket: WebSocket,brand_name: Optional[str] = None):
|
241 |
access_token = code
|
242 |
# messages = get_messages(access_token,websocket,brand_name)
|
243 |
-
await get_messages(access_token,websocket,brand_name)
|
244 |
# print("websocket_main")
|
245 |
# print(messages)
|
246 |
# # logging.info(f"brand_name:{brand_name}")
|
|
|
10 |
from models.models import Message, Attachment
|
11 |
from fastapi import WebSocket
|
12 |
from services import utils as ut
|
13 |
+
import time
|
14 |
# from models import supabase_models as sp
|
15 |
import asyncio
|
16 |
def get_company_type(company_name:str)->str:
|
|
|
18 |
print(company_types_dict["louis vuitton"])
|
19 |
return company_types_dict.get(company_name.lower(), 'Others')
|
20 |
|
21 |
+
async def get_messages(code: str,websocket:WebSocket,start,brand_name: Optional[str] = None):
|
22 |
+
await websocket.send_text("Test text!!")
|
23 |
access_token = code
|
24 |
g_query = f'(subject:"your order" OR subject:receipts OR subject:receipt OR subject:aankoopbon OR subject:reçu OR subject:invoice OR subject:invoices OR category:purchases) has:attachment'
|
25 |
if brand_name is not None:
|
|
|
38 |
|
39 |
return None
|
40 |
|
41 |
+
end = time.time()
|
42 |
+
print("time 0")
|
43 |
+
print(end - start)
|
44 |
+
start1 = time.time()
|
|
|
|
|
|
|
|
|
45 |
while True:
|
46 |
+
start2= time.time()
|
47 |
+
gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={g_query}&maxResults={30}"
|
48 |
if page_token:
|
49 |
gmail_url += f"&pageToken={page_token}"
|
50 |
+
# print(gmail_url)
|
51 |
|
52 |
gmail_response = requests.get(gmail_url, headers={"Authorization": f"Bearer {access_token}"})
|
53 |
+
# print(gmail_response.text)
|
54 |
+
end2 = time.time()
|
55 |
+
|
56 |
+
print("End 2 ")
|
57 |
+
print(end2-start2)
|
58 |
+
print("response length")
|
59 |
+
print(gmail_response.content.__len__())
|
60 |
gmail_data = gmail_response.json()
|
|
|
61 |
print(gmail_data)
|
62 |
+
print(len(gmail_data['messages']))
|
63 |
+
|
64 |
|
65 |
if "messages" in gmail_data:
|
66 |
+
with ThreadPoolExecutor(max_workers=50) as executor:
|
67 |
|
68 |
|
69 |
|
70 |
futures=[executor.submit(fetch_message_wrapper, message_data,websocket) for message_data in
|
71 |
gmail_data["messages"]]
|
72 |
+
print(len(futures))
|
73 |
+
print(futures)
|
74 |
for future in futures:
|
75 |
message = future.result()
|
76 |
print(message)
|
77 |
if message:
|
78 |
# Process and send the message immediately
|
79 |
+
end1 = time.time()
|
80 |
+
print("time 1")
|
81 |
+
print(end1-start1)
|
82 |
+
await process_message(message, websocket, 100000)
|
83 |
+
|
84 |
+
|
85 |
# if message:
|
86 |
# messages.append(message)
|
87 |
print("Messages to be sent")
|
|
|
105 |
if message:
|
106 |
message_json = message.to_json()
|
107 |
logging.info(f"{message_json}")
|
108 |
+
await send_message_in_chunks(websocket, message_json, 50000)
|
109 |
+
# await websocket.send_text(str(message_json))
|
110 |
await websocket.send_text("NEXT_MESSAGE")
|
111 |
|
112 |
|
|
|
119 |
company_from_mail = extract_domain_name(message_data['payload']['headers'], subject)
|
120 |
|
121 |
body = extract_body_from_mail(message_data)
|
122 |
+
start3= time.time()
|
123 |
attachments,structed_attachment_data = extract_attachments_from_mail(access_token, message_data)
|
124 |
+
end3=time.time()
|
125 |
+
|
126 |
+
print("time 5")
|
127 |
+
print(end3 - start3)
|
128 |
high_level_company_type = get_company_type(company_from_mail)
|
|
|
129 |
|
130 |
|
131 |
body_len = 0
|
132 |
if body is not None :
|
133 |
body_len = len(body)
|
134 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
135 |
return Message(message_id=message_id, body_len=body_len,body=body, attachments=attachments, company=company_from_mail,high_level_company_type=high_level_company_type,structured_data = structed_attachment_data)
|
136 |
|
137 |
|
|
|
169 |
return None
|
170 |
|
171 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
172 |
|
173 |
def extract_body_from_mail(message_data: dict) -> str:
|
174 |
body = None
|
|
|
237 |
return text
|
238 |
|
239 |
|
240 |
+
async def websocket_main(code: str, websocket: WebSocket,start,brand_name: Optional[str] = None):
|
241 |
access_token = code
|
242 |
# messages = get_messages(access_token,websocket,brand_name)
|
243 |
+
await get_messages(access_token,websocket,start,brand_name)
|
244 |
# print("websocket_main")
|
245 |
# print(messages)
|
246 |
# # logging.info(f"brand_name:{brand_name}")
|