openvino_notebooks / .ci /validate_notebooks.py
malvika2003's picture
Upload folder using huggingface_hub
db5855f verified
import sys
import time
import os
import subprocess # nosec - disable B404:import-subprocess check
import csv
import json
import shutil
import platform
from argparse import ArgumentParser
from pathlib import Path
from typing import Dict, List, Optional, Tuple, TypedDict
ROOT = Path(__file__).parents[1]
NOTEBOOKS_DIR = Path("notebooks")
class NotebookStatus:
SUCCESS = "SUCCESS"
FAILED = "FAILED"
TIMEOUT = "TIMEOUT"
SKIPPED = "SKIPPED"
NOT_RUN = "NOT_RUN"
EMPTY = "EMPTY"
class NotebookReport(TypedDict):
status: str
path: Path
duration: float = 0
TestPlan = Dict[Path, NotebookReport]
def parse_arguments():
parser = ArgumentParser()
parser.add_argument("--ignore_list", required=False, nargs="+")
parser.add_argument("--test_list", required=False, nargs="+")
parser.add_argument("--early_stop", action="store_true")
parser.add_argument("--report_dir", default="report")
parser.add_argument("--keep_artifacts", action="store_true")
parser.add_argument("--collect_reports", action="store_true")
parser.add_argument("--move_notebooks_dir")
parser.add_argument("--job_name")
parser.add_argument("--device_used")
parser.add_argument("--upload_to_db")
parser.add_argument(
"--timeout",
type=int,
default=7200,
help="Timeout for running single notebook in seconds",
)
return parser.parse_args()
def move_notebooks(nb_dir):
current_notebooks_dir = ROOT / NOTEBOOKS_DIR
shutil.copytree(current_notebooks_dir, nb_dir)
def collect_python_packages(output_file: Path):
reqs = subprocess.check_output(
[sys.executable, "-m", "pip", "freeze"],
shell=(platform.system() == "Windows"),
)
with output_file.open("wb") as f:
f.write(reqs)
def prepare_test_plan(test_list: Optional[List[str]], ignore_list: List[str], nb_dir: Optional[Path] = None) -> TestPlan:
orig_nb_dir = ROOT / NOTEBOOKS_DIR
notebooks_dir = nb_dir or orig_nb_dir
notebooks: List[Path] = sorted(list([n for n in notebooks_dir.rglob("**/*.ipynb") if not n.name.startswith("test_")]))
test_plan: TestPlan = {notebook.relative_to(notebooks_dir): NotebookReport(status="", path=notebook, duration=0) for notebook in notebooks}
ignored_notebooks: List[Path] = []
if ignore_list is not None:
for ignore_item in ignore_list:
if ignore_item.endswith(".txt"):
# Paths to ignore files are provided to `--ignore_list` argument
with open(ignore_item, "r") as f:
ignored_notebooks.extend(list(map(lambda line: Path(line.strip()), f.readlines())))
else:
# Ignored notebooks are provided as several items to `--ignore_list` argument
ignored_notebooks.append(Path(ignore_item))
try:
ignored_notebooks = list(set(map(lambda n: n.relative_to(NOTEBOOKS_DIR), ignored_notebooks)))
except ValueError:
raise ValueError(
f"Ignore list items should be relative to repo root (e.g. 'notebooks/subdir/notebook.ipynb').\nInvalid ignored notebooks: {ignored_notebooks}"
)
print(f"Ignored notebooks: {ignored_notebooks}")
testing_notebooks: List[Path] = []
if not test_list:
testing_notebooks = [Path(n) for n in test_plan.keys()]
elif len(test_list) == 1 and test_list[0].endswith(".txt"):
with open(test_list[0], "r") as f:
for line in f.readlines():
changed_file_path = Path(line.strip())
if changed_file_path.resolve() == (ROOT / "requirements.txt").resolve():
print("requirements.txt changed, check all notebooks")
testing_notebooks = [Path(n) for n in test_plan.keys()]
break
if changed_file_path.suffix != ".ipynb":
continue
try:
testing_notebook_path = changed_file_path.relative_to(NOTEBOOKS_DIR)
except ValueError:
raise ValueError(
"Items in test list file should be relative to repo root (e.g. 'notebooks/subdir/notebook.ipynb').\n"
f"Invalid line: {changed_file_path}"
)
testing_notebooks.append(testing_notebook_path)
else:
raise ValueError(
"Testing notebooks should be provided to '--test_list' argument as a txt file or should be empty to test all notebooks.\n"
f"Received test list: {test_list}"
)
testing_notebooks = list(set(testing_notebooks))
print(f"Testing notebooks: {testing_notebooks}")
for notebook in test_plan:
if notebook not in testing_notebooks:
test_plan[notebook]["status"] = NotebookStatus.SKIPPED
if notebook in ignored_notebooks:
test_plan[notebook]["status"] = NotebookStatus.SKIPPED
return test_plan
def clean_test_artifacts(before_test_files: List[Path], after_test_files: List[Path]):
for file_path in after_test_files:
if file_path in before_test_files or not file_path.exists():
continue
if file_path.is_file():
try:
file_path.unlink()
except Exception:
pass
else:
shutil.rmtree(file_path, ignore_errors=True)
def get_openvino_version() -> str:
try:
import openvino as ov
version = ov.get_version()
except ImportError:
print("Openvino is missing in validation environment.")
version = "Openvino is missing"
return version
def run_test(notebook_path: Path, root, timeout=7200, keep_artifacts=False, report_dir=".") -> Optional[Tuple[str, int, float, str, str]]:
os.environ["HUGGINGFACE_HUB_CACHE"] = str(notebook_path.parent)
print(f"RUN {notebook_path.relative_to(root)}", flush=True)
result = None
if notebook_path.is_dir():
print(f'Notebook path "{notebook_path}" is a directory, but path to "*.ipynb" file was expected.')
return result
if notebook_path.suffix != ".ipynb":
print(f'Notebook path "{notebook_path}" should have "*.ipynb" extension.')
return result
with cd(notebook_path.parent):
files_before_test = sorted(Path(".").iterdir())
ov_version_before = get_openvino_version()
patched_notebook = Path(f"test_{notebook_path.name}")
if not patched_notebook.exists():
print(f'Patched notebook "{patched_notebook}" does not exist.')
return result
collect_python_packages(report_dir / (patched_notebook.stem + "_env_before.txt"))
main_command = [sys.executable, "-m", "treon", str(patched_notebook)]
start = time.perf_counter()
try:
retcode = subprocess.run(
main_command,
shell=(platform.system() == "Windows"),
timeout=timeout,
).returncode
except subprocess.TimeoutExpired:
retcode = -42
duration = time.perf_counter() - start
ov_version_after = get_openvino_version()
result = (str(patched_notebook), retcode, duration, ov_version_before, ov_version_after)
if not keep_artifacts:
clean_test_artifacts(files_before_test, sorted(Path(".").iterdir()))
collect_python_packages(report_dir / (patched_notebook.stem + "_env_after.txt"))
return result
def finalize_status(failed_notebooks: List[str], timeout_notebooks: List[str], test_plan: TestPlan, report_dir: Path, root: Path) -> int:
return_status = 0
if failed_notebooks:
return_status = 1
print("FAILED: \n{}".format("\n".join(failed_notebooks)))
if timeout_notebooks:
print("FAILED BY TIMEOUT: \n{}".format("\n".join(timeout_notebooks)))
test_report = []
for notebook, status in test_plan.items():
test_status = status["status"] or NotebookStatus.NOT_RUN
test_report.append(
{"name": notebook.as_posix(), "status": test_status, "full_path": str(status["path"].relative_to(root)), "duration": status["duration"]}
)
with (report_dir / "test_report.csv").open("w") as f:
writer = csv.DictWriter(f, fieldnames=["name", "status", "full_path", "duration"])
writer.writeheader()
writer.writerows(test_report)
return return_status
class cd:
"""Context manager for changing the current working directory"""
def __init__(self, new_path):
self.new_path = os.path.expanduser(new_path)
def __enter__(self):
self.saved_path = os.getcwd()
os.chdir(self.new_path)
def __exit__(self, etype, value, traceback):
os.chdir(self.saved_path)
def write_single_notebook_report(
base_version: str,
notebook_name: str,
status_code: int,
duration: float,
ov_version_before: str,
ov_version_after: str,
job_name: str,
device_used: str,
saving_dir: Path,
) -> Path:
report_file = saving_dir / notebook_name.replace(".ipynb", ".json")
report = {
"version": base_version,
"notebook_name": notebook_name.replace("test_", ""),
"status": status_code,
"duration": duration,
"ov_version_before": ov_version_before,
"ov_version_after": ov_version_after,
"job_name": job_name,
"device_used": device_used,
}
with report_file.open("w") as f:
json.dump(report, f)
return report_file
def main():
failed_notebooks = []
timeout_notebooks = []
args = parse_arguments()
reports_dir = Path(args.report_dir)
reports_dir.mkdir(exist_ok=True, parents=True)
notebooks_moving_dir = args.move_notebooks_dir
root = ROOT
if notebooks_moving_dir is not None:
notebooks_moving_dir = Path(notebooks_moving_dir)
root = notebooks_moving_dir.parent
move_notebooks(notebooks_moving_dir)
keep_artifacts = False
if args.keep_artifacts:
keep_artifacts = True
base_version = get_openvino_version()
test_plan = prepare_test_plan(args.test_list, args.ignore_list, notebooks_moving_dir)
for notebook, report in test_plan.items():
if report["status"] == NotebookStatus.SKIPPED:
continue
test_result = run_test(report["path"], root, args.timeout, keep_artifacts, reports_dir.absolute())
timing = 0
if not test_result:
print(f'Testing notebooks "{str(notebook)}" is not found.')
report["status"] = NotebookStatus.EMPTY
report["duration"] = timing
else:
patched_notebook, status_code, duration, ov_version_before, ov_version_after = test_result
if status_code:
if status_code == -42:
status = NotebookStatus.TIMEOUT
timeout_notebooks.append(patched_notebook)
else:
status = NotebookStatus.FAILED
failed_notebooks.append(patched_notebook)
report["status"] = status
else:
report["status"] = NotebookStatus.SUCCESS if not report["status"] in [NotebookStatus.TIMEOUT, NotebookStatus.FAILED] else report["status"]
timing += duration
report["duration"] = timing
if args.collect_reports:
job_name = args.job_name or "Unknown"
device_used = args.device_used or "Unknown"
report_path = write_single_notebook_report(
base_version, patched_notebook, status_code, duration, ov_version_before, ov_version_after, job_name, device_used, reports_dir
)
if args.upload_to_db:
cmd = [sys.executable, args.upload_to_db, report_path]
print(f"\nUploading {report_path} to database. CMD: {cmd}")
try:
dbprocess = subprocess.Popen(
cmd, shell=(platform.system() == "Windows"), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True
)
for line in dbprocess.stdout:
sys.stdout.write(line)
sys.stdout.flush()
except subprocess.CalledProcessError as e:
print(e.output)
if args.early_stop:
break
exit_status = finalize_status(failed_notebooks, timeout_notebooks, test_plan, reports_dir, root)
return exit_status
if __name__ == "__main__":
exit_code = main()
sys.exit(exit_code)