Spaces:
Sleeping
Sleeping
File size: 8,517 Bytes
d1a66a2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
import logging
import base64
import requests
import asyncio
from fastapi import WebSocket
from services import utils as util
import re
from bs4 import BeautifulSoup
async def send_chunked_data(websocket: WebSocket, filename: str, data: str ,company_associated:str , message_id:str):
chunk_size = 2000 # Set an appropriate chunk size
for i in range(0, len(data), chunk_size):
await websocket.send_json({"filename": filename, "data_chunk": data[i:i + chunk_size]})
await asyncio.sleep(0.4)
await websocket.send_json({"company_associated":company_associated , "message_id":message_id})
await websocket.send_text("FinishedThisAttachment")
async def send_chunked_data_without_attch(websocket: WebSocket,body_text:str,message_id:str , company_associated:str):
chunk_size = 2000 # Set an appropriate chunk size
await websocket.send_text("This message does'nt contain an Attachment")
for i in range(0, len(body_text), chunk_size):
await websocket.send_json({"data_chunk": body_text[i:i + chunk_size]})
await asyncio.sleep(0.4)
await websocket.send_json({"company_associated":company_associated , "message_id":message_id})
await websocket.send_text("FinishedThisAttachmentnotContainingAttachment")
async def process_messages(access_token: str, websocket: WebSocket):
logging.info("Entered process_messages")
messages = get_messages(access_token)
await websocket.send_json({"total_messages": len(messages)})
await websocket.send_text("CompletedSendingTotalMessagesLength")
for message in messages:
message_id = message.get("id")
if message_id:
message_data = fetch_message_data(access_token, message_id)
await process_message_data(access_token,message_data, websocket,message_id)
await websocket.send_text("CompletedFetchingMessages")
async def websocket_main(code: str, websocket: WebSocket):
logging.info("Entered mwebsocket_main")
access_token = code
await process_messages(access_token, websocket)
logging.info("Completed Fetching all the messages")
websocket.close()
def get_messages(code: str):
logging.info("Entered get_messages")
access_token = code
page_token = None
messages = []
jobs_query = f'subject:"your order" OR subject:receipts OR subject:receipt OR subject: aankoopbon OR subject:reçu OR subject:invoice OR subject:invoice OR category:purchases'
max_results = 10
while True:
gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={jobs_query}&maxResults={max_results}"
if page_token:
gmail_url += f"&pageToken={page_token}"
gmail_response = requests.get(gmail_url, headers={"Authorization": f"Bearer {access_token}"})
logging.info(f"{gmail_response}")
gmail_data = gmail_response.json()
if "messages" in gmail_data:
messages.extend(gmail_data["messages"])
# if len(messages) 10:
# break
if "nextPageToken" in gmail_data:
page_token = gmail_data["nextPageToken"]
else:
break
logging.info("Total Length:")
logging.info(len(messages))
return messages
def fetch_message_data(access_token: str, message_id: str):
logging.info(f"Entered fetch_message_data for message_id: {message_id}")
message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}"
message_response = requests.get(message_url, headers={"Authorization": f"Bearer {access_token}"})
return message_response.json()
async def process_message_data(access_token:str,message_data: dict, websocket: WebSocket,message_id:str):
logging.info("Entered process_message_data")
subject=''
body_base64 = ''
body_html=''
body_text = ''
compnay_from_mail = 'others'
#Extracting subject
subject = extract_subject_from_mail(message_data)
company_from_mail = extract_domain_name(message_data['payload']['headers'],subject)
if "payload" in message_data and "parts" in message_data["payload"]:
#Extracting the domain name from the senders email
for part in message_data["payload"]["parts"]:
if 'mimeType' not in part:
continue
mime_type = part['mimeType']
if mime_type == 'text/plain' or mime_type == 'text/html':
body_data = part['body'].get('data', '')
body_base64 = base64.urlsafe_b64decode(body_data)
body_text = extract_text(body_base64)
if "body" in part and "attachmentId" not in part["body"]:
await process_mail_body_data(websocket , body_text , message_id , company_from_mail)
if "body" in part and "attachmentId" in part["body"]:
attachment_id = part["body"]["attachmentId"]
attachment_data = fetch_attachment_data(access_token, message_data["id"], attachment_id)
body_text=''
await process_attachment_data(part, attachment_data, websocket,company_from_mail ,message_id)
async def process_attachment_data(part: dict, attachment_data: dict, websocket: WebSocket,company_associated:str,message_id:str):
logging.info("Entered process_attachment_data")
filename = part.get("filename", "untitled.txt")
data = attachment_data.get("data", {})
if data:
attachment_content = base64.urlsafe_b64decode(data)
extracted_text = await util.extract_text_from_attachment(filename, attachment_content)
logging.info(f"Extracted text from attachment {filename}: {extracted_text}")
await send_chunked_data(websocket, filename, data , company_associated ,message_id)
async def process_mail_body_data(websocket:WebSocket ,body_text : str, message_id:str,company_associated:str):
await send_chunked_data_without_attch(websocket,body_text,message_id,company_associated)
def fetch_attachment_data(access_token: str, message_id: str, attachment_id: str):
logging.info(f"Entered fetch_attachment_data for attachment_id: {attachment_id}")
attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}"
attachment_response = requests.get(attachment_url, headers={"Authorization": f"Bearer {access_token}"})
return attachment_response.json()
def extract_subject_from_mail(message_data: dict):
if 'payload' in message_data and 'headers' in message_data['payload']:
headers = message_data['payload']['headers']
for header in headers:
if header['name'] == 'Subject':
return header['value']
# If 'Subject' header is not found, return a default value or handle it gracefully
return ""
else:
# If 'payload' or 'headers' are not present, return a default value or handle it gracefully
return ""
def extract_domain_name(payload:dict,subject:str):
domain_name = 'others'
for fromdata in payload:
if fromdata['name'] == 'From':
domain_name = extract_domain_from_email(fromdata['value'])
break
if 'chanel' in subject.lower():
return 'chanel'
if 'louis vuitton' in subject.lower():
return'Louis Vuitton'
return domain_name
def extract_domain_from_email(email_string:str):
# Extracting the email address using regex
email_address = re.search(r'[\w\.-]+@[\w\.-]+', email_string).group()
# Extracting the domain name from the email address
domain = email_address.split('@')[-1].split('.')[0]
if email_address and domain :
return domain
else:
return None
def extract_text(html_content:str):
"""
Extracts text and links from HTML content.
Args:
html_content (str): The HTML content to process.
Returns:
tuple: A tuple containing the extracted text (str) and links (list of tuples).
Raises:
ValueError: If the input HTML content is empty or None.
"""
if not html_content:
raise ValueError("HTML content is empty or None")
soup = BeautifulSoup(html_content, 'html.parser')
# Extract text
text = soup.get_text(separator=' ')
text = re.sub(r'\s+', ' ', text).strip()
print("Printing the extracted text from the html")
print(text)
print()
print()
# Extract links
links = [(link.text, link['href']) for link in soup.find_all('a', href=True)]
return text
|