gabcares's picture
Add src folder
15e3c34 verified
import os
from dotenv import load_dotenv
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.coder import PickleCoder
from fastapi_cache.decorator import cache
from redis import asyncio as aioredis
from pydantic import BaseModel
from typing import Tuple, Dict, Union
from imblearn.pipeline import Pipeline as imbPipeline
from sklearn.preprocessing._label import LabelEncoder
import joblib
import pandas as pd
from urllib.request import urlopen
from src.api.config import ONE_DAY_SEC, ONE_WEEK_SEC, XGBOOST_URL, RANDOM_FOREST_URL, ENCODER_URL, ENV_PATH
load_dotenv(ENV_PATH)
@asynccontextmanager
async def lifespan(_: FastAPI) -> AsyncIterator[None]:
url = os.getenv("REDIS_URL")
username = os.getenv("REDIS_USERNAME")
password = os.getenv("REDIS_PASSWORD")
redis = aioredis.from_url(url=url, username=username,
password=password, encoding="utf8", decode_responses=True)
FastAPICache.init(RedisBackend(redis), prefix="fastapi-cache")
yield
# FastAPI Object
app = FastAPI(
title='Sepsis classification',
version='0.0.1',
description='Identify ICU patients at risk of developing sepsis',
lifespan=lifespan,
)
# API input features
class SepsisFeatures(BaseModel):
PRG: float
PL: float
PR: float
SK: float
TS: float
M11: float
BD2: float
Age: float
Insurance: float
class Url(BaseModel):
pipeline_url: str
encoder_url: str
class ResultData(BaseModel):
prediction: str
probability: float
class PredictionResponse(BaseModel):
execution_msg: str
execution_code: int
result: ResultData
class ErrorResponse(BaseModel):
execution_msg: Union[str, None]
execution_code: Union[int, None]
result: Union[Dict[str, Union[str, int]], Union[Dict[str, None], None]]
# Load the model pipelines and encoder
# Cache for 1 day
@cache(expire=ONE_DAY_SEC, namespace='pipeline_resource', coder=PickleCoder)
async def load_pipeline(pipeline_url: Url, encoder_url: Url) -> Tuple[imbPipeline, LabelEncoder]:
pipeline, encoder = None, None
try:
pipeline: imbPipeline = joblib.load(urlopen(pipeline_url))
encoder: LabelEncoder = joblib.load(urlopen(encoder_url))
except Exception:
# Log exception
pass
finally:
return pipeline, encoder
# Endpoints
# Status endpoint: check if api is online
@app.get('/')
@cache(expire=ONE_WEEK_SEC, namespace='status_check') # Cache for 1 week
async def status_check():
return {"Status": "API is online..."}
@cache(expire=ONE_DAY_SEC, namespace='pipeline_classifier') # Cache for 1 day
async def pipeline_classifier(pipeline: imbPipeline, encoder: LabelEncoder, data: SepsisFeatures) -> ErrorResponse | PredictionResponse:
output = ErrorResponse(**{'execution_msg': None,
'execution_code': None, 'result': None})
try:
# Create dataframe
df = pd.DataFrame([data.model_dump()])
# Make prediction
prediction = pipeline.predict(df)
pred_int = int(prediction[0])
prediction = encoder.inverse_transform([pred_int])[0]
# Get the probability of the predicted class
probability = round(
float(pipeline.predict_proba(df)[0][pred_int] * 100), 2)
msg = 'Execution was successful'
code = 1
result = {"prediction": prediction, "probability": probability}
output = PredictionResponse(
**{'execution_msg': msg,
'execution_code': code, 'result': result}
)
except Exception as e:
msg = 'Execution failed'
code = 0
result = {'error': f"Omg, pipeline classsifier failure{e}"}
output = ErrorResponse(**{'execution_msg': msg,
'execution_code': code, 'result': result})
finally:
return output
# Xgboost endpoint: classify sepsis with xgboost
@app.post('/xgboost_prediction')
async def xgboost_classifier(data: SepsisFeatures) -> ErrorResponse | PredictionResponse:
xgboost_pipeline, encoder = await load_pipeline(XGBOOST_URL, ENCODER_URL)
output = await pipeline_classifier(xgboost_pipeline, encoder, data)
return output
# Random forest endpoint: classify sepsis with random forest
@app.post('/random_forest_prediction')
async def random_forest_classifier(data: SepsisFeatures) -> ErrorResponse | PredictionResponse:
random_forest_pipeline, encoder = await load_pipeline(RANDOM_FOREST_URL, ENCODER_URL)
output = await pipeline_classifier(random_forest_pipeline, encoder, data)
return output