Spaces:
Running
Running
import requests | |
import base64 | |
from bs4 import BeautifulSoup | |
import re | |
import jwt | |
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes | |
from cryptography.hazmat.backends import default_backend | |
import os | |
import hashlib | |
class GmailDataExtractor: | |
def __init__(self,jwt:str , user_input: str = None) -> None: | |
if jwt is None : | |
self.error = "Error" | |
else: | |
self.__jwt = jwt | |
self.__user_input = user_input | |
self.error = None | |
self.__secret_key = 'nkldjlncbamjlklwjeklwu24898h*&#Ujnfjf34893U5HSJFBSKFSHFNSK*$*W_ 3OWU' | |
def __validate_jwt_token(self): | |
try: | |
payload = jwt.decode(self.jwt, self.secret_key, algorithms=["HS256"]) | |
access_token = payload.get("access_token") | |
if access_token: | |
return access_token | |
else: | |
raise ValueError("Invalid JWT token: Missing access token") | |
except jwt.ExpiredSignatureError: | |
raise ValueError("Invalid JWT token: Expired token") | |
except jwt.InvalidTokenError: | |
raise ValueError("Invalid JWT token: Token verification failed") | |
def __fetch_messages(self) -> list: | |
""" | |
Fetches messages from the Gmail API. | |
Args: | |
gmail_url (str): The URL for the Gmail API request. | |
access_token (str): The access token for authenticating with Gmail API. | |
Returns: | |
list: A list of message objects retrieved from the Gmail API. | |
Raises: | |
RuntimeError: If there is an issue while fetching messages from the Gmail API. | |
""" | |
"""currently not implementing jwt for testing purposes | |
replace every access_token with jwt function directly which returns the access token""" | |
access_token = self.__jwt | |
print("access token") | |
print(access_token) | |
receipt_query = f'(subject:"your order" OR subject:receipts OR subject:receipt OR subject: aankoopbon OR subject:reçu OR subject:invoice OR subject:invoice OR category:purchases)' | |
# if self.__user_input is not None: | |
# receipt_query = f'((subject:"your order" OR subject:receipts OR subject:receipt OR subject:invoice OR subject:invoice OR category:purchases) AND subject:{self.__user_input})&maxResults=15' | |
gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={receipt_query}&maxResults=10" | |
gmail_response = requests.get(gmail_url, headers={"Authorization": f"Bearer {access_token}"}) | |
gmail_data = gmail_response.json() | |
messages=[] | |
messages.extend(gmail_data.get("messages",[])) | |
# def __fetch_page(url): | |
# response = requests.get(url, headers={"Authorization": f"Bearer {access_token}"}) | |
# response.raise_for_status() # Raise error if the request fails | |
# data = response.json() | |
# return data.get("messages", []), data.get("nextPageToken") | |
# messages = [] | |
# page_token = None | |
# try: | |
# while True: | |
# url = f"{gmail_url}&pageToken={page_token}" if page_token else gmail_url | |
# page_messages, page_token = __fetch_page(url) | |
# messages.extend(page_messages) | |
# if not page_token: | |
# break | |
# except requests.RequestException as e: | |
# raise RuntimeError(f"Error fetching messages from Gmail API: {str(e)}") | |
print(len(messages)) | |
return messages | |
def __fetch_message_data(self, message_id: str) -> dict: | |
""" | |
Fetches message data from the Gmail API. | |
Args: | |
message_id (str): The ID of the message to fetch. | |
Returns: | |
dict: Message data retrieved from the Gmail API. | |
Raises: | |
RuntimeError: If there is an issue while fetching message data from the Gmail API. | |
""" | |
print("fetch_message_data") | |
message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}" | |
try: | |
response = requests.get(message_url, headers={"Authorization": f"Bearer {self.__jwt}"}) | |
response.raise_for_status() # Raise error if the request fails | |
return response.json() | |
except requests.RequestException as e: | |
raise RuntimeError(f"Error fetching message data from Gmail API: {str(e)}") | |
def __fetch_attachment_data(self, message_id: str, attachment_id: str) -> dict: | |
""" | |
Fetches attachment data from the Gmail API. | |
Args: | |
message_id (str): The ID of the message containing the attachment. | |
attachment_id (str): The ID of the attachment to fetch. | |
Returns: | |
dict: Attachment data retrieved from the Gmail API. | |
Raises: | |
RuntimeError: If there is an issue while fetching attachment data from the Gmail API. | |
""" | |
print("fetch_attachment_data") | |
attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}" | |
try: | |
response = requests.get(attachment_url, headers={"Authorization": f"Bearer {self.__jwt}"}) | |
response.raise_for_status() # Raise error if the request fails | |
return response.json() | |
except requests.RequestException as e: | |
raise RuntimeError(f"Error fetching attachment data from Gmail API: {str(e)}") | |
def __process_message(self, message: dict) -> tuple: | |
""" | |
Processes a single message. | |
Args: | |
message (dict): The message to process. | |
Returns: | |
tuple: A tuple containing the subject (str), body (str), links (list of str), | |
and base64 data if it contains an document attachment in the form of pdf, docx, ppt or any file format indicating whether the message contains an attachment. | |
Raises: | |
RuntimeError: If there is an issue while fetching message data from the Gmail API. | |
""" | |
print("process_messages") | |
message_id = message.get("id") | |
encrypted_message_id = self.encrypt_message_id(message_id) | |
if not message_id: | |
return None, None, [], False | |
subject='' | |
message_data = self.__fetch_message_data(message_id) | |
if 'payload' in message_data and 'headers' in message_data['payload']: | |
headers = message_data['payload']['headers'] | |
for header in headers: | |
if header['name'] == 'Subject': | |
subject = header['value'] | |
body = '' | |
text='' | |
links = [] | |
has_attachment = False | |
company_from_gmail = 'others' | |
if 'payload' in message_data and 'parts' in message_data['payload']: | |
parts = message_data['payload']['parts'] | |
payload = message_data['payload']['headers'] | |
print("printing headers response") | |
print(payload) | |
#Extracting the domain name from the senders email | |
for fromdata in payload: | |
if fromdata['name'] == 'From': | |
company_from_gmail = self.extract_domain_from_email(fromdata['value']) | |
break | |
if 'chanel' in subject.lower(): | |
company_from_gmail = 'chanel' | |
if 'louis vuitton' in subject.lower(): | |
company_from_gmail = 'Louis Vuitton' | |
for part in parts: | |
if 'mimeType' not in part: | |
continue | |
mime_type = part['mimeType'] | |
if mime_type == 'text/plain' or mime_type == 'text/html': | |
body_data = part['body'].get('data', '') | |
body = base64.urlsafe_b64decode(body_data) | |
text= self.extract_text(body) | |
if 'body' in part and 'attachmentId' in part['body']: | |
attachment_id = part['body']['attachmentId'] | |
attachment_data = self.__fetch_attachment_data(message_id, attachment_id) | |
data = attachment_data.get("data", "") | |
filename = part.get("filename", "untitled.txt") | |
if data: | |
# Save only the first 10 characters of the attachment data | |
return subject,text ,{"filename":filename , "data":data} , company_from_gmail , message_id | |
return subject, text,None , company_from_gmail , message_id | |
def encrypt_message_id(self,message_id:str): | |
key = os.getenv('AES_KEY').encode('utf-8')[:32] | |
message_id_bytes = message_id.encode('utf-8') | |
iv = os.urandom(16) | |
# Initialize AES cipher with the key and CBC mode | |
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend()) | |
# Create a encryptor object | |
encryptor = cipher.encryptor() | |
# Pad the message_id to be a multiple of 16 bytes (AES block size) | |
# This is necessary for AES encryption | |
message_id_padded = message_id_bytes.ljust(32, b'\0') | |
# Encrypt the padded message_id | |
ciphertext = encryptor.update(message_id_padded) + encryptor.finalize() | |
return ciphertext | |
def extract_domain_from_email(self,email_string): | |
# Extracting the email address using regex | |
email_address = re.search(r'[\w\.-]+@[\w\.-]+', email_string).group() | |
# Extracting the domain name from the email address | |
domain = email_address.split('@')[-1].split('.')[0] | |
if email_address and domain : | |
return domain | |
else: | |
return None | |
def extract_text(self,html_content:str): | |
""" | |
Extracts text and links from HTML content. | |
Args: | |
html_content (str): The HTML content to process. | |
Returns: | |
tuple: A tuple containing the extracted text (str) and links (list of tuples). | |
Raises: | |
ValueError: If the input HTML content is empty or None. | |
""" | |
if not html_content: | |
raise ValueError("HTML content is empty or None") | |
soup = BeautifulSoup(html_content, 'html.parser') | |
# Extract text | |
text = soup.get_text(separator=' ') | |
text = re.sub(r'\s+', ' ', text).strip() | |
print("Printing the extracted text from the html") | |
print(text) | |
print() | |
print() | |
# Extract links | |
links = [(link.text, link['href']) for link in soup.find_all('a', href=True)] | |
return text | |
def extract_messages(self) -> dict: | |
""" | |
Extracts messages based on the provided brand name. | |
Args: | |
brand_name (str): The brand name to search for in email subjects. | |
jwt_token (str): The JWT token for authentication. | |
Returns: | |
dict: A dictionary containing the extracted messages with their subjects, bodies, links, and attachment statuses. | |
format:{"results":[{"subjec":"test subject" , "body":"it would be text" , "attachment_data":{"filename":base64URL format}},{second message with same content of subject , body , attachment_data}]} | |
""" | |
print("entered the extract messages") | |
messages = self.__fetch_messages() | |
results = [] | |
for message in messages: | |
subject, body, attachment_data , company_name , encrypt_mssg_id = self.__process_message(message) | |
""" Handling None values """ | |
body = body if body is not None else '' | |
attachment_data = attachment_data if attachment_data is not None else {} | |
company_associated = company_name if company_name is not None else '' | |
en_msg_id = encrypt_mssg_id if encrypt_mssg_id is not None else None | |
results.append({"body": body, "attachment_data": [attachment_data] ,'company_associated':company_associated , "message_id":en_msg_id}) | |
return {"results": results} | |
# obj = GmailDataExtractor("abcd","user_input") | |
# print(obj.error) |