voice_clone_v4 / transformers /utils /get_previous_daily_ci.py
ahassoun's picture
Upload 3018 files
ee6e328
import os
import zipfile
import requests
from get_ci_error_statistics import download_artifact, get_artifacts_links
def get_daily_ci_runs(token, num_runs=7):
"""Get the workflow runs of the scheduled (daily) CI.
This only selects the runs triggered by the `schedule` event on the `main` branch.
"""
headers = None
if token is not None:
headers = {"Accept": "application/vnd.github+json", "Authorization": f"Bearer {token}"}
# The id of a workflow (not of a workflow run)
workflow_id = "636036"
url = f"https://api.github.com/repos/huggingface/transformers/actions/workflows/{workflow_id}/runs"
# On `main` branch + event being `schedule` + not returning PRs + only `num_runs` results
url += f"?branch=main&event=schedule&exclude_pull_requests=true&per_page={num_runs}"
result = requests.get(url, headers=headers).json()
return result["workflow_runs"]
def get_last_daily_ci_runs(token):
"""Get the last completed workflow run id of the scheduled (daily) CI."""
workflow_runs = get_daily_ci_runs(token)
workflow_run_id = None
for workflow_run in workflow_runs:
if workflow_run["status"] == "completed":
workflow_run_id = workflow_run["id"]
break
return workflow_run_id
def get_last_daily_ci_artifacts(artifact_names, output_dir, token):
"""Get the artifacts of last completed workflow run id of the scheduled (daily) CI."""
workflow_run_id = get_last_daily_ci_runs(token)
if workflow_run_id is not None:
artifacts_links = get_artifacts_links(worflow_run_id=workflow_run_id, token=token)
for artifact_name in artifact_names:
if artifact_name in artifacts_links:
artifact_url = artifacts_links[artifact_name]
download_artifact(
artifact_name=artifact_name, artifact_url=artifact_url, output_dir=output_dir, token=token
)
def get_last_daily_ci_reports(artifact_names, output_dir, token):
"""Get the artifacts' content of the last completed workflow run id of the scheduled (daily) CI."""
get_last_daily_ci_artifacts(artifact_names, output_dir, token)
results = {}
for artifact_name in artifact_names:
artifact_zip_path = os.path.join(output_dir, f"{artifact_name}.zip")
if os.path.isfile(artifact_zip_path):
results[artifact_name] = {}
with zipfile.ZipFile(artifact_zip_path) as z:
for filename in z.namelist():
if not os.path.isdir(filename):
# read the file
with z.open(filename) as f:
results[artifact_name][filename] = f.read().decode("UTF-8")
return results