Spaces:
Runtime error
Runtime error
import os | |
import subprocess | |
import time | |
from pathlib import Path | |
from threading import Thread | |
from typing import List, Union | |
import gradio as gr | |
from huggingface_hub import HfFolder, delete_repo, upload_folder, get_space_runtime, request_space_hardware, DatasetCard | |
def process_is_complete(process_pid): | |
'''Checks if the process with the given PID is still running''' | |
p = subprocess.Popen(["ps", "-p", process_pid], stdout=subprocess.PIPE) | |
out = p.communicate()[0].decode("utf-8").strip().split("\n") | |
return len(out) == 1 | |
def get_task_status(output_dataset_id): | |
'''Gets the task status from the output dataset repo''' | |
card = DatasetCard.load(output_dataset_id) | |
return card.data.fuego['status'] | |
def set_task_status(output_dataset_id, status="done"): | |
'''Sets the task status in the output dataset repo''' | |
card = DatasetCard.load(output_dataset_id) | |
card.data.fuego['status'] = status | |
card.push_to_hub(output_dataset_id) | |
def check_for_status( | |
process_pid, this_space_id, output_dataset_id, output_dirs, delete_on_completion, downgrade_hardware_on_completion | |
): | |
task_status = get_task_status(output_dataset_id) | |
print("Task status (found in dataset repo)", task_status) | |
if task_status == "done": | |
print("Task was already done, exiting...") | |
return | |
elif task_status == "preparing": | |
print("Setting task status to running...") | |
set_task_status(output_dataset_id, "running") | |
print("Watching PID of script to see if it is done running") | |
while True: | |
if process_is_complete(process_pid): | |
print("Process is complete! Uploading assets to output dataset repo") | |
for output_dir in output_dirs: | |
if Path(output_dir).exists(): | |
print("Uploading folder", output_dir) | |
upload_folder( | |
repo_id=output_dataset_id, | |
folder_path=str(output_dir), | |
path_in_repo=str(Path('.outputs') / output_dir), | |
repo_type="dataset", | |
) | |
else: | |
print("Folder", output_dir, "does not exist, skipping") | |
print("Finished uploading outputs to dataset repo...Finishing up...") | |
if delete_on_completion: | |
print("Deleting space...") | |
delete_repo(repo_id=this_space_id, repo_type="space") | |
elif downgrade_hardware_on_completion: | |
runtime = get_space_runtime(this_space_id) | |
if runtime.hardware not in [None, "cpu-basic"]: | |
print("Requesting downgrade to CPU Basic...") | |
request_space_hardware(repo_id=this_space_id, hardware="cpu-basic") | |
else: | |
print("Space is already on cpu-basic, not downgrading.") | |
print("Done! Setting task status to done in dataset repo") | |
set_task_status(output_dataset_id, "done") | |
return | |
time.sleep(5) | |
def main( | |
this_space_repo_id: str, | |
output_dataset_id: str, | |
output_dirs: Union[str, List[str]] = "./outputs", | |
delete_on_completion: bool = True, | |
downgrade_hardware_on_completion: bool = True, | |
): | |
token_env_var = os.getenv("HF_TOKEN") | |
if token_env_var is None: | |
raise ValueError( | |
"Please set HF_TOKEN environment variable to your Hugging Face token. You can do this in the settings tab of your space." | |
) | |
if isinstance(output_dirs, str): | |
output_dirs = [output_dirs] | |
HfFolder().save_token(token_env_var) | |
# Watch python script's process to see when it's done running | |
process_pid = os.getenv("USER_SCRIPT_PID", None) | |
with gr.Blocks() as demo: | |
gr.Markdown(Path("about.md").read_text()) | |
thread = Thread( | |
target=check_for_status, | |
daemon=True, | |
args=( | |
process_pid, | |
this_space_repo_id, | |
output_dataset_id, | |
output_dirs, | |
delete_on_completion, | |
downgrade_hardware_on_completion, | |
), | |
) | |
thread.start() | |
demo.launch() | |
if __name__ == "__main__": | |
import fire | |
fire.Fire(main) | |