Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import gradio as gr | |
import time | |
import os | |
from pathlib import Path | |
import subprocess | |
from concrete.ml.deployment import FHEModelClient | |
from requests import head | |
import numpy | |
import os | |
from pathlib import Path | |
import requests | |
import json | |
import base64 | |
import subprocess | |
import shutil | |
import time | |
import pandas as pd | |
import pickle | |
import numpy as np | |
import pdb | |
# This repository's directory | |
REPO_DIR = Path(__file__).parent | |
subprocess.Popen(["uvicorn", "server:app"], cwd=REPO_DIR) | |
# subprocess.Popen(["uvicorn", "server:app", "--port", "3000"], cwd=REPO_DIR) | |
# if not exists, create a directory for the FHE keys called .fhe_keys | |
if not os.path.exists(".fhe_keys"): | |
os.mkdir(".fhe_keys") | |
# if not exists, create a directory for the tmp files called tmp | |
if not os.path.exists("tmp"): | |
os.mkdir("tmp") | |
# Wait 4 sec for the server to start | |
time.sleep(4) | |
# Encrypted data limit for the browser to display | |
# (encrypted data is too large to display in the browser) | |
ENCRYPTED_DATA_BROWSER_LIMIT = 500 | |
N_USER_KEY_STORED = 20 | |
def clean_tmp_directory(): | |
# Allow 20 user keys to be stored. | |
# Once that limitation is reached, deleted the oldest. | |
path_sub_directories = sorted( | |
[f for f in Path(".fhe_keys/").iterdir() if f.is_dir()], key=os.path.getmtime | |
) | |
user_ids = [] | |
if len(path_sub_directories) > N_USER_KEY_STORED: | |
n_files_to_delete = len(path_sub_directories) - N_USER_KEY_STORED | |
for p in path_sub_directories[:n_files_to_delete]: | |
user_ids.append(p.name) | |
shutil.rmtree(p) | |
list_files_tmp = Path("tmp/").iterdir() | |
# Delete all files related to user_id | |
for file in list_files_tmp: | |
for user_id in user_ids: | |
if file.name.endswith(f"{user_id}.npy"): | |
file.unlink() | |
def keygen(): | |
# Clean tmp directory if needed | |
clean_tmp_directory() | |
print("Initializing FHEModelClient...") | |
# Let's create a user_id | |
user_id = numpy.random.randint(0, 2**32) | |
fhe_api = FHEModelClient(f"fhe_model", f".fhe_keys/{user_id}") | |
fhe_api.load() | |
# Generate a fresh key | |
fhe_api.generate_private_and_evaluation_keys(force=True) | |
evaluation_key = fhe_api.get_serialized_evaluation_keys() | |
numpy.save(f"tmp/tmp_evaluation_key_{user_id}.npy", evaluation_key) | |
return [list(evaluation_key)[:ENCRYPTED_DATA_BROWSER_LIMIT], user_id] | |
def encode_quantize_encrypt(test_file, eval_key): | |
ugly = ['Machine', 'SizeOfOptionalHeader', 'Characteristics', | |
'MajorLinkerVersion', 'MinorLinkerVersion', 'SizeOfCode', | |
'SizeOfInitializedData', 'SizeOfUninitializedData', | |
'AddressOfEntryPoint', 'BaseOfCode', 'BaseOfData', 'ImageBase', | |
'SectionAlignment', 'FileAlignment', 'MajorOperatingSystemVersion', | |
'MinorOperatingSystemVersion', 'MajorImageVersion', 'MinorImageVersion', | |
'MajorSubsystemVersion', 'MinorSubsystemVersion', 'SizeOfImage', | |
'SizeOfHeaders', 'CheckSum', 'Subsystem', 'DllCharacteristics', | |
'SizeOfStackReserve', 'SizeOfStackCommit', 'SizeOfHeapReserve', | |
'SizeOfHeapCommit', 'LoaderFlags', 'NumberOfRvaAndSizes', 'SectionsNb', | |
'SectionsMeanEntropy', 'SectionsMinEntropy', 'SectionsMaxEntropy', | |
'SectionsMeanRawsize', 'SectionsMinRawsize', | |
'SectionsMeanVirtualsize', 'SectionsMinVirtualsize', | |
'SectionMaxVirtualsize', 'ImportsNbDLL', 'ImportsNb', | |
'ImportsNbOrdinal', 'ExportNb', 'ResourcesNb', 'ResourcesMeanEntropy', | |
'ResourcesMinEntropy', 'ResourcesMaxEntropy', 'ResourcesMeanSize', | |
'ResourcesMinSize', 'ResourcesMaxSize', 'LoadConfigurationSize', | |
'VersionInformationSize'] | |
fhe_api = FHEModelClient(f"fhe_model", f".fhe_keys/{eval_key}") | |
fhe_api.load() | |
from PE_main import extract_infos | |
# expect [1, 53] but we get (53) | |
# pdb.set_trace() | |
# features = pickle.loads(open(os.path.join("features.pkl"), "rb").read()) | |
encodings = extract_infos(test_file) | |
encodings = list(map(lambda x: encodings[x], ugly)) | |
encodings = np.array(encodings).reshape(1, -1) | |
quantized_encodings = fhe_api.model.quantize_input(encodings).astype(numpy.uint8) | |
encrypted_quantized_encoding = fhe_api.quantize_encrypt_serialize(encodings) | |
numpy.save( | |
f"tmp/tmp_encrypted_quantized_encoding_{eval_key[1]}.npy", | |
encrypted_quantized_encoding, | |
) | |
# Compute size | |
encrypted_quantized_encoding_shorten = list(encrypted_quantized_encoding)[:ENCRYPTED_DATA_BROWSER_LIMIT] | |
encrypted_quantized_encoding_shorten_hex = "".join(f"{i:02x}" for i in encrypted_quantized_encoding_shorten) | |
return (encodings[0],quantized_encodings[0],encrypted_quantized_encoding_shorten_hex) | |
def run_fhe(user_id): | |
encoded_data_path = Path(f"tmp/tmp_encrypted_quantized_encoding_{user_id}.npy") | |
encrypted_quantized_encoding = numpy.load(encoded_data_path) | |
# Read evaluation_key from the file | |
evaluation_key = numpy.load(f"tmp/tmp_evaluation_key_{user_id}.npy") | |
# Use base64 to encode the encodings and evaluation key | |
encrypted_quantized_encoding = base64.b64encode(encrypted_quantized_encoding).decode() | |
encoded_evaluation_key = base64.b64encode(evaluation_key).decode() | |
query = {} | |
query["evaluation_key"] = encoded_evaluation_key | |
query["encrypted_encoding"] = encrypted_quantized_encoding | |
headers = {"Content-type": "application/json"} | |
response = requests.post( | |
"http://localhost:3000/predict", | |
data=json.dumps(query), | |
headers=headers, | |
) | |
encrypted_prediction = base64.b64decode(response.json()["encrypted_prediction"]) | |
numpy.save(f"tmp/tmp_encrypted_prediction_{user_id}.npy", encrypted_prediction) | |
encrypted_prediction_shorten = list(encrypted_prediction)[:ENCRYPTED_DATA_BROWSER_LIMIT] | |
encrypted_prediction_shorten_hex = "".join(f"{i:02x}" for i in encrypted_prediction_shorten) | |
return encrypted_prediction_shorten_hex | |
def decrypt_prediction(user_id): | |
encoded_data_path = Path(f"tmp/tmp_encrypted_prediction_{user_id}.npy") | |
# Read encrypted_prediction from the file | |
encrypted_prediction = numpy.load(encoded_data_path).tobytes() | |
fhe_api = FHEModelClient(f"fhe_model", f".fhe_keys/{user_id}") | |
fhe_api.load() | |
# We need to retrieve the private key that matches the client specs (see issue #18) | |
fhe_api.generate_private_and_evaluation_keys(force=False) | |
predictions = fhe_api.deserialize_decrypt_dequantize(encrypted_prediction) | |
return predictions | |
def process_pipeline(test_file): | |
eval_key = keygen() | |
encodings = encode_quantize_encrypt(test_file, eval_key) | |
encrypted_quantized_encoding = run_fhe(eval_key[1]) | |
encrypted_prediction = decrypt_prediction(eval_key[1]) | |
return eval_key, encodings, encrypted_quantized_encoding, encrypted_prediction | |
if __name__ == "__main__": | |
with gr.Blocks() as demo: | |
print("Starting the FHE Model") | |
fn = (process_pipeline,) | |
inputs = ( | |
[ | |
gr.File(label="Test File"), | |
], | |
) | |
outputs = ( | |
[ | |
gr.Textbox(label="Evaluation Key"), | |
gr.Textbox(label="Encodings"), | |
gr.Textbox(label="Encrypted Quantized Encoding"), | |
gr.Textbox(label="Encrypted Prediction"), | |
], | |
) | |
title = ("FHE Model",) | |
description = ("This is a FHE Model",) | |
demo.launch() #share=True) | |