Spaces:
Runtime error
Runtime error
import sys | |
import time | |
import os | |
import subprocess # nosec - disable B404:import-subprocess check | |
import csv | |
import json | |
import shutil | |
import platform | |
from argparse import ArgumentParser | |
from pathlib import Path | |
from typing import Dict, List, Optional, Tuple, TypedDict | |
ROOT = Path(__file__).parents[1] | |
NOTEBOOKS_DIR = Path("notebooks") | |
class NotebookStatus: | |
SUCCESS = "SUCCESS" | |
FAILED = "FAILED" | |
TIMEOUT = "TIMEOUT" | |
SKIPPED = "SKIPPED" | |
NOT_RUN = "NOT_RUN" | |
EMPTY = "EMPTY" | |
class NotebookReport(TypedDict): | |
status: str | |
path: Path | |
duration: float = 0 | |
TestPlan = Dict[Path, NotebookReport] | |
def parse_arguments(): | |
parser = ArgumentParser() | |
parser.add_argument("--ignore_list", required=False, nargs="+") | |
parser.add_argument("--test_list", required=False, nargs="+") | |
parser.add_argument("--early_stop", action="store_true") | |
parser.add_argument("--report_dir", default="report") | |
parser.add_argument("--keep_artifacts", action="store_true") | |
parser.add_argument("--collect_reports", action="store_true") | |
parser.add_argument("--move_notebooks_dir") | |
parser.add_argument("--job_name") | |
parser.add_argument("--device_used") | |
parser.add_argument("--upload_to_db") | |
parser.add_argument( | |
"--timeout", | |
type=int, | |
default=7200, | |
help="Timeout for running single notebook in seconds", | |
) | |
return parser.parse_args() | |
def move_notebooks(nb_dir): | |
current_notebooks_dir = ROOT / NOTEBOOKS_DIR | |
shutil.copytree(current_notebooks_dir, nb_dir) | |
def collect_python_packages(output_file: Path): | |
reqs = subprocess.check_output( | |
[sys.executable, "-m", "pip", "freeze"], | |
shell=(platform.system() == "Windows"), | |
) | |
with output_file.open("wb") as f: | |
f.write(reqs) | |
def prepare_test_plan(test_list: Optional[List[str]], ignore_list: List[str], nb_dir: Optional[Path] = None) -> TestPlan: | |
orig_nb_dir = ROOT / NOTEBOOKS_DIR | |
notebooks_dir = nb_dir or orig_nb_dir | |
notebooks: List[Path] = sorted(list([n for n in notebooks_dir.rglob("**/*.ipynb") if not n.name.startswith("test_")])) | |
test_plan: TestPlan = {notebook.relative_to(notebooks_dir): NotebookReport(status="", path=notebook, duration=0) for notebook in notebooks} | |
ignored_notebooks: List[Path] = [] | |
if ignore_list is not None: | |
for ignore_item in ignore_list: | |
if ignore_item.endswith(".txt"): | |
# Paths to ignore files are provided to `--ignore_list` argument | |
with open(ignore_item, "r") as f: | |
ignored_notebooks.extend(list(map(lambda line: Path(line.strip()), f.readlines()))) | |
else: | |
# Ignored notebooks are provided as several items to `--ignore_list` argument | |
ignored_notebooks.append(Path(ignore_item)) | |
try: | |
ignored_notebooks = list(set(map(lambda n: n.relative_to(NOTEBOOKS_DIR), ignored_notebooks))) | |
except ValueError: | |
raise ValueError( | |
f"Ignore list items should be relative to repo root (e.g. 'notebooks/subdir/notebook.ipynb').\nInvalid ignored notebooks: {ignored_notebooks}" | |
) | |
print(f"Ignored notebooks: {ignored_notebooks}") | |
testing_notebooks: List[Path] = [] | |
if not test_list: | |
testing_notebooks = [Path(n) for n in test_plan.keys()] | |
elif len(test_list) == 1 and test_list[0].endswith(".txt"): | |
with open(test_list[0], "r") as f: | |
for line in f.readlines(): | |
changed_file_path = Path(line.strip()) | |
if changed_file_path.resolve() == (ROOT / "requirements.txt").resolve(): | |
print("requirements.txt changed, check all notebooks") | |
testing_notebooks = [Path(n) for n in test_plan.keys()] | |
break | |
if changed_file_path.suffix != ".ipynb": | |
continue | |
try: | |
testing_notebook_path = changed_file_path.relative_to(NOTEBOOKS_DIR) | |
except ValueError: | |
raise ValueError( | |
"Items in test list file should be relative to repo root (e.g. 'notebooks/subdir/notebook.ipynb').\n" | |
f"Invalid line: {changed_file_path}" | |
) | |
testing_notebooks.append(testing_notebook_path) | |
else: | |
raise ValueError( | |
"Testing notebooks should be provided to '--test_list' argument as a txt file or should be empty to test all notebooks.\n" | |
f"Received test list: {test_list}" | |
) | |
testing_notebooks = list(set(testing_notebooks)) | |
print(f"Testing notebooks: {testing_notebooks}") | |
for notebook in test_plan: | |
if notebook not in testing_notebooks: | |
test_plan[notebook]["status"] = NotebookStatus.SKIPPED | |
if notebook in ignored_notebooks: | |
test_plan[notebook]["status"] = NotebookStatus.SKIPPED | |
return test_plan | |
def clean_test_artifacts(before_test_files: List[Path], after_test_files: List[Path]): | |
for file_path in after_test_files: | |
if file_path in before_test_files or not file_path.exists(): | |
continue | |
if file_path.is_file(): | |
try: | |
file_path.unlink() | |
except Exception: | |
pass | |
else: | |
shutil.rmtree(file_path, ignore_errors=True) | |
def get_openvino_version() -> str: | |
try: | |
import openvino as ov | |
version = ov.get_version() | |
except ImportError: | |
print("Openvino is missing in validation environment.") | |
version = "Openvino is missing" | |
return version | |
def run_test(notebook_path: Path, root, timeout=7200, keep_artifacts=False, report_dir=".") -> Optional[Tuple[str, int, float, str, str]]: | |
os.environ["HUGGINGFACE_HUB_CACHE"] = str(notebook_path.parent) | |
print(f"RUN {notebook_path.relative_to(root)}", flush=True) | |
result = None | |
if notebook_path.is_dir(): | |
print(f'Notebook path "{notebook_path}" is a directory, but path to "*.ipynb" file was expected.') | |
return result | |
if notebook_path.suffix != ".ipynb": | |
print(f'Notebook path "{notebook_path}" should have "*.ipynb" extension.') | |
return result | |
with cd(notebook_path.parent): | |
files_before_test = sorted(Path(".").iterdir()) | |
ov_version_before = get_openvino_version() | |
patched_notebook = Path(f"test_{notebook_path.name}") | |
if not patched_notebook.exists(): | |
print(f'Patched notebook "{patched_notebook}" does not exist.') | |
return result | |
collect_python_packages(report_dir / (patched_notebook.stem + "_env_before.txt")) | |
main_command = [sys.executable, "-m", "treon", str(patched_notebook)] | |
start = time.perf_counter() | |
try: | |
retcode = subprocess.run( | |
main_command, | |
shell=(platform.system() == "Windows"), | |
timeout=timeout, | |
).returncode | |
except subprocess.TimeoutExpired: | |
retcode = -42 | |
duration = time.perf_counter() - start | |
ov_version_after = get_openvino_version() | |
result = (str(patched_notebook), retcode, duration, ov_version_before, ov_version_after) | |
if not keep_artifacts: | |
clean_test_artifacts(files_before_test, sorted(Path(".").iterdir())) | |
collect_python_packages(report_dir / (patched_notebook.stem + "_env_after.txt")) | |
return result | |
def finalize_status(failed_notebooks: List[str], timeout_notebooks: List[str], test_plan: TestPlan, report_dir: Path, root: Path) -> int: | |
return_status = 0 | |
if failed_notebooks: | |
return_status = 1 | |
print("FAILED: \n{}".format("\n".join(failed_notebooks))) | |
if timeout_notebooks: | |
print("FAILED BY TIMEOUT: \n{}".format("\n".join(timeout_notebooks))) | |
test_report = [] | |
for notebook, status in test_plan.items(): | |
test_status = status["status"] or NotebookStatus.NOT_RUN | |
test_report.append( | |
{"name": notebook.as_posix(), "status": test_status, "full_path": str(status["path"].relative_to(root)), "duration": status["duration"]} | |
) | |
with (report_dir / "test_report.csv").open("w") as f: | |
writer = csv.DictWriter(f, fieldnames=["name", "status", "full_path", "duration"]) | |
writer.writeheader() | |
writer.writerows(test_report) | |
return return_status | |
class cd: | |
"""Context manager for changing the current working directory""" | |
def __init__(self, new_path): | |
self.new_path = os.path.expanduser(new_path) | |
def __enter__(self): | |
self.saved_path = os.getcwd() | |
os.chdir(self.new_path) | |
def __exit__(self, etype, value, traceback): | |
os.chdir(self.saved_path) | |
def write_single_notebook_report( | |
base_version: str, | |
notebook_name: str, | |
status_code: int, | |
duration: float, | |
ov_version_before: str, | |
ov_version_after: str, | |
job_name: str, | |
device_used: str, | |
saving_dir: Path, | |
) -> Path: | |
report_file = saving_dir / notebook_name.replace(".ipynb", ".json") | |
report = { | |
"version": base_version, | |
"notebook_name": notebook_name.replace("test_", ""), | |
"status": status_code, | |
"duration": duration, | |
"ov_version_before": ov_version_before, | |
"ov_version_after": ov_version_after, | |
"job_name": job_name, | |
"device_used": device_used, | |
} | |
with report_file.open("w") as f: | |
json.dump(report, f) | |
return report_file | |
def main(): | |
failed_notebooks = [] | |
timeout_notebooks = [] | |
args = parse_arguments() | |
reports_dir = Path(args.report_dir) | |
reports_dir.mkdir(exist_ok=True, parents=True) | |
notebooks_moving_dir = args.move_notebooks_dir | |
root = ROOT | |
if notebooks_moving_dir is not None: | |
notebooks_moving_dir = Path(notebooks_moving_dir) | |
root = notebooks_moving_dir.parent | |
move_notebooks(notebooks_moving_dir) | |
keep_artifacts = False | |
if args.keep_artifacts: | |
keep_artifacts = True | |
base_version = get_openvino_version() | |
test_plan = prepare_test_plan(args.test_list, args.ignore_list, notebooks_moving_dir) | |
for notebook, report in test_plan.items(): | |
if report["status"] == NotebookStatus.SKIPPED: | |
continue | |
test_result = run_test(report["path"], root, args.timeout, keep_artifacts, reports_dir.absolute()) | |
timing = 0 | |
if not test_result: | |
print(f'Testing notebooks "{str(notebook)}" is not found.') | |
report["status"] = NotebookStatus.EMPTY | |
report["duration"] = timing | |
else: | |
patched_notebook, status_code, duration, ov_version_before, ov_version_after = test_result | |
if status_code: | |
if status_code == -42: | |
status = NotebookStatus.TIMEOUT | |
timeout_notebooks.append(patched_notebook) | |
else: | |
status = NotebookStatus.FAILED | |
failed_notebooks.append(patched_notebook) | |
report["status"] = status | |
else: | |
report["status"] = NotebookStatus.SUCCESS if not report["status"] in [NotebookStatus.TIMEOUT, NotebookStatus.FAILED] else report["status"] | |
timing += duration | |
report["duration"] = timing | |
if args.collect_reports: | |
job_name = args.job_name or "Unknown" | |
device_used = args.device_used or "Unknown" | |
report_path = write_single_notebook_report( | |
base_version, patched_notebook, status_code, duration, ov_version_before, ov_version_after, job_name, device_used, reports_dir | |
) | |
if args.upload_to_db: | |
cmd = [sys.executable, args.upload_to_db, report_path] | |
print(f"\nUploading {report_path} to database. CMD: {cmd}") | |
try: | |
dbprocess = subprocess.Popen( | |
cmd, shell=(platform.system() == "Windows"), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True | |
) | |
for line in dbprocess.stdout: | |
sys.stdout.write(line) | |
sys.stdout.flush() | |
except subprocess.CalledProcessError as e: | |
print(e.output) | |
if args.early_stop: | |
break | |
exit_status = finalize_status(failed_notebooks, timeout_notebooks, test_plan, reports_dir, root) | |
return exit_status | |
if __name__ == "__main__": | |
exit_code = main() | |
sys.exit(exit_code) | |