Omkar008's picture
Update test.py
e715b7e
raw
history blame
No virus
9.31 kB
from fastapi import FastAPI, Depends
from fastapi.security import OAuth2PasswordBearer
import requests
from jose import jwt
import webbrowser
import base64
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# Replace these with your own values from the Google Developer Console
# GOOGLE_CLIENT_ID = ""
# GOOGLE_CLIENT_SECRET = ""
# GOOGLE_REDIRECT_URI = ""
GOOGLE_CLIENT_ID = "485753721652-5uta3e18va2g6cnkldib2d68q39t4vod.apps.googleusercontent.com"
GOOGLE_CLIENT_SECRET = "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"
GOOGLE_REDIRECT_URI = "https://omkar008-receipt-radar-test.hf.space/auth/google"
GOOGLE_REDIRECT_URI_hr = "https://receiptradar-0bb387d81174.herokuapp.com/auth1/google"
@app.get("/")
async def login_google():
# oauth_url = f"https://accounts.google.com/o/oauth2/auth?response_type=code&client_id={GOOGLE_CLIENT_ID}&redirect_uri={GOOGLE_REDIRECT_URI}&scope=openid%20profile%20email&access_type=offline"
#Below is the URL to prompt the user to login to his specified gmail account and also give a readonly access to his gmail
oauth_url = f"https://accounts.google.com/o/oauth2/auth?response_type=code&client_id={GOOGLE_CLIENT_ID}&redirect_uri={GOOGLE_REDIRECT_URI}&scope=openid%20profile%20email%20https://www.googleapis.com/auth/gmail.readonly&access_type=offline"
oauth_url_hr = f"https://accounts.google.com/o/oauth2/auth?response_type=code&client_id={GOOGLE_CLIENT_ID}&redirect_uri={GOOGLE_REDIRECT_URI_hr}&scope=openid%20profile%20email&access_type=offline"
webbrowser.open(oauth_url)
return {
"url":f"https://accounts.google.com/o/oauth2/auth?response_type=code&client_id={GOOGLE_CLIENT_ID}&redirect_uri={GOOGLE_REDIRECT_URI}&scope=openid%20profile%20email&access_type=offline",
"url_hr": oauth_url_hr
}
@app.get("/auth/google")
async def auth_google(code: str):
#This code is basically the authorization code and this authorization code helps us to get the access token with the required scopes that we have set .
#We require the gmail.readonly scopes that requires verification of our application and all.
token_url = "https://accounts.google.com/o/oauth2/token"
print(code)
data = {
"code": code,
"client_id": GOOGLE_CLIENT_ID,
"client_secret": GOOGLE_CLIENT_SECRET,
"redirect_uri": GOOGLE_REDIRECT_URI,
"grant_type": "authorization_code",
}
response = requests.post(token_url, data=data)
access_token = response.json().get("access_token")
print(response.json())
user_info = requests.get("https://www.googleapis.com/oauth2/v1/userinfo", headers={"Authorization": f"Bearer {access_token}"})
jobs_query = "subject:New application:ios"
gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={jobs_query}&maxResults=110"
gmail_response = requests.get(gmail_url, headers={"Authorization": f"Bearer {access_token}"})
messages = gmail_response.json().get("messages", [])
print(messages)
print("Printing gmail response")
print(gmail_response.json())
# Fetch attachments from the first message
attachments = []
for i,message in enumerate(messages) :
print(i)
print(message)
if message:
message_id = message.get("id")
print(message_id)
if message_id:
message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}"
message_response = requests.get(message_url, headers={"Authorization": f"Bearer {access_token}"})
message_data = message_response.json()
# Check for parts in the message payload
if "payload" in message_data and "parts" in message_data["payload"]:
for part in message_data["payload"]["parts"]:
if "body" in part and "attachmentId" in part["body"]:
attachment_id = part["body"]["attachmentId"]
attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}"
attachment_response = requests.get(attachment_url, headers={"Authorization": f"Bearer {access_token}"})
attachment_data = attachment_response.json()
data = attachment_data.get("data")
filename = part.get("filename", "untitled.txt")
if data:
# Decode base64-encoded attachment data
attachment_content = base64.urlsafe_b64decode(data.encode("UTF-8"))
# Save the attachment to a file
save_path = f"/Users/omkarmalpure/Documents/Gmail_API/attachments/{filename}"
with open(save_path, "wb") as file:
file.write(attachment_content)
attachments.append(save_path)
print(attachments)
print(len(attachments))
return user_info.json()
@app.get("/auth1/google")
async def auth_google(request: Request,code: str,state: str = None):
#This code is basically the authorization code and this authorization code helps us to get the access token with the required scopes that we have set .
#We require the gmail.readonly scopes that requires verification of our application and all.
filter_query = None
if state:
state_params = state.split('&')
for param in state_params:
if param.startswith('filter_query='):
filter_query = param.split('=')[1]
token_url = "https://accounts.google.com/o/oauth2/token"
print(code)
data = {
"code": code,
"client_id": GOOGLE_CLIENT_ID,
"client_secret": GOOGLE_CLIENT_SECRET,
"redirect_uri": GOOGLE_REDIRECT_URI,
"grant_type": "authorization_code",
}
response = requests.post(token_url, data=data)
access_token = response.json().get("access_token")
print(response.json())
user_info = requests.get("https://www.googleapis.com/oauth2/v1/userinfo", headers={"Authorization": f"Bearer {access_token}"})
gmail_url = "https://www.googleapis.com/gmail/v1/users/me/messages?q=subject:receipt&maxResults=2"
gmail_response = requests.get(gmail_url, headers={"Authorization": f"Bearer {access_token}"})
messages = gmail_response.json().get("messages", [])
print(messages)
print("Printing gmail response")
print(gmail_response.json())
attachment_no = 0
data_new = [[]]
for i,message in enumerate(messages) :
if message:
message_id = message.get("id")
print(message_id)
if message_id:
message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}"
message_response = requests.get(message_url, headers={"Authorization": f"Bearer {access_token}"})
message_data = message_response.json()
# Check for parts in the message payload
if "payload" in message_data and "parts" in message_data["payload"]:
for part in message_data["payload"]["parts"]:
if "body" in part and "attachmentId" in part["body"]:
attachment_id = part["body"]["attachmentId"]
attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}"
attachment_response = requests.get(attachment_url, headers={"Authorization": f"Bearer {access_token}"})
attachment_data = attachment_response.json()
data = attachment_data.get("data")
filename = part.get("filename", "untitled.txt")
if data:
data_new.append([data.encode("utf-8")])
# Decode base64-encoded attachment data
attachment_content = base64.urlsafe_b64decode(data.encode("UTF-8"))
# print(attachment_content)
print("yo")
print(data_new)
attachment_no+=1
# Save the attachment to a file
#Here add the function or class to save the attachments to Google Cloud
# save_path = f"/Users/omkarmalpure/Desktop/Current Work/fastapi_gmail/attachments/{filename}"
# with open(save_path, "wb") as file:
# file.write(attachment_content)
return {"user_info": user_info.json(), "query": filter_query , "data":attachment_no , "attachment_content":data_new , "attachment_count":attachment_no}
@app.get("/token")
async def get_token(token: str = Depends(oauth2_scheme)):
return jwt.decode(token, GOOGLE_CLIENT_SECRET, algorithms=["HS256"])
# if __name__ == "__main__":
# import uvicorn
# uvicorn.run(app, host="0.0.0.0", port=8000)