Spaces:
Sleeping
Sleeping
File size: 6,673 Bytes
b665f30 2c065ea 985eaa2 b665f30 c0ca9ad b665f30 602cbe6 c0ca9ad 602cbe6 b665f30 602cbe6 b665f30 602cbe6 b665f30 c4ad005 b665f30 6a1361e c812a8f 6a1361e c812a8f 6a1361e 7c9c13a 6a1361e c812a8f 6a1361e b665f30 74fefe4 b665f30 7e7da4d b665f30 1aa0325 b665f30 6a1361e c4ad005 b665f30 6a1361e b665f30 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
import base64
import json
import logging
import re
from concurrent.futures import ThreadPoolExecutor
from typing import Optional, List, Dict
import requests
from bs4 import BeautifulSoup
from models.models import Message, Attachment
from fastapi import WebSocket
def get_messages(code: str,brand_name: Optional[str] = None) -> List[Message]:
access_token = code
g_query = f'subject:"your order" OR subject:receipts OR subject:receipt OR subject: aankoopbon OR subject:reçu OR subject:invoice OR subject:invoice OR category:purchases'
if brand_name is not None:
g_query = f'(subject:"your order" OR subject:receipts OR subject:receipt OR subject: aankoopbon OR subject:reçu OR subject:invoice OR subject:invoice OR category:purchases) AND subject:{brand_name}'
page_token = None
messages = []
# max_results = 10
# gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={jobs_query}&maxResults={max_results}"
def fetch_message_wrapper(message_data):
message_id = message_data.get("id")
if message_id:
return fetch_message_data(access_token, message_id)
return None
while True:
gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={g_query}"
if page_token:
gmail_url += f"&pageToken={page_token}"
gmail_response = requests.get(gmail_url, headers={"Authorization": f"Bearer {access_token}"})
gmail_data = gmail_response.json()
if "messages" in gmail_data:
with ThreadPoolExecutor(max_workers=15) as executor:
futures = [executor.submit(fetch_message_wrapper, message_data) for message_data in
gmail_data["messages"]]
for future in futures:
message = future.result()
if message:
messages.append(message)
if "nextPageToken" in gmail_data:
page_token = gmail_data["nextPageToken"]
else:
break
return messages
def fetch_message_data(access_token: str, message_id: str) -> Message:
message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}"
message_response = requests.get(message_url, headers={"Authorization": f"Bearer {access_token}"})
message_data = message_response.json()
print(message_data)
subject = extract_subject_from_mail(message_data)
company_from_mail = extract_domain_name(message_data['payload']['headers'], subject)
body = extract_body_from_mail(message_data)
attachments = extract_attachments_from_mail(access_token, message_data)
print("subject: ")
print(subject)
print("company name: ")
print(company_from_mail)
print("Printing the body of the mail: ")
print(body)
print("Printing attachment Data: ")
print(attachments)
print("Completed this mail.")
return Message(message_id=message_id, body=body, attachments=attachments, company=company_from_mail)
def extract_subject_from_mail(message_data: dict) -> str:
if 'payload' in message_data and 'headers' in message_data['payload']:
headers = message_data['payload']['headers']
for header in headers:
if header['name'] == 'Subject':
return header['value']
return ""
else:
return ""
def extract_domain_name(payload: dict, subject: str) -> str:
domain_name = 'others'
for fromdata in payload:
if fromdata['name'] == 'From':
domain_name = extract_domain_from_email(fromdata['value'])
break
if 'chanel' in subject.lower():
return 'chanel'
if 'louis vuitton' in subject.lower():
return 'Louis Vuitton'
return domain_name
def extract_domain_from_email(email_string: str) -> Optional[str]:
email_address = re.search(r'[\w\.-]+@[\w\.-]+', email_string).group()
domain = email_address.split('@')[-1].split('.')[0]
if email_address and domain:
return domain
else:
return None
def extract_body_from_mail(message_data: dict) -> str:
body = None
if "payload" in message_data and "parts" in message_data["payload"]:
for part in message_data["payload"]["parts"]:
if 'mimeType' in part and (part['mimeType'] == 'text/plain' or part['mimeType'] == 'text/html'):
body_data = part['body'].get('data', '')
body_base64 = base64.urlsafe_b64decode(body_data)
body = extract_text(body_base64)
return body
def fetch_attachment_data(access_token: str, message_id: str, attachment_id: str) -> Dict:
attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}"
attachment_response = requests.get(attachment_url, headers={"Authorization": f"Bearer {access_token}"})
return attachment_response.json()
def extract_attachments_from_mail(access_token: str, message_data: dict) -> List[Attachment]:
attachments = []
if "payload" in message_data and "parts" in message_data["payload"]:
for part in message_data["payload"]["parts"]:
if "body" in part and "attachmentId" in part["body"]:
attachment_id = part["body"]["attachmentId"]
attachment_data = fetch_attachment_data(access_token, message_data["id"], attachment_id)
filename = part.get("filename", "untitled.txt")
attachments.append(Attachment(filename=filename, data=attachment_data.get("data", "")))
return attachments
def extract_text(html_content: str) -> str:
if not html_content:
raise ValueError("HTML content is empty or None")
soup = BeautifulSoup(html_content, 'html.parser')
text = soup.get_text(separator=' ')
text = re.sub(r'\s+', ' ', text).strip()
return text
async def websocket_main(code: str, websocket: WebSocket,brand_name: Optional[str] = None):
access_token = code
messages = get_messages(access_token,brand_name)
await websocket.send_json({"total_messages": len(messages)})
print("Total Length of messages")
print(len(messages))
chunk_size = 100000
for message in messages:
message_json = message.to_json()
await send_message_in_chunks(websocket, message_json, chunk_size)
await websocket.send_text("NEXT_MESSAGE")
await websocket.close()
async def send_message_in_chunks(websocket: WebSocket, message_json: dict, chunk_size: int):
message_str = json.dumps(message_json)
for i in range(0, len(message_str), chunk_size):
await websocket.send_text(message_str[i:i + chunk_size]) |