from fastapi import FastAPI, HTTPException, Query from supabase import create_client, Client from typing import List import uvicorn from datetime import datetime, timedelta from collections import Counter from dotenv import load_dotenv import os load_dotenv() app = FastAPI() # Initialize Supabase client url: str = os.getenv("SUPABASE_URL") key: str = os.getenv("SUPABASE_KEY") supabase: Client = create_client(url, key) table = "receipt_radar_structured_data_duplicate" @app.get("/") def test(): return {"message":"Application is working !!"} @app.get("/test") def test1(): return {"Api response !!"} @app.get("/user-analytics") async def get_user_analytics( hush_id: str = Query(..., description="User's hush ID"), limit: int = 5 ): # Fetch all necessary data in one query response = supabase.table(table).select("*").eq("user_id", hush_id).execute() # Process the data for different analytics total_amount = sum(float(item['total_cost']) for item in response.data if item['total_cost'] is not None) dates = [datetime.strptime(item['Date'], "%d-%m-%Y") for item in response.data if item['Date'] is not None] date_counts = Counter(dates) avg_transaction = total_amount / len(response.data) if response.data else 0 item_counts = Counter(item['brand'] for item in response.data) category_counts = Counter(item['purchase_category'] for item in response.data) category_spending = {} brand_spending = {} for item in response.data: category = item['purchase_category'] brand = item['brand_category'] cost = float(item['total_cost']) category_spending[category] = category_spending.get(category, 0) + cost brand_spending[brand] = brand_spending.get(brand, 0) + cost sorted_categories = sorted(category_spending.items(), key=lambda x: x[1], reverse=True) sorted_brands = sorted(brand_spending.items(), key=lambda x: x[1], reverse=True) return { "total_amount_spent": total_amount, # "frequency_of_purchases": dict(date_counts), "average_transaction_value": avg_transaction, "most_purchased_items": dict(item_counts.most_common(limit)), "purchase_categories": dict(category_counts), "categories_with_highest_spending": dict(sorted_categories[:limit]), "brand_preferences": dict(sorted_brands[:limit]) } @app.get("/total-amount-spent") async def get_total_amount_spent(hush_id: str = Query(..., description="User's hush ID")): response = supabase.table(table).select("total_cost").eq("user_id", hush_id).execute() print(response) total_amount = sum(float(item['total_cost']) for item in response.data if item['Total_cost'] is not None) return {"total_amount_spent": total_amount} @app.get("/frequency-of-purchases") async def get_frequency_of_purchases(hush_id: str = Query(..., description="User's hush ID")): response = supabase.table(table).select("Date").eq("user_id", hush_id).execute() print(response) dates = [datetime.strptime(item['Date'], "%d-%m-%Y") for item in response.data if item['Date'] is not None] print(dates) # dates = [] # for item in response.data: # if item['Date']: # date = datetime.fromisoformat(item['Date'].replace('Z', '+00:00')) # dates.append(date.strftime("%Y-%m-%d")) # Format as YYYY-MM-DD date_counts = Counter(dates) return {"frequency_of_purchases": dict(date_counts)} @app.get("/average-transaction-value") async def get_average_transaction_value(hush_id: str = Query(..., description="User's hush ID")): response = supabase.table(table).select("total_cost").eq("user_id", hush_id).execute() total_amount = sum(float(item['total_cost']) for item in response.data) avg_transaction = total_amount / len(response.data) if response.data else 0 return {"average_transaction_value": avg_transaction} @app.get("/most-purchased-items") async def get_most_purchased_items(hush_id: str = Query(..., description="User's hush ID"), limit: int = 5): response = supabase.table(table).select("brand").eq("user_id", hush_id).execute() item_counts = Counter(item['brand'] for item in response.data) return {"most_purchased_items": dict(item_counts.most_common(limit))} @app.get("/purchase-categories") async def get_purchase_categories(hush_id: str = Query(..., description="User's hush ID")): response = supabase.table(table).select("purchase_category").eq("user_id", hush_id).execute() category_counts = Counter(item['purchase_category'] for item in response.data) return {"purchase_categories": dict(category_counts)} @app.get("/categories-with-highest-spending") async def get_categories_with_highest_spending(hush_id: str = Query(..., description="User's hush ID"), limit: int = 5): response = supabase.table(table).select("purchase_category,total_cost").eq("user_id", hush_id).execute() category_spending = {} for item in response.data: category = item['purchase_category'] cost = float(item['total_cost']) category_spending[category] = category_spending.get(category, 0) + cost sorted_categories = sorted(category_spending.items(), key=lambda x: x[1], reverse=True) return {"categories_with_highest_spending": dict(sorted_categories[:limit])} @app.get("/brand-preferences") async def get_brand_preferences(hush_id: str = Query(..., description="User's hush ID"), limit: int = 5): response = supabase.table(table).select("brand_category,total_cost").eq("user_id", hush_id).execute() brand_spending = {} for item in response.data: brand = item['brand_category'] cost = float(item['total_cost']) brand_spending[brand] = brand_spending.get(brand, 0) + cost sorted_brands = sorted(brand_spending.items(), key=lambda x: x[1], reverse=True) return {"brand_preferences": dict(sorted_brands[:limit])}