Spaces:
Sleeping
Sleeping
Update controllers/ws_controller_1.py
Browse files- controllers/ws_controller_1.py +10 -11
controllers/ws_controller_1.py
CHANGED
@@ -144,7 +144,7 @@ async def get_messages(code: str, websocket: WebSocket,start, brand_name: Option
|
|
144 |
|
145 |
page_token = None
|
146 |
processed_count = 0
|
147 |
-
|
148 |
async with aiohttp.ClientSession() as session:
|
149 |
while True:
|
150 |
gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={g_query}&maxResults=50"
|
@@ -165,21 +165,19 @@ async def get_messages(code: str, websocket: WebSocket,start, brand_name: Option
|
|
165 |
|
166 |
for future in asyncio.as_completed(processing_tasks):
|
167 |
processed_message = await future
|
168 |
-
|
169 |
-
|
170 |
-
|
171 |
-
|
172 |
-
|
173 |
-
|
174 |
-
await websocket.send_text("NEXT_MESSAGE")
|
175 |
-
|
176 |
-
processed_count += 1
|
177 |
|
178 |
if "nextPageToken" in gmail_data:
|
179 |
page_token = gmail_data["nextPageToken"]
|
180 |
else:
|
181 |
break
|
182 |
-
|
|
|
183 |
async def send_message_in_chunks(websocket: WebSocket, message_json: dict, chunk_size: int):
|
184 |
message_str = json.dumps(message_json)
|
185 |
for i in range(0, len(message_str), chunk_size):
|
@@ -218,6 +216,7 @@ async def process_message(session: aiohttp.ClientSession, access_token: str, mes
|
|
218 |
else:
|
219 |
# If 'payload' or 'headers' is missing, use a default value or extract from another field
|
220 |
company_from_mail = "unknown" # or some other default value
|
|
|
221 |
logging.warning(f"Payload or headers missing for message ID: {message_data.get('id', 'unknown')}")
|
222 |
|
223 |
body = extract_body_from_mail(message_data)
|
|
|
144 |
|
145 |
page_token = None
|
146 |
processed_count = 0
|
147 |
+
skipped_count = 0
|
148 |
async with aiohttp.ClientSession() as session:
|
149 |
while True:
|
150 |
gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={g_query}&maxResults=50"
|
|
|
165 |
|
166 |
for future in asyncio.as_completed(processing_tasks):
|
167 |
processed_message = await future
|
168 |
+
if processed_message is not None:
|
169 |
+
await websocket.send_text(json.dumps(processed_message.to_json()))
|
170 |
+
await websocket.send_text("NEXT_MESSAGE")
|
171 |
+
processed_count += 1
|
172 |
+
else:
|
173 |
+
skipped_count += 1
|
|
|
|
|
|
|
174 |
|
175 |
if "nextPageToken" in gmail_data:
|
176 |
page_token = gmail_data["nextPageToken"]
|
177 |
else:
|
178 |
break
|
179 |
+
logging.info(f"Processed Message {processed_count}")
|
180 |
+
logging.info(f"Skipped Message {skipped_count}")
|
181 |
async def send_message_in_chunks(websocket: WebSocket, message_json: dict, chunk_size: int):
|
182 |
message_str = json.dumps(message_json)
|
183 |
for i in range(0, len(message_str), chunk_size):
|
|
|
216 |
else:
|
217 |
# If 'payload' or 'headers' is missing, use a default value or extract from another field
|
218 |
company_from_mail = "unknown" # or some other default value
|
219 |
+
return None
|
220 |
logging.warning(f"Payload or headers missing for message ID: {message_data.get('id', 'unknown')}")
|
221 |
|
222 |
body = extract_body_from_mail(message_data)
|