Spaces:
Sleeping
Sleeping
from fastapi import FastAPI, HTTPException, Query | |
from supabase import create_client, Client | |
from typing import List | |
import uvicorn | |
from datetime import datetime, timedelta | |
from collections import Counter | |
from dotenv import load_dotenv | |
import os | |
load_dotenv() | |
app = FastAPI() | |
# Initialize Supabase client | |
url: str = os.getenv("SUPABASE_URL") | |
key: str = os.getenv("SUPABASE_KEY") | |
supabase: Client = create_client(url, key) | |
table = "receipt_radar_structured_data_duplicate" | |
def test(): | |
return {"message":"Application is working !!"} | |
async def get_user_analytics( | |
hush_id: str = Query(..., description="User's hush ID"), | |
limit: int = 5 | |
): | |
# Fetch all necessary data in one query | |
response = supabase.table(table).select("*").eq("user_id", hush_id).execute() | |
# Process the data for different analytics | |
total_amount = sum(float(item['total_cost']) for item in response.data if item['total_cost'] is not None) | |
dates = [datetime.strptime(item['Date'], "%d-%m-%Y") for item in response.data if item['Date'] is not None] | |
date_counts = Counter(dates) | |
avg_transaction = total_amount / len(response.data) if response.data else 0 | |
item_counts = Counter(item['brand'] for item in response.data) | |
category_counts = Counter(item['purchase_category'] for item in response.data) | |
category_spending = {} | |
brand_spending = {} | |
for item in response.data: | |
category = item['purchase_category'] | |
brand = item['brand_category'] | |
cost = float(item['total_cost']) | |
category_spending[category] = category_spending.get(category, 0) + cost | |
brand_spending[brand] = brand_spending.get(brand, 0) + cost | |
sorted_categories = sorted(category_spending.items(), key=lambda x: x[1], reverse=True) | |
sorted_brands = sorted(brand_spending.items(), key=lambda x: x[1], reverse=True) | |
return { | |
"total_amount_spent": total_amount, | |
# "frequency_of_purchases": dict(date_counts), | |
"average_transaction_value": avg_transaction, | |
"most_purchased_items": dict(item_counts.most_common(limit)), | |
"purchase_categories": dict(category_counts), | |
"categories_with_highest_spending": dict(sorted_categories[:limit]), | |
"brand_preferences": dict(sorted_brands[:limit]) | |
} | |
async def get_total_amount_spent(hush_id: str = Query(..., description="User's hush ID")): | |
response = supabase.table(table).select("total_cost").eq("user_id", hush_id).execute() | |
print(response) | |
total_amount = sum(float(item['total_cost']) for item in response.data if item['Total_cost'] is not None) | |
return {"total_amount_spent": total_amount} | |
async def get_frequency_of_purchases(hush_id: str = Query(..., description="User's hush ID")): | |
response = supabase.table(table).select("Date").eq("user_id", hush_id).execute() | |
print(response) | |
dates = [datetime.strptime(item['Date'], "%d-%m-%Y") for item in response.data if item['Date'] is not None] | |
print(dates) | |
# dates = [] | |
# for item in response.data: | |
# if item['Date']: | |
# date = datetime.fromisoformat(item['Date'].replace('Z', '+00:00')) | |
# dates.append(date.strftime("%Y-%m-%d")) # Format as YYYY-MM-DD | |
date_counts = Counter(dates) | |
return {"frequency_of_purchases": dict(date_counts)} | |
async def get_average_transaction_value(hush_id: str = Query(..., description="User's hush ID")): | |
response = supabase.table(table).select("total_cost").eq("user_id", hush_id).execute() | |
total_amount = sum(float(item['total_cost']) for item in response.data) | |
avg_transaction = total_amount / len(response.data) if response.data else 0 | |
return {"average_transaction_value": avg_transaction} | |
async def get_most_purchased_items(hush_id: str = Query(..., description="User's hush ID"), limit: int = 5): | |
response = supabase.table(table).select("brand").eq("user_id", hush_id).execute() | |
item_counts = Counter(item['brand'] for item in response.data) | |
return {"most_purchased_items": dict(item_counts.most_common(limit))} | |
async def get_purchase_categories(hush_id: str = Query(..., description="User's hush ID")): | |
response = supabase.table(table).select("purchase_category").eq("user_id", hush_id).execute() | |
category_counts = Counter(item['purchase_category'] for item in response.data) | |
return {"purchase_categories": dict(category_counts)} | |
async def get_categories_with_highest_spending(hush_id: str = Query(..., description="User's hush ID"), limit: int = 5): | |
response = supabase.table(table).select("purchase_category,total_cost").eq("user_id", hush_id).execute() | |
category_spending = {} | |
for item in response.data: | |
category = item['purchase_category'] | |
cost = float(item['total_cost']) | |
category_spending[category] = category_spending.get(category, 0) + cost | |
sorted_categories = sorted(category_spending.items(), key=lambda x: x[1], reverse=True) | |
return {"categories_with_highest_spending": dict(sorted_categories[:limit])} | |
async def get_brand_preferences(hush_id: str = Query(..., description="User's hush ID"), limit: int = 5): | |
response = supabase.table(table).select("brand_category,total_cost").eq("user_id", hush_id).execute() | |
brand_spending = {} | |
for item in response.data: | |
brand = item['brand_category'] | |
cost = float(item['total_cost']) | |
brand_spending[brand] = brand_spending.get(brand, 0) + cost | |
sorted_brands = sorted(brand_spending.items(), key=lambda x: x[1], reverse=True) | |
return {"brand_preferences": dict(sorted_brands[:limit])} |