analytics_api / main.py
Omkar008's picture
Update main.py
a712034 verified
raw
history blame
No virus
5.95 kB
from fastapi import FastAPI, HTTPException, Query
from supabase import create_client, Client
from typing import List
import uvicorn
from datetime import datetime, timedelta
from collections import Counter
from dotenv import load_dotenv
import os
load_dotenv()
app = FastAPI()
# Initialize Supabase client
url: str = os.getenv("SUPABASE_URL")
key: str = os.getenv("SUPABASE_KEY")
supabase: Client = create_client(url, key)
table = "receipt_radar_structured_data_duplicate"
@app.get("/")
def test():
return {"message":"Application is working !!"}
@app.get("/test")
def test1():
return {"Api response !!"}
@app.get("/user-analytics")
async def get_user_analytics(
hush_id: str = Query(..., description="User's hush ID"),
limit: int = 5
):
# Fetch all necessary data in one query
response = supabase.table(table).select("*").eq("user_id", hush_id).execute()
# Process the data for different analytics
total_amount = sum(float(item['total_cost']) for item in response.data if item['total_cost'] is not None)
dates = [datetime.strptime(item['Date'], "%d-%m-%Y") for item in response.data if item['Date'] is not None]
date_counts = Counter(dates)
avg_transaction = total_amount / len(response.data) if response.data else 0
item_counts = Counter(item['brand'] for item in response.data)
category_counts = Counter(item['purchase_category'] for item in response.data)
category_spending = {}
brand_spending = {}
for item in response.data:
category = item['purchase_category']
brand = item['brand_category']
cost = float(item['total_cost'])
category_spending[category] = category_spending.get(category, 0) + cost
brand_spending[brand] = brand_spending.get(brand, 0) + cost
sorted_categories = sorted(category_spending.items(), key=lambda x: x[1], reverse=True)
sorted_brands = sorted(brand_spending.items(), key=lambda x: x[1], reverse=True)
return {
"total_amount_spent": total_amount,
# "frequency_of_purchases": dict(date_counts),
"average_transaction_value": avg_transaction,
"most_purchased_items": dict(item_counts.most_common(limit)),
"purchase_categories": dict(category_counts),
"categories_with_highest_spending": dict(sorted_categories[:limit]),
"brand_preferences": dict(sorted_brands[:limit])
}
@app.get("/total-amount-spent")
async def get_total_amount_spent(hush_id: str = Query(..., description="User's hush ID")):
response = supabase.table(table).select("total_cost").eq("user_id", hush_id).execute()
print(response)
total_amount = sum(float(item['total_cost']) for item in response.data if item['Total_cost'] is not None)
return {"total_amount_spent": total_amount}
@app.get("/frequency-of-purchases")
async def get_frequency_of_purchases(hush_id: str = Query(..., description="User's hush ID")):
response = supabase.table(table).select("Date").eq("user_id", hush_id).execute()
print(response)
dates = [datetime.strptime(item['Date'], "%d-%m-%Y") for item in response.data if item['Date'] is not None]
print(dates)
# dates = []
# for item in response.data:
# if item['Date']:
# date = datetime.fromisoformat(item['Date'].replace('Z', '+00:00'))
# dates.append(date.strftime("%Y-%m-%d")) # Format as YYYY-MM-DD
date_counts = Counter(dates)
return {"frequency_of_purchases": dict(date_counts)}
@app.get("/average-transaction-value")
async def get_average_transaction_value(hush_id: str = Query(..., description="User's hush ID")):
response = supabase.table(table).select("total_cost").eq("user_id", hush_id).execute()
total_amount = sum(float(item['total_cost']) for item in response.data)
avg_transaction = total_amount / len(response.data) if response.data else 0
return {"average_transaction_value": avg_transaction}
@app.get("/most-purchased-items")
async def get_most_purchased_items(hush_id: str = Query(..., description="User's hush ID"), limit: int = 5):
response = supabase.table(table).select("brand").eq("user_id", hush_id).execute()
item_counts = Counter(item['brand'] for item in response.data)
return {"most_purchased_items": dict(item_counts.most_common(limit))}
@app.get("/purchase-categories")
async def get_purchase_categories(hush_id: str = Query(..., description="User's hush ID")):
response = supabase.table(table).select("purchase_category").eq("user_id", hush_id).execute()
category_counts = Counter(item['purchase_category'] for item in response.data)
return {"purchase_categories": dict(category_counts)}
@app.get("/categories-with-highest-spending")
async def get_categories_with_highest_spending(hush_id: str = Query(..., description="User's hush ID"), limit: int = 5):
response = supabase.table(table).select("purchase_category,total_cost").eq("user_id", hush_id).execute()
category_spending = {}
for item in response.data:
category = item['purchase_category']
cost = float(item['total_cost'])
category_spending[category] = category_spending.get(category, 0) + cost
sorted_categories = sorted(category_spending.items(), key=lambda x: x[1], reverse=True)
return {"categories_with_highest_spending": dict(sorted_categories[:limit])}
@app.get("/brand-preferences")
async def get_brand_preferences(hush_id: str = Query(..., description="User's hush ID"), limit: int = 5):
response = supabase.table(table).select("brand_category,total_cost").eq("user_id", hush_id).execute()
brand_spending = {}
for item in response.data:
brand = item['brand_category']
cost = float(item['total_cost'])
brand_spending[brand] = brand_spending.get(brand, 0) + cost
sorted_brands = sorted(brand_spending.items(), key=lambda x: x[1], reverse=True)
return {"brand_preferences": dict(sorted_brands[:limit])}