ClairVault / app.py
VaultChem's picture
...
a072763
import gradio as gr
import subprocess
from concrete.ml.deployment import FHEModelClient
from requests import head
import numpy
import os
from pathlib import Path
import requests
import json
import base64
import shutil
import time
import pandas as pd
import pickle
import numpy as np
import pdb
import json
def load_data_from_file(filename):
"""
Load data from a JSON file in the current directory.
Parameters:
filename (str): The name of the file to load.
Returns:
data: The data loaded from the file.
"""
with open(filename, 'r') as file:
data = json.load(file)
return data
# This repository's directory
REPO_DIR = Path(__file__).parent
subprocess.Popen(["uvicorn", "server:app"], cwd=REPO_DIR)
# subprocess.Popen(["uvicorn", "server:app", "--port", "3000"], cwd=REPO_DIR)
# if not exists, create a directory for the FHE keys called .fhe_keys
if not os.path.exists(".fhe_keys"):
os.mkdir(".fhe_keys")
# if not exists, create a directory for the tmp files called tmp
if not os.path.exists("tmp"):
os.mkdir("tmp")
# Wait 5 sec for the server to start
time.sleep(5)
# Encrypted data limit for the browser to display
# (encrypted data is too large to display in the browser)
ENCRYPTED_DATA_BROWSER_LIMIT = 500
N_USER_KEY_STORED = 20
#Evaluation Key
eval_key = []
#Encodings vector
encodings = []
#User ID
user_id = []
def clean_tmp_directory():
# Allow 20 user keys to be stored.
# Once that limitation is reached, deleted the oldest.
path_sub_directories = sorted(
[f for f in Path(".fhe_keys/").iterdir() if f.is_dir()], key=os.path.getmtime
)
user_ids = []
if len(path_sub_directories) > N_USER_KEY_STORED:
n_files_to_delete = len(path_sub_directories) - N_USER_KEY_STORED
for p in path_sub_directories[:n_files_to_delete]:
user_ids.append(p.name)
shutil.rmtree(p)
list_files_tmp = Path("tmp/").iterdir()
# Delete all files related to user_id
for file in list_files_tmp:
for user_id in user_ids:
if file.name.endswith(f"{user_id}.npy"):
file.unlink()
def keygen(eval_key, user_id):
# Clean tmp directory if needed
clean_tmp_directory()
print("Initializing FHEModelClient...")
# Let's create a user_id
user_id = numpy.random.randint(0, 2**32)
fhe_api = FHEModelClient(f"fhe_model", f".fhe_keys/{user_id}")
fhe_api.load()
# Generate a fresh key
fhe_api.generate_private_and_evaluation_keys(force=True)
evaluation_key = fhe_api.get_serialized_evaluation_keys()
numpy.save(f"tmp/tmp_evaluation_key_{user_id}.npy", evaluation_key)
eval_key = list(evaluation_key)[:ENCRYPTED_DATA_BROWSER_LIMIT]
return eval_key, user_id
def encode_quantize(test_file, eval_key, encodings):
ugly = ['Machine', 'SizeOfOptionalHeader', 'Characteristics',
'MajorLinkerVersion', 'MinorLinkerVersion', 'SizeOfCode',
'SizeOfInitializedData', 'SizeOfUninitializedData',
'AddressOfEntryPoint', 'BaseOfCode', 'BaseOfData', 'ImageBase',
'SectionAlignment', 'FileAlignment', 'MajorOperatingSystemVersion',
'MinorOperatingSystemVersion', 'MajorImageVersion', 'MinorImageVersion',
'MajorSubsystemVersion', 'MinorSubsystemVersion', 'SizeOfImage',
'SizeOfHeaders', 'CheckSum', 'Subsystem', 'DllCharacteristics',
'SizeOfStackReserve', 'SizeOfStackCommit', 'SizeOfHeapReserve',
'SizeOfHeapCommit', 'LoaderFlags', 'NumberOfRvaAndSizes', 'SectionsNb',
'SectionsMeanEntropy', 'SectionsMinEntropy', 'SectionsMaxEntropy',
'SectionsMeanRawsize', 'SectionsMinRawsize',
'SectionsMeanVirtualsize', 'SectionsMinVirtualsize',
'SectionMaxVirtualsize', 'ImportsNbDLL', 'ImportsNb',
'ImportsNbOrdinal', 'ExportNb', 'ResourcesNb', 'ResourcesMeanEntropy',
'ResourcesMinEntropy', 'ResourcesMaxEntropy', 'ResourcesMeanSize',
'ResourcesMinSize', 'ResourcesMaxSize', 'LoadConfigurationSize',
'VersionInformationSize']
fhe_api = FHEModelClient(f"fhe_model", f".fhe_keys/{eval_key}")
fhe_api.load()
from PE_main import extract_infos
# expect [1, 53] but we get (53)
# pdb.set_trace()
# features = pickle.loads(open(os.path.join("features.pkl"), "rb").read())
encodings = extract_infos(test_file)
encodings = list(map(lambda x: encodings[x], ugly))
encodings = np.array(encodings).reshape(1, -1)
return encodings
def encrypt_encoded_quantize(encodings, user_id, eval_key):
fhe_api = FHEModelClient(f"fhe_model", f".fhe_keys/{user_id}")
fhe_api.load()
encodings = np.array(encodings)
print(encodings)
quantized_encodings = fhe_api.model.quantize_input(encodings).astype(numpy.uint8)
print(quantized_encodings)
encrypted_quantized_encoding = fhe_api.quantize_encrypt_serialize(encodings)
print(encrypted_quantized_encoding)
numpy.save(
f"tmp/tmp_encrypted_quantized_encoding_{user_id}.npy",
encrypted_quantized_encoding,
)
# Compute size
encrypted_quantized_encoding_shorten = list(encrypted_quantized_encoding)[:ENCRYPTED_DATA_BROWSER_LIMIT]
encrypted_quantized_encoding_shorten_hex = "".join(f"{i:02x}" for i in encrypted_quantized_encoding_shorten)
return (encrypted_quantized_encoding_shorten_hex)
def run_fhe(user_id):
encoded_data_path = Path(f"tmp/tmp_encrypted_quantized_encoding_{user_id}.npy")
encrypted_quantized_encoding = numpy.load(encoded_data_path)
# Read evaluation_key from the file
evaluation_key = numpy.load(f"tmp/tmp_evaluation_key_{user_id}.npy")
# Use base64 to encode the encodings and evaluation key
encrypted_quantized_encoding = base64.b64encode(encrypted_quantized_encoding).decode()
encoded_evaluation_key = base64.b64encode(evaluation_key).decode()
query = {}
query["evaluation_key"] = encoded_evaluation_key
query["encrypted_encoding"] = encrypted_quantized_encoding
headers = {"Content-type": "application/json"}
response = requests.post(
"http://localhost:8000/predict",
data=json.dumps(query),
headers=headers,
)
encrypted_prediction = base64.b64decode(response.json()["encrypted_prediction"])
numpy.save(f"tmp/tmp_encrypted_prediction_{user_id}.npy", encrypted_prediction)
encrypted_prediction_shorten = list(encrypted_prediction)[:ENCRYPTED_DATA_BROWSER_LIMIT]
encrypted_prediction_shorten_hex = "".join(f"{i:02x}" for i in encrypted_prediction_shorten)
return encrypted_prediction_shorten_hex
def decrypt_prediction(user_id):
encoded_data_path = Path(f"tmp/tmp_encrypted_prediction_{user_id}.npy")
# Read encrypted_prediction from the file
encrypted_prediction = numpy.load(encoded_data_path).tobytes()
fhe_api = FHEModelClient(f"fhe_model", f".fhe_keys/{user_id}")
fhe_api.load()
# We need to retrieve the private key that matches the client specs (see issue #18)
fhe_api.generate_private_and_evaluation_keys(force=False)
predictions = fhe_api.deserialize_decrypt_dequantize(encrypted_prediction)
if(predictions[0][1] >= 0.5):
return "Safe file with class probabilities: ", predictions
else:
return "Malware file with class probabilities: ", predictions
if __name__ == "__main__":
"""
with gr.Blocks() as demo:
print("Starting the FHE Model")
inputs = [gr.File(label="Test File")]
outputs = [
gr.Textbox(label="Evaluation Key"),
gr.Textbox(label="Encodings"),
gr.Textbox(label="Encrypted Quantized Encoding"),
gr.Textbox(label="Encrypted Prediction"),
]
run_bottn = gr.Button(label="Run")
run_bottn.click(
fn=process_pipeline, # Pass process_pipeline directly
inputs=inputs,
outputs=outputs
)
demo.launch()
"""
print("๐Ÿš€ Starting the ClairVault demo...")
with gr.Blocks(css=".gradio-container { font-size: 20px; }") as demo:
gr.Markdown(
"""
<p align="center">
<img width=200 src="/api/placeholder/200/200" alt="ClairVault Logo">
</p>
<h2 align="center">๐Ÿ”’ ClairVault: Privacy-Preserving Cloud-Based Malware Scanning</h2>
<p align="center">
<a href="#">GitHub</a>
โ€”
<a href="#">Documentation</a>
โ€”
<a href="#">Community</a>
โ€”
<a href="#">@ClairVault</a>
</p>
<p align="center">
<img src="/api/placeholder/600/300" alt="ClairVault Concept" width="60%" height="60%">
</p>
"""
)
gr.Markdown("## ๐Ÿ“‹ Executive Summary")
gr.Markdown(
"""
### ClairVault is a **malware scanning service**, cloud-based and machine-learning enabled, that leverages fully homomorphic encryption (FHE) to securely scan files without seeing your data.
"""
)
gr.Markdown("Note: This is a demonstration. Please do not rely on the prediction of this app to judge if an executable or any file is potentially harmful. Do not upload harmful executables on purpose.")
gr.Markdown("## ๐Ÿšจ The Problem")
gr.Markdown(
"""
1. Privacy: do you really trust the Russian-based Kaspersky and that there is no back-door built in McAfee by Oncle Sam?
- Require access to plaintext data, posing privacy and security risks
- Often closed-source, lacking transparency
2. Protection:
- The malware scanner itself might be compromised
- Require frequent local updates
- Mostly rule-based โ†’ need machine learning - but mostly on cloud
3. Resources
- Consume local resources (Windows Defender only runs ML based models on the cloud)
"""
)
gr.Markdown("## ๐Ÿ’ก Our Solution: ClairVault")
gr.Markdown(
"""
Key features include:
- Local extraction of features and encryption of user data (files, logs)
- Transmission of encrypted data to secure, open-source cloud servers
- Malware classification performed on encrypted data
- Return of encrypted classification results
"""
)
gr.Markdown("### ๐Ÿ› ๏ธ Technical Implementation")
gr.Markdown(
"""
- **Encryption Method**: Fully Homomorphic Encryption using the TFHE (Fast Fully Homomorphic Encryption over the Torus) library
- **Machine Learning Model**: Random Forest classifier optimized for FHE computations using ConcreteML
*Note: These are preliminary figures based on our proof-of-concept. We aim to significantly improve performance in future iterations.*
"""
)
gr.Markdown("# ๐Ÿ—๏ธ Step 1: Generate the keys")
b_gen_key = gr.Button("๐Ÿ”‘ Generate the keys and send public part to server")
evaluation_key = gr.Textbox(
label="Evaluation key (truncated):",
max_lines=4,
interactive=False,
)
#load
malware_data = load_data_from_file('malware_features.txt')
malware_data_state = gr.State(value=malware_data)
gr.Markdown("# ๐Ÿ“ค Step 2: Upload a file for scanning")
gr.Markdown("## Client side")
gr.Markdown(
"Upload a file you want to scan for malware. In reality the file first be processed locally, the result encrypted it before sending it to the cloud."
)
gr.Markdown(
"The default file is a harmless executable. You may delete it to upload your own exe file. IF you want to run this app on actual malware, please do not do that! Instead, skip this step and select the second option in the next step. We have extracted feature vectors from real hardware for you so it is safe to run the analysis here!"
)
file_input = gr.File(label="Upload a harmless exe file or use default:", file_count = "single", value="./harmless.exe")
gr.Markdown("# ๐Ÿ“ฅ Step 3: Extract executable file features")
b_extract = gr.Button("๐Ÿ“ฅ Extract features from harmless file and save")
c_extract = gr.Button("๐Ÿ“ฅ Load features from malware file and save")
extracted_vector = gr.JSON(
label="Extracted vector:",
)
gr.Markdown("# ๐Ÿ”’ Step 4: Encrypt the file with the private key")
b_encrypt_file = gr.Button(
"๐Ÿ” Encrypt the file and send to server"
)
encrypted_file = gr.Textbox(
label="Encrypted file content (truncated):",
max_lines=4,
interactive=False,
)
gr.Markdown("# ๐Ÿ–ฅ๏ธ Step 5: Run the FHE-based malware scan")
gr.Markdown("## Server side")
gr.Markdown(
"The encrypted file is received by the server. Using the evaluation key and FHE, the server can perform the malware scan directly on the encrypted data. Once the scan is finished, the server returns the encrypted result to the client."
)
b_run_fhe_scan = gr.Button("๐Ÿ›ก๏ธ Run FHE-based malware scan")
encrypted_scan_result = gr.Textbox(
label="Encrypted scan result (truncated):",
max_lines=4,
interactive=False,
)
gr.Markdown("# ๐Ÿ”“ Step 6: Decrypt the scan result")
gr.Markdown("## Client side")
gr.Markdown(
"The encrypted scan result is sent back to the client, who can finally decrypt it with their private key. Only the client is aware of the original file content and the scan result."
)
b_decrypt_result = gr.Button("๐Ÿ” Decrypt scan result")
user_id_input = gr.Number(visible=False)
scan_result = gr.Textbox(label="Scan Result:")
eval_key_input = gr.Textbox(value=eval_key, visible=False)
# Button for key generation
b_gen_key.click(fn=keygen, inputs=[eval_key_input, user_id_input], outputs=[evaluation_key, user_id_input])
encodings_input = gr.Textbox(value=encodings, visible=False)
# Button to extract vector
b_extract.click(
fn=encode_quantize,
inputs=[file_input, eval_key_input, encodings_input],
outputs=[extracted_vector],
)
c_extract.click(
fn=encrypt_encoded_quantize,
inputs =[malware_data_state, user_id_input, eval_key_input],
outputs=[encrypted_file],
)
# Button to encrypt file
b_encrypt_file.click(
fn=encrypt_encoded_quantize,
inputs=[extracted_vector, user_id_input, eval_key_input],
outputs=[encrypted_file],
)
# Button to run FHE-based malware scan
b_run_fhe_scan.click(fn=run_fhe, inputs=[user_id_input], outputs=[encrypted_scan_result])
# Button to decrypt the scan result
b_decrypt_result.click(fn=decrypt_prediction, inputs=[user_id_input], outputs=[scan_result])
gr.Markdown(
"ClairVault is built using advanced Fully Homomorphic Encryption techniques to ensure your data remains private and secure throughout the entire malware scanning process."
)
gr.Markdown("## ๐ŸŒ Market Opportunity")
gr.Markdown(
"""
The global cybersecurity market is valued at $60 billion in 2024 with an annual growth rate of 15% projected by Morgan Stanley. In 2023, there were 6 billion cyberattacks, a 10% increase from 2022.
**Target Industries**: Finance, Healthcare, Government, Legal Services, Individuals
**Estimated TAM (Total Addressable Market) for Privacy-Preserving Malware Scanning**:
While exact figures for malware scanning are not available, we estimate it to be approximately 10% of the total cybersecurity market, or $6 billion. (Kaspersky has 700 million of revenue, Norton 1.5 billion, McAfee 2.5 billion.)
Based on the critical need in our target industries we estimate privacy-preserving to be 5-10% of that, approximately $300-600 million.
"""
)
gr.Markdown("## ๐Ÿ† Competitive Advantage")
gr.Markdown(
"""
Unlike traditional solutions:
1. Complete data privacy through FHE
2. Open-source transparency
3. Cloud-based scanning without local resource consumption
4. Immunity to local malware compromise
"""
)
gr.Markdown("## ๐Ÿš€ Go-To-Market Strategy")
gr.Markdown("### ๐ŸŽฏ Target Customers")
gr.Markdown(
"""
1. Enterprise Clients in sensitive industries
2. Cloud Service Providers
3. Cybersecurity Firms
4. Privacy-conscious individuals
"""
)
gr.Markdown("### ๐Ÿ’ฐ Revenue Model")
gr.Markdown(
"""
1. Enterprise Licensing
2. Tiered Subscription Plans
3. API Access Fees
"""
)
gr.Markdown("### ๐Ÿ“ˆ Sales and Marketing Channels")
gr.Markdown(
"""
1. Direct Enterprise Sales
2. Partnerships with cloud providers and cybersecurity firms
3. Industry events and conferences
4. Content marketing (whitepapers, case studies)
"""
)
gr.Markdown("## ๐Ÿ… Achievements and Roadmap")
gr.Markdown("### ๐Ÿ† Current Achievements")
gr.Markdown(
"""
- Developed a proof-of-concept multiscanner using the TFHE library
- Successfully demonstrated end-to-end process from local encryption to cloud scanning
- Implemented a linear classifier for malware detection on encrypted data
"""
)
gr.Markdown("### ๐Ÿ”ฎ Future Development")
gr.Markdown(
"""
1. Develop real-time scanning capabilities
2. Add behavior analysis through encrypted log processing
3. Expand to support a wider range of file types and encryption schemes
"""
)
demo.launch(share=False)