from datetime import datetime import json import os import gradio as gr from gradio_client import Client import shutil import time import random import zipfile import base64 # Global variables SPFManstate = { "last_file": 0, "output_dir": "output", "errors": [], "skipped_items": [], "is_paid_api": False, "cost_per_item": 0.1, # Default cost per item for paid API "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"), "api_provider": "default", # Default API provider "retry_delay": 300, # 5 minutes delay for retry "max_retries": 3 # Maximum number of retries } SPFManprompt_list = [] #SPFMan - State, Prompt and file manager def SPFManload_state(config_file=None): global SPFManstate, SPFManprompt_list try: if config_file: with open(config_file, "r", encoding="utf-8", errors="replace") as f: loaded_data = json.load(f) SPFManstate.update(loaded_data.get("state", {})) SPFManprompt_list = loaded_data.get("prompt_list", []) elif os.path.exists("state.json"): with open("state.json", "r", encoding="utf-8", errors="replace") as f: loaded_data = json.load(f) SPFManstate.update(loaded_data.get("state", {})) SPFManprompt_list = loaded_data.get("prompt_list", []) except json.JSONDecodeError as e: print(f"Error decoding JSON: {e}") except Exception as e: print(f"Unexpected error: {e}") def SPFMansave_state(): SPFManstate["timestamp"] = datetime.now().strftime("%Y%m%d_%H%M%S") data_to_save = { "state": SPFManstate, "prompt_list": SPFManprompt_list } with open(f"state_{SPFManstate['timestamp']}.json", "w") as f: json.dump(data_to_save, f) # Delete old state files for file in os.listdir(): if file.startswith("state_") and file.endswith(".json") and file != f"state_{SPFManstate['timestamp']}.json": os.remove(file) def SPFManensure_output_directory(): os.makedirs(SPFManstate['output_dir'], exist_ok=True) def SPFMangenerate_image(prompt, retries=0): SPFManensure_output_directory() try: client = Client("black-forest-labs/FLUX.1-dev") result = client.predict( prompt=prompt, seed=0, randomize_seed=True, width=1024, height=1024, guidance_scale=3.5, num_inference_steps=28, api_name="/infer" ) image_path = result[0] filename = f"{SPFManstate['output_dir']}/{prompt[:50].replace(' ', '_')}_image_{SPFManstate['timestamp']}.webp" shutil.move(image_path, filename) return f"Image saved as {filename}" except Exception as e: error_msg = f"Error generating image: {str(e)}" if "exceeded your GPU quota" in str(e) and retries < SPFManstate['max_retries']: time.sleep(SPFManstate['retry_delay']) return SPFMangenerate_image(prompt, retries + 1) SPFManstate["errors"].append(error_msg) return error_msg def SPFMangenerate_audio(prompt, retries=0): SPFManensure_output_directory() try: client = Client("artificialguybr/Stable-Audio-Open-Zero") result = client.predict( prompt=prompt, seconds_total=30, steps=100, cfg_scale=7, api_name="/predict" ) if isinstance(result, str) and os.path.exists(result): filename = f"{SPFManstate['output_dir']}/{prompt[:50].replace(' ', '_')}_audio_{SPFManstate['timestamp']}.wav" shutil.move(result, filename) else: audio_data = base64.b64decode(result) filename = f"{SPFManstate['output_dir']}/{prompt[:50].replace(' ', '_')}_audio_{SPFManstate['timestamp']}.wav" with open(filename, "wb") as audio_file: audio_file.write(audio_data) return f"Audio saved as {filename}" except Exception as e: error_msg = f"Error generating audio: {str(e)}" if "exceeded your GPU quota" in str(e) and retries < SPFManstate['max_retries']: time.sleep(SPFManstate['retry_delay']) return SPFMangenerate_audio(prompt, retries + 1) SPFManstate["errors"].append(error_msg) return error_msg def SPFManprocess_prompts(prompt_list): router = { 'image': SPFMangenerate_image, 'audio': SPFMangenerate_audio, } results = [] for prompt_type, prompt in prompt_list: if prompt_type in router: result = router[prompt_type](prompt) results.append(result) if "Error" in result: break # Stop processing if there's an error else: error_msg = f"Unknown prompt type: {prompt_type}" SPFManstate["errors"].append(error_msg) results.append(error_msg) break # Stop processing if there's an error return results def SPFMancreate_files_with_generation(resume=True): global SPFManstate, SPFManprompt_list results = [] if resume and SPFManstate["last_file"] < len(SPFManprompt_list): start = SPFManstate["last_file"] results.append(f"Resuming from item {start + 1}") else: start = 0 end = len(SPFManprompt_list) for i in range(start, end): if i in SPFManstate["skipped_items"]: results.append(f"Skipped item {i + 1}") continue prompt_type, prompt = SPFManprompt_list[i] try: if SPFManstate["is_paid_api"]: generation_results = SPFManprocess_prompts([(prompt_type, prompt)]) results.extend(generation_results) else: results.append(f"Processing: {prompt_type} - {prompt}") generation_results = SPFManprocess_prompts([(prompt_type, prompt)]) results.extend(generation_results) if any("Error" in result for result in generation_results): break # Stop processing if there's an error SPFManstate["last_file"] = i + 1 except Exception as e: error_msg = f"Error processing item {i + 1}: {str(e)}" SPFManstate["errors"].append(error_msg) results.append(error_msg) break # Stop processing if there's an error SPFMansave_state() yield "\n".join(results) if not SPFManstate["is_paid_api"]: break # Stop after processing one item for non-paid API def SPFManadd_prompt(prompt_type, prompt): global SPFManprompt_list SPFManprompt_list.append((prompt_type, prompt)) SPFMansave_state() return f"Added {prompt_type}: {prompt}", gr.update(value=len(SPFManprompt_list)) def SPFManclear_prompts(): global SPFManprompt_list SPFManprompt_list = [] SPFMansave_state() return "Prompt list cleared", gr.update(value=0) def SPFManview_all_prompts(): return "\n".join([f"{i+1}. {t}: {p}" for i, (t, p) in enumerate(SPFManprompt_list)]) def SPFManzip_files(): SPFManensure_output_directory() zip_filename = f"output_{SPFManstate['timestamp']}.zip" with zipfile.ZipFile(zip_filename, 'w') as zipf: for root, _, files in os.walk(SPFManstate['output_dir']): for file in files: zipf.write(os.path.join(root, file)) zipf.write(f"state_{SPFManstate['timestamp']}.json") # Add prompt list to zip prompt_list_filename = f"prompt_list_{SPFManstate['timestamp']}.txt" with open(prompt_list_filename, 'w') as f: for t, p in SPFManprompt_list: f.write(f"{t}: {p}\n") zipf.write(prompt_list_filename) os.remove(prompt_list_filename) # Remove the temporary file return f"Files zipped as {zip_filename}" def SPFMantoggle_paid_api(value): SPFManstate["is_paid_api"] = value SPFMansave_state() return f"Paid API: {'Enabled' if value else 'Disabled'}" def SPFManestimate_cost(): return f"Estimated cost: ${len(SPFManprompt_list) * SPFManstate['cost_per_item']:.2f}" def SPFManauto_generate_prompt(prompt_type): subjects = ["cat", "dog", "tree", "mountain", "ocean", "city", "person", "flower", "car", "building"] styles = ["realistic", "cartoon", "abstract", "vintage", "futuristic", "minimalist", "surreal", "impressionist"] actions = ["running", "sleeping", "flying", "dancing", "singing", "jumping", "sitting", "laughing"] if prompt_type == "image": return f"A {random.choice(styles)} {random.choice(subjects)} {random.choice(actions)}" elif prompt_type == "audio": return f"Sound of a {random.choice(subjects)} {random.choice(actions)}" def SPFManview_config(): config = { "state": SPFManstate, "prompt_list_length": len(SPFManprompt_list) } return json.dumps(config, indent=2) def SPFManskip_item(): if SPFManstate["last_file"] < len(SPFManprompt_list): SPFManstate["skipped_items"].append(SPFManstate["last_file"]) SPFManstate["last_file"] += 1 SPFMansave_state() return f"Skipped item {SPFManstate['last_file']}" return "No more items to skip" def SPFManupdate_api_details(provider, cost): SPFManstate["api_provider"] = provider SPFManstate["cost_per_item"] = float(cost) SPFMansave_state() return f"API details updated: Provider - {provider}, Cost per item - ${cost}" def SPFManload_config_file(file): global SPFManstate, SPFManprompt_list try: # Clear existing state and prompt list SPFManstate = { "last_file": 0, "output_dir": "output", "errors": [], "skipped_items": [], "is_paid_api": False, "cost_per_item": 0.1, "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"), "api_provider": "default", "retry_delay": 300, "max_retries": 3 } SPFManprompt_list = [] # Check if the file is a ZIP archive if file.name.endswith('.zip'): # Extract the ZIP file extracted_folder_path = 'extracted_files' os.makedirs(extracted_folder_path, exist_ok=True) with zipfile.ZipFile(file.name, 'r') as zip_ref: zip_ref.extractall(extracted_folder_path) # Find and load the state JSON file json_files = [f for f in os.listdir(extracted_folder_path) if f.startswith('state_') and f.endswith('.json')] if json_files: json_file_path = os.path.join(extracted_folder_path, json_files[0]) with open(json_file_path, 'r') as json_file: loaded_data = json.load(json_file) SPFManstate.update(loaded_data.get("state", {})) SPFManprompt_list = loaded_data.get("prompt_list", []) # Find and load the prompt list text file txt_files = [f for f in os.listdir(extracted_folder_path) if f.startswith('prompt_list_') and f.endswith('.txt')] if txt_files: txt_file_path = os.path.join(extracted_folder_path, txt_files[0]) with open(txt_file_path, 'r') as txt_file: for line in txt_file: prompt_type, prompt = line.strip().split(': ', 1) SPFManprompt_list.append((prompt_type, prompt)) # Clean up extracted files shutil.rmtree(extracted_folder_path) else: # Load new configuration from a single file SPFManload_state(file.name) SPFMansave_state() # Save the loaded state return f"Configuration loaded from {file.name}", gr.update(value=len(SPFManprompt_list)) except Exception as e: return f"Error loading configuration: {str(e)}", gr.update(value=len(SPFManprompt_list)) # Handle JSON input and loading def SPFManload_json_configuration(json_text): global SPFManstate, SPFManprompt_list try: loaded_data = json.loads(json_text) SPFManstate = loaded_data.get("state", SPFManstate) SPFManprompt_list = loaded_data.get("prompt_list", SPFManprompt_list) SPFMansave_state() return "Configuration loaded from JSON input", gr.update(value=len(SPFManprompt_list)) except json.JSONDecodeError as e: return f"Error parsing JSON: {str(e)}", gr.update(value=len(SPFManprompt_list)) except Exception as e: return f"Unexpected error: {str(e)}", gr.update(value=len(SPFManprompt_list))