File size: 5,946 Bytes
2fee4ac
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a712034
 
 
5175b84
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fc54a77
5175b84
 
 
 
 
 
 
 
2fee4ac
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
from fastapi import FastAPI, HTTPException, Query
from supabase import create_client, Client
from typing import List
import uvicorn
from datetime import datetime, timedelta
from collections import Counter
from dotenv import load_dotenv
import os

load_dotenv()
app = FastAPI()

# Initialize Supabase client
url: str = os.getenv("SUPABASE_URL")
key: str = os.getenv("SUPABASE_KEY")
supabase: Client = create_client(url, key)

table = "receipt_radar_structured_data_duplicate"

@app.get("/")
def test():
    return {"message":"Application is working !!"}

@app.get("/test")
def test1():
    return {"Api response !!"}

@app.get("/user-analytics")
async def get_user_analytics(
    hush_id: str = Query(..., description="User's hush ID"),
    limit: int = 5
):
    # Fetch all necessary data in one query
    response = supabase.table(table).select("*").eq("user_id", hush_id).execute()
    
    # Process the data for different analytics
    total_amount = sum(float(item['total_cost']) for item in response.data if item['total_cost'] is not None)
    
    dates = [datetime.strptime(item['Date'], "%d-%m-%Y") for item in response.data if item['Date'] is not None]
    date_counts = Counter(dates)
    
    avg_transaction = total_amount / len(response.data) if response.data else 0
    
    item_counts = Counter(item['brand'] for item in response.data)
    
    category_counts = Counter(item['purchase_category'] for item in response.data)
    
    category_spending = {}
    brand_spending = {}
    for item in response.data:
        category = item['purchase_category']
        brand = item['brand_category']
        cost = float(item['total_cost'])
        category_spending[category] = category_spending.get(category, 0) + cost
        brand_spending[brand] = brand_spending.get(brand, 0) + cost
    
    sorted_categories = sorted(category_spending.items(), key=lambda x: x[1], reverse=True)
    sorted_brands = sorted(brand_spending.items(), key=lambda x: x[1], reverse=True)

    return {
        "total_amount_spent": total_amount,
        # "frequency_of_purchases": dict(date_counts),
        "average_transaction_value": avg_transaction,
        "most_purchased_items": dict(item_counts.most_common(limit)),
        "purchase_categories": dict(category_counts),
        "categories_with_highest_spending": dict(sorted_categories[:limit]),
        "brand_preferences": dict(sorted_brands[:limit])
    }


@app.get("/total-amount-spent")
async def get_total_amount_spent(hush_id: str = Query(..., description="User's hush ID")):
    response = supabase.table(table).select("total_cost").eq("user_id", hush_id).execute()
    print(response)
    total_amount = sum(float(item['total_cost']) for item in response.data if item['Total_cost'] is not None)
    return {"total_amount_spent": total_amount}

@app.get("/frequency-of-purchases")
async def get_frequency_of_purchases(hush_id: str = Query(..., description="User's hush ID")):
    response = supabase.table(table).select("Date").eq("user_id", hush_id).execute()
    print(response)
    dates = [datetime.strptime(item['Date'], "%d-%m-%Y") for item in response.data if item['Date'] is not None]
    print(dates)
    # dates = []
    # for item in response.data:
    #     if item['Date']:
    #         date = datetime.fromisoformat(item['Date'].replace('Z', '+00:00'))
    #         dates.append(date.strftime("%Y-%m-%d"))  # Format as YYYY-MM-DD
    date_counts = Counter(dates)
    return {"frequency_of_purchases": dict(date_counts)}

@app.get("/average-transaction-value")
async def get_average_transaction_value(hush_id: str = Query(..., description="User's hush ID")):
    response = supabase.table(table).select("total_cost").eq("user_id", hush_id).execute()
    total_amount = sum(float(item['total_cost']) for item in response.data)
    avg_transaction = total_amount / len(response.data) if response.data else 0
    return {"average_transaction_value": avg_transaction}

@app.get("/most-purchased-items")
async def get_most_purchased_items(hush_id: str = Query(..., description="User's hush ID"), limit: int = 5):
    response = supabase.table(table).select("brand").eq("user_id", hush_id).execute()
    item_counts = Counter(item['brand'] for item in response.data)
    return {"most_purchased_items": dict(item_counts.most_common(limit))}

@app.get("/purchase-categories")
async def get_purchase_categories(hush_id: str = Query(..., description="User's hush ID")):
    response = supabase.table(table).select("purchase_category").eq("user_id", hush_id).execute()
    category_counts = Counter(item['purchase_category'] for item in response.data)
    return {"purchase_categories": dict(category_counts)}

@app.get("/categories-with-highest-spending")
async def get_categories_with_highest_spending(hush_id: str = Query(..., description="User's hush ID"), limit: int = 5):
    response = supabase.table(table).select("purchase_category,total_cost").eq("user_id", hush_id).execute()
    category_spending = {}
    for item in response.data:
        category = item['purchase_category']
        cost = float(item['total_cost'])
        category_spending[category] = category_spending.get(category, 0) + cost
    sorted_categories = sorted(category_spending.items(), key=lambda x: x[1], reverse=True)
    return {"categories_with_highest_spending": dict(sorted_categories[:limit])}

@app.get("/brand-preferences")
async def get_brand_preferences(hush_id: str = Query(..., description="User's hush ID"), limit: int = 5):
    response = supabase.table(table).select("brand_category,total_cost").eq("user_id", hush_id).execute()
    brand_spending = {}
    for item in response.data:
        brand = item['brand_category']
        cost = float(item['total_cost'])
        brand_spending[brand] = brand_spending.get(brand, 0) + cost
    sorted_brands = sorted(brand_spending.items(), key=lambda x: x[1], reverse=True)
    return {"brand_preferences": dict(sorted_brands[:limit])}