hushh-valet-chat / controllers /websocket_controller.py
Omkar008's picture
Upload 17 files
d1a66a2 verified
raw
history blame
No virus
8.52 kB
import logging
import base64
import requests
import asyncio
from fastapi import WebSocket
from services import utils as util
import re
from bs4 import BeautifulSoup
async def send_chunked_data(websocket: WebSocket, filename: str, data: str ,company_associated:str , message_id:str):
chunk_size = 2000 # Set an appropriate chunk size
for i in range(0, len(data), chunk_size):
await websocket.send_json({"filename": filename, "data_chunk": data[i:i + chunk_size]})
await asyncio.sleep(0.4)
await websocket.send_json({"company_associated":company_associated , "message_id":message_id})
await websocket.send_text("FinishedThisAttachment")
async def send_chunked_data_without_attch(websocket: WebSocket,body_text:str,message_id:str , company_associated:str):
chunk_size = 2000 # Set an appropriate chunk size
await websocket.send_text("This message does'nt contain an Attachment")
for i in range(0, len(body_text), chunk_size):
await websocket.send_json({"data_chunk": body_text[i:i + chunk_size]})
await asyncio.sleep(0.4)
await websocket.send_json({"company_associated":company_associated , "message_id":message_id})
await websocket.send_text("FinishedThisAttachmentnotContainingAttachment")
async def process_messages(access_token: str, websocket: WebSocket):
logging.info("Entered process_messages")
messages = get_messages(access_token)
await websocket.send_json({"total_messages": len(messages)})
await websocket.send_text("CompletedSendingTotalMessagesLength")
for message in messages:
message_id = message.get("id")
if message_id:
message_data = fetch_message_data(access_token, message_id)
await process_message_data(access_token,message_data, websocket,message_id)
await websocket.send_text("CompletedFetchingMessages")
async def websocket_main(code: str, websocket: WebSocket):
logging.info("Entered mwebsocket_main")
access_token = code
await process_messages(access_token, websocket)
logging.info("Completed Fetching all the messages")
websocket.close()
def get_messages(code: str):
logging.info("Entered get_messages")
access_token = code
page_token = None
messages = []
jobs_query = f'subject:"your order" OR subject:receipts OR subject:receipt OR subject: aankoopbon OR subject:reçu OR subject:invoice OR subject:invoice OR category:purchases'
max_results = 10
while True:
gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={jobs_query}&maxResults={max_results}"
if page_token:
gmail_url += f"&pageToken={page_token}"
gmail_response = requests.get(gmail_url, headers={"Authorization": f"Bearer {access_token}"})
logging.info(f"{gmail_response}")
gmail_data = gmail_response.json()
if "messages" in gmail_data:
messages.extend(gmail_data["messages"])
# if len(messages) 10:
# break
if "nextPageToken" in gmail_data:
page_token = gmail_data["nextPageToken"]
else:
break
logging.info("Total Length:")
logging.info(len(messages))
return messages
def fetch_message_data(access_token: str, message_id: str):
logging.info(f"Entered fetch_message_data for message_id: {message_id}")
message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}"
message_response = requests.get(message_url, headers={"Authorization": f"Bearer {access_token}"})
return message_response.json()
async def process_message_data(access_token:str,message_data: dict, websocket: WebSocket,message_id:str):
logging.info("Entered process_message_data")
subject=''
body_base64 = ''
body_html=''
body_text = ''
compnay_from_mail = 'others'
#Extracting subject
subject = extract_subject_from_mail(message_data)
company_from_mail = extract_domain_name(message_data['payload']['headers'],subject)
if "payload" in message_data and "parts" in message_data["payload"]:
#Extracting the domain name from the senders email
for part in message_data["payload"]["parts"]:
if 'mimeType' not in part:
continue
mime_type = part['mimeType']
if mime_type == 'text/plain' or mime_type == 'text/html':
body_data = part['body'].get('data', '')
body_base64 = base64.urlsafe_b64decode(body_data)
body_text = extract_text(body_base64)
if "body" in part and "attachmentId" not in part["body"]:
await process_mail_body_data(websocket , body_text , message_id , company_from_mail)
if "body" in part and "attachmentId" in part["body"]:
attachment_id = part["body"]["attachmentId"]
attachment_data = fetch_attachment_data(access_token, message_data["id"], attachment_id)
body_text=''
await process_attachment_data(part, attachment_data, websocket,company_from_mail ,message_id)
async def process_attachment_data(part: dict, attachment_data: dict, websocket: WebSocket,company_associated:str,message_id:str):
logging.info("Entered process_attachment_data")
filename = part.get("filename", "untitled.txt")
data = attachment_data.get("data", {})
if data:
attachment_content = base64.urlsafe_b64decode(data)
extracted_text = await util.extract_text_from_attachment(filename, attachment_content)
logging.info(f"Extracted text from attachment {filename}: {extracted_text}")
await send_chunked_data(websocket, filename, data , company_associated ,message_id)
async def process_mail_body_data(websocket:WebSocket ,body_text : str, message_id:str,company_associated:str):
await send_chunked_data_without_attch(websocket,body_text,message_id,company_associated)
def fetch_attachment_data(access_token: str, message_id: str, attachment_id: str):
logging.info(f"Entered fetch_attachment_data for attachment_id: {attachment_id}")
attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}"
attachment_response = requests.get(attachment_url, headers={"Authorization": f"Bearer {access_token}"})
return attachment_response.json()
def extract_subject_from_mail(message_data: dict):
if 'payload' in message_data and 'headers' in message_data['payload']:
headers = message_data['payload']['headers']
for header in headers:
if header['name'] == 'Subject':
return header['value']
# If 'Subject' header is not found, return a default value or handle it gracefully
return ""
else:
# If 'payload' or 'headers' are not present, return a default value or handle it gracefully
return ""
def extract_domain_name(payload:dict,subject:str):
domain_name = 'others'
for fromdata in payload:
if fromdata['name'] == 'From':
domain_name = extract_domain_from_email(fromdata['value'])
break
if 'chanel' in subject.lower():
return 'chanel'
if 'louis vuitton' in subject.lower():
return'Louis Vuitton'
return domain_name
def extract_domain_from_email(email_string:str):
# Extracting the email address using regex
email_address = re.search(r'[\w\.-]+@[\w\.-]+', email_string).group()
# Extracting the domain name from the email address
domain = email_address.split('@')[-1].split('.')[0]
if email_address and domain :
return domain
else:
return None
def extract_text(html_content:str):
"""
Extracts text and links from HTML content.
Args:
html_content (str): The HTML content to process.
Returns:
tuple: A tuple containing the extracted text (str) and links (list of tuples).
Raises:
ValueError: If the input HTML content is empty or None.
"""
if not html_content:
raise ValueError("HTML content is empty or None")
soup = BeautifulSoup(html_content, 'html.parser')
# Extract text
text = soup.get_text(separator=' ')
text = re.sub(r'\s+', ' ', text).strip()
print("Printing the extracted text from the html")
print(text)
print()
print()
# Extract links
links = [(link.text, link['href']) for link in soup.find_all('a', href=True)]
return text