Omkar008 commited on
Commit
3df0c32
1 Parent(s): 573f961

Delete get_gmail_data.py

Browse files
Files changed (1) hide show
  1. get_gmail_data.py +0 -221
get_gmail_data.py DELETED
@@ -1,221 +0,0 @@
1
- import requests
2
- import jwt
3
- import base64
4
- from bs4 import BeautifulSoup
5
- import re
6
-
7
- class GmailDataExtractor:
8
-
9
- def __init__(self,jwt:str , user_input: str = None) -> None:
10
- if jwt is None :
11
- self.error = "Error"
12
- else:
13
- self.__jwt = jwt
14
- self.__user_input = user_input
15
- self.error = None
16
- self.__secret_key = ''
17
-
18
- def __validate_jwt_token(self):
19
- try:
20
- payload = jwt.decode(self.jwt, self.secret_key, algorithms=["HS256"])
21
- access_token = payload.get("access_token")
22
- if access_token:
23
- return access_token
24
- else:
25
- raise ValueError("Invalid JWT token: Missing access token")
26
- except jwt.ExpiredSignatureError:
27
- raise ValueError("Invalid JWT token: Expired token")
28
- except jwt.InvalidTokenError:
29
- raise ValueError("Invalid JWT token: Token verification failed")
30
-
31
- def __fetch_messages(self) -> list:
32
- """
33
- Fetches messages from the Gmail API.
34
-
35
- Args:
36
- gmail_url (str): The URL for the Gmail API request.
37
- access_token (str): The access token for authenticating with Gmail API.
38
-
39
- Returns:
40
- list: A list of message objects retrieved from the Gmail API.
41
-
42
- Raises:
43
- RuntimeError: If there is an issue while fetching messages from the Gmail API.
44
-
45
- """
46
-
47
- """currently not implementing jwt for testing purposes
48
- replace every access_token with jwt function directly which returns the access token"""
49
- access_token = self.__jwt
50
- receipt_query = f'subject:"your order" OR subject:receipts OR subject:receipt OR subject:invoice OR subject:invoice OR category:purchases'
51
- if self.__user_input is not None:
52
- receipt_query = f'(subject:"your order" OR subject:receipts OR subject:receipt OR subject:invoice OR subject:invoice OR category:purchases) AND subject:{self.__user_input}'
53
- gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={receipt_query}"
54
- def __fetch_page(url):
55
- response = requests.get(url, headers={"Authorization": f"Bearer {access_token}"})
56
- response.raise_for_status() # Raise error if the request fails
57
- data = response.json()
58
- return data.get("messages", []), data.get("nextPageToken")
59
-
60
- messages = []
61
- page_token = None
62
- try:
63
- while True:
64
- url = f"{gmail_url}&pageToken={page_token}" if page_token else gmail_url
65
- page_messages, page_token = __fetch_page(url)
66
- messages.extend(page_messages)
67
- if not page_token:
68
- break
69
- except requests.RequestException as e:
70
- raise RuntimeError(f"Error fetching messages from Gmail API: {str(e)}")
71
-
72
- return messages
73
-
74
- def __fetch_message_data(self, message_id: str) -> dict:
75
- """
76
- Fetches message data from the Gmail API.
77
-
78
- Args:
79
- message_id (str): The ID of the message to fetch.
80
-
81
- Returns:
82
- dict: Message data retrieved from the Gmail API.
83
-
84
- Raises:
85
- RuntimeError: If there is an issue while fetching message data from the Gmail API.
86
- """
87
- message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}"
88
- try:
89
- response = requests.get(message_url, headers={"Authorization": f"Bearer {self.__jwt}"})
90
- response.raise_for_status() # Raise error if the request fails
91
- return response.json()
92
- except requests.RequestException as e:
93
- raise RuntimeError(f"Error fetching message data from Gmail API: {str(e)}")
94
-
95
- def __fetch_attachment_data(self, message_id: str, attachment_id: str) -> dict:
96
- """
97
- Fetches attachment data from the Gmail API.
98
-
99
- Args:
100
- message_id (str): The ID of the message containing the attachment.
101
- attachment_id (str): The ID of the attachment to fetch.
102
-
103
- Returns:
104
- dict: Attachment data retrieved from the Gmail API.
105
-
106
- Raises:
107
- RuntimeError: If there is an issue while fetching attachment data from the Gmail API.
108
- """
109
- attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}"
110
- try:
111
- response = requests.get(attachment_url, headers={"Authorization": f"Bearer {self.__jwt}"})
112
- response.raise_for_status() # Raise error if the request fails
113
- return response.json()
114
- except requests.RequestException as e:
115
- raise RuntimeError(f"Error fetching attachment data from Gmail API: {str(e)}")
116
-
117
- def __process_message(self, message: dict) -> tuple:
118
- """
119
- Processes a single message.
120
-
121
- Args:
122
- message (dict): The message to process.
123
-
124
- Returns:
125
- tuple: A tuple containing the subject (str), body (str), links (list of str),
126
- and base64 data if it contains an document attachment in the form of pdf, docx, ppt or any file format indicating whether the message contains an attachment.
127
-
128
- Raises:
129
- RuntimeError: If there is an issue while fetching message data from the Gmail API.
130
- """
131
- message_id = message.get("id")
132
- if not message_id:
133
- return None, None, [], False
134
-
135
- message_data = self.__fetch_message_data(message_id, self.__jwt)
136
- subject = message_data.get('payload', {}).get('headers', {}).get('value', '')
137
-
138
- body = ''
139
- links = []
140
- has_attachment = False
141
-
142
- if 'payload' in message_data and 'parts' in message_data['payload']:
143
- parts = message_data['payload']['parts']
144
- for part in parts:
145
- if 'mimeType' not in part:
146
- continue
147
-
148
- mime_type = part['mimeType']
149
- if mime_type == 'text/plain' or mime_type == 'text/html':
150
- body_data = part['body'].get('data', '')
151
- body = base64.urlsafe_b64decode(body_data).decode('utf-8')
152
- text= self._extract_text_and_links(body)
153
-
154
- if 'body' in part and 'attachmentId' in part['body']:
155
- attachment_id = part['body']['attachmentId']
156
- attachment_data = self.__fetch_attachment_data(message_id, attachment_id)
157
- data = attachment_data.get("data", "")
158
- filename = part.get("filename", "untitled.txt")
159
-
160
- if data:
161
- # Save only the first 10 characters of the attachment data
162
- return subject,body ,{"filename":filename , "base64URLData":data}
163
-
164
- return subject, body,None
165
-
166
- def extract_text_and_links(html_content: str) -> tuple:
167
- """
168
- Extracts text and links from HTML content.
169
-
170
- Args:
171
- html_content (str): The HTML content to process.
172
-
173
- Returns:
174
- tuple: A tuple containing the extracted text (str) and links (list of tuples).
175
-
176
- Raises:
177
- ValueError: If the input HTML content is empty or None.
178
- """
179
- if not html_content:
180
- raise ValueError("HTML content is empty or None")
181
-
182
- soup = BeautifulSoup(html_content, 'html.parser')
183
-
184
- # Extract text
185
- text = soup.get_text(separator=' ')
186
- text = re.sub(r'\s+', ' ', text).strip()
187
-
188
- # Extract links
189
- links = [(link.text, link['href']) for link in soup.find_all('a', href=True)]
190
-
191
- return text, links
192
-
193
- def extract_messages(self) -> dict:
194
- """
195
- Extracts messages based on the provided brand name.
196
-
197
- Args:
198
- brand_name (str): The brand name to search for in email subjects.
199
- jwt_token (str): The JWT token for authentication.
200
-
201
- Returns:
202
- dict: A dictionary containing the extracted messages with their subjects, bodies, links, and attachment statuses.
203
- format:{"results":[{"subjec":"test subject" , "body":"it would be text" , "attachment_data":{"filename":filename,"base64URLData":data}},{second message with same content of subject , body , attachment_data}]}
204
-
205
- """
206
- messages = self.__fetch_messages()
207
- results = []
208
- for message in messages:
209
- subject, body, attachment_data = self.__process_message(message)
210
-
211
- """ Handling None values """
212
- subject = subject if subject is not None else ""
213
- body = body if body is not None else ""
214
- attachment_data = attachment_data if attachment_data is not None else {}
215
-
216
- if attachment_data: # Check if attachment_data is present
217
- results.append({"attachment_data": attachment_data})
218
- elif body: # If attachment_data is absent, append body only if it's not empty
219
- results.append({"body": body})
220
-
221
- return {"results": results}