Spaces:
Sleeping
Sleeping
File size: 5,946 Bytes
2fee4ac a712034 5175b84 fc54a77 5175b84 2fee4ac |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
from fastapi import FastAPI, HTTPException, Query
from supabase import create_client, Client
from typing import List
import uvicorn
from datetime import datetime, timedelta
from collections import Counter
from dotenv import load_dotenv
import os
load_dotenv()
app = FastAPI()
# Initialize Supabase client
url: str = os.getenv("SUPABASE_URL")
key: str = os.getenv("SUPABASE_KEY")
supabase: Client = create_client(url, key)
table = "receipt_radar_structured_data_duplicate"
@app.get("/")
def test():
return {"message":"Application is working !!"}
@app.get("/test")
def test1():
return {"Api response !!"}
@app.get("/user-analytics")
async def get_user_analytics(
hush_id: str = Query(..., description="User's hush ID"),
limit: int = 5
):
# Fetch all necessary data in one query
response = supabase.table(table).select("*").eq("user_id", hush_id).execute()
# Process the data for different analytics
total_amount = sum(float(item['total_cost']) for item in response.data if item['total_cost'] is not None)
dates = [datetime.strptime(item['Date'], "%d-%m-%Y") for item in response.data if item['Date'] is not None]
date_counts = Counter(dates)
avg_transaction = total_amount / len(response.data) if response.data else 0
item_counts = Counter(item['brand'] for item in response.data)
category_counts = Counter(item['purchase_category'] for item in response.data)
category_spending = {}
brand_spending = {}
for item in response.data:
category = item['purchase_category']
brand = item['brand_category']
cost = float(item['total_cost'])
category_spending[category] = category_spending.get(category, 0) + cost
brand_spending[brand] = brand_spending.get(brand, 0) + cost
sorted_categories = sorted(category_spending.items(), key=lambda x: x[1], reverse=True)
sorted_brands = sorted(brand_spending.items(), key=lambda x: x[1], reverse=True)
return {
"total_amount_spent": total_amount,
# "frequency_of_purchases": dict(date_counts),
"average_transaction_value": avg_transaction,
"most_purchased_items": dict(item_counts.most_common(limit)),
"purchase_categories": dict(category_counts),
"categories_with_highest_spending": dict(sorted_categories[:limit]),
"brand_preferences": dict(sorted_brands[:limit])
}
@app.get("/total-amount-spent")
async def get_total_amount_spent(hush_id: str = Query(..., description="User's hush ID")):
response = supabase.table(table).select("total_cost").eq("user_id", hush_id).execute()
print(response)
total_amount = sum(float(item['total_cost']) for item in response.data if item['Total_cost'] is not None)
return {"total_amount_spent": total_amount}
@app.get("/frequency-of-purchases")
async def get_frequency_of_purchases(hush_id: str = Query(..., description="User's hush ID")):
response = supabase.table(table).select("Date").eq("user_id", hush_id).execute()
print(response)
dates = [datetime.strptime(item['Date'], "%d-%m-%Y") for item in response.data if item['Date'] is not None]
print(dates)
# dates = []
# for item in response.data:
# if item['Date']:
# date = datetime.fromisoformat(item['Date'].replace('Z', '+00:00'))
# dates.append(date.strftime("%Y-%m-%d")) # Format as YYYY-MM-DD
date_counts = Counter(dates)
return {"frequency_of_purchases": dict(date_counts)}
@app.get("/average-transaction-value")
async def get_average_transaction_value(hush_id: str = Query(..., description="User's hush ID")):
response = supabase.table(table).select("total_cost").eq("user_id", hush_id).execute()
total_amount = sum(float(item['total_cost']) for item in response.data)
avg_transaction = total_amount / len(response.data) if response.data else 0
return {"average_transaction_value": avg_transaction}
@app.get("/most-purchased-items")
async def get_most_purchased_items(hush_id: str = Query(..., description="User's hush ID"), limit: int = 5):
response = supabase.table(table).select("brand").eq("user_id", hush_id).execute()
item_counts = Counter(item['brand'] for item in response.data)
return {"most_purchased_items": dict(item_counts.most_common(limit))}
@app.get("/purchase-categories")
async def get_purchase_categories(hush_id: str = Query(..., description="User's hush ID")):
response = supabase.table(table).select("purchase_category").eq("user_id", hush_id).execute()
category_counts = Counter(item['purchase_category'] for item in response.data)
return {"purchase_categories": dict(category_counts)}
@app.get("/categories-with-highest-spending")
async def get_categories_with_highest_spending(hush_id: str = Query(..., description="User's hush ID"), limit: int = 5):
response = supabase.table(table).select("purchase_category,total_cost").eq("user_id", hush_id).execute()
category_spending = {}
for item in response.data:
category = item['purchase_category']
cost = float(item['total_cost'])
category_spending[category] = category_spending.get(category, 0) + cost
sorted_categories = sorted(category_spending.items(), key=lambda x: x[1], reverse=True)
return {"categories_with_highest_spending": dict(sorted_categories[:limit])}
@app.get("/brand-preferences")
async def get_brand_preferences(hush_id: str = Query(..., description="User's hush ID"), limit: int = 5):
response = supabase.table(table).select("brand_category,total_cost").eq("user_id", hush_id).execute()
brand_spending = {}
for item in response.data:
brand = item['brand_category']
cost = float(item['total_cost'])
brand_spending[brand] = brand_spending.get(brand, 0) + cost
sorted_brands = sorted(brand_spending.items(), key=lambda x: x[1], reverse=True)
return {"brand_preferences": dict(sorted_brands[:limit])} |