Spaces:
Running
Running
import requests | |
import jwt | |
import base64 | |
from bs4 import BeautifulSoup | |
import re | |
class GmailDataExtractor: | |
def __init__(self,jwt:str , user_input: str = None) -> None: | |
if jwt is None : | |
self.error = "Error" | |
else: | |
self.__jwt = jwt | |
self.__user_input = user_input | |
self.error = None | |
self.__secret_key = '' | |
def __validate_jwt_token(self): | |
try: | |
payload = jwt.decode(self.jwt, self.secret_key, algorithms=["HS256"]) | |
access_token = payload.get("access_token") | |
if access_token: | |
return access_token | |
else: | |
raise ValueError("Invalid JWT token: Missing access token") | |
except jwt.ExpiredSignatureError: | |
raise ValueError("Invalid JWT token: Expired token") | |
except jwt.InvalidTokenError: | |
raise ValueError("Invalid JWT token: Token verification failed") | |
def __fetch_messages(self) -> list: | |
""" | |
Fetches messages from the Gmail API. | |
Args: | |
gmail_url (str): The URL for the Gmail API request. | |
access_token (str): The access token for authenticating with Gmail API. | |
Returns: | |
list: A list of message objects retrieved from the Gmail API. | |
Raises: | |
RuntimeError: If there is an issue while fetching messages from the Gmail API. | |
""" | |
"""currently not implementing jwt for testing purposes | |
replace every access_token with jwt function directly which returns the access token""" | |
access_token = self.__jwt | |
receipt_query = f'subject:"your order" OR subject:receipts OR subject:receipt OR subject:invoice OR subject:invoice OR category:purchases' | |
if self.__user_input is not None: | |
receipt_query = f'(subject:"your order" OR subject:receipts OR subject:receipt OR subject:invoice OR subject:invoice OR category:purchases) AND subject:{self.__user_input}' | |
gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={receipt_query}" | |
def __fetch_page(url): | |
response = requests.get(url, headers={"Authorization": f"Bearer {access_token}"}) | |
response.raise_for_status() # Raise error if the request fails | |
data = response.json() | |
return data.get("messages", []), data.get("nextPageToken") | |
messages = [] | |
page_token = None | |
try: | |
while True: | |
url = f"{gmail_url}&pageToken={page_token}" if page_token else gmail_url | |
page_messages, page_token = __fetch_page(url) | |
messages.extend(page_messages) | |
if not page_token: | |
break | |
except requests.RequestException as e: | |
raise RuntimeError(f"Error fetching messages from Gmail API: {str(e)}") | |
return messages | |
def __fetch_message_data(self, message_id: str) -> dict: | |
""" | |
Fetches message data from the Gmail API. | |
Args: | |
message_id (str): The ID of the message to fetch. | |
Returns: | |
dict: Message data retrieved from the Gmail API. | |
Raises: | |
RuntimeError: If there is an issue while fetching message data from the Gmail API. | |
""" | |
message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}" | |
try: | |
response = requests.get(message_url, headers={"Authorization": f"Bearer {self.__jwt}"}) | |
response.raise_for_status() # Raise error if the request fails | |
return response.json() | |
except requests.RequestException as e: | |
raise RuntimeError(f"Error fetching message data from Gmail API: {str(e)}") | |
def __fetch_attachment_data(self, message_id: str, attachment_id: str) -> dict: | |
""" | |
Fetches attachment data from the Gmail API. | |
Args: | |
message_id (str): The ID of the message containing the attachment. | |
attachment_id (str): The ID of the attachment to fetch. | |
Returns: | |
dict: Attachment data retrieved from the Gmail API. | |
Raises: | |
RuntimeError: If there is an issue while fetching attachment data from the Gmail API. | |
""" | |
attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}" | |
try: | |
response = requests.get(attachment_url, headers={"Authorization": f"Bearer {self.__jwt}"}) | |
response.raise_for_status() # Raise error if the request fails | |
return response.json() | |
except requests.RequestException as e: | |
raise RuntimeError(f"Error fetching attachment data from Gmail API: {str(e)}") | |
def __process_message(self, message: dict) -> tuple: | |
""" | |
Processes a single message. | |
Args: | |
message (dict): The message to process. | |
Returns: | |
tuple: A tuple containing the subject (str), body (str), links (list of str), | |
and base64 data if it contains an document attachment in the form of pdf, docx, ppt or any file format indicating whether the message contains an attachment. | |
Raises: | |
RuntimeError: If there is an issue while fetching message data from the Gmail API. | |
""" | |
message_id = message.get("id") | |
if not message_id: | |
return None, None, [], False | |
message_data = self.__fetch_message_data(message_id, self.__jwt) | |
subject = message_data.get('payload', {}).get('headers', {}).get('value', '') | |
body = '' | |
links = [] | |
has_attachment = False | |
if 'payload' in message_data and 'parts' in message_data['payload']: | |
parts = message_data['payload']['parts'] | |
for part in parts: | |
if 'mimeType' not in part: | |
continue | |
mime_type = part['mimeType'] | |
if mime_type == 'text/plain' or mime_type == 'text/html': | |
body_data = part['body'].get('data', '') | |
body = base64.urlsafe_b64decode(body_data).decode('utf-8') | |
text= self._extract_text_and_links(body) | |
if 'body' in part and 'attachmentId' in part['body']: | |
attachment_id = part['body']['attachmentId'] | |
attachment_data = self.__fetch_attachment_data(message_id, attachment_id) | |
data = attachment_data.get("data", "") | |
filename = part.get("filename", "untitled.txt") | |
if data: | |
# Save only the first 10 characters of the attachment data | |
return subject,body ,{"filename":filename , "base64URLData":data} | |
return subject, body,None | |
def extract_text_and_links(html_content: str) -> tuple: | |
""" | |
Extracts text and links from HTML content. | |
Args: | |
html_content (str): The HTML content to process. | |
Returns: | |
tuple: A tuple containing the extracted text (str) and links (list of tuples). | |
Raises: | |
ValueError: If the input HTML content is empty or None. | |
""" | |
if not html_content: | |
raise ValueError("HTML content is empty or None") | |
soup = BeautifulSoup(html_content, 'html.parser') | |
# Extract text | |
text = soup.get_text(separator=' ') | |
text = re.sub(r'\s+', ' ', text).strip() | |
# Extract links | |
links = [(link.text, link['href']) for link in soup.find_all('a', href=True)] | |
return text, links | |
def extract_messages(self) -> dict: | |
""" | |
Extracts messages based on the provided brand name. | |
Args: | |
brand_name (str): The brand name to search for in email subjects. | |
jwt_token (str): The JWT token for authentication. | |
Returns: | |
dict: A dictionary containing the extracted messages with their subjects, bodies, links, and attachment statuses. | |
format:{"results":[{"subjec":"test subject" , "body":"it would be text" , "attachment_data":{"filename":filename,"base64URLData":data}},{second message with same content of subject , body , attachment_data}]} | |
""" | |
messages = self.__fetch_messages() | |
results = [] | |
for message in messages: | |
subject, body, attachment_data = self.__process_message(message) | |
""" Handling None values """ | |
subject = subject if subject is not None else "" | |
body = body if body is not None else "" | |
attachment_data = attachment_data if attachment_data is not None else {} | |
if attachment_data: # Check if attachment_data is present | |
results.append({"attachment_data": attachment_data}) | |
elif body: # If attachment_data is absent, append body only if it's not empty | |
results.append({"body": body}) | |
return {"results": results} | |