import os import subprocess import time from pathlib import Path from threading import Thread from typing import List, Union import gradio as gr from huggingface_hub import HfFolder, delete_repo, upload_folder, get_space_runtime, request_space_hardware, DatasetCard def process_is_complete(process_pid): '''Checks if the process with the given PID is still running''' p = subprocess.Popen(["ps", "-p", process_pid], stdout=subprocess.PIPE) out = p.communicate()[0].decode("utf-8").strip().split("\n") return len(out) == 1 def get_task_status(output_dataset_id): '''Gets the task status from the output dataset repo''' card = DatasetCard.load(output_dataset_id) return card.data.fuego['status'] def set_task_status(output_dataset_id, status="done"): '''Sets the task status in the output dataset repo''' card = DatasetCard.load(output_dataset_id) card.data.fuego['status'] = status card.push_to_hub(output_dataset_id) def check_for_status( process_pid, this_space_id, output_dataset_id, output_dirs, delete_on_completion, downgrade_hardware_on_completion ): task_status = get_task_status(output_dataset_id) print("Task status (found in dataset repo)", task_status) if task_status == "done": print("Task was already done, exiting...") return elif task_status == "preparing": print("Setting task status to running...") set_task_status(output_dataset_id, "running") print("Watching PID of script to see if it is done running") while True: if process_is_complete(process_pid): print("Process is complete! Uploading assets to output dataset repo") for output_dir in output_dirs: if Path(output_dir).exists(): print("Uploading folder", output_dir) upload_folder( repo_id=output_dataset_id, folder_path=str(output_dir), path_in_repo=str(Path('.outputs') / output_dir), repo_type="dataset", ) else: print("Folder", output_dir, "does not exist, skipping") print("Finished uploading outputs to dataset repo...Finishing up...") if delete_on_completion: print("Deleting space...") delete_repo(repo_id=this_space_id, repo_type="space") elif downgrade_hardware_on_completion: runtime = get_space_runtime(this_space_id) if runtime.hardware not in [None, "cpu-basic"]: print("Requesting downgrade to CPU Basic...") request_space_hardware(repo_id=this_space_id, hardware="cpu-basic") else: print("Space is already on cpu-basic, not downgrading.") print("Done! Setting task status to done in dataset repo") set_task_status(output_dataset_id, "done") return time.sleep(5) def main( this_space_repo_id: str, output_dataset_id: str, output_dirs: Union[str, List[str]] = "./outputs", delete_on_completion: bool = True, downgrade_hardware_on_completion: bool = True, ): token_env_var = os.getenv("HF_TOKEN") if token_env_var is None: raise ValueError( "Please set HF_TOKEN environment variable to your Hugging Face token. You can do this in the settings tab of your space." ) if isinstance(output_dirs, str): output_dirs = [output_dirs] HfFolder().save_token(token_env_var) # Watch python script's process to see when it's done running process_pid = os.getenv("USER_SCRIPT_PID", None) with gr.Blocks() as demo: gr.Markdown(Path("about.md").read_text()) thread = Thread( target=check_for_status, daemon=True, args=( process_pid, this_space_repo_id, output_dataset_id, output_dirs, delete_on_completion, downgrade_hardware_on_completion, ), ) thread.start() demo.launch() if __name__ == "__main__": import fire fire.Fire(main)