File size: 17,141 Bytes
b665f30
 
 
 
21791d0
b665f30
 
 
 
2c065ea
985eaa2
1e272ca
e01e08c
16b124e
9bf7c59
4d4f9d0
33b6dc3
1e272ca
 
 
b665f30
e01e08c
b665f30
9ebb81c
2a1db67
c0ca9ad
31b785c
b665f30
 
602cbe6
 
f75908f
 
 
21791d0
b665f30
 
4b33dd3
3313a0d
4b33dd3
0db13ac
b665f30
 
e01e08c
 
 
 
b665f30
e01e08c
07b8e82
b665f30
 
e01e08c
b665f30
 
e01e08c
 
b665f30
8abd0d4
6f9036e
e01e08c
21791d0
b665f30
9ebb81c
 
 
07b8e82
21791d0
 
 
 
 
9ebb81c
 
14bec9f
1b5a70b
1f2aaa9
8abd0d4
1f2aaa9
9ebb81c
21791d0
9ebb81c
 
 
3313a0d
 
e01e08c
 
9ebb81c
 
14bec9f
9ebb81c
14bec9f
9ebb81c
 
21791d0
b665f30
 
 
 
 
f13d7b7
9ebb81c
 
 
14bec9f
b665f30
 
409c72c
 
0db13ac
9ebb81c
f13d7b7
8a73b97
9777d71
25e5107
9ebb81c
 
0db13ac
 
b665f30
 
 
 
 
716077c
b665f30
9ebb81c
 
b665f30
 
 
3d6350d
1e272ca
 
 
8364347
 
 
 
1e272ca
 
b665f30
9ebb81c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b665f30
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dc6c45b
c788a70
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b665f30
 
 
 
 
 
 
 
3d6350d
b665f30
 
 
 
 
 
6953b8f
067db0f
3d6350d
07b8e82
 
03c7264
07b8e82
 
 
03c7264
8a73b97
 
 
 
 
 
 
 
 
 
0817311
8a73b97
 
 
 
 
 
 
 
 
 
 
78b319c
03c7264
3d6350d
55bac05
3d6350d
b665f30
 
 
 
 
 
 
 
 
 
 
 
e01e08c
b665f30
e01e08c
b665f30
 
409c72c
b665f30
9ebb81c
 
 
 
 
 
 
 
 
 
 
 
 
b665f30
 
2f346b0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
import base64
import json
import logging
import re
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
from typing import Optional, List, Dict
import requests
from bs4 import BeautifulSoup
from models.models import Message, Attachment
from fastapi import WebSocket
from services import utils as ut
import time
import concurrent.futures

import asyncio
def get_company_type(company_name:str)->str:
    company_types_dict ={'ao yun': 'wines and spirit', 'ardbeg': 'wines and spirit', 'belvedere': 'wines and spirit', 'bodega numanthia': 'wines and spirit', 'chandon': 'wines and spirit', 'château cheval blanc': 'wines and spirit', "château d'yquem": 'wines and spirit', 'château galoupet': 'wines and spirit', 'cheval des andes': 'wines and spirit', 'clos19': 'wines and spirit', 'cloudy bay': 'wines and spirit', 'colgin cellars': 'wines and spirit', 'dom pérignon': 'wines and spirit', 'domaine des lambrays': 'wines and spirit', 'eminente': 'wines and spirit', 'glenmorangie': 'wines and spirit', 'hennessy': 'wines and spirit', 'joseph phelps': 'wines and spirit', 'krug': 'wines and spirit', 'mercier': 'wines and spirit', 'moët & chandon': 'wines and spirit', 'newton vineyard': 'wines and spirit', 'ruinart': 'wines and spirit', 'terrazas de los andes': 'wines and spirit', 'veuve clicquot': 'wines and spirit', 'volcan de mi tierra': 'wines and spirit', 'woodinville': 'wines and spirit' , 'berluti': 'Fashion & Leather Goods', 'celine': 'Fashion & Leather Goods', 'christian dior': 'Fashion & Leather Goods', 'emilio pucci': 'Fashion & Leather Goods', 'fendi': 'Fashion & Leather Goods', 'givenchy': 'Fashion & Leather Goods', 'kenzo': 'Fashion & Leather Goods', 'loewe': 'Fashion & Leather Goods', 'loro piana': 'Fashion & Leather Goods', 'louis vuitton': 'Fashion & Leather Goods', 'marc jacobs': 'Fashion & Leather Goods', 'moynat': 'Fashion & Leather Goods', 'patou': 'Fashion & Leather Goods', 'rimowa': 'Fashion & Leather Goods','acqua di parma': 'Perfumes & Cosmetics', 'benefit cosmetics': 'Perfumes & Cosmetics', 'cha ling': 'Perfumes & Cosmetics', 'fenty beauty by rihanna': 'Perfumes & Cosmetics', 'fresh': 'Perfumes & Cosmetics', 'givenchy parfums': 'Perfumes & Cosmetics', 'guerlain': 'Perfumes & Cosmetics', 'kenzo parfums': 'Perfumes & Cosmetics', 'kvd beauty': 'Perfumes & Cosmetics', 'loewe perfumes': 'Perfumes & Cosmetics', 'maison francis kurkdjian': 'Perfumes & Cosmetics', 'make up for ever': 'Perfumes & Cosmetics', 'officine universelle buly': 'Perfumes & Cosmetics', 'olehenriksen': 'Perfumes & Cosmetics', 'parfums christian dior': 'Perfumes & Cosmetics', 'stella by stella mccartney': 'Perfumes & Cosmetics','bulgari': 'Watches & Jewelry', 'chaumet': 'Watches & Jewelry', 'fred': 'Watches & Jewelry', 'hublot': 'Watches & Jewelry', 'repossi': 'Watches & Jewelry', 'tag heuer': 'Watches & Jewelry', 'tiffany & co.': 'Watches & Jewelry', 'zenith': 'Watches & Jewelry','24s': 'Selective retailing', 'dfs': 'Selective retailing', 'la grande epicerie de paris': 'Selective retailing', 'le bon marché rive gauche': 'Selective retailing', 'sephora': 'Selective retailing','belmond': 'Other activities', 'cheval blanc': 'Other activities', 'connaissance des arts': 'Other activities', 'cova': 'Other activities', 'investir': 'Other activities', "jardin d'acclimatation": 'Other activities', 'le parisien': 'Other activities', 'les echos': 'Other activities', 'radio classique': 'Other activities', 'royal van lent': 'Other activities'}
    print(company_types_dict["louis vuitton"])
    return company_types_dict.get(company_name.lower(), 'Others')

async def get_messages(code: str,websocket:WebSocket,start,brand_name: Optional[str] = None):
    access_token = code
    total_processed = 0
    g_query = f'(subject:"your order" OR subject:receipts OR subject:receipt OR subject:aankoopbon OR subject:reçu OR subject:invoice OR subject:invoices OR category:purchases) has:attachment'
    if brand_name is not None:
        g_query = f'(subject:"your order" OR subject:receipts OR subject:receipt OR  subject: aankoopbon  OR subject:reçu OR subject:invoice OR subject:invoices OR category:purchases) AND ({brand_name}) has:attachment'    
    page_token = None
    messages = []
    # max_results = 10
    # gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={jobs_query}&maxResults={max_results}"
    # gmail_response = requests.get(gmail_url, headers={"Authorization": f"Bearer {access_token}"})
    # gmail_data = gmail_response.json()
    # messages.append(gmail_data['messages'])
    def fetch_message_wrapper(message_data,websocket:WebSocket):
        message_id = message_data.get("id")
        if message_id:
            msg = fetch_message_data(access_token, message_id)    

            return msg
            
        return None

    end = time.time()
    print("time 0")
    print(end - start)
    start1 = time.time()
    while True:
        start2= time.time()
        gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={g_query}&maxResults=20"
        if page_token:
            gmail_url += f"&pageToken={page_token}"
        # print(gmail_url)

        gmail_response = requests.get(gmail_url, headers={"Authorization": f"Bearer {access_token}"})
        # print(gmail_response.text)
        end2 = time.time()
        gmail_data = gmail_response.json()
        # print(gmail_data)
        # print(len(gmail_data['messages']))
       
            
        if "messages" in gmail_data:
            # for message_data in gmail_data['messages']:
            #     message = fetch_message_wrapper(message_data,websocket)
            #     await process_message(message, websocket, 2000000)
            with ThreadPoolExecutor(max_workers=20) as executor:

                            
                    
                futures=[executor.submit(fetch_message_wrapper, message_data,websocket) for message_data in
                               gmail_data["messages"]]
                print("Futures length")
                print(len(futures))
                # print(futures)
                for future in concurrent.futures.as_completed(futures):
                    message = future.result()
                    # print(message)
                    if message:
                        total_processed += 1
                        # Process and send the message immediately
                        end1 = time.time()
                        print("time 1")
                        print("sending the message")

                        await process_message(message, websocket, 100000)
                        
                      
            #         # if message:
            #         #     messages.append(message)
            # print("Messages to be sent")
            # # print(messages)s
            # print(len(messages))
            # # for message_data in messages:
            # #     await process_message(message_data,websocket,10000)
            

        if "nextPageToken" in gmail_data:
            page_token = gmail_data["nextPageToken"]
        else:
            break
    await websocket.send_text(f"Total messages processed: {total_processed}")
    print(f"Total messages processed: {total_processed}")
    logging.info(f"Total Processed Messages : {total_processed}")
    print("printing messages")
    # print(messages)
    return messages

#async def process_message(message:Message, websocket:WebSocket, chunk_size:int):

async def process_message(message:Message, websocket:WebSocket, chunk_size:int):
    logging.info("process_message")
    # print(message)
    if message and message.structured_data:
        message_json = message.to_json()
        # logging.info(f"{message_json}")
        await send_message_in_chunks(websocket, message_json, 50000)
        # await websocket.send_text(str(message_json))
        await websocket.send_text("NEXT_MESSAGE")


def fetch_message_data(access_token: str, message_id: str) -> Message:
    message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}"
    message_response = requests.get(message_url, headers={"Authorization": f"Bearer {access_token}"})
    message_data = message_response.json()
    # print(message_data)
    subject = extract_subject_from_mail(message_data)
    print("printing message data")
    print(message_data)
    company_from_mail = extract_domain_name(message_data['payload']['headers'], subject)

    body = extract_body_from_mail(message_data)
    attachments,structed_attachment_data = extract_attachments_from_mail(access_token, message_data)
    high_level_company_type = get_company_type(company_from_mail)


    body_len = 0
    if body is not None :
        body_len = len(body)

    return Message(message_id=message_id, body_len=body_len,body=body, attachments=attachments, company=company_from_mail,high_level_company_type=high_level_company_type,structured_data = structed_attachment_data)


# def fetch_message_data(access_token: str, message_id: str) -> Message:
#     message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}"
#     message_response = requests.get(message_url, headers={"Authorization": f"Bearer {access_token}"})
#     message_data = message_response.json()

#     with ThreadPoolExecutor(max_workers=50) as executor:
#         # Submit tasks to executor
#         future_subject = executor.submit(extract_subject_from_mail, message_data)
#         subject = future_subject.result()
#         future_company_from_mail = executor.submit(extract_domain_name, message_data['payload']['headers'], future_subject.result())
#         company_from_mail = future_company_from_mail.result()
#         future_body = executor.submit(extract_body_from_mail, message_data)
#         body = future_body.result()
        
#         # Extract attachments and measure time
 
#         future_attachments = executor.submit(extract_attachments_from_mail, access_token, message_data)
#         attachments, structured_attachment_data = future_attachments.result()
  

#         future_high_level_company_type = executor.submit(get_company_type, future_company_from_mail.result())

#         high_level_company_type = future_high_level_company_type.result()

#     body_len = len(body) if body is not None else 0

#     return Message(
#         message_id=message_id,
#         body_len=body_len,
#         body=body,
#         attachments=attachments,
#         company=company_from_mail,
#         high_level_company_type=high_level_company_type,
#         structured_data=structured_attachment_data
#     )








def extract_subject_from_mail(message_data: dict) -> str:
    if 'payload' in message_data and 'headers' in message_data['payload']:
        headers = message_data['payload']['headers']
        for header in headers:
            if header['name'] == 'Subject':
                return header['value']
        return ""
    else:
        return ""


def extract_domain_name(payload: dict, subject: str) -> str:
    domain_name = 'others'
    for fromdata in payload:
        if fromdata['name'] == 'From':
            domain_name = extract_domain_from_email(fromdata['value'])
            break
    if 'chanel' in subject.lower():
        return 'chanel'
    if 'louis vuitton' in subject.lower():
        return 'Louis Vuitton'
    return domain_name


def extract_domain_from_email(email_string: str) -> Optional[str]:
    email_address = re.search(r'[\w\.-]+@[\w\.-]+', email_string).group()
    domain = email_address.split('@')[-1].split('.')[0]
    if email_address and domain:
        return domain
    else:
        return None



def extract_body_from_mail(message_data: dict) -> str:
    body = None
    if "payload" in message_data:
        payload = message_data["payload"]
        if "parts" in payload:
            for part in payload["parts"]:
                if 'mimeType' in part and (part['mimeType'] == 'text/plain' or part['mimeType'] == 'text/html'):
                    body_data = part['body'].get('data', '')
                    if body_data:
                        body_base64 = base64.urlsafe_b64decode(body_data)
                        body = extract_text(body_base64)
                        
        elif 'body' in payload:
            body_data = payload['body'].get('data', '')
            if body_data:
                body_base64 = base64.urlsafe_b64decode(body_data)
                body = extract_text(body_base64)
        elif 'parts' in payload['body']:
            for part in payload['body']['parts']:
                if 'mimeType' in part and (part['mimeType'] == 'text/plain' or part['mimeType'] == 'text/html'):
                    body_data = part['body'].get('data', '')
                    if body_data:
                        body_base64 = base64.urlsafe_b64decode(body_data)
                        body = extract_text(body_base64)
                        
    if not body:
        body = message_data.get('snippet', '')
    return body


def fetch_attachment_data(access_token: str, message_id: str, attachment_id: str) -> Dict:
    attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}"
    attachment_response = requests.get(attachment_url, headers={"Authorization": f"Bearer {access_token}"})
    return attachment_response.json()


def extract_attachments_from_mail(access_token: str, message_data: dict) -> List[Attachment]:
    attachments = []
    structured_data = []
    if "payload" in message_data and "parts" in message_data["payload"]:
        for part in message_data["payload"]["parts"]:
            if "body" in part and "attachmentId" in part["body"]:
                attachment_id = part["body"]["attachmentId"]
                attachment_data = fetch_attachment_data(access_token, message_data["id"], attachment_id)
                filename = part.get("filename", "untitled.txt")
                if filename.endswith(".zip") or filename.endswith(".txt") or filename.endswith(".png") or filename.endswith(".jpg") or filename.endswith(".jpeg") or filename.endswith(".gif"):
                    continue
                data = attachment_data.get("data", "")
                try:
                    raw_text = ut.extract_text_from_attachment(filename, data)

                except Exception as e:
                    print(f"Error processing attachment {filename}: {str(e)}")
                    continue
                struct_data = ut.strcuture_document_data(raw_text)
                st_str ={
                            "brand": "INSERT BRAND NAME",
                            "total_cost": "INSERT TOTAL COST",
                            "location": "INSERT LOCATION FROM",
                            "purchase_category": "INSERT PURCHASE CATEGORY",
                            "brand_category": "INSERT BRAND CATEGORY",
                            "Date": "INSERT RECEIPT DATE",
                            "currency": "INSERT CURRENCY",
                            "filename": "GENERATE A FILENAME",
                            "payment_method": "INSERT PAYMENT METHOD"
                        }
                if struct_data:
                    for key,value in st_str.items():
                            if struct_data[key]:
                                if value in struct_data[key]: 
                                
                                    struct_data[key]=None
                all_null = all(value is None for value in struct_data.values())
                if all_null:
                    struct_data=None

                structured_data.append(struct_data)
                    
                
                
                attachments.append(Attachment(attachment_len = len(attachment_data.get("data", "")),filename=filename, data=attachment_data.get("data", "")))
    return attachments,structured_data


def extract_text(html_content: str) -> str:
    if not html_content:
        raise ValueError("HTML content is empty or None")

    soup = BeautifulSoup(html_content, 'html.parser')
    text = soup.get_text(separator=' ')
    text = re.sub(r'\s+', ' ', text).strip()
    return text


async def websocket_main(code: str,  websocket: WebSocket,start,brand_name: Optional[str] = None):
    access_token = code
    await get_messages(access_token,websocket,start,brand_name)
    await websocket.close()

#async def send_message_in_chunks(websocket: WebSocket, message_json: dict, chunk_size: int):

async def send_message_in_chunks(websocket: WebSocket, message_json: dict, chunk_size: int):

    # if message_json['attachments'] is not None :
    #     for attch in message_json['attachments']:
    #         attachment_len = attch['attachment_len']
            

    # print(body_len)
    # print(attachment_len)
    # if attachment_len == 0:
    #     attachment_len = None
    # await websocket.send_json({"body_len":body_len ,"attachment_len":attachment_len})        
    
    message_str = json.dumps(message_json)
    for i in range(0, len(message_str), chunk_size):
        await websocket.send_text(message_str[i:i + chunk_size])