Omkar008 commited on
Commit
b665f30
1 Parent(s): b3e237c

Create ws_controller.py

Browse files
Files changed (1) hide show
  1. controllers/ws_controller.py +158 -0
controllers/ws_controller.py ADDED
@@ -0,0 +1,158 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import base64
2
+ import json
3
+ import logging
4
+ import re
5
+ from concurrent.futures import ThreadPoolExecutor
6
+ from typing import Optional, List, Dict
7
+
8
+ import requests
9
+ from bs4 import BeautifulSoup
10
+ from fastapi import FastAPI, WebSocket
11
+
12
+
13
+
14
+
15
+
16
+ def get_messages(code: str) -> List[Message]:
17
+ access_token = code
18
+ page_token = None
19
+ messages = []
20
+ jobs_query = f'subject:"your order" OR subject:receipts OR subject:receipt OR subject: aankoopbon OR subject:reçu OR subject:invoice OR subject:invoice OR category:purchases'
21
+ max_results = 10
22
+
23
+ def fetch_message_wrapper(message_data):
24
+ message_id = message_data.get("id")
25
+ if message_id:
26
+ return fetch_message_data(access_token, message_id)
27
+ return None
28
+
29
+ while True:
30
+ gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={jobs_query}&maxResults={max_results}"
31
+ if page_token:
32
+ gmail_url += f"&pageToken={page_token}"
33
+
34
+ gmail_response = requests.get(gmail_url, headers={"Authorization": f"Bearer {access_token}"})
35
+ gmail_data = gmail_response.json()
36
+
37
+ if "messages" in gmail_data:
38
+ with ThreadPoolExecutor(max_workers=15) as executor:
39
+ futures = [executor.submit(fetch_message_wrapper, message_data) for message_data in
40
+ gmail_data["messages"]]
41
+ for future in futures:
42
+ message = future.result()
43
+ if message:
44
+ messages.append(message)
45
+
46
+ if "nextPageToken" in gmail_data:
47
+ page_token = gmail_data["nextPageToken"]
48
+ else:
49
+ break
50
+
51
+ return messages
52
+
53
+
54
+ def fetch_message_data(access_token: str, message_id: str) -> Message:
55
+ message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}"
56
+ message_response = requests.get(message_url, headers={"Authorization": f"Bearer {access_token}"})
57
+ message_data = message_response.json()
58
+
59
+ subject = extract_subject_from_mail(message_data)
60
+ company_from_mail = extract_domain_name(message_data['payload']['headers'], subject)
61
+
62
+ body = extract_body_from_mail(message_data)
63
+
64
+ attachments = extract_attachments_from_mail(access_token, message_data)
65
+
66
+ return Message(message_id=message_id, body=body, attachments=attachments, company=company_from_mail)
67
+
68
+
69
+ def extract_subject_from_mail(message_data: dict) -> str:
70
+ if 'payload' in message_data and 'headers' in message_data['payload']:
71
+ headers = message_data['payload']['headers']
72
+ for header in headers:
73
+ if header['name'] == 'Subject':
74
+ return header['value']
75
+ return ""
76
+ else:
77
+ return ""
78
+
79
+
80
+ def extract_domain_name(payload: dict, subject: str) -> str:
81
+ domain_name = 'others'
82
+ for fromdata in payload:
83
+ if fromdata['name'] == 'From':
84
+ domain_name = extract_domain_from_email(fromdata['value'])
85
+ break
86
+ if 'chanel' in subject.lower():
87
+ return 'chanel'
88
+ if 'louis vuitton' in subject.lower():
89
+ return 'Louis Vuitton'
90
+ return domain_name
91
+
92
+
93
+ def extract_domain_from_email(email_string: str) -> Optional[str]:
94
+ email_address = re.search(r'[\w\.-]+@[\w\.-]+', email_string).group()
95
+ domain = email_address.split('@')[-1].split('.')[0]
96
+ if email_address and domain:
97
+ return domain
98
+ else:
99
+ return None
100
+
101
+
102
+ def extract_body_from_mail(message_data: dict) -> str:
103
+ body = None
104
+ if "payload" in message_data and "parts" in message_data["payload"]:
105
+ for part in message_data["payload"]["parts"]:
106
+ if 'mimeType' in part and part['mimeType'] == 'text/plain':
107
+ body_data = part['body'].get('data', '')
108
+ body_base64 = base64.urlsafe_b64decode(body_data)
109
+ body = extract_text(body_base64)
110
+ break
111
+ return body
112
+
113
+
114
+ def fetch_attachment_data(access_token: str, message_id: str, attachment_id: str) -> Dict:
115
+ attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}"
116
+ attachment_response = requests.get(attachment_url, headers={"Authorization": f"Bearer {access_token}"})
117
+ return attachment_response.json()
118
+
119
+
120
+ def extract_attachments_from_mail(access_token: str, message_data: dict) -> List[Attachment]:
121
+ attachments = []
122
+ if "payload" in message_data and "parts" in message_data["payload"]:
123
+ for part in message_data["payload"]["parts"]:
124
+ if "body" in part and "attachmentId" in part["body"]:
125
+ attachment_id = part["body"]["attachmentId"]
126
+ attachment_data = fetch_attachment_data(access_token, message_data["id"], attachment_id)
127
+ filename = part.get("filename", "untitled.txt")
128
+ attachments.append(Attachment(filename=filename, data=attachment_data.get("data", "")))
129
+ return attachments
130
+
131
+
132
+ def extract_text(html_content: str) -> str:
133
+ if not html_content:
134
+ raise ValueError("HTML content is empty or None")
135
+
136
+ soup = BeautifulSoup(html_content, 'html.parser')
137
+ text = soup.get_text(separator=' ')
138
+ text = re.sub(r'\s+', ' ', text).strip()
139
+ return text
140
+
141
+
142
+ async def websocket_main(code: str, websocket: WebSocket):
143
+ access_token = code
144
+ messages = get_messages(access_token)
145
+ await websocket.send_json({"total_messages": len(messages)})
146
+ chunk_size = 100000
147
+ for message in messages:
148
+ message_json = message.to_json()
149
+ await send_message_in_chunks(websocket, message_json, chunk_size)
150
+ await websocket.send_text("NEXT_MESSAGE")
151
+
152
+ await websocket.close()
153
+
154
+
155
+ async def send_message_in_chunks(websocket: WebSocket, message_json: dict, chunk_size: int):
156
+ message_str = json.dumps(message_json)
157
+ for i in range(0, len(message_str), chunk_size):
158
+ await websocket.send_text(message_str[i:i + chunk_size])