Spaces:
Running
on
Zero
Running
on
Zero
from datetime import datetime | |
import json | |
import os | |
import gradio as gr | |
from gradio_client import Client | |
import shutil | |
import time | |
import random | |
import zipfile | |
import base64 | |
# Global variables | |
SPFManstate = { | |
"last_file": 0, | |
"output_dir": "output", | |
"errors": [], | |
"skipped_items": [], | |
"is_paid_api": False, | |
"cost_per_item": 0.1, # Default cost per item for paid API | |
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"), | |
"api_provider": "default", # Default API provider | |
"retry_delay": 300, # 5 minutes delay for retry | |
"max_retries": 3 # Maximum number of retries | |
} | |
SPFManprompt_list = [] | |
#SPFMan - State, Prompt and file manager | |
def SPFManload_state(config_file=None): | |
global SPFManstate, SPFManprompt_list | |
try: | |
if config_file: | |
with open(config_file, "r", encoding="utf-8", errors="replace") as f: | |
loaded_data = json.load(f) | |
SPFManstate.update(loaded_data.get("state", {})) | |
SPFManprompt_list = loaded_data.get("prompt_list", []) | |
elif os.path.exists("state.json"): | |
with open("state.json", "r", encoding="utf-8", errors="replace") as f: | |
loaded_data = json.load(f) | |
SPFManstate.update(loaded_data.get("state", {})) | |
SPFManprompt_list = loaded_data.get("prompt_list", []) | |
except json.JSONDecodeError as e: | |
print(f"Error decoding JSON: {e}") | |
except Exception as e: | |
print(f"Unexpected error: {e}") | |
def SPFMansave_state(): | |
SPFManstate["timestamp"] = datetime.now().strftime("%Y%m%d_%H%M%S") | |
data_to_save = { | |
"state": SPFManstate, | |
"prompt_list": SPFManprompt_list | |
} | |
with open(f"state_{SPFManstate['timestamp']}.json", "w") as f: | |
json.dump(data_to_save, f) | |
# Delete old state files | |
for file in os.listdir(): | |
if file.startswith("state_") and file.endswith(".json") and file != f"state_{SPFManstate['timestamp']}.json": | |
os.remove(file) | |
def SPFManensure_output_directory(): | |
os.makedirs(SPFManstate['output_dir'], exist_ok=True) | |
def SPFMangenerate_image(prompt, retries=0): | |
SPFManensure_output_directory() | |
try: | |
client = Client("black-forest-labs/FLUX.1-dev") | |
result = client.predict( | |
prompt=prompt, | |
seed=0, | |
randomize_seed=True, | |
width=1024, | |
height=1024, | |
guidance_scale=3.5, | |
num_inference_steps=28, | |
api_name="/infer" | |
) | |
image_path = result[0] | |
filename = f"{SPFManstate['output_dir']}/{prompt[:50].replace(' ', '_')}_image_{SPFManstate['timestamp']}.webp" | |
shutil.move(image_path, filename) | |
return f"Image saved as {filename}" | |
except Exception as e: | |
error_msg = f"Error generating image: {str(e)}" | |
if "exceeded your GPU quota" in str(e) and retries < SPFManstate['max_retries']: | |
time.sleep(SPFManstate['retry_delay']) | |
return SPFMangenerate_image(prompt, retries + 1) | |
SPFManstate["errors"].append(error_msg) | |
return error_msg | |
def SPFMangenerate_audio(prompt, retries=0): | |
SPFManensure_output_directory() | |
try: | |
client = Client("artificialguybr/Stable-Audio-Open-Zero") | |
result = client.predict( | |
prompt=prompt, | |
seconds_total=30, | |
steps=100, | |
cfg_scale=7, | |
api_name="/predict" | |
) | |
if isinstance(result, str) and os.path.exists(result): | |
filename = f"{SPFManstate['output_dir']}/{prompt[:50].replace(' ', '_')}_audio_{SPFManstate['timestamp']}.wav" | |
shutil.move(result, filename) | |
else: | |
audio_data = base64.b64decode(result) | |
filename = f"{SPFManstate['output_dir']}/{prompt[:50].replace(' ', '_')}_audio_{SPFManstate['timestamp']}.wav" | |
with open(filename, "wb") as audio_file: | |
audio_file.write(audio_data) | |
return f"Audio saved as {filename}" | |
except Exception as e: | |
error_msg = f"Error generating audio: {str(e)}" | |
if "exceeded your GPU quota" in str(e) and retries < SPFManstate['max_retries']: | |
time.sleep(SPFManstate['retry_delay']) | |
return SPFMangenerate_audio(prompt, retries + 1) | |
SPFManstate["errors"].append(error_msg) | |
return error_msg | |
def SPFManprocess_prompts(prompt_list): | |
router = { | |
'image': SPFMangenerate_image, | |
'audio': SPFMangenerate_audio, | |
} | |
results = [] | |
for prompt_type, prompt in prompt_list: | |
if prompt_type in router: | |
result = router[prompt_type](prompt) | |
results.append(result) | |
if "Error" in result: | |
break # Stop processing if there's an error | |
else: | |
error_msg = f"Unknown prompt type: {prompt_type}" | |
SPFManstate["errors"].append(error_msg) | |
results.append(error_msg) | |
break # Stop processing if there's an error | |
return results | |
def SPFMancreate_files_with_generation(resume=True): | |
global SPFManstate, SPFManprompt_list | |
results = [] | |
if resume and SPFManstate["last_file"] < len(SPFManprompt_list): | |
start = SPFManstate["last_file"] | |
results.append(f"Resuming from item {start + 1}") | |
else: | |
start = 0 | |
end = len(SPFManprompt_list) | |
for i in range(start, end): | |
if i in SPFManstate["skipped_items"]: | |
results.append(f"Skipped item {i + 1}") | |
continue | |
prompt_type, prompt = SPFManprompt_list[i] | |
try: | |
if SPFManstate["is_paid_api"]: | |
generation_results = SPFManprocess_prompts([(prompt_type, prompt)]) | |
results.extend(generation_results) | |
else: | |
results.append(f"Processing: {prompt_type} - {prompt}") | |
generation_results = SPFManprocess_prompts([(prompt_type, prompt)]) | |
results.extend(generation_results) | |
if any("Error" in result for result in generation_results): | |
break # Stop processing if there's an error | |
SPFManstate["last_file"] = i + 1 | |
except Exception as e: | |
error_msg = f"Error processing item {i + 1}: {str(e)}" | |
SPFManstate["errors"].append(error_msg) | |
results.append(error_msg) | |
break # Stop processing if there's an error | |
SPFMansave_state() | |
yield "\n".join(results) | |
if not SPFManstate["is_paid_api"]: | |
break # Stop after processing one item for non-paid API | |
def SPFManadd_prompt(prompt_type, prompt): | |
global SPFManprompt_list | |
SPFManprompt_list.append((prompt_type, prompt)) | |
SPFMansave_state() | |
return f"Added {prompt_type}: {prompt}", gr.update(value=len(SPFManprompt_list)) | |
def SPFManclear_prompts(): | |
global SPFManprompt_list | |
SPFManprompt_list = [] | |
SPFMansave_state() | |
return "Prompt list cleared", gr.update(value=0) | |
def SPFManview_all_prompts(): | |
return "\n".join([f"{i+1}. {t}: {p}" for i, (t, p) in enumerate(SPFManprompt_list)]) | |
def SPFManzip_files(): | |
SPFManensure_output_directory() | |
zip_filename = f"output_{SPFManstate['timestamp']}.zip" | |
with zipfile.ZipFile(zip_filename, 'w') as zipf: | |
for root, _, files in os.walk(SPFManstate['output_dir']): | |
for file in files: | |
zipf.write(os.path.join(root, file)) | |
zipf.write(f"state_{SPFManstate['timestamp']}.json") | |
# Add prompt list to zip | |
prompt_list_filename = f"prompt_list_{SPFManstate['timestamp']}.txt" | |
with open(prompt_list_filename, 'w') as f: | |
for t, p in SPFManprompt_list: | |
f.write(f"{t}: {p}\n") | |
zipf.write(prompt_list_filename) | |
os.remove(prompt_list_filename) # Remove the temporary file | |
return f"Files zipped as {zip_filename}" | |
def SPFMantoggle_paid_api(value): | |
SPFManstate["is_paid_api"] = value | |
SPFMansave_state() | |
return f"Paid API: {'Enabled' if value else 'Disabled'}" | |
def SPFManestimate_cost(): | |
return f"Estimated cost: ${len(SPFManprompt_list) * SPFManstate['cost_per_item']:.2f}" | |
def SPFManauto_generate_prompt(prompt_type): | |
subjects = ["cat", "dog", "tree", "mountain", "ocean", "city", "person", "flower", "car", "building"] | |
styles = ["realistic", "cartoon", "abstract", "vintage", "futuristic", "minimalist", "surreal", "impressionist"] | |
actions = ["running", "sleeping", "flying", "dancing", "singing", "jumping", "sitting", "laughing"] | |
if prompt_type == "image": | |
return f"A {random.choice(styles)} {random.choice(subjects)} {random.choice(actions)}" | |
elif prompt_type == "audio": | |
return f"Sound of a {random.choice(subjects)} {random.choice(actions)}" | |
def SPFManview_config(): | |
config = { | |
"state": SPFManstate, | |
"prompt_list_length": len(SPFManprompt_list) | |
} | |
return json.dumps(config, indent=2) | |
def SPFManskip_item(): | |
if SPFManstate["last_file"] < len(SPFManprompt_list): | |
SPFManstate["skipped_items"].append(SPFManstate["last_file"]) | |
SPFManstate["last_file"] += 1 | |
SPFMansave_state() | |
return f"Skipped item {SPFManstate['last_file']}" | |
return "No more items to skip" | |
def SPFManupdate_api_details(provider, cost): | |
SPFManstate["api_provider"] = provider | |
SPFManstate["cost_per_item"] = float(cost) | |
SPFMansave_state() | |
return f"API details updated: Provider - {provider}, Cost per item - ${cost}" | |
def SPFManload_config_file(file): | |
global SPFManstate, SPFManprompt_list | |
try: | |
# Clear existing state and prompt list | |
SPFManstate = { | |
"last_file": 0, | |
"output_dir": "output", | |
"errors": [], | |
"skipped_items": [], | |
"is_paid_api": False, | |
"cost_per_item": 0.1, | |
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"), | |
"api_provider": "default", | |
"retry_delay": 300, | |
"max_retries": 3 | |
} | |
SPFManprompt_list = [] | |
# Check if the file is a ZIP archive | |
if file.name.endswith('.zip'): | |
# Extract the ZIP file | |
extracted_folder_path = 'extracted_files' | |
os.makedirs(extracted_folder_path, exist_ok=True) | |
with zipfile.ZipFile(file.name, 'r') as zip_ref: | |
zip_ref.extractall(extracted_folder_path) | |
# Find and load the state JSON file | |
json_files = [f for f in os.listdir(extracted_folder_path) if f.startswith('state_') and f.endswith('.json')] | |
if json_files: | |
json_file_path = os.path.join(extracted_folder_path, json_files[0]) | |
with open(json_file_path, 'r') as json_file: | |
loaded_data = json.load(json_file) | |
SPFManstate.update(loaded_data.get("state", {})) | |
SPFManprompt_list = loaded_data.get("prompt_list", []) | |
# Find and load the prompt list text file | |
txt_files = [f for f in os.listdir(extracted_folder_path) if f.startswith('prompt_list_') and f.endswith('.txt')] | |
if txt_files: | |
txt_file_path = os.path.join(extracted_folder_path, txt_files[0]) | |
with open(txt_file_path, 'r') as txt_file: | |
for line in txt_file: | |
prompt_type, prompt = line.strip().split(': ', 1) | |
SPFManprompt_list.append((prompt_type, prompt)) | |
# Clean up extracted files | |
shutil.rmtree(extracted_folder_path) | |
else: | |
# Load new configuration from a single file | |
SPFManload_state(file.name) | |
SPFMansave_state() # Save the loaded state | |
return f"Configuration loaded from {file.name}", gr.update(value=len(SPFManprompt_list)) | |
except Exception as e: | |
return f"Error loading configuration: {str(e)}", gr.update(value=len(SPFManprompt_list)) | |
# Handle JSON input and loading | |
def SPFManload_json_configuration(json_text): | |
global SPFManstate, SPFManprompt_list | |
try: | |
loaded_data = json.loads(json_text) | |
SPFManstate = loaded_data.get("state", SPFManstate) | |
SPFManprompt_list = loaded_data.get("prompt_list", SPFManprompt_list) | |
SPFMansave_state() | |
return "Configuration loaded from JSON input", gr.update(value=len(SPFManprompt_list)) | |
except json.JSONDecodeError as e: | |
return f"Error parsing JSON: {str(e)}", gr.update(value=len(SPFManprompt_list)) | |
except Exception as e: | |
return f"Unexpected error: {str(e)}", gr.update(value=len(SPFManprompt_list)) |