HF-API-monitor / app.py
nbroad's picture
nbroad HF staff
fix sql query
21d006e verified
raw
history blame
6.23 kB
import os
import json
import random
import string
import sqlite3
from datetime import datetime, timezone
from fastapi import Query
from typing import List, Dict, Any
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from pydantic import BaseModel
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from huggingface_hub import AsyncInferenceClient
app = FastAPI()
# Configuration
models = [
"meta-llama/Llama-3.1-8B-Instruct",
"meta-llama/Llama-3.1-70B-Instruct",
"meta-llama/Meta-Llama-3-8B-Instruct",
"meta-llama/Meta-Llama-3-70B-Instruct",
"meta-llama/Llama-Guard-3-8B",
"meta-llama/Llama-2-7b-chat-hf",
"meta-llama/Llama-2-13b-chat-hf",
"deepseek-ai/DeepSeek-Coder-V2-Instruct",
"mistralai/Mistral-7B-Instruct-v0.3",
"mistralai/Mixtral-8x7B-Instruct-v0.1",
]
LOG_FILE = "/data/api_logs.json"
DB_FILE = "/data/api_logs.db"
client = AsyncInferenceClient(token=os.environ["HF_INFERENCE_API_TOKEN"])
# Ensure log file exists
if not os.path.exists(LOG_FILE):
with open(LOG_FILE, "w") as f:
json.dump([], f)
# Initialize SQLite database
def init_db():
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS api_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
model TEXT,
success BOOLEAN,
timestamp TEXT,
failure_message TEXT,
response_data TEXT
)
''')
conn.commit()
conn.close()
init_db()
class LogEntry(BaseModel):
model: str
success: bool
timestamp: str
failure_message: str
response_data: Dict[str, Any] = None
def random_string(length=10):
characters = string.ascii_letters + string.digits
return ''.join(random.choice(characters) for _ in range(length))
def log_to_sqlite(entry: LogEntry):
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO api_logs (model, success, timestamp, failure_message, response_data)
VALUES (?, ?, ?, ?, ?)
''', (
entry.model,
entry.success,
entry.timestamp,
entry.failure_message,
json.dumps(entry.response_data) if entry.response_data else None
))
conn.commit()
conn.close()
async def check_apis():
results = []
for model in models:
try:
response = await client.chat_completion(
messages=[{"role": "user", "content": f"{random_string()}\nWhat is the capital of France?"}],
max_tokens=10,
)
success = True
response_data = response
e = 'success'
except Exception as e:
print(e)
success = False
response_data = None
log_entry = LogEntry(
model=model,
success=success,
timestamp=datetime.now(timezone.utc).isoformat(),
failure_message=str(e) if not success else "",
response_data=dict(response_data)
)
results.append(log_entry)
log_to_sqlite(log_entry)
with open(LOG_FILE, "r+") as f:
logs = json.load(f)
logs.extend([result.dict() for result in results])
f.seek(0)
f.truncate()
json.dump(logs, f, indent=2)
@app.on_event("startup")
async def start_scheduler():
scheduler = AsyncIOScheduler()
scheduler.add_job(check_apis, 'interval', minutes=10)
scheduler.start()
@app.get("/api/models")
async def get_models():
return models
@app.get("/")
async def index():
return FileResponse("static/index.html")
@app.get("/api/logs", response_model=List[LogEntry])
async def get_logs(
model: str = Query(None, description="Filter by model name"),
start: str = Query(None, description="Start time for filtering (ISO format)"),
end: str = Query(None, description="End time for filtering (ISO format)")
):
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
query = "SELECT * FROM api_logs"
params = []
if any([model, start, end]):
query += " WHERE"
if model:
query += " model = ?"
params.append(model)
if start:
if not query.endswith("WHERE"):
query += " AND"
query += " timestamp >= ?"
params.append(start)
if end:
if not query.endswith("WHERE"):
query += " AND"
query += " timestamp <= ?"
params.append(end)
query += " ORDER BY timestamp DESC LIMIT 500"
print(query, params)
cursor.execute(query, params)
logs = cursor.fetchall()
conn.close()
return [LogEntry(
model=log[1],
success=log[2],
timestamp=log[3],
failure_message=log[4],
response_data=json.loads(log[5]) if log[5] else None
) for log in logs]
@app.get("/api/db-logs")
async def get_db_logs():
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute("SELECT * FROM api_logs ORDER BY timestamp DESC LIMIT 100")
logs = cursor.fetchall()
conn.close()
return [{"id": log[0], "model": log[1], "success": log[2], "timestamp": log[3], "failure_message": log[4], "response_data": json.loads(log[5]) if log[5] else None} for log in logs]
@app.get("/api/chart-data", response_model=Dict[str, Dict[str, Dict[str, List]]])
async def get_chart_data():
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute("SELECT model, success, timestamp FROM api_logs ORDER BY timestamp")
logs = cursor.fetchall()
conn.close()
chart_data = {}
for log in logs:
model, success, timestamp = log
if model not in chart_data:
chart_data[model] = {
'success': {'x': [], 'y': []},
'failure': {'x': [], 'y': []}
}
status = 'success' if success else 'failure'
chart_data[model][status]['x'].append(timestamp)
chart_data[model][status]['y'].append(1)
return chart_data
# Mount the static files directory
app.mount("/static", StaticFiles(directory="static"), name="static")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)