fuego-20230428-014702-04e0f5 / status_checker.py
davanstrien's picture
davanstrien HF staff
Upload status_checker.py with huggingface_hub
29317f3
import os
import subprocess
import time
from pathlib import Path
from threading import Thread
from typing import List, Union
import gradio as gr
from huggingface_hub import HfFolder, delete_repo, upload_folder, get_space_runtime, request_space_hardware, DatasetCard
def process_is_complete(process_pid):
'''Checks if the process with the given PID is still running'''
p = subprocess.Popen(["ps", "-p", process_pid], stdout=subprocess.PIPE)
out = p.communicate()[0].decode("utf-8").strip().split("\n")
return len(out) == 1
def get_task_status(output_dataset_id):
'''Gets the task status from the output dataset repo'''
card = DatasetCard.load(output_dataset_id)
return card.data.fuego['status']
def set_task_status(output_dataset_id, status="done"):
'''Sets the task status in the output dataset repo'''
card = DatasetCard.load(output_dataset_id)
card.data.fuego['status'] = status
card.push_to_hub(output_dataset_id)
def check_for_status(
process_pid, this_space_id, output_dataset_id, output_dirs, delete_on_completion, downgrade_hardware_on_completion
):
task_status = get_task_status(output_dataset_id)
print("Task status (found in dataset repo)", task_status)
if task_status == "done":
print("Task was already done, exiting...")
return
elif task_status == "preparing":
print("Setting task status to running...")
set_task_status(output_dataset_id, "running")
print("Watching PID of script to see if it is done running")
while True:
if process_is_complete(process_pid):
print("Process is complete! Uploading assets to output dataset repo")
for output_dir in output_dirs:
if Path(output_dir).exists():
print("Uploading folder", output_dir)
upload_folder(
repo_id=output_dataset_id,
folder_path=str(output_dir),
path_in_repo=str(Path('.outputs') / output_dir),
repo_type="dataset",
)
else:
print("Folder", output_dir, "does not exist, skipping")
print("Finished uploading outputs to dataset repo...Finishing up...")
if delete_on_completion:
print("Deleting space...")
delete_repo(repo_id=this_space_id, repo_type="space")
elif downgrade_hardware_on_completion:
runtime = get_space_runtime(this_space_id)
if runtime.hardware not in [None, "cpu-basic"]:
print("Requesting downgrade to CPU Basic...")
request_space_hardware(repo_id=this_space_id, hardware="cpu-basic")
else:
print("Space is already on cpu-basic, not downgrading.")
print("Done! Setting task status to done in dataset repo")
set_task_status(output_dataset_id, "done")
return
time.sleep(5)
def main(
this_space_repo_id: str,
output_dataset_id: str,
output_dirs: Union[str, List[str]] = "./outputs",
delete_on_completion: bool = True,
downgrade_hardware_on_completion: bool = True,
):
token_env_var = os.getenv("HF_TOKEN")
if token_env_var is None:
raise ValueError(
"Please set HF_TOKEN environment variable to your Hugging Face token. You can do this in the settings tab of your space."
)
if isinstance(output_dirs, str):
output_dirs = [output_dirs]
HfFolder().save_token(token_env_var)
# Watch python script's process to see when it's done running
process_pid = os.getenv("USER_SCRIPT_PID", None)
with gr.Blocks() as demo:
gr.Markdown(Path("about.md").read_text())
thread = Thread(
target=check_for_status,
daemon=True,
args=(
process_pid,
this_space_repo_id,
output_dataset_id,
output_dirs,
delete_on_completion,
downgrade_hardware_on_completion,
),
)
thread.start()
demo.launch()
if __name__ == "__main__":
import fire
fire.Fire(main)