Spaces:
Sleeping
Sleeping
import logging | |
import base64 | |
import requests | |
import asyncio | |
from fastapi import WebSocket | |
from services import utils as util | |
import re | |
from bs4 import BeautifulSoup | |
async def send_chunked_data(websocket: WebSocket, filename: str, data: str ,company_associated:str , message_id:str): | |
chunk_size = 2000 # Set an appropriate chunk size | |
for i in range(0, len(data), chunk_size): | |
await websocket.send_json({"filename": filename, "data_chunk": data[i:i + chunk_size]}) | |
await asyncio.sleep(0.4) | |
await websocket.send_json({"company_associated":company_associated , "message_id":message_id}) | |
await websocket.send_text("FinishedThisAttachment") | |
async def send_chunked_data_without_attch(websocket: WebSocket,body_text:str,message_id:str , company_associated:str): | |
chunk_size = 2000 # Set an appropriate chunk size | |
await websocket.send_text("This message does'nt contain an Attachment") | |
for i in range(0, len(body_text), chunk_size): | |
await websocket.send_json({"data_chunk": body_text[i:i + chunk_size]}) | |
await asyncio.sleep(0.4) | |
await websocket.send_json({"company_associated":company_associated , "message_id":message_id}) | |
await websocket.send_text("FinishedThisAttachmentnotContainingAttachment") | |
async def process_messages(access_token: str, websocket: WebSocket): | |
logging.info("Entered process_messages") | |
messages = get_messages(access_token) | |
await websocket.send_json({"total_messages": len(messages)}) | |
await websocket.send_text("CompletedSendingTotalMessagesLength") | |
for message in messages: | |
message_id = message.get("id") | |
if message_id: | |
message_data = fetch_message_data(access_token, message_id) | |
await process_message_data(access_token,message_data, websocket,message_id) | |
await websocket.send_text("CompletedFetchingMessages") | |
async def websocket_main(code: str, websocket: WebSocket): | |
logging.info("Entered mwebsocket_main") | |
access_token = code | |
await process_messages(access_token, websocket) | |
logging.info("Completed Fetching all the messages") | |
websocket.close() | |
def get_messages(code: str): | |
logging.info("Entered get_messages") | |
access_token = code | |
page_token = None | |
messages = [] | |
jobs_query = f'subject:"your order" OR subject:receipts OR subject:receipt OR subject: aankoopbon OR subject:reçu OR subject:invoice OR subject:invoice OR category:purchases' | |
max_results = 10 | |
while True: | |
gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={jobs_query}&maxResults={max_results}" | |
if page_token: | |
gmail_url += f"&pageToken={page_token}" | |
gmail_response = requests.get(gmail_url, headers={"Authorization": f"Bearer {access_token}"}) | |
logging.info(f"{gmail_response}") | |
gmail_data = gmail_response.json() | |
if "messages" in gmail_data: | |
messages.extend(gmail_data["messages"]) | |
# if len(messages) 10: | |
# break | |
if "nextPageToken" in gmail_data: | |
page_token = gmail_data["nextPageToken"] | |
else: | |
break | |
logging.info("Total Length:") | |
logging.info(len(messages)) | |
return messages | |
def fetch_message_data(access_token: str, message_id: str): | |
logging.info(f"Entered fetch_message_data for message_id: {message_id}") | |
message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}" | |
message_response = requests.get(message_url, headers={"Authorization": f"Bearer {access_token}"}) | |
return message_response.json() | |
async def process_message_data(access_token:str,message_data: dict, websocket: WebSocket,message_id:str): | |
logging.info("Entered process_message_data") | |
subject='' | |
body_base64 = '' | |
body_html='' | |
body_text = '' | |
compnay_from_mail = 'others' | |
#Extracting subject | |
subject = extract_subject_from_mail(message_data) | |
company_from_mail = extract_domain_name(message_data['payload']['headers'],subject) | |
if "payload" in message_data and "parts" in message_data["payload"]: | |
#Extracting the domain name from the senders email | |
for part in message_data["payload"]["parts"]: | |
if 'mimeType' not in part: | |
continue | |
mime_type = part['mimeType'] | |
if mime_type == 'text/plain' or mime_type == 'text/html': | |
body_data = part['body'].get('data', '') | |
body_base64 = base64.urlsafe_b64decode(body_data) | |
body_text = extract_text(body_base64) | |
if "body" in part and "attachmentId" not in part["body"]: | |
await process_mail_body_data(websocket , body_text , message_id , company_from_mail) | |
if "body" in part and "attachmentId" in part["body"]: | |
attachment_id = part["body"]["attachmentId"] | |
attachment_data = fetch_attachment_data(access_token, message_data["id"], attachment_id) | |
body_text='' | |
await process_attachment_data(part, attachment_data, websocket,company_from_mail ,message_id) | |
async def process_attachment_data(part: dict, attachment_data: dict, websocket: WebSocket,company_associated:str,message_id:str): | |
logging.info("Entered process_attachment_data") | |
filename = part.get("filename", "untitled.txt") | |
data = attachment_data.get("data", {}) | |
if data: | |
attachment_content = base64.urlsafe_b64decode(data) | |
extracted_text = await util.extract_text_from_attachment(filename, attachment_content) | |
logging.info(f"Extracted text from attachment {filename}: {extracted_text}") | |
await send_chunked_data(websocket, filename, data , company_associated ,message_id) | |
async def process_mail_body_data(websocket:WebSocket ,body_text : str, message_id:str,company_associated:str): | |
await send_chunked_data_without_attch(websocket,body_text,message_id,company_associated) | |
def fetch_attachment_data(access_token: str, message_id: str, attachment_id: str): | |
logging.info(f"Entered fetch_attachment_data for attachment_id: {attachment_id}") | |
attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}" | |
attachment_response = requests.get(attachment_url, headers={"Authorization": f"Bearer {access_token}"}) | |
return attachment_response.json() | |
def extract_subject_from_mail(message_data: dict): | |
if 'payload' in message_data and 'headers' in message_data['payload']: | |
headers = message_data['payload']['headers'] | |
for header in headers: | |
if header['name'] == 'Subject': | |
return header['value'] | |
# If 'Subject' header is not found, return a default value or handle it gracefully | |
return "" | |
else: | |
# If 'payload' or 'headers' are not present, return a default value or handle it gracefully | |
return "" | |
def extract_domain_name(payload:dict,subject:str): | |
domain_name = 'others' | |
for fromdata in payload: | |
if fromdata['name'] == 'From': | |
domain_name = extract_domain_from_email(fromdata['value']) | |
break | |
if 'chanel' in subject.lower(): | |
return 'chanel' | |
if 'louis vuitton' in subject.lower(): | |
return'Louis Vuitton' | |
return domain_name | |
def extract_domain_from_email(email_string:str): | |
# Extracting the email address using regex | |
email_address = re.search(r'[\w\.-]+@[\w\.-]+', email_string).group() | |
# Extracting the domain name from the email address | |
domain = email_address.split('@')[-1].split('.')[0] | |
if email_address and domain : | |
return domain | |
else: | |
return None | |
def extract_text(html_content:str): | |
""" | |
Extracts text and links from HTML content. | |
Args: | |
html_content (str): The HTML content to process. | |
Returns: | |
tuple: A tuple containing the extracted text (str) and links (list of tuples). | |
Raises: | |
ValueError: If the input HTML content is empty or None. | |
""" | |
if not html_content: | |
raise ValueError("HTML content is empty or None") | |
soup = BeautifulSoup(html_content, 'html.parser') | |
# Extract text | |
text = soup.get_text(separator=' ') | |
text = re.sub(r'\s+', ' ', text).strip() | |
print("Printing the extracted text from the html") | |
print(text) | |
print() | |
print() | |
# Extract links | |
links = [(link.text, link['href']) for link in soup.find_all('a', href=True)] | |
return text | |