Spaces:
Running
Running
import requests | |
import base64 | |
from bs4 import BeautifulSoup | |
import re | |
import jwt | |
class GmailDataExtractor: | |
def __init__(self,jwt:str , user_input: str = None) -> None: | |
if jwt is None : | |
self.error = "Error" | |
else: | |
self.__jwt = jwt | |
self.__user_input = user_input | |
self.error = None | |
self.__secret_key = 'nkldjlncbamjlklwjeklwu24898h*&#Ujnfjf34893U5HSJFBSKFSHFNSK*$*W_ 3OWU' | |
def __validate_jwt_token(self): | |
try: | |
payload = jwt.decode(self.jwt, self.secret_key, algorithms=["HS256"]) | |
access_token = payload.get("access_token") | |
if access_token: | |
return access_token | |
else: | |
raise ValueError("Invalid JWT token: Missing access token") | |
except jwt.ExpiredSignatureError: | |
raise ValueError("Invalid JWT token: Expired token") | |
except jwt.InvalidTokenError: | |
raise ValueError("Invalid JWT token: Token verification failed") | |
def __fetch_messages(self) -> list: | |
""" | |
Fetches messages from the Gmail API. | |
Args: | |
gmail_url (str): The URL for the Gmail API request. | |
access_token (str): The access token for authenticating with Gmail API. | |
Returns: | |
list: A list of message objects retrieved from the Gmail API. | |
Raises: | |
RuntimeError: If there is an issue while fetching messages from the Gmail API. | |
""" | |
"""currently not implementing jwt for testing purposes | |
replace every access_token with jwt function directly which returns the access token""" | |
access_token = self.__jwt | |
print("access token") | |
print(access_token) | |
receipt_query = f"(label:^smartlabel_receipt OR (subject:your AND subject:order) OR subject:receipts OR subject:receipt OR subject:invoice OR subject:invoice))" | |
if self.__user_input is not None: | |
receipt_query = f"(label:^smartlabel_receipt OR (subject:your AND subject:order) OR subject:receipts OR subject:receipt OR subject:invoice OR subject:invoice)) AND subject:{self.__user_input}" | |
gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={receipt_query}" | |
def __fetch_page(url): | |
response = requests.get(url, headers={"Authorization": f"Bearer {access_token}"}) | |
response.raise_for_status() # Raise error if the request fails | |
data = response.json() | |
return data.get("messages", []), data.get("nextPageToken") | |
messages = [] | |
page_token = None | |
try: | |
while True: | |
url = f"{gmail_url}&pageToken={page_token}" if page_token else gmail_url | |
page_messages, page_token = __fetch_page(url) | |
messages.extend(page_messages) | |
if not page_token: | |
break | |
except requests.RequestException as e: | |
raise RuntimeError(f"Error fetching messages from Gmail API: {str(e)}") | |
return messages | |
def __fetch_message_data(self, message_id: str) -> dict: | |
""" | |
Fetches message data from the Gmail API. | |
Args: | |
message_id (str): The ID of the message to fetch. | |
Returns: | |
dict: Message data retrieved from the Gmail API. | |
Raises: | |
RuntimeError: If there is an issue while fetching message data from the Gmail API. | |
""" | |
print("fetch_message_data") | |
message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}" | |
try: | |
response = requests.get(message_url, headers={"Authorization": f"Bearer {self.__jwt}"}) | |
response.raise_for_status() # Raise error if the request fails | |
return response.json() | |
except requests.RequestException as e: | |
raise RuntimeError(f"Error fetching message data from Gmail API: {str(e)}") | |
def __fetch_attachment_data(self, message_id: str, attachment_id: str) -> dict: | |
""" | |
Fetches attachment data from the Gmail API. | |
Args: | |
message_id (str): The ID of the message containing the attachment. | |
attachment_id (str): The ID of the attachment to fetch. | |
Returns: | |
dict: Attachment data retrieved from the Gmail API. | |
Raises: | |
RuntimeError: If there is an issue while fetching attachment data from the Gmail API. | |
""" | |
print("fetch_attachment_data") | |
attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}" | |
try: | |
response = requests.get(attachment_url, headers={"Authorization": f"Bearer {self.__jwt}"}) | |
response.raise_for_status() # Raise error if the request fails | |
return response.json() | |
except requests.RequestException as e: | |
raise RuntimeError(f"Error fetching attachment data from Gmail API: {str(e)}") | |
def __process_message(self, message: dict) -> tuple: | |
""" | |
Processes a single message. | |
Args: | |
message (dict): The message to process. | |
Returns: | |
tuple: A tuple containing the subject (str), body (str), links (list of str), | |
and base64 data if it contains an document attachment in the form of pdf, docx, ppt or any file format indicating whether the message contains an attachment. | |
Raises: | |
RuntimeError: If there is an issue while fetching message data from the Gmail API. | |
""" | |
print("process_messages") | |
message_id = message.get("id") | |
if not message_id: | |
return None, None, [], False | |
subject='' | |
message_data = self.__fetch_message_data(message_id) | |
if 'payload' in message_data and 'headers' in message_data['payload']: | |
headers = message_data['payload']['headers'] | |
for header in headers: | |
if header['name'] == 'Subject': | |
subject = header['value'] | |
body = '' | |
links = [] | |
has_attachment = False | |
if 'payload' in message_data and 'parts' in message_data['payload']: | |
parts = message_data['payload']['parts'] | |
payload = message_data['payload']['headers'] | |
print("printing headers response") | |
print(payload) | |
print() | |
print() | |
brand_from_gmail = '' | |
company_from_gmail = '' | |
# if payload['headers']['name'] == 'from': | |
# brand_from_gmail = payload['headers']['value'] | |
# company_from_gmail = extract_domain_from_email(brand_from_gmail) | |
# else: | |
# company_from_gmail = None | |
for part in parts: | |
if 'mimeType' not in part: | |
continue | |
mime_type = part['mimeType'] | |
if mime_type == 'text/plain' or mime_type == 'text/html': | |
body_data = part['body'].get('data', '') | |
body = base64.urlsafe_b64decode(body_data) | |
text= self.extract_text(body) | |
if 'body' in part and 'attachmentId' in part['body']: | |
attachment_id = part['body']['attachmentId'] | |
attachment_data = self.__fetch_attachment_data(message_id, attachment_id) | |
data = attachment_data.get("data", "") | |
filename = part.get("filename", "untitled.txt") | |
if data: | |
# Save only the first 10 characters of the attachment data | |
return subject,body ,{"filename":filename , "data":data} , company_from_gmail | |
return subject, body,None , company_from_gmail | |
def extract_domain_from_email(self,email): | |
# Extracting the email address using regex | |
email_address = re.search(r'[\w\.-]+@[\w\.-]+', email_string).group() | |
# Extracting the domain name from the email address | |
domain = email_address.split('@')[-1].split('.')[0] | |
if email_address and domain : | |
return domain | |
else: | |
return None | |
def extract_text(html_content: str) -> tuple: | |
""" | |
Extracts text and links from HTML content. | |
Args: | |
html_content (str): The HTML content to process. | |
Returns: | |
tuple: A tuple containing the extracted text (str) and links (list of tuples). | |
Raises: | |
ValueError: If the input HTML content is empty or None. | |
""" | |
if not html_content: | |
raise ValueError("HTML content is empty or None") | |
soup = BeautifulSoup(html_content, 'html.parser') | |
# Extract text | |
text = soup.get_text(separator=' ') | |
text = re.sub(r'\s+', ' ', text).strip() | |
# Extract links | |
links = [(link.text, link['href']) for link in soup.find_all('a', href=True)] | |
return text, links | |
def extract_messages(self) -> dict: | |
""" | |
Extracts messages based on the provided brand name. | |
Args: | |
brand_name (str): The brand name to search for in email subjects. | |
jwt_token (str): The JWT token for authentication. | |
Returns: | |
dict: A dictionary containing the extracted messages with their subjects, bodies, links, and attachment statuses. | |
format:{"results":[{"subjec":"test subject" , "body":"it would be text" , "attachment_data":{"filename":base64URL format}},{second message with same content of subject , body , attachment_data}]} | |
""" | |
print("entered the extract messages") | |
messages = self.__fetch_messages() | |
results = [] | |
for message in messages: | |
subject, body, attachment_data , company_name = self.__process_message(message) | |
""" Handling None values """ | |
body = body if body is not None else None | |
attachment_data = attachment_data if attachment_data is not None else {} | |
company_associated = company_name if company_name is not None else None | |
results.append({"body": body, "attachment_data": [attachment_data] ,'company_associated':company_associated}) | |
return {"results": results} | |
# obj = GmailDataExtractor("abcd","user_input") | |
# print(obj.error) |