Spaces:
Running
Running
from fastapi import FastAPI, Depends , Request | |
from fastapi.security import OAuth2PasswordBearer | |
import requests | |
from jose import jwt | |
import webbrowser | |
import base64 | |
app = FastAPI() | |
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") | |
# Replace these with your own values from the Google Developer Console | |
# GOOGLE_CLIENT_ID = "" | |
# GOOGLE_CLIENT_SECRET = "" | |
# GOOGLE_REDIRECT_URI = "" | |
GOOGLE_CLIENT_ID = "485753721652-5uta3e18va2g6cnkldib2d68q39t4vod.apps.googleusercontent.com" | |
GOOGLE_CLIENT_SECRET = "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" | |
GOOGLE_REDIRECT_URI = "https://omkar008-receipt-radar-test.hf.space/auth/google" | |
GOOGLE_REDIRECT_URI_hr = "https://receiptradar-0bb387d81174.herokuapp.com/auth/google" | |
async def login_google(): | |
# oauth_url = f"https://accounts.google.com/o/oauth2/auth?response_type=code&client_id={GOOGLE_CLIENT_ID}&redirect_uri={GOOGLE_REDIRECT_URI}&scope=openid%20profile%20email&access_type=offline" | |
#Below is the URL to prompt the user to login to his specified gmail account and also give a readonly access to his gmail | |
oauth_url = f"https://accounts.google.com/o/oauth2/auth?response_type=code&client_id={GOOGLE_CLIENT_ID}&redirect_uri={GOOGLE_REDIRECT_URI}&scope=openid%20profile%20email%20https://www.googleapis.com/auth/gmail.readonly&access_type=offline" | |
oauth_url_hr = f"https://accounts.google.com/o/oauth2/auth?response_type=code&client_id={GOOGLE_CLIENT_ID}&redirect_uri={GOOGLE_REDIRECT_URI_hr}&scope=openid%20profile%20email%20https://www.googleapis.com/auth/gmail.readonly&access_type=offline&state=receipts" | |
webbrowser.open(oauth_url) | |
return { | |
"url":f"https://accounts.google.com/o/oauth2/auth?response_type=code&client_id={GOOGLE_CLIENT_ID}&redirect_uri={GOOGLE_REDIRECT_URI}&scope=openid%20profile%20email&access_type=offline", | |
"url_hr": oauth_url_hr | |
} | |
async def auth_google(request: Request): | |
#This code is basically the authorization code and this authorization code helps us to get the access token with the required scopes that we have set . | |
#We require the gmail.readonly scopes that requires verification of our application and all. | |
data = await request.json() | |
code = data.get("access_token") | |
print("Printing the authorisation token") | |
print(code) | |
if not code: | |
raise HTTPException(status_code=400, detail="Authorization code not provided") | |
# token_url = "https://accounts.google.com/o/oauth2/token" | |
# print(code) | |
# data = { | |
# "code": code, | |
# "client_id": GOOGLE_CLIENT_ID, | |
# "client_secret": GOOGLE_CLIENT_SECRET, | |
# "redirect_uri": GOOGLE_REDIRECT_URI, | |
# "grant_type": "authorization_code", | |
# } | |
# response = requests.post(token_url, data=data) | |
# access_token = response.json().get("access_token") | |
access_token_new = code | |
# print(response.json()) | |
# print(access_token_new) | |
user_info = requests.get("https://www.googleapis.com/oauth2/v1/userinfo", headers={"Authorization": f"Bearer {access_token_new}"}) | |
# user_query = data.get("user_query") | |
query = "subject:receipts has:attachment" | |
gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={query}&maxResults=2" | |
gmail_response = requests.get(gmail_url, headers={"Authorization": f"Bearer {access_token_new}"}) | |
messages = gmail_response.json().get("messages", []) | |
print(messages) | |
print("Printing gmail response") | |
print(gmail_response.json()) | |
# Fetch attachments from the first message | |
attachments = [] | |
attachment_no = 0 | |
data_new = {} | |
for i,message in enumerate(messages) : | |
print(i) | |
print(message) | |
if message: | |
message_id = message.get("id") | |
print(message_id) | |
if message_id: | |
message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}" | |
message_response = requests.get(message_url, headers={"Authorization": f"Bearer {access_token_new}"}) | |
message_data = message_response.json() | |
# Check for parts in the message payload | |
if "payload" in message_data and "parts" in message_data["payload"]: | |
for part in message_data["payload"]["parts"]: | |
if "body" in part and "attachmentId" in part["body"]: | |
attachment_id = part["body"]["attachmentId"] | |
attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}" | |
attachment_response = requests.get(attachment_url, headers={"Authorization": f"Bearer {access_token_new}"}) | |
attachment_data = attachment_response.json() | |
data = attachment_data.get("data") | |
filename = part.get("filename", "untitled.txt") | |
if data: | |
data_new[filename]=data.encode("UTF-8") | |
# data_new.append({filename:data.encode("UTF-8")}) | |
# Decode base64-encoded attachment data | |
attachment_content = base64.urlsafe_b64decode(data.encode("UTF-8")) | |
# print(attachment_content) | |
print("yo") | |
print(data_new) | |
attachment_no+=1 | |
# if data: | |
# # Decode base64-encoded attachment data | |
# attachment_content = base64.urlsafe_b64decode(data.encode("UTF-8")) | |
# # Save the attachment to a file | |
# save_path = f"/Users/omkarmalpure/Documents/Gmail_API/attachments/{filename}" | |
# with open(save_path, "wb") as file: | |
# file.write(attachment_content) | |
# attachments.append(save_path) | |
print(attachments) | |
print(len(data_new)) | |
return {"attachment_count":attachment_no,"attachment_content":data_new} | |
async def auth_google(request: Request): | |
#This code is basically the authorization code and this authorization code helps us to get the access token with the required scopes that we have set . | |
#We require the gmail.readonly scopes that requires verification of our application and all. | |
filter_query = None | |
if state: | |
state_params = state.split('&') | |
for param in state_params: | |
if param.startswith('filter_query='): | |
filter_query = param.split('=')[1] | |
token_url = "https://accounts.google.com/o/oauth2/token" | |
print(code) | |
data = { | |
"code": code, | |
"client_id": GOOGLE_CLIENT_ID, | |
"client_secret": GOOGLE_CLIENT_SECRET, | |
"redirect_uri": GOOGLE_REDIRECT_URI, | |
"grant_type": "authorization_code", | |
} | |
response = requests.post(token_url, data=data) | |
access_token = response.json().get("access_token") | |
print(response.json()) | |
user_info = requests.get("https://www.googleapis.com/oauth2/v1/userinfo", headers={"Authorization": f"Bearer {access_token}"}) | |
gmail_url = "https://www.googleapis.com/gmail/v1/users/me/messages?q=subject:receipt&maxResults=2" | |
gmail_response = requests.get(gmail_url, headers={"Authorization": f"Bearer {access_token}"}) | |
messages = gmail_response.json().get("messages", []) | |
print(messages) | |
print("Printing gmail response") | |
print(gmail_response.json()) | |
attachment_no = 0 | |
data_new = [[]] | |
for i,message in enumerate(messages) : | |
if message: | |
message_id = message.get("id") | |
print(message_id) | |
if message_id: | |
message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}" | |
message_response = requests.get(message_url, headers={"Authorization": f"Bearer {access_token}"}) | |
message_data = message_response.json() | |
# Check for parts in the message payload | |
if "payload" in message_data and "parts" in message_data["payload"]: | |
for part in message_data["payload"]["parts"]: | |
if "body" in part and "attachmentId" in part["body"]: | |
attachment_id = part["body"]["attachmentId"] | |
attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}" | |
attachment_response = requests.get(attachment_url, headers={"Authorization": f"Bearer {access_token}"}) | |
attachment_data = attachment_response.json() | |
data = attachment_data.get("data") | |
filename = part.get("filename", "untitled.txt") | |
if data: | |
data_new.append([data.encode("utf-8")]) | |
# Decode base64-encoded attachment data | |
attachment_content = base64.urlsafe_b64decode(data.encode("UTF-8")) | |
# print(attachment_content) | |
print("yo") | |
print(data_new) | |
attachment_no+=1 | |
# Save the attachment to a file | |
#Here add the function or class to save the attachments to Google Cloud | |
# save_path = f"/Users/omkarmalpure/Desktop/Current Work/fastapi_gmail/attachments/{filename}" | |
# with open(save_path, "wb") as file: | |
# file.write(attachment_content) | |
return {"user_info": user_info.json(), "query": filter_query , "data":attachment_no , "attachment_content":data_new , "attachment_count":attachment_no} | |
async def get_token(token: str = Depends(oauth2_scheme)): | |
return jwt.decode(token, GOOGLE_CLIENT_SECRET, algorithms=["HS256"]) | |
# if __name__ == "__main__": | |
# import uvicorn | |
# uvicorn.run(app, host="0.0.0.0", port=8000) |