Img-label / app.py
rayochoajr's picture
Update app.py
df1624c verified
raw
history blame contribute delete
No virus
49.7 kB
project_index = 0 # Initialize project_index with a default value
import gradio as gr,os,shutil,numpy as np,hashlib,subprocess,pandas as pd
from PIL import Image
from datetime import datetime
import base64
import requests
import io
import json
import logging
import re
"""
Gradio App Interface Specification:
Inputs:
- prompt: Text Input. Description: 'Enter a prompt for image generation.'
- steps: Slider. Range: [1, 32]. value: 16. Description: 'Number of steps for image generation.'
- model: Dropdown. Options: ['TurboAnime.saftensors', 'OtherModel']. value: 'TurboAnime.saftensors'. Description: 'Select the model for image generation.'
- styles: CheckboxGroup. Options: ['RayORender', 'OtherStyle']. value: ['RayORender']. Description: 'Select styles for image generation.'
Outputs:
- image: Image. Description: 'Generated image based on the prompt.'
- log: Text. Description: 'Log of the image generation process.'
"""
import gradio as gr
import subprocess
import json
import base64
from PIL import Image
import io
import time
# πŸ•’ Start time for the entire script
start_time = time.time()
def analyze_all_images(custom_prompt):
global file_array
descriptions = []
for file_info in file_array:
file_path = find_file_by_hash(file_info["hash"])
if file_path:
with Image.open(file_path) as img:
description = get_image_description(img, custom_prompt)
save_text(file_info["hash"], description, "", "", True) # Save the description to the text file
descriptions.append(description)
return descriptions
def generate_status_message(index, total, file_name):
return f"Processing {index + 1}/{total}: {file_name}"
def analyze_thumbs_up_images(custom_prompt):
thumbs_up_images = load_thumbs_up_gallery() # Assuming this function returns full paths
total_images = len(thumbs_up_images)
descriptions = []
status_messages = []
for index, image_path in enumerate(thumbs_up_images):
with Image.open(image_path) as img:
description = get_image_description(img, custom_prompt)
descriptions.append((os.path.basename(image_path), description))
status_messages.append(generate_status_message(index, total_images, os.path.basename(image_path)))
save_text(file_info["hash"], description, "", "", True) # Save the description to the text file
return descriptions, status_messages
# πŸ“’ Log Status
def log_status(progress, message):
elapsed_time = time.time() - start_time
log_message = {"status": message, "progress": f"{progress}%", "time_elapsed": f"{elapsed_time:.2f} seconds"}
return json.dumps(log_message)
def process_files(file_info):
try:
if file_info:
return [_process_file(file_path) for file_path in file_info] if isinstance(file_info, list) else [_process_file(file_info)]
return []
except Exception as e:
print(f"Error processing files: {e}")
raise e
def clear_uploads_folder():
os.makedirs(recycle_bin_folder, exist_ok=True)
for file_name in os.listdir(_get_project_folder()):
dest_file_path = os.path.join(recycle_bin_folder, file_name)
if os.path.exists(dest_file_path):
base, extension = os.path.splitext(file_name)
i = 1
new_file_name = f"{base}_{i}{extension}"
new_dest_file_path = os.path.join(recycle_bin_folder, new_file_name)
while os.path.exists(new_dest_file_path):
i += 1
new_file_name = f"{base}_{i}{extension}"
new_dest_file_path = os.path.join(recycle_bin_folder, new_file_name)
dest_file_path = new_dest_file_path
shutil.move(os.path.join(_get_project_folder(), file_name), dest_file_path)
return "Uploads folder cleared!"
def undo_last_deletion():
file_list = os.listdir(recycle_bin_folder)
if file_list:
last_deleted_file = file_list[-1]
shutil.move(os.path.join(recycle_bin_folder, last_deleted_file), _get_project_folder())
return f"Restored: {last_deleted_file}"
return "No files to restore"
def next_session():
global project_index
project_index += 1
_init_project_directory()
return f"Switched to the next session! Current session index: {project_index}"
def previous_session():
global project_index
if project_index > 0: project_index -= 1
return f"Switched to the previous session! Current session index: {project_index}"
def get_next_image():
global current_image
images = get_images()
if images:
current_image = images[0] if current_image is None else images[(images.index(current_image) + 1) % len(images)]
else:
current_image = default_image
return current_image
def get_previous_image():
global current_image
images = get_images()
if images:
current_image = images[-1] if current_image is None else images[(images.index(current_image) - 1) % len(images)]
else:
current_image = default_image
return current_image
def process_all_zips():
for file_name in os.listdir(_get_project_folder()):
file_path = os.path.join(_get_project_folder(), file_name)
if zipfile.is_zipfile(file_path): _process_file(file_path)
return "All zip files processed!"
# 🌐 API Call
def make_api_call(url, payload):
try:
response = subprocess.run(["curl", "-X", "POST", url, "-H", "Content-Type: application/json", "--data", json.dumps(payload)], capture_output=True, text=True)
if response.returncode == 0:
return json.loads(response.stdout), None
else:
return None, log_status(progress, f"API call failed with return code: {response.returncode}")
except Exception as e:
return None, log_status(progress, f"API call failed with exception: {str(e)}")
# πŸ–ΌοΈ Save Image
def save_image(image_data):
image = Image.open(io.BytesIO(base64.b64decode(image_data)))
return image
# πŸš€ Main Execution Function
def generate_image(prompt, steps, model, styles):
base_url = "http://73.255.78.150:7909"
base_payload = {
"prompt": prompt,
"steps": steps,
"model": model,
"styles": styles,
"negative_prompt": "album, duplicate, crowded, multiple, stuff, messy, photo, collage, doll, caricature, render. mannequin.",
"seed": -1,
"height": 768,
"width": 1280,
"sampler_index": "DPM++ 2M SDE Karras",
"restore_faces": False,
"tiling": False,
"n_iter": 1,
"batch_size": 1,
"cfg_scale": 2.0,
"subseed": -1,
"subseed_strength": 0.0,
"seed_resize_from_h": -1,
"seed_resize_from_w": -1,
"seed_enable_extras": False,
"enable_hr": False,
"denoising_strength": 0.0,
"hr_scale": 2.0,
"hr_upscaler": "Latent",
"hr_second_pass_steps": 0,
"hr_resize_x": 0,
"hr_resize_y": 0,
"hr_sampler_index": "",
"hr_prompt": "",
"hr_negative_prompt": "",
"override_settings_texts": ""
}
progress = 0
log = log_status(progress, "Starting image generation...")
response, error_log = make_api_call(f"{base_url}/sdapi/v1/txt2img", base_payload)
if error_log:
return None, error_log
if response and 'images' in response:
image_data = response['images'][0]
image = save_image(image_data)
log = log_status(100, "Image generated successfully.")
return image, log
else:
log = log_status(progress, "Failed to generate image.")
return None, log
max_image_size = 768
image_folder = "/Users/gev7418/Library/CloudStorage/OneDrive-Personal/Gradio Script Building Blocks/HatchDuo-Gradio/Images"
thumbs_up_folder = os.path.join(image_folder, "Thumbs_Up")
thumbs_down_folder = os.path.join(image_folder, "Thumbs_Down")
allowed_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp']
folders_cycle = [image_folder, thumbs_up_folder, thumbs_down_folder]
# Set up logging to print to the terminal
logging.basicConfig(level=logging.INFO)
# OpenAI API Key
api_key = "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" # Replace with your OpenAI API Key
import subprocess
def convert_and_rename_files_in_directory(directory, original_extension, new_extension, new_base_name):
"""
Converts all files in the specified directory from the original extension to PNG, and renames them to a new base name followed by a number.
"""
files = [f for f in os.listdir(directory) if f.endswith(original_extension)]
for index, file in enumerate(sorted(files)):
new_name = f"{new_base_name}_{index}{new_extension}"
original_file_path = os.path.join(directory, file)
new_file_path = os.path.join(directory, new_name)
# Convert to PNG using a subprocess call to a command line tool like ImageMagick
subprocess.run(['convert', original_file_path, new_file_path])
os.rename(new_file_path, os.path.join(directory, new_name))
logging.info(f"All {original_extension} files in {directory} have been converted to PNG and renamed to {new_base_name} format.")
def encode_image(image):
"""
This function converts the image to bytes and returns the base64 encoding of the image file
"""
logging.info("Encoding image to base64")
image_bytes = io.BytesIO()
if image.mode == 'RGBA':
# Convert RGBA to RGB
image = image.convert('RGB')
image.save(image_bytes, format='JPEG')
image_bytes = image_bytes.getvalue()
base64_image = base64.b64encode(image_bytes).decode('utf-8')
return base64_image
def get_image_description(image, custom_prompt):
"""
This function sends the image to the OpenAI API and returns the response.
It now checks if there's custom prompt content before sending it to the API.
"""
logging.info("Getting image description from OpenAI API")
base64_image = encode_image(image)
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"}
# Check if custom_prompt is not empty, otherwise use a default prompt
if not custom_prompt.strip():
custom_prompt = "Describe this image"
payload = {
"model": "gpt-4-vision-preview",
"messages": [
{
"role": "user",
"content": [
{"type": "text", "text": custom_prompt},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}}
]
}
],
"max_tokens": 300
}
response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)
response_payload = json.loads(response.text)
content = response_payload['choices'][0]['message']['content']
logging.info("Received response from OpenAI API")
return content
def create_directories():
print("Creating directories...")
os.makedirs(thumbs_up_folder, exist_ok=True)
os.makedirs(thumbs_down_folder, exist_ok=True)
create_directories()
def get_files(folder):
print(f"Getting files from {folder}...")
return [os.path.join(dp, f) for dp, dn, filenames in os.walk(folder) for f in filenames if os.path.splitext(f)[1].lower() in allowed_extensions]
def hash_image_pixels(file_path):
"""
Generates a hash for an image based on its pixel content.
"""
if os.path.splitext(file_path)[1].lower() in allowed_extensions:
with Image.open(file_path) as img:
# Convert the image to RGBA (to standardize if images are in different modes)
img_rgba = img.convert('RGBA')
# Calculate the new height and width to maintain aspect ratio
aspect_ratio = img_rgba.width / img_rgba.height
new_height = min(max_image_size, img_rgba.height)
new_width = int(aspect_ratio * new_height)
if img_rgba.height > max_image_size:
new_height = max_image_size
new_width = int(new_height * aspect_ratio)
# Use Image.Resampling.LANCZOS for better quality resizing
img_resized = img_rgba.resize((new_width, new_height), Image.Resampling.LANCZOS)
# Get the bytes of the resized image data
img_bytes = io.BytesIO()
img_resized.save(img_bytes, format='PNG') # PNG format to ensure consistency across platforms
img_bytes = img_bytes.getvalue()
# Generate a hash of the resized image bytes
hash_obj = hashlib.sha256(img_bytes)
return hash_obj.hexdigest()
else:
return None
# Global cache for hashes to file paths
hash_to_path_cache = {}
def update_hash_to_path_cache():
global hash_to_path_cache
hash_to_path_cache.clear()
for folder in folders_cycle:
for file_path in get_files(folder):
file_hash = hash_image_pixels(file_path)
hash_to_path_cache[file_hash] = file_path
def find_file_by_hash(file_hash):
# Use the cache to find the file path
file_path = hash_to_path_cache.get(file_hash, None)
if file_path and os.path.exists(file_path):
return file_path
# If the file was not found or doesn't exist at the cached location,
# search in the thumbs up and thumbs down folders
for folder in [thumbs_up_folder, thumbs_down_folder]:
for dp, dn, filenames in os.walk(folder):
for f in filenames:
potential_path = os.path.join(dp, f)
if os.path.splitext(f)[1].lower() in allowed_extensions:
if hash_image_pixels(potential_path) == file_hash:
# Update the cache with the new location
hash_to_path_cache[file_hash] = potential_path
return potential_path
# If the file is not found in any of the locations, return None
return None
def build_file_array():
print("Building file array...")
files = []
for folder in folders_cycle:
for file_path in get_files(folder):
file_hash = hash_image_pixels(file_path)
status = "πŸ‘ Rated Up" if folder == thumbs_up_folder else "πŸ‘Ž Rated Down" if folder == thumbs_down_folder else "⏳ Pending"
files.append({"path": file_path, "hash": file_hash, "status": status, "date_modified": os.path.getmtime(file_path)})
# Create text files for any images missing their text file counterpart
text_file_path = file_path.replace(os.path.splitext(file_path)[1], '.txt')
if not os.path.exists(text_file_path):
with open(text_file_path, 'w') as text_file:
text_file.write("") # Create an empty text file
files = sorted(files, key=lambda x: x["date_modified"], reverse=True)
return files
def refresh_file_array_and_hashes():
global file_array, hash_list
file_array = build_file_array()
hash_list = [file['hash'] for file in file_array]
print("File array and hash list updated.")
update_hash_to_path_cache()
refresh_file_array_and_hashes()
current_index = 0
def save_text(file_hash, text, prepend_text="", append_text="", save_and_overwrite_changes=True):
if not save_and_overwrite_changes:
print("Skipping saving due to user preference.")
return
file_path = find_file_by_hash(file_hash)
if file_path:
text_file_path = file_path.replace(os.path.splitext(file_path)[1], '.txt')
final_text = ""
if not save_and_overwrite_changes and os.path.exists(text_file_path):
with open(text_file_path, 'r') as existing_file:
existing_content = existing_file.read()
final_text = (prepend_text + ", " if prepend_text else "") + existing_content + (", " + append_text if append_text else "")
else:
final_text = (prepend_text + ", " if prepend_text else "") + text + (", " + append_text if append_text else "")
print(f"Saving text for file hash {file_hash} to {text_file_path}...")
with open(text_file_path, 'w') as text_file:
text_file.write(final_text)
def get_text(file_hash):
file_path = find_file_by_hash(file_hash)
if file_path:
text_file_path = file_path.replace(os.path.splitext(file_path)[1], '.txt')
if os.path.exists(text_file_path):
print(f"Getting text for file hash {file_hash} from {text_file_path}...")
with open(text_file_path) as text_file:
return text_file.read()
return ""
def update_index_for_navigation(current_text, prepend_text, append_text, navigate_forward=True, save_and_overwrite_changes=False):
global current_index, file_array, hash_list
if current_text or not save_and_overwrite_changes: # Save only if there's something to save or if saving changes is not required
file_hash = hash_list[current_index]
save_text(file_hash, current_text, prepend_text, append_text, save_and_overwrite_changes)
if navigate_forward:
current_index = (current_index + 1) % len(hash_list) # Cycle to the first item if at the end
else:
current_index = (current_index - 1) % len(hash_list) # Cycle to the last item if at the beginning
def get_file(navigate_forward, current_text, prepend_text, append_text, pause_api_call, save_and_overwrite_changes, custom_prompt):
global current_index, file_array, hash_list
print(f"Getting file, navigate_forward: {navigate_forward}...")
update_index_for_navigation(current_text, prepend_text, append_text, navigate_forward, save_and_overwrite_changes)
if current_index < len(hash_list):
current_hash = hash_list[current_index]
file_info = next((item for item in file_array if item["hash"] == current_hash), None)
if file_info:
file_path = find_file_by_hash(current_hash)
if file_path:
with Image.open(file_path) as img:
# Calculate the new height and width to maintain aspect ratio, with a max height of 768 for display in gradio
aspect_ratio = img.width / img.height
new_height = min(768, img.height) # Set max height to 768 for gradio display
new_width = int(aspect_ratio * new_height)
# Use Image.Resampling.LANCZOS for better quality resizing
img_resized = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
img_resized_bytes = io.BytesIO()
img_resized.save(img_resized_bytes, format='PNG')
img_resized_bytes = img_resized_bytes.getvalue()
text = get_text(current_hash) # Load text file contents into textbox
if not pause_api_call:
text = get_image_description(img, custom_prompt) # Get new description from OpenAI
save_text(current_hash, text, prepend_text, append_text, save_and_overwrite_changes) # Optionally save the new description
# Convert the resized image to a numpy array for display
img_resized_np = np.array(Image.open(io.BytesIO(img_resized_bytes)))
return img_resized_np, current_hash, file_info["status"], text, os.path.basename(file_path), file_path, os.path.relpath(file_path, start=image_folder), os.path.basename(file_path).replace(os.path.splitext(os.path.basename(file_path))[1], '.txt'), find_file_by_hash(current_hash).replace(os.path.splitext(find_file_by_hash(current_hash))[1], '.txt'), os.path.relpath(find_file_by_hash(current_hash).replace(os.path.splitext(find_file_by_hash(current_hash))[1], '.txt'), start=image_folder)
return None, "File not found", "⏳ Pending", "", "", "", "", "", "", ""
def move_file(direction, current_text, prepend_text, append_text, pause_api_call, save_and_overwrite_changes, custom_prompt):
global current_index, file_array, hash_list
print(f"Moving file in direction {direction}...")
if current_index < len(hash_list):
current_hash = hash_list[current_index]
file_info = next((item for item in file_array if item["hash"] == current_hash), None)
if file_info:
source_file = find_file_by_hash(current_hash)
if source_file:
destination = thumbs_up_folder if direction == "up" else thumbs_down_folder if direction == "down" else None
new_status = "πŸ‘ Rated Up" if direction == "up" else "πŸ‘Ž Rated Down" if direction == "down" else None
if destination:
shutil.move(source_file, os.path.join(destination, os.path.basename(source_file)))
file_info["status"] = new_status
# Get new description and save it
with Image.open(source_file) as img:
description = get_image_description(img, custom_prompt)
save_text(current_hash, description, prepend_text, append_text, save_and_overwrite_changes)
return get_file(True, current_text, prepend_text, append_text, pause_api_call, save_and_overwrite_changes, custom_prompt)
return None, "Invalid direction or file not found", "⏳ Pending", "", "", "", "", "", "", ""
def reset_files(current_text, prepend_text, append_text, pause_api_call, save_and_overwrite_changes, custom_prompt):
global file_array, current_index, hash_list
print("Resetting files and clearing text...")
for file_info in file_array:
source_file = find_file_by_hash(file_info["hash"])
if source_file and os.path.exists(source_file):
source_text = source_file.replace(os.path.splitext(source_file)[1], '.txt')
if os.path.exists(source_text):
shutil.move(source_text, os.path.join(image_folder, os.path.basename(source_text)))
shutil.move(source_file, os.path.join(image_folder, os.path.basename(source_file)))
with open(os.path.join(image_folder, os.path.basename(source_text)), 'w') as text_file:
text_file.write("")
file_info["status"] = "⏳ Pending"
file_array = build_file_array()
hash_list = [file['hash'] for file in file_array]
update_hash_to_path_cache()
return get_file(True, current_text, prepend_text, append_text, pause_api_call, save_and_overwrite_changes, custom_prompt)
def delete_file(current_text, prepend_text, append_text, pause_api_call, save_and_overwrite_changes, custom_prompt):
global current_index, file_array, hash_list
print("Deleting file...")
if current_index < len(hash_list):
current_hash = hash_list[current_index]
source_file = find_file_by_hash(current_hash)
if source_file:
source_text = source_file.replace(os.path.splitext(source_file)[1], '.txt')
os.remove(source_file)
os.remove(source_text)
hash_list.remove(current_hash) # Remove the hash from the hash_list
file_array = [file for file in file_array if file["hash"] != current_hash] # Rebuild file_array without the deleted file
if current_index >= len(hash_list): current_index = len(hash_list) - 1
update_hash_to_path_cache()
return get_file(True, current_text, prepend_text, append_text, pause_api_call, save_and_overwrite_changes, custom_prompt)
def load_image_details():
print("Loading image details...")
image_details = []
for folder in folders_cycle:
for file_path in get_files(folder):
file_hash = hash_image_pixels(file_path)
status = "πŸ‘ Rated Up" if folder == thumbs_up_folder else "πŸ‘Ž Rated Down" if folder == thumbs_down_folder else "⏳ Pending"
text_file_path = file_path.replace(os.path.splitext(file_path)[1], '.txt')
text_content = ""
if os.path.exists(text_file_path):
with open(text_file_path, 'r') as text_file:
text_content = text_file.read()
image_details.append({
"Text Content": text_content,
"Rating Status": status,
"File Hash": file_hash,
"Image Path": file_path,
"Text Path": text_file_path if os.path.exists(text_file_path) else "N/A"
})
return pd.DataFrame(image_details)
def load_text_files_as_df():
print("Loading text files into dataframe...")
text_files = []
for file in os.listdir(image_folder):
if file.endswith('.txt'):
file_path = os.path.join(image_folder, file)
with open(file_path, 'r') as text_file:
text_content = text_file.read()
text_files.append({
"File Name": file.replace('.txt', ''),
"Content": text_content,
"Date Modified": datetime.fromtimestamp(os.path.getmtime(file_path)).strftime('%Y-%m-%d %H:%M:%S')
})
return pd.DataFrame(text_files)
# Function to merge text_files_df and image_details_df into a single dataframe
def load_combined_details():
print("Loading text file details...")
text_df = load_text_files_as_df()
image_details_df = load_image_details()
# Ensure the 'File Name' column exists in image_details_df by extracting it from 'Image Path'
image_details_df['File Name'] = image_details_df['Image Path'].apply(lambda x: os.path.basename(x).replace(os.path.splitext(os.path.basename(x))[1], ''))
combined_df = pd.merge(text_df, image_details_df, on="File Name", how="outer")
# Reorder dataframe fields according to specified order: Date Modified, Rating Status, File Name, Text Content, File Hash, Text Path, Image Path
combined_df = combined_df[['Date Modified', 'Rating Status', 'File Name', 'Text Content', 'File Hash', 'Text Path', 'Image Path']]
return combined_df
def load_gallery():
print("Loading gallery...")
return sorted([os.path.join(image_folder, f) for f in os.listdir(image_folder) if os.path.splitext(f)[1].lower() in allowed_extensions], key=lambda x: os.path.basename(x).lower())
def load_thumbs_up_gallery():
print("Loading Thumbs Up gallery...")
return sorted([os.path.join(thumbs_up_folder, f) for f in os.listdir(thumbs_up_folder) if os.path.splitext(f)[1].lower() in allowed_extensions], key=lambda x: os.path.basename(x).lower())
def load_thumbs_down_gallery():
print("Loading Thumbs Down gallery...")
return sorted([os.path.join(thumbs_down_folder, f) for f in os.listdir(thumbs_down_folder) if os.path.splitext(f)[1].lower() in allowed_extensions], key=lambda x: os.path.basename(x).lower())
def load_all_folders_gallery():
print("Loading all folders gallery...")
all_images = []
for folder in folders_cycle:
all_images.extend(sorted([os.path.join(folder, f) for f in os.listdir(folder) if os.path.splitext(f)[1].lower() in allowed_extensions], key=lambda x: os.path.basename(x).lower()))
return all_images
def load_text_files():
print("Loading text files...")
return sorted([[file, open(os.path.join(image_folder, file),'r').read()] for file in os.listdir(image_folder) if file.endswith('.txt')], key=lambda x: x[0].lower())
def update_text_content(df):
global text_files_df
print("Updating text content...")
for row in df:
if len(row) == 3:
file_name, new_content, date_added = row
file_path = os.path.join(image_folder, file_name)
if os.path.exists(file_path):
file_hash = hash_file(file_path)
save_text(file_hash, new_content)
else:
print("Error: Unexpected number of values in row. Expected 3 values per row.")
# Step 1: Define a function to update the dataframe
def refresh_text_files():
print("Refreshing text files...")
updated_text_files = sorted([[file, open(os.path.join(image_folder, file),'r').read()] for file in os.listdir(image_folder) if file.endswith('.txt')], key=lambda x: x[0].lower())
return updated_text_files
def save_uploaded_image(image_file):
"""
Attempts to save the uploaded image to the designated image folder. Returns the path of the saved image or None if an error occurs.
"""
try:
os.makedirs(image_folder, exist_ok=True) # Ensure the image folder exists
image_path = os.path.join(image_folder, image_file.name)
with open(image_path, "wb") as file:
file.write(image_file.content) # Write the byte content of the file
print(f"Image saved successfully: {image_path}")
return image_path
except Exception as e:
print(f"Error saving image: {e}")
return None
def create_text_file_for_image(image_path):
"""
Generates an empty text file corresponding to an uploaded image, using the same base name.
"""
text_file_path = f"{os.path.splitext(image_path)[0]}.txt"
open(text_file_path, 'w').close() # Efficiently create an empty text file
return text_file_path
def handle_image_upload(uploaded_files):
status_messages = []
for uploaded_file in uploaded_files:
file_extension = os.path.splitext(uploaded_file.name)[1].lower()
if file_extension in allowed_extensions:
# Ensure the image folder exists
os.makedirs(image_folder, exist_ok=True)
# Save the file with a unique name based on its hash
unique_name = hashlib.sha256(uploaded_file.name.encode()).hexdigest() + file_extension
save_path = os.path.join(image_folder, unique_name)
with open(save_path, "wb") as file:
file.write(uploaded_file.content)
# Optionally, create a corresponding text file
text_file_path = save_path + ".txt"
with open(text_file_path, "w") as text_file:
text_file.write(f"Image file: {unique_name} uploaded successfully.")
status_messages.append(f"Uploaded and saved {uploaded_file.name} successfully.")
else:
status_messages.append(f"File {uploaded_file.name} has an unsupported extension ({file_extension}) and was not uploaded.")
return "\n".join(status_messages)
def refresh_image_description(current_text, prepend_text, append_text, pause_api_call, save_and_overwrite_changes, custom_prompt):
global current_index, file_array
if not pause_api_call:
file_info = file_array[current_index]
file_path = find_file_by_hash(file_info["hash"])
if file_path:
with Image.open(file_path) as img:
# Use the custom_prompt parameter when calling get_image_description
text = get_image_description(img, custom_prompt)
save_text(file_info["hash"], text, prepend_text, append_text, save_and_overwrite_changes)
return np.array(img), file_info["hash"], file_info["status"], text, os.path.basename(file_path), file_path, os.path.relpath(file_path, start=image_folder), os.path.basename(file_path).replace(os.path.splitext(os.path.basename(file_path))[1], '.txt'), find_file_by_hash(file_info["hash"]).replace(os.path.splitext(find_file_by_hash(file_info["hash"]))[1], '.txt'), os.path.relpath(find_file_by_hash(file_info["hash"]).replace(os.path.splitext(find_file_by_hash(file_info["hash"]))[1], '.txt'), start=image_folder)
# If API call is paused or file not found, return current state without changes
return None, file_info["hash"], file_info["status"], current_text, os.path.basename(file_path), file_path, os.path.relpath(file_path, start=image_folder), os.path.basename(file_path).replace(os.path.splitext(os.path.basename(file_path))[1], '.txt'), find_file_by_hash(file_info["hash"]).replace(os.path.splitext(find_file_by_hash(file_info["hash"]))[1], '.txt'), os.path.relpath(find_file_by_hash(file_info["hash"]).replace(os.path.splitext(find_file_by_hash(file_info["hash"]))[1], '.txt'), start=image_folder)
# Backup functionality for successful app launches
backup_folder = "Backup_Scripts"
os.makedirs(backup_folder, exist_ok=True)
def backup_script():
"""
This function creates a backup of the current script in the designated backup folder.
"""
current_script_path = os.path.realpath(__file__)
backup_script_path = os.path.join(backup_folder, f"backup_{os.path.basename(current_script_path)}_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.py")
shutil.copy2(current_script_path, backup_script_path)
print(f"Backup of the script created at {backup_script_path}")
def app_launch_success():
"""
This function is called when the app successfully launches.
It triggers the backup of the current script.
"""
print("App successfully launched. Creating a backup of the script...")
backup_script()
def refresh_all():
combined_details = load_combined_details()
image_gallery_content = load_gallery()
thumbs_up_gallery_content = load_thumbs_up_gallery()
# Reinitialize the hash array and the index for new images
update_hash_to_path_cache()
refresh_file_array_and_hashes()
current_index = 0 # Reset the index to start from the first image
return combined_details, image_gallery_content, thumbs_up_gallery_content
def combined_action(prompt, steps, model, styles, custom_prompt):
# Trigger the OpenAI API for description and wait for the response
description, status = trigger_openai_api(custom_prompt)
if status == "success":
# Once the response is received, generate the image based on the prompt and other parameters
generated_image, log = generate_image(prompt, steps, model, styles)
else:
generated_image, log = None, "Failed to generate image due to API call failure."
return generated_image, log, description
# πŸ“‚ Setup Environment πŸ“‚
import gradio as gr
import os
from pathlib import Path
import subprocess
# Define the directory to store uploaded images
image_folder = "./Images"
os.makedirs(image_folder, exist_ok=True) # Create the directory if it doesn't exist
# πŸ”„ Function Definitions πŸ”„
def save_and_show_images(uploaded_files):
images_to_display = []
# Process each uploaded file
for index, uploaded_file in enumerate(uploaded_files):
# Define a base file name with index
base_file_name = f"Img-{index}"
# Check for existing files and increment index to avoid overwriting
existing_files = [f for f in Path(image_folder).rglob('*') if f.is_file() and f.suffix in ['.jpg', '.png', '.txt']]
while any(base_file_name in file.name for file in existing_files):
index += 1
base_file_name = f"Img-{index}"
# Determine the file extension (jpg or png)
file_extension = "jpg" if uploaded_file[:3] == b'\xff\xd8\xff' else "png"
final_file_name = f"{base_file_name}.{file_extension}"
image_path = os.path.join(image_folder, final_file_name)
# Write the bytes to a new file in the specified directory
with open(image_path, "wb") as file:
file.write(uploaded_file)
# Create a corresponding text file for the image
text_file_path = os.path.join(image_folder, f"{base_file_name}.txt")
with open(text_file_path, "w") as text_file:
text_file.write(f"Image file: {final_file_name}")
# Add the path (or ideally, a URL to the file) to the list to display
images_to_display.append(image_path)
# Ensure all images have a corresponding text file
for image_file_path in Path(image_folder).rglob('*'):
if image_file_path.suffix in ['.jpg', '.png']:
base_name = os.path.splitext(image_file_path.name)[0]
text_file_path = os.path.join(image_folder, f"{base_name}.txt")
if not os.path.exists(text_file_path):
with open(text_file_path, "w") as text_file:
text_file.write(f"Image file: {image_file_path.name}")
return images_to_display
with gr.Blocks() as app:
with gr.Row():
upload_btn = gr.File(label="Upload Images", type="binary", file_count='multiple')
gallery = gr.Gallery(label="Uploaded Images Gallery")
upload_btn.change(fn=save_and_show_images, inputs=upload_btn, outputs=gallery)
with gr.Row():
with gr.Column():
gr.Markdown("## Image Viewer")
with gr.Column():
# Add an upload button for uploading files to the images folder
image = gr.Image(height=768, label="Current Image", image_mode="RGBA", width="max", elem_id="current_image", type="numpy")
with gr.Row():
thumbs_down_btn = gr.Button("πŸ‘Ž Rate Down")
thumbs_up_btn = gr.Button("πŸ‘ Rate Up")
with gr.Row():
prev_btn = gr.Button("β¬… Previous")
next_btn = gr.Button("Next ➑")
with gr.Row():
status_display = gr.Textbox(label="Rating Status", interactive=False)
image_path_display = gr.Textbox(label="Image Path", interactive=False)
with gr.Column():
# Existing setup for checkboxes
with gr.Row():
pause_api_call_checkbox = gr.Checkbox(label="Pause OpenAI API Calls", value=True)
save_and_overwrite_changes_checkbox = gr.Checkbox(label="Save and Overwrite Changes", value=True)
# New textbox for custom OpenAI API instructions
custom_instructions = gr.Textbox(label="Custom Instructions", value="Use the art of deduction and creativity to generate a persona profile and 3 inspiration words to describe the image without describing the subject. Wrap it up with a concise 1 sentence caption of the image with the subject in high detail. JSON Format needed: Futuristic Persona Profile: The subject exudes a sense of readiness and authority, dressed in attire that hints at a future dominated by advanced technology and interstellar travel. This character could be envisioned as a commander or a pioneer in a futuristic saga, marked by their composed nature and the streamlined design of their gear. Futuristic Caption: A resolute figure of tomorrow, standing firm with a serene resolve, garbed in a sleek, technologically superior suit that narrates tales of distant realms and adventures. Poised Persona Profile: The individual appears as a beacon of confidence and tactical acumen. Their outfit suggests a world of progressive technology and potential space conquests. This persona might be a strategist or a guardian in a speculative fiction setting, distinguished by their steady presence and the modernistic cut of their uniform. Poised Caption: A visionary sentinel of the cosmos, poised with a calm yet assertive demeanor, dressed in an advanced, form-enhancing suit that hints at the mysteries of the universe yet to unfold. Advanced Persona Profile: The figure stands as a symbol of assurance and innovation, clad in a costume that forecasts an era of sophisticated technology and cosmic exploration. This character could represent an expert or a defender in a futuristic tale, highlighted by their tranquil posture and the elegant configuration of their attire. Advanced Caption: An unwavering pioneer of the future, the individual poses with a composed assurance, enveloped in a cutting-edge suit that speaks of advanced civilizations and uncharted frontiers.", placeholder="Enter custom instructions...", lines=2, max_lines=5)
with gr.Accordion("πŸ‘ Thumbs Up Gallery", open=False):
thumbs_up_gallery = gr.Gallery(label="Thumbs Up Image Gallery", value=load_thumbs_up_gallery(), every=5, columns=3, rows=1, object_fit="contain", height="auto")
with gr.Row():
text_display = prompt = gr.Textbox(label="Image Analysis", lines=3, max_lines=10, interactive=True, placeholder="API call paused. Manually enter description.")
output_image = gr.Image(type="pil", label="Visual Analysis Analog")
with gr.Row():
trigger_api_btn = gr.Button("Analyze Aesthetic!")
combined_btn = gr.Button("Analyze & Generate") # New combined action button
generate_btn = gr.Button("Generate")
analyze_all_btn = gr.Button("Analyze All Thumbs Up")
descriptions_display = gr.Dataframe()
status_display = gr.Textbox(label="Status", lines=10, interactive=False)
analyze_all_btn.click(analyze_thumbs_up_images, inputs=[custom_instructions], outputs=[descriptions_display, status_display])
with gr.Row():
analyze_everything_btn = gr.Button("Analyze All Images")
analyze_all_btn.click(analyze_all_images, inputs=[custom_instructions], outputs=[descriptions_display, status_display])
analyze_everything_btn.click(analyze_all_images, inputs=[custom_instructions], outputs=[descriptions_display, status_display])
with gr.Row():
gr.Image(type="pil")
gr.Image(type="pil")
gr.Image(type="pil")
with gr.Accordion("Append / Prepend", open=False):
with gr.Row():
prepend_text = gr.Textbox(label="Prepend", lines=2, max_lines=5, placeholder="Enter text to prepend...")
append_text = gr.Textbox(label="Append", lines=2, max_lines=5, placeholder="Enter text to append...")
def trigger_openai_api(custom_prompt):
global current_index, file_array
if current_index < len(file_array):
file_info = file_array[current_index]
file_path = find_file_by_hash(file_info["hash"])
if file_path:
with Image.open(file_path) as img:
# Use the custom_prompt parameter when calling get_image_description
text = get_image_description(img, custom_prompt)
save_text(file_info["hash"], text, "", "", True) # Assuming you want to save and overwrite changes by default
return text, "Triggered OpenAI API successfully."
return "", "Failed to trigger OpenAI API."
trigger_api_btn.click(trigger_openai_api, inputs=[custom_instructions], outputs=[text_display, status_display])
with gr.Row():
delete_btn = gr.Button("πŸ—‘οΈ Delete")
reset_btn = gr.Button("πŸ”„ Reset All Ratings πŸ‘πŸ‘Ž")
with gr.Accordion(label="Edit Project", open=False):
with gr.Column(scale=0):
file_input = gr.File(file_count="multiple", type="binary", label="Upload Images or Zip Files")
with gr.Row():
previous_session_button = gr.Button("πŸ‘ˆ Previous Project")
next_session_button = gr.Button("Next ProjectπŸ‘‰")
with gr.Row():
next_img = gr.Button("πŸ–ΌοΈ Show Next Image")
prev_img = gr.Button("πŸ–ΌοΈ Show Previous Image")
with gr.Row():
with gr.Row():
clear_button = gr.Button("πŸ—‘οΈ Clear Uploads Folder")
undo_button = gr.Button("↩️ Undo Last Deletion")
process_zip_button = gr.Button("πŸ“‚ Process Zip Files")
with gr.Column():
randomize_checkbox = gr.Checkbox(label="πŸ”€ Randomize Every 2 Seconds", value=True)
refresh_time_input = gr.Number(label="⏱️ Refresh Time", value=2)
index_display = gr.Textbox(value=str(project_index), label="πŸ”’ Session Index", every=5)
file_input.change(process_files, inputs=[file_input], outputs=[])
clear_button.click(clear_uploads_folder, inputs=[], outputs=[])
undo_button.click(undo_last_deletion, inputs=[], outputs=[])
next_session_button.click(next_session, inputs=[], outputs=[index_display])
previous_session_button.click(previous_session, inputs=[], outputs=[index_display])
next_img.click(get_next_image, inputs=[], outputs=[gallery])
prev_img.click(get_previous_image, inputs=[], outputs=[gallery])
process_zip_button.click(process_all_zips, inputs=[], outputs=[])
with gr.Accordion("Advanced Generation Settings", open=False):
with gr.Column():
steps = gr.Slider(minimum=1, maximum=32, value=16, label="Steps")
model = gr.Dropdown(choices=['TurboAnime.saftensors', 'OtherModel'], value='TurboAnime.saftensors', label="Model")
styles = gr.CheckboxGroup(choices=['RayORender', 'OtherStyle'], value=['RayORender'], label="Styles")
output_log = gr.Textbox(label="Log")
# Step 2: Add a button to trigger the update
generate_btn.click(fn=generate_image, inputs=[prompt, steps, model, styles], outputs=[output_image, output_log])
# Set the click action for the combined button
combined_btn.click(combined_action, inputs=[prompt, steps, model, styles, custom_instructions], outputs=[output_image, output_log, text_display])
with gr.Accordion("Training Data", open=True):
refresh_btn = gr.Button("Refresh")
with gr.Row():
combined_details_df = gr.Dataframe(label="Combined Text and Image Details", value=load_combined_details(), headers=["Date Modified", "Rating Status", "File Name", "Text Content", "File Hash", "Text Path", "Image Path"], datatype=["str", "str", "str", "str", "str", "str", "str"], every=5)
gallery = gr.Gallery(label="Image Gallery", value=load_gallery(), every=5)
with gr.Accordion("File Info", open=False):
file_hash_display = gr.Textbox(label="File Hash", interactive=False)
text_path_display = gr.Textbox(label="Text Path", interactive=False)
def delete_all_text_files_content():
text_files = [os.path.join(image_folder, f) for f in os.listdir(image_folder) if f.endswith('.txt')]
for file_path in text_files:
with open(file_path, 'w') as f:
f.write('') # Clear the content of the text file
return "All text files' content deleted."
with gr.Row():
# Add a button to manually trigger a backup of the script
backup_btn = gr.Button("Backup Script")
backup_btn.click(backup_script, inputs=[], outputs=[])
delete_all_text_btn = gr.Button("Delete All Text Files Content")
delete_all_text_btn.click(delete_all_text_files_content, inputs=[], outputs=status_display)
# Modify the button click actions to include the pause_api_call_checkbox state and the save_and_overwrite_changes_checkbox state as arguments
reset_btn.click(lambda t, pt, at, p, s, ci: reset_files(t, pt, at, p, s, ci), inputs=[text_display, prepend_text, append_text, pause_api_call_checkbox, save_and_overwrite_changes_checkbox, custom_instructions], outputs=[image, file_hash_display, status_display, text_display, image_path_display, text_path_display])
prev_btn.click(lambda t, pt, at, p, s, ci: get_file(False, t, pt, at, p, s, ci), inputs=[text_display, prepend_text, append_text, pause_api_call_checkbox, save_and_overwrite_changes_checkbox, custom_instructions], outputs=[image, file_hash_display, status_display, text_display, image_path_display, text_path_display])
next_btn.click(lambda t, pt, at, p, s, ci: get_file(True, t, pt, at, p, s, ci), inputs=[text_display, prepend_text, append_text, pause_api_call_checkbox, save_and_overwrite_changes_checkbox, custom_instructions], outputs=[image, file_hash_display, status_display, text_display, image_path_display, text_path_display])
thumbs_up_btn.click(lambda t, pt, at, p, s, ci: move_file("up", t, pt, at, p, s, ci), inputs=[text_display, prepend_text, append_text, pause_api_call_checkbox, save_and_overwrite_changes_checkbox, custom_instructions], outputs=[image, file_hash_display, status_display, text_display, image_path_display, text_path_display])
thumbs_down_btn.click(lambda t, pt, at, p, s, ci: move_file("down", t, pt, at, p, s, ci), inputs=[text_display, prepend_text, append_text, pause_api_call_checkbox, save_and_overwrite_changes_checkbox, custom_instructions], outputs=[image, file_hash_display, status_display, text_display, image_path_display, text_path_display])
delete_btn.click(lambda t, pt, at, p, s, ci: delete_file(t, pt, at, p, s, ci), inputs=[text_display, prepend_text, append_text, pause_api_call_checkbox, save_and_overwrite_changes_checkbox, custom_instructions], outputs=[image, file_hash_display, status_display, text_display, image_path_display, text_path_display])
refresh_btn.click(refresh_all, inputs=[], outputs=[combined_details_df, gallery, thumbs_up_gallery])
# Check for successful app launch and backup the script
try:
app.launch(share=True, server_port=7866, server_name="0.0.0.0")
app_launch_success()
except Exception as e:
print(f"Error launching the app: {e}")