Spaces:
Sleeping
Sleeping
File size: 6,449 Bytes
127130c 9a4ad03 127130c 9a4ad03 dcc364c 127130c 9a4ad03 127130c 684641d 326bc01 9a4ad03 127130c 9a4ad03 127130c 9a4ad03 7ea0738 9a4ad03 127130c 9a4ad03 6d96105 7ea0738 9a4ad03 7ea0738 9a4ad03 6d96105 326bc01 127130c dcc364c 9a4ad03 dcc364c 7ea0738 9a4ad03 dcc364c 9a4ad03 dcc364c 9a4ad03 dcc364c 9a4ad03 326bc01 dcc364c 326bc01 dcc364c 9a4ad03 326bc01 dcc364c 4d2771f dcc364c 9a4ad03 dcc364c 127130c 9a4ad03 127130c 9a4ad03 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
"""Server that will listen for GET and POST requests from the client."""
import time
import logging
from pathlib import Path
from typing import List
from fastapi import FastAPI, File, Form, UploadFile, HTTPException
from fastapi.responses import JSONResponse, Response
from fastapi.middleware.cors import CORSMiddleware
from concrete.ml.deployment import FHEModelServer
import gc
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI()
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allows all origins
allow_credentials=True,
allow_methods=["*"], # Allows all methods
allow_headers=["*"], # Allows all headers
)
# Initialize the FHE server
DEPLOYMENT_DIR = Path(__file__).parent / "deployment_files"
FHE_SERVER = FHEModelServer(DEPLOYMENT_DIR)
def get_server_file_path(file_type: str, user_id: str) -> Path:
"""Get the path to a file on the server."""
if file_type == "encrypted_image":
file_type = "encrypted"
elif file_type == "evaluation_key":
file_type = "evaluation"
return Path(__file__).parent / "server_tmp" / f"{file_type}_{user_id}"
@app.post("/send_input")
async def send_input(user_id: str = Form(), files: List[UploadFile] = File(...)):
"""Receive the encrypted input image and the evaluation key from the client."""
try:
for file in files:
file_path = get_server_file_path(file.filename.split("_")[0], user_id)
logger.info(f"Saving file to: {file_path}")
file_path.parent.mkdir(parents=True, exist_ok=True) # Ensure the directory exists
with file_path.open("wb") as buffer:
content = await file.read()
buffer.write(content)
# Check if the file was saved successfully
if not file_path.exists():
raise IOError(f"Failed to save file at {file_path}")
else:
logger.info(f"File saved successfully at {file_path}")
return JSONResponse(content={"message": "Files received successfully"})
except Exception as e:
logger.error(f"Error in send_input: {str(e)}")
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
def get_memory_usage():
"""Get current memory usage in GB"""
with open('/proc/self/status') as f:
memusage = f.read().split('VmRSS:')[1].split('\n')[0][:-3]
return int(memusage.strip()) / 1024 / 1024 # Convert KB to GB
@app.post("/run_fhe")
def run_fhe(user_id: str = Form()):
"""Execute seizure detection on the encrypted input image using FHE."""
logger.info(f"Starting FHE execution for user {user_id}")
try:
# Retrieve the encrypted input image and the evaluation key paths
encrypted_image_path = get_server_file_path("encrypted", user_id)
evaluation_key_path = get_server_file_path("evaluation", user_id)
logger.info(f"Looking for encrypted_image at: {encrypted_image_path}")
logger.info(f"Looking for evaluation_key at: {evaluation_key_path}")
# Check if files exist
if not encrypted_image_path.exists():
raise FileNotFoundError(f"Encrypted image file not found at {encrypted_image_path}")
if not evaluation_key_path.exists():
raise FileNotFoundError(f"Evaluation key file not found at {evaluation_key_path}")
# Read the files using the above paths
with encrypted_image_path.open("rb") as encrypted_image_file, evaluation_key_path.open("rb") as evaluation_key_file:
encrypted_image = encrypted_image_file.read()
evaluation_key = evaluation_key_file.read()
memory_before = get_memory_usage()
logger.info(f"Memory usage before FHE execution: {memory_before:.2f} GB")
# Force garbage collection before FHE execution
gc.collect()
# Run the FHE execution
start = time.time()
try:
encrypted_output = FHE_SERVER.run(encrypted_image, evaluation_key)
except MemoryError:
logger.error("FHE execution failed due to insufficient memory")
raise HTTPException(status_code=503, detail="Insufficient memory during FHE execution")
except Exception as e:
logger.error(f"FHE execution failed: {str(e)}")
raise HTTPException(status_code=500, detail="FHE execution failed")
finally:
# Force garbage collection after FHE execution
gc.collect()
fhe_execution_time = round(time.time() - start, 2)
# Check memory usage after FHE execution
memory_after = get_memory_usage()
logger.info(f"Memory usage after FHE execution: {memory_after:.2f} GB")
logger.info(f"Memory increase during FHE execution: {memory_after - memory_before:.2f} GB")
# Retrieve the encrypted output path
encrypted_output_path = get_server_file_path("encrypted_output", user_id)
# Write the file using the above path
with encrypted_output_path.open("wb") as encrypted_output_file:
encrypted_output_file.write(encrypted_output)
logger.info(f"FHE execution completed for user {user_id} in {fhe_execution_time} seconds")
return JSONResponse(content=fhe_execution_time)
except Exception as e:
logger.error(f"Error in run_fhe for user {user_id}: {str(e)}")
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@app.post("/get_output")
def get_output(user_id: str = Form()):
"""Retrieve the encrypted output."""
try:
# Retrieve the encrypted output path
encrypted_output_path = get_server_file_path("encrypted_output", user_id)
# Check if file exists
if not encrypted_output_path.exists():
raise FileNotFoundError("Encrypted output file not found")
# Read the file using the above path
with encrypted_output_path.open("rb") as encrypted_output_file:
encrypted_output = encrypted_output_file.read()
return Response(encrypted_output)
except Exception as e:
logger.error(f"Error in get_output for user {user_id}: {str(e)}")
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000) |