Spaces:
Running
Running
import os | |
import json | |
import random | |
import string | |
import sqlite3 | |
from datetime import datetime, timezone | |
from fastapi import Query | |
from typing import List, Dict, Any | |
from fastapi import FastAPI | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.responses import FileResponse | |
from pydantic import BaseModel | |
from apscheduler.schedulers.asyncio import AsyncIOScheduler | |
from huggingface_hub import AsyncInferenceClient | |
app = FastAPI() | |
# Configuration | |
models = [ | |
"meta-llama/Llama-3.1-8B-Instruct", | |
"meta-llama/Llama-3.1-70B-Instruct", | |
"meta-llama/Meta-Llama-3-8B-Instruct", | |
"meta-llama/Meta-Llama-3-70B-Instruct", | |
"meta-llama/Llama-Guard-3-8B", | |
"meta-llama/Llama-2-7b-chat-hf", | |
"meta-llama/Llama-2-13b-chat-hf", | |
"deepseek-ai/DeepSeek-Coder-V2-Instruct", | |
"mistralai/Mistral-7B-Instruct-v0.3", | |
"mistralai/Mixtral-8x7B-Instruct-v0.1", | |
] | |
LOG_FILE = "/data/api_logs.json" | |
DB_FILE = "/data/api_logs.db" | |
client = AsyncInferenceClient(token=os.environ["HF_INFERENCE_API_TOKEN"]) | |
# Ensure log file exists | |
if not os.path.exists(LOG_FILE): | |
with open(LOG_FILE, "w") as f: | |
json.dump([], f) | |
# Initialize SQLite database | |
def init_db(): | |
conn = sqlite3.connect(DB_FILE) | |
cursor = conn.cursor() | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS api_logs ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
model TEXT, | |
success BOOLEAN, | |
timestamp TEXT, | |
failure_message TEXT, | |
response_data TEXT | |
) | |
''') | |
conn.commit() | |
conn.close() | |
init_db() | |
class LogEntry(BaseModel): | |
model: str | |
success: bool | |
timestamp: str | |
failure_message: str | |
response_data: Dict[str, Any] = None | |
def random_string(length=10): | |
characters = string.ascii_letters + string.digits | |
return ''.join(random.choice(characters) for _ in range(length)) | |
def log_to_sqlite(entry: LogEntry): | |
conn = sqlite3.connect(DB_FILE) | |
cursor = conn.cursor() | |
cursor.execute(''' | |
INSERT INTO api_logs (model, success, timestamp, failure_message, response_data) | |
VALUES (?, ?, ?, ?, ?) | |
''', ( | |
entry.model, | |
entry.success, | |
entry.timestamp, | |
entry.failure_message, | |
json.dumps(entry.response_data) if entry.response_data else None | |
)) | |
conn.commit() | |
conn.close() | |
async def check_apis(): | |
results = [] | |
for model in models: | |
try: | |
response = await client.chat_completion( | |
messages=[{"role": "user", "content": f"{random_string()}\nWhat is the capital of France?"}], | |
max_tokens=10, | |
) | |
success = True | |
response_data = response | |
e = 'success' | |
except Exception as e: | |
print(e) | |
success = False | |
response_data = None | |
log_entry = LogEntry( | |
model=model, | |
success=success, | |
timestamp=datetime.now(timezone.utc).isoformat(), | |
failure_message=str(e) if not success else "", | |
response_data=dict(response_data) | |
) | |
results.append(log_entry) | |
log_to_sqlite(log_entry) | |
with open(LOG_FILE, "r+") as f: | |
logs = json.load(f) | |
logs.extend([result.dict() for result in results]) | |
f.seek(0) | |
f.truncate() | |
json.dump(logs, f, indent=2) | |
async def start_scheduler(): | |
scheduler = AsyncIOScheduler() | |
scheduler.add_job(check_apis, 'interval', minutes=10) | |
scheduler.start() | |
async def get_models(): | |
return models | |
async def index(): | |
return FileResponse("static/index.html") | |
async def get_logs( | |
model: str = Query(None, description="Filter by model name"), | |
start: str = Query(None, description="Start time for filtering (ISO format)"), | |
end: str = Query(None, description="End time for filtering (ISO format)") | |
): | |
conn = sqlite3.connect(DB_FILE) | |
cursor = conn.cursor() | |
query = "SELECT * FROM api_logs" | |
params = [] | |
if any([model, start, end]): | |
query += " WHERE" | |
if model: | |
query += " model = ?" | |
params.append(model) | |
if start: | |
if not query.endswith("WHERE"): | |
query += " AND" | |
query += " timestamp >= ?" | |
params.append(start) | |
if end: | |
if not query.endswith("WHERE"): | |
query += " AND" | |
query += " timestamp <= ?" | |
params.append(end) | |
query += " ORDER BY timestamp DESC LIMIT 500" | |
print(query, params) | |
cursor.execute(query, params) | |
logs = cursor.fetchall() | |
conn.close() | |
return [LogEntry( | |
model=log[1], | |
success=log[2], | |
timestamp=log[3], | |
failure_message=log[4], | |
response_data=json.loads(log[5]) if log[5] else None | |
) for log in logs] | |
async def get_db_logs(): | |
conn = sqlite3.connect(DB_FILE) | |
cursor = conn.cursor() | |
cursor.execute("SELECT * FROM api_logs ORDER BY timestamp DESC LIMIT 100") | |
logs = cursor.fetchall() | |
conn.close() | |
return [{"id": log[0], "model": log[1], "success": log[2], "timestamp": log[3], "failure_message": log[4], "response_data": json.loads(log[5]) if log[5] else None} for log in logs] | |
async def get_chart_data(): | |
conn = sqlite3.connect(DB_FILE) | |
cursor = conn.cursor() | |
cursor.execute("SELECT model, success, timestamp FROM api_logs ORDER BY timestamp") | |
logs = cursor.fetchall() | |
conn.close() | |
chart_data = {} | |
for log in logs: | |
model, success, timestamp = log | |
if model not in chart_data: | |
chart_data[model] = { | |
'success': {'x': [], 'y': []}, | |
'failure': {'x': [], 'y': []} | |
} | |
status = 'success' if success else 'failure' | |
chart_data[model][status]['x'].append(timestamp) | |
chart_data[model][status]['y'].append(1) | |
return chart_data | |
# Mount the static files directory | |
app.mount("/static", StaticFiles(directory="static"), name="static") | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=7860) |