File size: 10,159 Bytes
e6e2b35
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3566981
e6e2b35
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3566981
e6e2b35
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3566981
e6e2b35
 
 
41d97d6
03a6290
41d97d6
 
 
 
 
e6e2b35
 
 
 
 
 
0527f8f
 
 
 
 
 
 
 
 
 
e6e2b35
 
 
 
 
0527f8f
e6e2b35
 
 
 
 
 
 
 
 
 
 
 
 
0527f8f
 
 
e6e2b35
0527f8f
 
 
 
 
 
 
 
e6e2b35
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0527f8f
e6e2b35
 
ccf236c
e6e2b35
0527f8f
e6e2b35
0527f8f
e6e2b35
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
import requests
import base64
from bs4 import BeautifulSoup
import re
import jwt
class GmailDataExtractor:

    def __init__(self,jwt:str  , user_input: str = None) -> None:
        if jwt is None :
            self.error = "Error"
        else:
            self.__jwt = jwt
            self.__user_input = user_input
            self.error = None
            self.__secret_key = 'nkldjlncbamjlklwjeklwu24898h*&#Ujnfjf34893U5HSJFBSKFSHFNSK*$*W_ 3OWU'

    def __validate_jwt_token(self):
        try:
            payload = jwt.decode(self.jwt, self.secret_key, algorithms=["HS256"])
            access_token = payload.get("access_token")
            if access_token:
                return access_token
            else:
                raise ValueError("Invalid JWT token: Missing access token")
        except jwt.ExpiredSignatureError:
            raise ValueError("Invalid JWT token: Expired token")
        except jwt.InvalidTokenError:
            raise ValueError("Invalid JWT token: Token verification failed")

    def __fetch_messages(self) -> list:
        """
        Fetches messages from the Gmail API.

        Args:
            gmail_url (str): The URL for the Gmail API request.
            access_token (str): The access token for authenticating with Gmail API.

        Returns:
            list: A list of message objects retrieved from the Gmail API.

        Raises:
            RuntimeError: If there is an issue while fetching messages from the Gmail API.
            
        """

        """currently not implementing jwt for testing purposes
        replace every access_token with jwt function directly which returns the access token"""
        access_token = self.__jwt
        print("access token")
        print(access_token)
        receipt_query = f"(label:^smartlabel_receipt OR (subject:your AND subject:order) OR subject:receipts OR subject:receipt OR subject:invoice OR subject:invoice))"
        if self.__user_input is not None:
            receipt_query = f"(label:^smartlabel_receipt OR (subject:your AND subject:order) OR subject:receipts OR subject:receipt OR subject:invoice OR subject:invoice)) AND subject:{self.__user_input}"
        gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={receipt_query}"
        def __fetch_page(url):
            response = requests.get(url, headers={"Authorization": f"Bearer {access_token}"})
            response.raise_for_status()  # Raise error if the request fails
            data = response.json()
            return data.get("messages", []), data.get("nextPageToken")

        messages = []
        page_token = None
        try:
            while True:
                url = f"{gmail_url}&pageToken={page_token}" if page_token else gmail_url
                page_messages, page_token = __fetch_page(url)
                messages.extend(page_messages)
                if not page_token:
                    break
        except requests.RequestException as e:
            raise RuntimeError(f"Error fetching messages from Gmail API: {str(e)}")
        
        return messages

    def __fetch_message_data(self, message_id: str) -> dict:
        """
        Fetches message data from the Gmail API.

        Args:
            message_id (str): The ID of the message to fetch.

        Returns:
            dict: Message data retrieved from the Gmail API.

        Raises:
            RuntimeError: If there is an issue while fetching message data from the Gmail API.
        """
        print("fetch_message_data")
        message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}"
        try:
            response = requests.get(message_url, headers={"Authorization": f"Bearer {self.__jwt}"})
            response.raise_for_status()  # Raise error if the request fails
            return response.json()
        except requests.RequestException as e:
            raise RuntimeError(f"Error fetching message data from Gmail API: {str(e)}")

    def __fetch_attachment_data(self, message_id: str, attachment_id: str) -> dict:
        """
        Fetches attachment data from the Gmail API.

        Args:
            message_id (str): The ID of the message containing the attachment.
            attachment_id (str): The ID of the attachment to fetch.

        Returns:
            dict: Attachment data retrieved from the Gmail API.

        Raises:
            RuntimeError: If there is an issue while fetching attachment data from the Gmail API.
        """
        print("fetch_attachment_data")
        attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}"
        try:
            response = requests.get(attachment_url, headers={"Authorization": f"Bearer {self.__jwt}"})
            response.raise_for_status()  # Raise error if the request fails
            return response.json()
        except requests.RequestException as e:
            raise RuntimeError(f"Error fetching attachment data from Gmail API: {str(e)}")
        
    def __process_message(self, message: dict) -> tuple:
        """
        Processes a single message.

        Args:
            message (dict): The message to process.

        Returns:
            tuple: A tuple containing the subject (str), body (str), links (list of str), 
                and base64 data if it contains an document attachment in the form of pdf, docx, ppt or any file format indicating whether the message contains an attachment.

        Raises:
            RuntimeError: If there is an issue while fetching message data from the Gmail API.
        """
        print("process_messages")
        message_id = message.get("id")
        if not message_id:
            return None, None, [], False
        subject=''
        message_data = self.__fetch_message_data(message_id)
        if 'payload' in message_data and 'headers' in message_data['payload']:
            headers = message_data['payload']['headers']
            for header in headers:
                if header['name'] == 'Subject':
                    subject = header['value']
        body = ''
        links = []
        has_attachment = False

        if 'payload' in message_data and 'parts' in message_data['payload']:
            parts = message_data['payload']['parts']
            payload = message_data['payload']
            brand_from_gmail = ''
            company_from_gmail = ''
            if payload['headers']['name'] == 'from':
                brand_from_gmail = payload['headers']['value']
                company_from_gmail = extract_domain_from_email(brand_from_gmail)
            else:
                company_from_gmail = None
                
                
            for part in parts:
                if 'mimeType' not in part:
                    continue

                mime_type = part['mimeType']
                
                if mime_type == 'text/plain' or mime_type == 'text/html':
                    body_data = part['body'].get('data', '')
                    body = base64.urlsafe_b64decode(body_data).decode('utf-8')
                    text= self._extract_text_and_links(body)

                if 'body' in part and 'attachmentId' in part['body']:
                    attachment_id = part['body']['attachmentId']
                    attachment_data = self.__fetch_attachment_data(message_id, attachment_id)
                    data = attachment_data.get("data", "")
                    filename = part.get("filename", "untitled.txt")

                    if data:
                        # Save only the first 10 characters of the attachment data
                        return subject,body ,{"filename":filename , "data":data} , company_from_gmail

        return subject, body,None , company_from_gmail

    def extract_domain_from_email(email):
        regex = r"@(.+)$"
        match = re.search(regex,email)
        if match :
            return match.group(1)
        else:
            return None
        
    
    def extract_text_and_links(html_content: str) -> tuple:
        """
        Extracts text and links from HTML content.

        Args:
            html_content (str): The HTML content to process.

        Returns:
            tuple: A tuple containing the extracted text (str) and links (list of tuples).

        Raises:
            ValueError: If the input HTML content is empty or None.
        """
        if not html_content:
            raise ValueError("HTML content is empty or None")

        soup = BeautifulSoup(html_content, 'html.parser')

        # Extract text
        text = soup.get_text(separator=' ')
        text = re.sub(r'\s+', ' ', text).strip()

        # Extract links
        links = [(link.text, link['href']) for link in soup.find_all('a', href=True)]

        return text, links
    
    def extract_messages(self) -> dict:
        """
        Extracts messages based on the provided brand name.

        Args:
            brand_name (str): The brand name to search for in email subjects.
            jwt_token (str): The JWT token for authentication.

        Returns:
            dict: A dictionary containing the extracted messages with their subjects, bodies, links, and attachment statuses.
            format:{"results":[{"subjec":"test subject" , "body":"it would be text" , "attachment_data":{"filename":base64URL format}},{second message with same content of subject , body , attachment_data}]}

        """
        print("entered the extract messages")
        messages = self.__fetch_messages()
        results = []
        for message in messages:
            subject, body, attachment_data , company_name = self.__process_message(message)

            """ Handling None values """
            body = body if body is not None else None
            attachment_data = attachment_data if attachment_data is not None else {}
            company_associated = company_name if company_name is not None else None

            results.append({"body": body, "attachment_data": [attachment_data] ,'company_associated':company_associated})

        return {"results": results}

# obj = GmailDataExtractor("abcd","user_input")
# print(obj.error)