Spaces:
Sleeping
Sleeping
File size: 4,939 Bytes
15e3c34 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
import os
from dotenv import load_dotenv
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.coder import PickleCoder
from fastapi_cache.decorator import cache
from redis import asyncio as aioredis
from pydantic import BaseModel
from typing import Tuple, Dict, Union
from imblearn.pipeline import Pipeline as imbPipeline
from sklearn.preprocessing._label import LabelEncoder
import joblib
import pandas as pd
from urllib.request import urlopen
from src.api.config import ONE_DAY_SEC, ONE_WEEK_SEC, XGBOOST_URL, RANDOM_FOREST_URL, ENCODER_URL, ENV_PATH
load_dotenv(ENV_PATH)
@asynccontextmanager
async def lifespan(_: FastAPI) -> AsyncIterator[None]:
url = os.getenv("REDIS_URL")
username = os.getenv("REDIS_USERNAME")
password = os.getenv("REDIS_PASSWORD")
redis = aioredis.from_url(url=url, username=username,
password=password, encoding="utf8", decode_responses=True)
FastAPICache.init(RedisBackend(redis), prefix="fastapi-cache")
yield
# FastAPI Object
app = FastAPI(
title='Sepsis classification',
version='0.0.1',
description='Identify ICU patients at risk of developing sepsis',
lifespan=lifespan,
)
# API input features
class SepsisFeatures(BaseModel):
PRG: float
PL: float
PR: float
SK: float
TS: float
M11: float
BD2: float
Age: float
Insurance: float
class Url(BaseModel):
pipeline_url: str
encoder_url: str
class ResultData(BaseModel):
prediction: str
probability: float
class PredictionResponse(BaseModel):
execution_msg: str
execution_code: int
result: ResultData
class ErrorResponse(BaseModel):
execution_msg: Union[str, None]
execution_code: Union[int, None]
result: Union[Dict[str, Union[str, int]], Union[Dict[str, None], None]]
# Load the model pipelines and encoder
# Cache for 1 day
@cache(expire=ONE_DAY_SEC, namespace='pipeline_resource', coder=PickleCoder)
async def load_pipeline(pipeline_url: Url, encoder_url: Url) -> Tuple[imbPipeline, LabelEncoder]:
pipeline, encoder = None, None
try:
pipeline: imbPipeline = joblib.load(urlopen(pipeline_url))
encoder: LabelEncoder = joblib.load(urlopen(encoder_url))
except Exception:
# Log exception
pass
finally:
return pipeline, encoder
# Endpoints
# Status endpoint: check if api is online
@app.get('/')
@cache(expire=ONE_WEEK_SEC, namespace='status_check') # Cache for 1 week
async def status_check():
return {"Status": "API is online..."}
@cache(expire=ONE_DAY_SEC, namespace='pipeline_classifier') # Cache for 1 day
async def pipeline_classifier(pipeline: imbPipeline, encoder: LabelEncoder, data: SepsisFeatures) -> ErrorResponse | PredictionResponse:
output = ErrorResponse(**{'execution_msg': None,
'execution_code': None, 'result': None})
try:
# Create dataframe
df = pd.DataFrame([data.model_dump()])
# Make prediction
prediction = pipeline.predict(df)
pred_int = int(prediction[0])
prediction = encoder.inverse_transform([pred_int])[0]
# Get the probability of the predicted class
probability = round(
float(pipeline.predict_proba(df)[0][pred_int] * 100), 2)
msg = 'Execution was successful'
code = 1
result = {"prediction": prediction, "probability": probability}
output = PredictionResponse(
**{'execution_msg': msg,
'execution_code': code, 'result': result}
)
except Exception as e:
msg = 'Execution failed'
code = 0
result = {'error': f"Omg, pipeline classsifier failure{e}"}
output = ErrorResponse(**{'execution_msg': msg,
'execution_code': code, 'result': result})
finally:
return output
# Xgboost endpoint: classify sepsis with xgboost
@app.post('/xgboost_prediction')
async def xgboost_classifier(data: SepsisFeatures) -> ErrorResponse | PredictionResponse:
xgboost_pipeline, encoder = await load_pipeline(XGBOOST_URL, ENCODER_URL)
output = await pipeline_classifier(xgboost_pipeline, encoder, data)
return output
# Random forest endpoint: classify sepsis with random forest
@app.post('/random_forest_prediction')
async def random_forest_classifier(data: SepsisFeatures) -> ErrorResponse | PredictionResponse:
random_forest_pipeline, encoder = await load_pipeline(RANDOM_FOREST_URL, ENCODER_URL)
output = await pipeline_classifier(random_forest_pipeline, encoder, data)
return output
|