Spaces:
Sleeping
Sleeping
# Importing the required libraries | |
import cv2 | |
from fastapi import FastAPI, File, UploadFile, HTTPException, Form, Depends | |
from fastapi.middleware.cors import CORSMiddleware | |
import numpy as np | |
import os | |
import datetime | |
import base64 | |
from motor.motor_asyncio import AsyncIOMotorClient | |
from pymongo.server_api import ServerApi | |
app = FastAPI(docs_url=None, openapi_url=None, redoc_url=None) | |
# MongoDB connection settings | |
MONGODB_URL = os.environ.get("mongoUrl") | |
DATABASE_NAME = os.environ.get("databaseName") | |
COLLECTION_NAME = "inkognitoUsers" | |
# Allowing CORS (Cross-Origin Resource Sharing) | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# MongoDB client initialization | |
async def get_database_client(): | |
client = AsyncIOMotorClient(MONGODB_URL) | |
database = client[DATABASE_NAME] | |
yield database | |
client.close() | |
# @app.on_event("startup") | |
# def save_openapi_json(): | |
# openapi_data = app.openapi() | |
# with open("openapi.json", "w") as file: | |
# json.dump(openapi_data, file) | |
# Converting types to binary | |
def msg_to_bin(msg): | |
if type(msg) == str: | |
return ''.join([format(ord(i), "08b") for i in msg]) | |
elif type(msg) == bytes or type(msg) == np.ndarray: | |
return [format(i, "08b") for i in msg] | |
elif type(msg) == int or type(msg) == np.uint8: | |
return format(msg, "08b") | |
else: | |
raise TypeError("Input type not supported") | |
# defining function to hide the secret message into the image | |
def hide_data(img, secret_msg): | |
# calculating the maximum bytes for encoding | |
nBytes = img.shape[0] * img.shape[1] * 3 // 8 | |
print("Maximum Bytes for encoding:", nBytes) | |
# checking whether the number of bytes for encoding is less | |
# than the maximum bytes in the image | |
if len(secret_msg) > nBytes: | |
raise ValueError("Error encountered insufficient bytes, need bigger image or less data!!") | |
secret_msg += '#####' # we can utilize any string as the delimiter | |
dataIndex = 0 | |
# converting the input data to binary format using the msg_to_bin() function | |
bin_secret_msg = msg_to_bin(secret_msg) | |
# finding the length of data that requires to be hidden | |
dataLen = len(bin_secret_msg) | |
for values in img: | |
for pixels in values: | |
# converting RGB values to binary format | |
r, g, b = msg_to_bin(pixels) | |
# modifying the LSB only if there is data remaining to store | |
if dataIndex < dataLen: | |
# hiding the data into LSB of Red pixel | |
pixels[0] = int(r[:-1] + bin_secret_msg[dataIndex], 2) | |
dataIndex += 1 | |
if dataIndex < dataLen: | |
# hiding the data into LSB of Green pixel | |
pixels[1] = int(g[:-1] + bin_secret_msg[dataIndex], 2) | |
dataIndex += 1 | |
if dataIndex < dataLen: | |
# hiding the data into LSB of Blue pixel | |
pixels[2] = int(b[:-1] + bin_secret_msg[dataIndex], 2) | |
dataIndex += 1 | |
# if data is encoded, break out the loop | |
if dataIndex >= dataLen: | |
break | |
return img | |
def show_data(img): | |
bin_data = "" | |
for values in img: | |
for pixels in values: | |
# converting the Red, Green, Blue values into binary format | |
r, g, b = msg_to_bin(pixels) | |
# data extraction from the LSB of Red pixel | |
bin_data += r[-1] | |
# data extraction from the LSB of Green pixel | |
bin_data += g[-1] | |
# data extraction from the LSB of Blue pixel | |
bin_data += b[-1] | |
# split by 8-Bits | |
allBytes = [bin_data[i: i + 8] for i in range(0, len(bin_data), 8)] | |
# converting from bits to characters | |
decodedData = "" | |
for bytes in allBytes: | |
decodedData += chr(int(bytes, 2)) | |
# checking if we have reached the delimiter which is "#####" | |
if decodedData[-5:] == "#####": | |
break | |
# print(decodedData) | |
# removing the delimiter to display the actual hidden message | |
return decodedData[:-5] | |
async def root(): | |
return {"message": "Server is alive"} | |
async def hide_message(image: UploadFile = File(...), secret_msg: str = Form(...)): | |
try: | |
# Read image using OpenCV | |
content = await image.read() | |
nparr = np.frombuffer(content, np.uint8) | |
img = cv2.imdecode(nparr, cv2.IMREAD_UNCHANGED) | |
# Hide the secret message in the image | |
encoded_image = hide_data(img, secret_msg) | |
# Convert the image to bytes | |
_, encoded_image_data = cv2.imencode(".png", encoded_image) | |
encoded_image_bytes = encoded_image_data.tobytes() | |
# Encode the image bytes to base64 | |
encoded_image_base64 = base64.b64encode(encoded_image_bytes).decode() | |
# Return the base64 encoded image | |
return {"base64_image": encoded_image_base64} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
# API endpoint to retrieve the secret message from an image | |
async def retrieve_message(image: UploadFile = File(...)): | |
try: | |
# Read image using OpenCV | |
content = await image.read() | |
nparr = np.frombuffer(content, np.uint8) | |
img = cv2.imdecode(nparr, cv2.IMREAD_UNCHANGED) | |
# Retrieve the hidden message from the image | |
decoded_message = show_data(img) | |
return {"decoded_message": decoded_message} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def save_user_data(name: str = Form(...), email: str = Form(...), database: AsyncIOMotorClient = Depends(get_database_client)): | |
try: | |
# Save user data to MongoDB | |
users_collection = database[COLLECTION_NAME] | |
user_data = { | |
"name": name, | |
"email": email, | |
"first_login": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
} | |
result = await users_collection.insert_one(user_data) | |
# Return success response | |
return {"message": "User data saved successfully"} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def get_all_users(database: AsyncIOMotorClient = Depends(get_database_client)): | |
try: | |
# Retrieve all users from MongoDB | |
users_collection = database[COLLECTION_NAME] | |
all_users = await users_collection.find({}, {"_id": 0}).to_list(length=None) | |
# Return the list of all users | |
return {"users": all_users} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) |