File size: 8,889 Bytes
87b75a5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1cb2fd6
87b75a5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
import requests
import jwt
import base64
from bs4 import BeautifulSoup
import re

class GmailDataExtractor:

    def __init__(self,jwt:str  , user_input: str = None) -> None:
        if jwt is None :
            self.error = "Error"
        else:
            self.__jwt = jwt
            self.__user_input = user_input
            self.error = None
            self.__secret_key = ''

    def __validate_jwt_token(self):
        try:
            payload = jwt.decode(self.jwt, self.secret_key, algorithms=["HS256"])
            access_token = payload.get("access_token")
            if access_token:
                return access_token
            else:
                raise ValueError("Invalid JWT token: Missing access token")
        except jwt.ExpiredSignatureError:
            raise ValueError("Invalid JWT token: Expired token")
        except jwt.InvalidTokenError:
            raise ValueError("Invalid JWT token: Token verification failed")

    def __fetch_messages(self) -> list:
        """
        Fetches messages from the Gmail API.

        Args:
            gmail_url (str): The URL for the Gmail API request.
            access_token (str): The access token for authenticating with Gmail API.

        Returns:
            list: A list of message objects retrieved from the Gmail API.

        Raises:
            RuntimeError: If there is an issue while fetching messages from the Gmail API.
            
        """

        """currently not implementing jwt for testing purposes
        replace every access_token with jwt function directly which returns the access token"""
        access_token = self.__jwt
        receipt_query = f"(label:^smartlabel_receipt OR (subject:your AND subject:order) OR subject:receipts OR subject:receipt OR subject:invoice OR subject:invoice))"
        if self.__user_input is not None:
            receipt_query = f"(label:^smartlabel_receipt OR (subject:your AND subject:order) OR subject:receipts OR subject:receipt OR subject:invoice OR subject:invoice)) AND subject:{self.__user_input}"
        gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={receipt_query}"
        def __fetch_page(url):
            response = requests.get(url, headers={"Authorization": f"Bearer {access_token}"})
            response.raise_for_status()  # Raise error if the request fails
            data = response.json()
            return data.get("messages", []), data.get("nextPageToken")

        messages = []
        page_token = None
        try:
            while True:
                url = f"{gmail_url}&pageToken={page_token}" if page_token else gmail_url
                page_messages, page_token = __fetch_page(url)
                messages.extend(page_messages)
                if not page_token:
                    break
        except requests.RequestException as e:
            raise RuntimeError(f"Error fetching messages from Gmail API: {str(e)}")
        
        return messages

    def __fetch_message_data(self, message_id: str) -> dict:
        """
        Fetches message data from the Gmail API.

        Args:
            message_id (str): The ID of the message to fetch.

        Returns:
            dict: Message data retrieved from the Gmail API.

        Raises:
            RuntimeError: If there is an issue while fetching message data from the Gmail API.
        """
        message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}"
        try:
            response = requests.get(message_url, headers={"Authorization": f"Bearer {self.__jwt}"})
            response.raise_for_status()  # Raise error if the request fails
            return response.json()
        except requests.RequestException as e:
            raise RuntimeError(f"Error fetching message data from Gmail API: {str(e)}")

    def __fetch_attachment_data(self, message_id: str, attachment_id: str) -> dict:
        """
        Fetches attachment data from the Gmail API.

        Args:
            message_id (str): The ID of the message containing the attachment.
            attachment_id (str): The ID of the attachment to fetch.

        Returns:
            dict: Attachment data retrieved from the Gmail API.

        Raises:
            RuntimeError: If there is an issue while fetching attachment data from the Gmail API.
        """
        attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}"
        try:
            response = requests.get(attachment_url, headers={"Authorization": f"Bearer {self.__jwt}"})
            response.raise_for_status()  # Raise error if the request fails
            return response.json()
        except requests.RequestException as e:
            raise RuntimeError(f"Error fetching attachment data from Gmail API: {str(e)}")
        
    def __process_message(self, message: dict) -> tuple:
        """
        Processes a single message.

        Args:
            message (dict): The message to process.

        Returns:
            tuple: A tuple containing the subject (str), body (str), links (list of str), 
                and base64 data if it contains an document attachment in the form of pdf, docx, ppt or any file format indicating whether the message contains an attachment.

        Raises:
            RuntimeError: If there is an issue while fetching message data from the Gmail API.
        """
        message_id = message.get("id")
        if not message_id:
            return None, None, [], False

        message_data = self.__fetch_message_data(message_id, self.__jwt)
        subject = message_data.get('payload', {}).get('headers', {}).get('value', '')

        body = ''
        links = []
        has_attachment = False

        if 'payload' in message_data and 'parts' in message_data['payload']:
            parts = message_data['payload']['parts']
            for part in parts:
                if 'mimeType' not in part:
                    continue

                mime_type = part['mimeType']
                if mime_type == 'text/plain' or mime_type == 'text/html':
                    body_data = part['body'].get('data', '')
                    body = base64.urlsafe_b64decode(body_data).decode('utf-8')
                    text= self._extract_text_and_links(body)

                if 'body' in part and 'attachmentId' in part['body']:
                    attachment_id = part['body']['attachmentId']
                    attachment_data = self.__fetch_attachment_data(message_id, attachment_id)
                    data = attachment_data.get("data", "")
                    filename = part.get("filename", "untitled.txt")

                    if data:
                        # Save only the first 10 characters of the attachment data
                        return subject,body , links , {filename:data}

        return subject, body, links , None
    
    def extract_text_and_links(html_content: str) -> tuple:
        """
        Extracts text and links from HTML content.

        Args:
            html_content (str): The HTML content to process.

        Returns:
            tuple: A tuple containing the extracted text (str) and links (list of tuples).

        Raises:
            ValueError: If the input HTML content is empty or None.
        """
        if not html_content:
            raise ValueError("HTML content is empty or None")

        soup = BeautifulSoup(html_content, 'html.parser')

        # Extract text
        text = soup.get_text(separator=' ')
        text = re.sub(r'\s+', ' ', text).strip()

        # Extract links
        links = [(link.text, link['href']) for link in soup.find_all('a', href=True)]

        return text, links
    
    def extract_messages(self) -> dict:
        """
        Extracts messages based on the provided brand name.

        Args:
            brand_name (str): The brand name to search for in email subjects.
            jwt_token (str): The JWT token for authentication.

        Returns:
            dict: A dictionary containing the extracted messages with their subjects, bodies, links, and attachment statuses.
            format:{"results":[{"subjec":"test subject" , "body":"it would be text" , "attachment_data":{"filename":base64URL format}},{second message with same content of subject , body , attachment_data}]}

        """
        messages = self.__fetch_messages()
        results = []
        for message in messages:
            subject, body, attachment_data = self.__process_message(message)

            """ Handling None values """
            subject = subject if subject is not None else ""
            body = body if body is not None else ""
            attachment_data = attachment_data if attachment_data is not None else {}

            results.append({"subject": subject, "body": body, "attachment_data": attachment_data})

        return {"results": results}