Spaces:
Running
Running
File size: 5,884 Bytes
2fee4ac 5175b84 fc54a77 5175b84 2fee4ac |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
from fastapi import FastAPI, HTTPException, Query
from supabase import create_client, Client
from typing import List
import uvicorn
from datetime import datetime, timedelta
from collections import Counter
from dotenv import load_dotenv
import os
load_dotenv()
app = FastAPI()
# Initialize Supabase client
url: str = os.getenv("SUPABASE_URL")
key: str = os.getenv("SUPABASE_KEY")
supabase: Client = create_client(url, key)
table = "receipt_radar_structured_data_duplicate"
@app.get("/")
def test():
return {"message":"Application is working !!"}
@app.get("/user-analytics")
async def get_user_analytics(
hush_id: str = Query(..., description="User's hush ID"),
limit: int = 5
):
# Fetch all necessary data in one query
response = supabase.table(table).select("*").eq("user_id", hush_id).execute()
# Process the data for different analytics
total_amount = sum(float(item['total_cost']) for item in response.data if item['total_cost'] is not None)
dates = [datetime.strptime(item['Date'], "%d-%m-%Y") for item in response.data if item['Date'] is not None]
date_counts = Counter(dates)
avg_transaction = total_amount / len(response.data) if response.data else 0
item_counts = Counter(item['brand'] for item in response.data)
category_counts = Counter(item['purchase_category'] for item in response.data)
category_spending = {}
brand_spending = {}
for item in response.data:
category = item['purchase_category']
brand = item['brand_category']
cost = float(item['total_cost'])
category_spending[category] = category_spending.get(category, 0) + cost
brand_spending[brand] = brand_spending.get(brand, 0) + cost
sorted_categories = sorted(category_spending.items(), key=lambda x: x[1], reverse=True)
sorted_brands = sorted(brand_spending.items(), key=lambda x: x[1], reverse=True)
return {
"total_amount_spent": total_amount,
# "frequency_of_purchases": dict(date_counts),
"average_transaction_value": avg_transaction,
"most_purchased_items": dict(item_counts.most_common(limit)),
"purchase_categories": dict(category_counts),
"categories_with_highest_spending": dict(sorted_categories[:limit]),
"brand_preferences": dict(sorted_brands[:limit])
}
@app.get("/total-amount-spent")
async def get_total_amount_spent(hush_id: str = Query(..., description="User's hush ID")):
response = supabase.table(table).select("total_cost").eq("user_id", hush_id).execute()
print(response)
total_amount = sum(float(item['total_cost']) for item in response.data if item['Total_cost'] is not None)
return {"total_amount_spent": total_amount}
@app.get("/frequency-of-purchases")
async def get_frequency_of_purchases(hush_id: str = Query(..., description="User's hush ID")):
response = supabase.table(table).select("Date").eq("user_id", hush_id).execute()
print(response)
dates = [datetime.strptime(item['Date'], "%d-%m-%Y") for item in response.data if item['Date'] is not None]
print(dates)
# dates = []
# for item in response.data:
# if item['Date']:
# date = datetime.fromisoformat(item['Date'].replace('Z', '+00:00'))
# dates.append(date.strftime("%Y-%m-%d")) # Format as YYYY-MM-DD
date_counts = Counter(dates)
return {"frequency_of_purchases": dict(date_counts)}
@app.get("/average-transaction-value")
async def get_average_transaction_value(hush_id: str = Query(..., description="User's hush ID")):
response = supabase.table(table).select("total_cost").eq("user_id", hush_id).execute()
total_amount = sum(float(item['total_cost']) for item in response.data)
avg_transaction = total_amount / len(response.data) if response.data else 0
return {"average_transaction_value": avg_transaction}
@app.get("/most-purchased-items")
async def get_most_purchased_items(hush_id: str = Query(..., description="User's hush ID"), limit: int = 5):
response = supabase.table(table).select("brand").eq("user_id", hush_id).execute()
item_counts = Counter(item['brand'] for item in response.data)
return {"most_purchased_items": dict(item_counts.most_common(limit))}
@app.get("/purchase-categories")
async def get_purchase_categories(hush_id: str = Query(..., description="User's hush ID")):
response = supabase.table(table).select("purchase_category").eq("user_id", hush_id).execute()
category_counts = Counter(item['purchase_category'] for item in response.data)
return {"purchase_categories": dict(category_counts)}
@app.get("/categories-with-highest-spending")
async def get_categories_with_highest_spending(hush_id: str = Query(..., description="User's hush ID"), limit: int = 5):
response = supabase.table(table).select("purchase_category,total_cost").eq("user_id", hush_id).execute()
category_spending = {}
for item in response.data:
category = item['purchase_category']
cost = float(item['total_cost'])
category_spending[category] = category_spending.get(category, 0) + cost
sorted_categories = sorted(category_spending.items(), key=lambda x: x[1], reverse=True)
return {"categories_with_highest_spending": dict(sorted_categories[:limit])}
@app.get("/brand-preferences")
async def get_brand_preferences(hush_id: str = Query(..., description="User's hush ID"), limit: int = 5):
response = supabase.table(table).select("brand_category,total_cost").eq("user_id", hush_id).execute()
brand_spending = {}
for item in response.data:
brand = item['brand_category']
cost = float(item['total_cost'])
brand_spending[brand] = brand_spending.get(brand, 0) + cost
sorted_brands = sorted(brand_spending.items(), key=lambda x: x[1], reverse=True)
return {"brand_preferences": dict(sorted_brands[:limit])} |