Omkar008 commited on
Commit
9ebb81c
1 Parent(s): f9ce227

Update controllers/ws_controller.py

Browse files
Files changed (1) hide show
  1. controllers/ws_controller.py +96 -34
controllers/ws_controller.py CHANGED
@@ -20,7 +20,9 @@ def get_company_type(company_name:str)->str:
20
  return company_types_dict.get(company_name.lower(), 'Others')
21
 
22
  async def get_messages(code: str,websocket:WebSocket,start,brand_name: Optional[str] = None):
 
23
  access_token = code
 
24
  g_query = f'(subject:"your order" OR subject:receipts OR subject:receipt OR subject:aankoopbon OR subject:reçu OR subject:invoice OR subject:invoices OR category:purchases) has:attachment'
25
  if brand_name is not None:
26
  g_query = f'(subject:"your order" OR subject:receipts OR subject:receipt OR subject: aankoopbon OR subject:reçu OR subject:invoice OR subject:invoices OR category:purchases OR from:{brand_name}) AND subject:{brand_name} has:attachment'
@@ -44,7 +46,7 @@ async def get_messages(code: str,websocket:WebSocket,start,brand_name: Optional[
44
  start1 = time.time()
45
  while True:
46
  start2= time.time()
47
- gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={g_query}&maxResults=15"
48
  if page_token:
49
  gmail_url += f"&pageToken={page_token}"
50
  # print(gmail_url)
@@ -53,57 +55,67 @@ async def get_messages(code: str,websocket:WebSocket,start,brand_name: Optional[
53
  # print(gmail_response.text)
54
  end2 = time.time()
55
 
56
- # print("End 2 ")
57
- # print(end2-start2)
58
- # print("response length")
59
- # print(gmail_response.content.__len__())
60
  gmail_data = gmail_response.json()
61
  # print(gmail_data)
62
- # print(len(gmail_data['messages']))
63
 
64
 
65
  if "messages" in gmail_data:
66
- with ThreadPoolExecutor(max_workers=20) as executor:
 
 
 
67
 
68
 
69
 
70
  futures=[executor.submit(fetch_message_wrapper, message_data,websocket) for message_data in
71
  gmail_data["messages"]]
72
- # print(len(futures))
 
73
  # print(futures)
74
  for future in concurrent.futures.as_completed(futures):
75
  message = future.result()
76
  # print(message)
77
  if message:
 
78
  # Process and send the message immediately
79
- await process_message(message, websocket, 50000)
 
 
 
80
 
81
 
82
- # if message:
83
- # messages.append(message)
84
  # print("Messages to be sent")
85
- # print(messages)
86
  # print(len(messages))
87
- # for message_data in messages:
88
- # await process_message(message_data,websocket,10000)
89
 
90
 
91
  if "nextPageToken" in gmail_data:
92
  page_token = gmail_data["nextPageToken"]
93
  else:
94
  break
95
- # print("printing messages")
 
 
96
  # print(messages)
97
  return messages
98
 
99
-
100
  async def process_message(message:Message, websocket:WebSocket, chunk_size:int):
101
- # logging.info("process_message")
102
- # print(message)
103
  if message:
104
  message_json = message.to_json()
105
  # logging.info(f"{message_json}")
106
- await send_message_in_chunks(websocket, message_json, chunk_size)
 
107
  await websocket.send_text("NEXT_MESSAGE")
108
 
109
 
@@ -113,6 +125,8 @@ def fetch_message_data(access_token: str, message_id: str) -> Message:
113
  message_data = message_response.json()
114
  # print(message_data)
115
  subject = extract_subject_from_mail(message_data)
 
 
116
  company_from_mail = extract_domain_name(message_data['payload']['headers'], subject)
117
 
118
  body = extract_body_from_mail(message_data)
@@ -132,6 +146,48 @@ def fetch_message_data(access_token: str, message_id: str) -> Message:
132
  return Message(message_id=message_id, body_len=body_len,body=body, attachments=attachments, company=company_from_mail,high_level_company_type=high_level_company_type,structured_data = structed_attachment_data)
133
 
134
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
135
 
136
  def extract_subject_from_mail(message_data: dict) -> str:
137
  if 'payload' in message_data and 'headers' in message_data['payload']:
@@ -215,20 +271,10 @@ def extract_attachments_from_mail(access_token: str, message_data: dict) -> List
215
  if filename.endswith(".zip") or filename.endswith(".txt") or filename.endswith(".png") or filename.endswith(".jpg") or filename.endswith(".jpeg") or filename.endswith(".gif"):
216
  continue
217
  data = attachment_data.get("data", "")
218
-
219
-
220
- try:
221
- raw_text = ut.extract_text_from_attachment(filename, data)
222
- struct_data = ut.strcuture_document_data(raw_text)
223
- if struct_data:
224
- structured_data.append(struct_data)
225
- except Exception as e:
226
- print(f"Error processing attachment {filename}: {str(e)}")
227
- continue # Skip this attachment if there's an error
228
- # raw_text=ut.extract_text_from_attachment(filename , data)
229
- # struct_data = ut.strcuture_document_data(raw_text)
230
- # if struct_data:
231
- # structured_data.append(struct_data)
232
 
233
  attachments.append(Attachment(attachment_len = len(attachment_data.get("data", "")),filename=filename, data=attachment_data.get("data", "")))
234
  return attachments,structured_data
@@ -268,7 +314,23 @@ async def websocket_main(code: str, websocket: WebSocket,start,brand_name: Opti
268
  await websocket.close()
269
 
270
 
271
- async def send_message_in_chunks(websocket: WebSocket, message_json: dict, chunk_size: int):
 
 
 
 
 
 
 
 
 
 
 
 
272
  message_str = json.dumps(message_json)
 
 
 
 
273
  for i in range(0, len(message_str), chunk_size):
274
  await websocket.send_text(message_str[i:i + chunk_size])
 
20
  return company_types_dict.get(company_name.lower(), 'Others')
21
 
22
  async def get_messages(code: str,websocket:WebSocket,start,brand_name: Optional[str] = None):
23
+ await websocket.send_text("Test text!!")
24
  access_token = code
25
+ total_processed = 0
26
  g_query = f'(subject:"your order" OR subject:receipts OR subject:receipt OR subject:aankoopbon OR subject:reçu OR subject:invoice OR subject:invoices OR category:purchases) has:attachment'
27
  if brand_name is not None:
28
  g_query = f'(subject:"your order" OR subject:receipts OR subject:receipt OR subject: aankoopbon OR subject:reçu OR subject:invoice OR subject:invoices OR category:purchases OR from:{brand_name}) AND subject:{brand_name} has:attachment'
 
46
  start1 = time.time()
47
  while True:
48
  start2= time.time()
49
+ gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={g_query}&maxResults={30}"
50
  if page_token:
51
  gmail_url += f"&pageToken={page_token}"
52
  # print(gmail_url)
 
55
  # print(gmail_response.text)
56
  end2 = time.time()
57
 
58
+ print("End 2 ")
59
+ print(end2-start2)
60
+ print("response length")
61
+ print(gmail_response.content.__len__())
62
  gmail_data = gmail_response.json()
63
  # print(gmail_data)
64
+ print(len(gmail_data['messages']))
65
 
66
 
67
  if "messages" in gmail_data:
68
+ # for message_data in gmail_data['messages']:
69
+ # message = fetch_message_wrapper(message_data,websocket)
70
+ # await process_message(message, websocket, 2000000)
71
+ with ThreadPoolExecutor(max_workers=50) as executor:
72
 
73
 
74
 
75
  futures=[executor.submit(fetch_message_wrapper, message_data,websocket) for message_data in
76
  gmail_data["messages"]]
77
+ print("Futures length")
78
+ print(len(futures))
79
  # print(futures)
80
  for future in concurrent.futures.as_completed(futures):
81
  message = future.result()
82
  # print(message)
83
  if message:
84
+ total_processed += 1
85
  # Process and send the message immediately
86
+ end1 = time.time()
87
+ print("time 1")
88
+ print("sending the message")
89
+ await process_message(message, websocket, 100000)
90
 
91
 
92
+ # # if message:
93
+ # # messages.append(message)
94
  # print("Messages to be sent")
95
+ # # print(messages)s
96
  # print(len(messages))
97
+ # # for message_data in messages:
98
+ # # await process_message(message_data,websocket,10000)
99
 
100
 
101
  if "nextPageToken" in gmail_data:
102
  page_token = gmail_data["nextPageToken"]
103
  else:
104
  break
105
+ print(f"Total messages processed: {total_processed}")
106
+ logging.info(f"Total Processed Messages : {total_processed}")
107
+ print("printing messages")
108
  # print(messages)
109
  return messages
110
 
 
111
  async def process_message(message:Message, websocket:WebSocket, chunk_size:int):
112
+ logging.info("process_message")
113
+ print(message)
114
  if message:
115
  message_json = message.to_json()
116
  # logging.info(f"{message_json}")
117
+ await send_message_in_chunks(websocket, message_json, 50000)
118
+ # await websocket.send_text(str(message_json))
119
  await websocket.send_text("NEXT_MESSAGE")
120
 
121
 
 
125
  message_data = message_response.json()
126
  # print(message_data)
127
  subject = extract_subject_from_mail(message_data)
128
+ print("printing message data")
129
+ print(message_data)
130
  company_from_mail = extract_domain_name(message_data['payload']['headers'], subject)
131
 
132
  body = extract_body_from_mail(message_data)
 
146
  return Message(message_id=message_id, body_len=body_len,body=body, attachments=attachments, company=company_from_mail,high_level_company_type=high_level_company_type,structured_data = structed_attachment_data)
147
 
148
 
149
+ # def fetch_message_data(access_token: str, message_id: str) -> Message:
150
+ # message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}"
151
+ # message_response = requests.get(message_url, headers={"Authorization": f"Bearer {access_token}"})
152
+ # message_data = message_response.json()
153
+
154
+ # with ThreadPoolExecutor(max_workers=50) as executor:
155
+ # # Submit tasks to executor
156
+ # future_subject = executor.submit(extract_subject_from_mail, message_data)
157
+ # subject = future_subject.result()
158
+ # future_company_from_mail = executor.submit(extract_domain_name, message_data['payload']['headers'], future_subject.result())
159
+ # company_from_mail = future_company_from_mail.result()
160
+ # future_body = executor.submit(extract_body_from_mail, message_data)
161
+ # body = future_body.result()
162
+
163
+ # # Extract attachments and measure time
164
+
165
+ # future_attachments = executor.submit(extract_attachments_from_mail, access_token, message_data)
166
+ # attachments, structured_attachment_data = future_attachments.result()
167
+
168
+
169
+ # future_high_level_company_type = executor.submit(get_company_type, future_company_from_mail.result())
170
+
171
+ # high_level_company_type = future_high_level_company_type.result()
172
+
173
+ # body_len = len(body) if body is not None else 0
174
+
175
+ # return Message(
176
+ # message_id=message_id,
177
+ # body_len=body_len,
178
+ # body=body,
179
+ # attachments=attachments,
180
+ # company=company_from_mail,
181
+ # high_level_company_type=high_level_company_type,
182
+ # structured_data=structured_attachment_data
183
+ # )
184
+
185
+
186
+
187
+
188
+
189
+
190
+
191
 
192
  def extract_subject_from_mail(message_data: dict) -> str:
193
  if 'payload' in message_data and 'headers' in message_data['payload']:
 
271
  if filename.endswith(".zip") or filename.endswith(".txt") or filename.endswith(".png") or filename.endswith(".jpg") or filename.endswith(".jpeg") or filename.endswith(".gif"):
272
  continue
273
  data = attachment_data.get("data", "")
274
+ raw_text=ut.extract_text_from_attachment(filename , data)
275
+ struct_data = ut.strcuture_document_data(raw_text,filename)
276
+ if struct_data:
277
+ structured_data.append(struct_data)
 
 
 
 
 
 
 
 
 
 
278
 
279
  attachments.append(Attachment(attachment_len = len(attachment_data.get("data", "")),filename=filename, data=attachment_data.get("data", "")))
280
  return attachments,structured_data
 
314
  await websocket.close()
315
 
316
 
317
+ async def send_message_in_chunks(websocket: WebSocket, message_json: dict, chunk_size: int):
318
+
319
+ # if message_json['attachments'] is not None :
320
+ # for attch in message_json['attachments']:
321
+ # attachment_len = attch['attachment_len']
322
+
323
+
324
+ # print(body_len)
325
+ # print(attachment_len)
326
+ # if attachment_len == 0:
327
+ # attachment_len = None
328
+ # await websocket.send_json({"body_len":body_len ,"attachment_len":attachment_len})
329
+
330
  message_str = json.dumps(message_json)
331
+ # print("Printing message_str")
332
+ # print(message_str)
333
+ # logging.info(message_str)
334
+ # await websocket.send_json({"file_len":len(file)})
335
  for i in range(0, len(message_str), chunk_size):
336
  await websocket.send_text(message_str[i:i + chunk_size])