File size: 4,939 Bytes
15e3c34
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import os
from dotenv import load_dotenv

from collections.abc import AsyncIterator
from contextlib import asynccontextmanager

from fastapi import FastAPI
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.coder import PickleCoder
from fastapi_cache.decorator import cache

from redis import asyncio as aioredis

from pydantic import BaseModel
from typing import Tuple, Dict, Union

from imblearn.pipeline import Pipeline as imbPipeline
from sklearn.preprocessing._label import LabelEncoder
import joblib
import pandas as pd
from urllib.request import urlopen


from src.api.config import ONE_DAY_SEC, ONE_WEEK_SEC, XGBOOST_URL, RANDOM_FOREST_URL, ENCODER_URL, ENV_PATH

load_dotenv(ENV_PATH)


@asynccontextmanager
async def lifespan(_: FastAPI) -> AsyncIterator[None]:
    url = os.getenv("REDIS_URL")
    username = os.getenv("REDIS_USERNAME")
    password = os.getenv("REDIS_PASSWORD")
    redis = aioredis.from_url(url=url, username=username,
                              password=password, encoding="utf8", decode_responses=True)
    FastAPICache.init(RedisBackend(redis), prefix="fastapi-cache")
    yield


# FastAPI Object
app = FastAPI(
    title='Sepsis classification',
    version='0.0.1',
    description='Identify ICU patients at risk of developing sepsis',
    lifespan=lifespan,
)


# API input features
class SepsisFeatures(BaseModel):
    PRG: float
    PL: float
    PR: float
    SK: float
    TS: float
    M11: float
    BD2: float
    Age: float
    Insurance: float


class Url(BaseModel):
    pipeline_url: str
    encoder_url: str


class ResultData(BaseModel):
    prediction: str
    probability: float


class PredictionResponse(BaseModel):
    execution_msg: str
    execution_code: int
    result: ResultData


class ErrorResponse(BaseModel):
    execution_msg: Union[str, None]
    execution_code: Union[int, None]
    result: Union[Dict[str, Union[str, int]], Union[Dict[str, None], None]]


# Load the model pipelines and encoder
# Cache for 1 day
@cache(expire=ONE_DAY_SEC, namespace='pipeline_resource', coder=PickleCoder)
async def load_pipeline(pipeline_url: Url, encoder_url: Url) -> Tuple[imbPipeline, LabelEncoder]:
    pipeline, encoder = None, None
    try:
        pipeline: imbPipeline = joblib.load(urlopen(pipeline_url))
        encoder: LabelEncoder = joblib.load(urlopen(encoder_url))
    except Exception:
        # Log exception
        pass
    finally:
        return pipeline, encoder


# Endpoints

# Status endpoint: check if api is online
@app.get('/')
@cache(expire=ONE_WEEK_SEC, namespace='status_check')  # Cache for 1 week
async def status_check():
    return {"Status": "API is online..."}


@cache(expire=ONE_DAY_SEC, namespace='pipeline_classifier')  # Cache for 1 day
async def pipeline_classifier(pipeline: imbPipeline, encoder: LabelEncoder, data: SepsisFeatures) -> ErrorResponse | PredictionResponse:
    output = ErrorResponse(**{'execution_msg': None,
                              'execution_code': None, 'result': None})
    try:
        # Create dataframe
        df = pd.DataFrame([data.model_dump()])

        # Make prediction
        prediction = pipeline.predict(df)

        pred_int = int(prediction[0])

        prediction = encoder.inverse_transform([pred_int])[0]

        # Get the probability of the predicted class
        probability = round(
            float(pipeline.predict_proba(df)[0][pred_int] * 100), 2)

        msg = 'Execution was successful'
        code = 1
        result = {"prediction": prediction, "probability": probability}

        output = PredictionResponse(
            **{'execution_msg': msg,
               'execution_code': code, 'result': result}
        )

    except Exception as e:
        msg = 'Execution failed'
        code = 0
        result = {'error': f"Omg, pipeline classsifier failure{e}"}
        output = ErrorResponse(**{'execution_msg': msg,
                                  'execution_code': code, 'result': result})

    finally:
        return output


# Xgboost endpoint: classify sepsis with xgboost
@app.post('/xgboost_prediction')
async def xgboost_classifier(data: SepsisFeatures) -> ErrorResponse | PredictionResponse:
    xgboost_pipeline, encoder = await load_pipeline(XGBOOST_URL, ENCODER_URL)
    output = await pipeline_classifier(xgboost_pipeline, encoder, data)
    return output


# Random forest endpoint: classify sepsis with random forest
@app.post('/random_forest_prediction')
async def random_forest_classifier(data: SepsisFeatures) -> ErrorResponse | PredictionResponse:
    random_forest_pipeline, encoder = await load_pipeline(RANDOM_FOREST_URL, ENCODER_URL)
    output = await pipeline_classifier(random_forest_pipeline, encoder, data)
    return output