|
"""Utilities and tools for tracking runs with Weights & Biases.""" |
|
|
|
import logging |
|
import os |
|
import sys |
|
from contextlib import contextmanager |
|
from pathlib import Path |
|
|
|
import yaml |
|
from tqdm import tqdm |
|
|
|
FILE = Path(__file__).absolute() |
|
sys.path.append(FILE.parents[3].as_posix()) |
|
|
|
from utils.datasets import LoadImagesAndLabels |
|
from utils.datasets import img2label_paths |
|
from utils.general import check_dataset, check_file |
|
|
|
try: |
|
import wandb |
|
|
|
assert hasattr(wandb, '__version__') |
|
except (ImportError, AssertionError): |
|
wandb = None |
|
|
|
RANK = int(os.getenv('RANK', -1)) |
|
WANDB_ARTIFACT_PREFIX = 'wandb-artifact://' |
|
|
|
|
|
def remove_prefix(from_string, prefix=WANDB_ARTIFACT_PREFIX): |
|
return from_string[len(prefix):] |
|
|
|
|
|
def check_wandb_config_file(data_config_file): |
|
wandb_config = '_wandb.'.join(data_config_file.rsplit('.', 1)) |
|
if Path(wandb_config).is_file(): |
|
return wandb_config |
|
return data_config_file |
|
|
|
|
|
def check_wandb_dataset(data_file): |
|
is_wandb_artifact = False |
|
if check_file(data_file) and data_file.endswith('.yaml'): |
|
with open(data_file, errors='ignore') as f: |
|
data_dict = yaml.safe_load(f) |
|
is_wandb_artifact = (data_dict['train'].startswith(WANDB_ARTIFACT_PREFIX) or |
|
data_dict['val'].startswith(WANDB_ARTIFACT_PREFIX)) |
|
if is_wandb_artifact: |
|
return data_dict |
|
else: |
|
return check_dataset(data_file) |
|
|
|
|
|
def get_run_info(run_path): |
|
run_path = Path(remove_prefix(run_path, WANDB_ARTIFACT_PREFIX)) |
|
run_id = run_path.stem |
|
project = run_path.parent.stem |
|
entity = run_path.parent.parent.stem |
|
model_artifact_name = 'run_' + run_id + '_model' |
|
return entity, project, run_id, model_artifact_name |
|
|
|
|
|
def check_wandb_resume(opt): |
|
process_wandb_config_ddp_mode(opt) if RANK not in [-1, 0] else None |
|
if isinstance(opt.resume, str): |
|
if opt.resume.startswith(WANDB_ARTIFACT_PREFIX): |
|
if RANK not in [-1, 0]: |
|
entity, project, run_id, model_artifact_name = get_run_info(opt.resume) |
|
api = wandb.Api() |
|
artifact = api.artifact(entity + '/' + project + '/' + model_artifact_name + ':latest') |
|
modeldir = artifact.download() |
|
opt.weights = str(Path(modeldir) / "last.pt") |
|
return True |
|
return None |
|
|
|
|
|
def process_wandb_config_ddp_mode(opt): |
|
with open(check_file(opt.data), errors='ignore') as f: |
|
data_dict = yaml.safe_load(f) |
|
train_dir, val_dir = None, None |
|
if isinstance(data_dict['train'], str) and data_dict['train'].startswith(WANDB_ARTIFACT_PREFIX): |
|
api = wandb.Api() |
|
train_artifact = api.artifact(remove_prefix(data_dict['train']) + ':' + opt.artifact_alias) |
|
train_dir = train_artifact.download() |
|
train_path = Path(train_dir) / 'data/images/' |
|
data_dict['train'] = str(train_path) |
|
|
|
if isinstance(data_dict['val'], str) and data_dict['val'].startswith(WANDB_ARTIFACT_PREFIX): |
|
api = wandb.Api() |
|
val_artifact = api.artifact(remove_prefix(data_dict['val']) + ':' + opt.artifact_alias) |
|
val_dir = val_artifact.download() |
|
val_path = Path(val_dir) / 'data/images/' |
|
data_dict['val'] = str(val_path) |
|
if train_dir or val_dir: |
|
ddp_data_path = str(Path(val_dir) / 'wandb_local_data.yaml') |
|
with open(ddp_data_path, 'w') as f: |
|
yaml.safe_dump(data_dict, f) |
|
opt.data = ddp_data_path |
|
|
|
|
|
class WandbLogger(): |
|
"""Log training runs, datasets, models, and predictions to Weights & Biases. |
|
|
|
This logger sends information to W&B at wandb.ai. By default, this information |
|
includes hyperparameters, system configuration and metrics, model metrics, |
|
and basic data metrics and analyses. |
|
|
|
By providing additional command line arguments to train.py, datasets, |
|
models and predictions can also be logged. |
|
|
|
For more on how this logger is used, see the Weights & Biases documentation: |
|
https://docs.wandb.com/guides/integrations/yolov5 |
|
""" |
|
|
|
def __init__(self, opt, run_id, job_type='Training'): |
|
""" |
|
- Initialize WandbLogger instance |
|
- Upload dataset if opt.upload_dataset is True |
|
- Setup trainig processes if job_type is 'Training' |
|
|
|
arguments: |
|
opt (namespace) -- Commandline arguments for this run |
|
run_id (str) -- Run ID of W&B run to be resumed |
|
job_type (str) -- To set the job_type for this run |
|
|
|
""" |
|
|
|
self.job_type = job_type |
|
self.wandb, self.wandb_run = wandb, None if not wandb else wandb.run |
|
self.val_artifact, self.train_artifact = None, None |
|
self.train_artifact_path, self.val_artifact_path = None, None |
|
self.result_artifact = None |
|
self.val_table, self.result_table = None, None |
|
self.bbox_media_panel_images = [] |
|
self.val_table_path_map = None |
|
self.max_imgs_to_log = 16 |
|
self.wandb_artifact_data_dict = None |
|
self.data_dict = None |
|
|
|
if isinstance(opt.resume, str): |
|
if opt.resume.startswith(WANDB_ARTIFACT_PREFIX): |
|
entity, project, run_id, model_artifact_name = get_run_info(opt.resume) |
|
model_artifact_name = WANDB_ARTIFACT_PREFIX + model_artifact_name |
|
assert wandb, 'install wandb to resume wandb runs' |
|
|
|
self.wandb_run = wandb.init(id=run_id, |
|
project=project, |
|
entity=entity, |
|
resume='allow', |
|
allow_val_change=True) |
|
opt.resume = model_artifact_name |
|
elif self.wandb: |
|
self.wandb_run = wandb.init(config=opt, |
|
resume="allow", |
|
project='YOLOv5' if opt.project == 'runs/train' else Path(opt.project).stem, |
|
entity=opt.entity, |
|
name=opt.name if opt.name != 'exp' else None, |
|
job_type=job_type, |
|
id=run_id, |
|
allow_val_change=True) if not wandb.run else wandb.run |
|
if self.wandb_run: |
|
if self.job_type == 'Training': |
|
if opt.upload_dataset: |
|
if not opt.resume: |
|
self.wandb_artifact_data_dict = self.check_and_upload_dataset(opt) |
|
|
|
if opt.resume: |
|
|
|
if isinstance(opt.resume, str) and opt.resume.startswith(WANDB_ARTIFACT_PREFIX): |
|
self.data_dict = dict(self.wandb_run.config.data_dict) |
|
else: |
|
self.data_dict = check_wandb_dataset(opt.data) |
|
else: |
|
self.data_dict = check_wandb_dataset(opt.data) |
|
self.wandb_artifact_data_dict = self.wandb_artifact_data_dict or self.data_dict |
|
|
|
|
|
self.wandb_run.config.update({'data_dict': self.wandb_artifact_data_dict}, |
|
allow_val_change=True) |
|
self.setup_training(opt) |
|
|
|
if self.job_type == 'Dataset Creation': |
|
self.data_dict = self.check_and_upload_dataset(opt) |
|
|
|
def check_and_upload_dataset(self, opt): |
|
""" |
|
Check if the dataset format is compatible and upload it as W&B artifact |
|
|
|
arguments: |
|
opt (namespace)-- Commandline arguments for current run |
|
|
|
returns: |
|
Updated dataset info dictionary where local dataset paths are replaced by WAND_ARFACT_PREFIX links. |
|
""" |
|
assert wandb, 'Install wandb to upload dataset' |
|
config_path = self.log_dataset_artifact(opt.data, |
|
opt.single_cls, |
|
'YOLOv5' if opt.project == 'runs/train' else Path(opt.project).stem) |
|
print("Created dataset config file ", config_path) |
|
with open(config_path, errors='ignore') as f: |
|
wandb_data_dict = yaml.safe_load(f) |
|
return wandb_data_dict |
|
|
|
def setup_training(self, opt): |
|
""" |
|
Setup the necessary processes for training YOLO models: |
|
- Attempt to download model checkpoint and dataset artifacts if opt.resume stats with WANDB_ARTIFACT_PREFIX |
|
- Update data_dict, to contain info of previous run if resumed and the paths of dataset artifact if downloaded |
|
- Setup log_dict, initialize bbox_interval |
|
|
|
arguments: |
|
opt (namespace) -- commandline arguments for this run |
|
|
|
""" |
|
self.log_dict, self.current_epoch = {}, 0 |
|
self.bbox_interval = opt.bbox_interval |
|
if isinstance(opt.resume, str): |
|
modeldir, _ = self.download_model_artifact(opt) |
|
if modeldir: |
|
self.weights = Path(modeldir) / "last.pt" |
|
config = self.wandb_run.config |
|
opt.weights, opt.save_period, opt.batch_size, opt.bbox_interval, opt.epochs, opt.hyp = str( |
|
self.weights), config.save_period, config.batch_size, config.bbox_interval, config.epochs, \ |
|
config.hyp |
|
data_dict = self.data_dict |
|
if self.val_artifact is None: |
|
self.train_artifact_path, self.train_artifact = self.download_dataset_artifact(data_dict.get('train'), |
|
opt.artifact_alias) |
|
self.val_artifact_path, self.val_artifact = self.download_dataset_artifact(data_dict.get('val'), |
|
opt.artifact_alias) |
|
|
|
if self.train_artifact_path is not None: |
|
train_path = Path(self.train_artifact_path) / 'data/images/' |
|
data_dict['train'] = str(train_path) |
|
if self.val_artifact_path is not None: |
|
val_path = Path(self.val_artifact_path) / 'data/images/' |
|
data_dict['val'] = str(val_path) |
|
|
|
if self.val_artifact is not None: |
|
self.result_artifact = wandb.Artifact("run_" + wandb.run.id + "_progress", "evaluation") |
|
self.result_table = wandb.Table(["epoch", "id", "ground truth", "prediction", "avg_confidence"]) |
|
self.val_table = self.val_artifact.get("val") |
|
if self.val_table_path_map is None: |
|
self.map_val_table_path() |
|
if opt.bbox_interval == -1: |
|
self.bbox_interval = opt.bbox_interval = (opt.epochs // 10) if opt.epochs > 10 else 1 |
|
train_from_artifact = self.train_artifact_path is not None and self.val_artifact_path is not None |
|
|
|
if train_from_artifact: |
|
self.data_dict = data_dict |
|
|
|
def download_dataset_artifact(self, path, alias): |
|
""" |
|
download the model checkpoint artifact if the path starts with WANDB_ARTIFACT_PREFIX |
|
|
|
arguments: |
|
path -- path of the dataset to be used for training |
|
alias (str)-- alias of the artifact to be download/used for training |
|
|
|
returns: |
|
(str, wandb.Artifact) -- path of the downladed dataset and it's corresponding artifact object if dataset |
|
is found otherwise returns (None, None) |
|
""" |
|
if isinstance(path, str) and path.startswith(WANDB_ARTIFACT_PREFIX): |
|
artifact_path = Path(remove_prefix(path, WANDB_ARTIFACT_PREFIX) + ":" + alias) |
|
dataset_artifact = wandb.use_artifact(artifact_path.as_posix().replace("\\", "/")) |
|
assert dataset_artifact is not None, "'Error: W&B dataset artifact doesn\'t exist'" |
|
datadir = dataset_artifact.download() |
|
return datadir, dataset_artifact |
|
return None, None |
|
|
|
def download_model_artifact(self, opt): |
|
""" |
|
download the model checkpoint artifact if the resume path starts with WANDB_ARTIFACT_PREFIX |
|
|
|
arguments: |
|
opt (namespace) -- Commandline arguments for this run |
|
""" |
|
if opt.resume.startswith(WANDB_ARTIFACT_PREFIX): |
|
model_artifact = wandb.use_artifact(remove_prefix(opt.resume, WANDB_ARTIFACT_PREFIX) + ":latest") |
|
assert model_artifact is not None, 'Error: W&B model artifact doesn\'t exist' |
|
modeldir = model_artifact.download() |
|
epochs_trained = model_artifact.metadata.get('epochs_trained') |
|
total_epochs = model_artifact.metadata.get('total_epochs') |
|
is_finished = total_epochs is None |
|
assert not is_finished, 'training is finished, can only resume incomplete runs.' |
|
return modeldir, model_artifact |
|
return None, None |
|
|
|
def log_model(self, path, opt, epoch, fitness_score, best_model=False): |
|
""" |
|
Log the model checkpoint as W&B artifact |
|
|
|
arguments: |
|
path (Path) -- Path of directory containing the checkpoints |
|
opt (namespace) -- Command line arguments for this run |
|
epoch (int) -- Current epoch number |
|
fitness_score (float) -- fitness score for current epoch |
|
best_model (boolean) -- Boolean representing if the current checkpoint is the best yet. |
|
""" |
|
model_artifact = wandb.Artifact('run_' + wandb.run.id + '_model', type='model', metadata={ |
|
'original_url': str(path), |
|
'epochs_trained': epoch + 1, |
|
'save period': opt.save_period, |
|
'project': opt.project, |
|
'total_epochs': opt.epochs, |
|
'fitness_score': fitness_score |
|
}) |
|
model_artifact.add_file(str(path / 'last.pt'), name='last.pt') |
|
wandb.log_artifact(model_artifact, |
|
aliases=['latest', 'last', 'epoch ' + str(self.current_epoch), 'best' if best_model else '']) |
|
print("Saving model artifact on epoch ", epoch + 1) |
|
|
|
def log_dataset_artifact(self, data_file, single_cls, project, overwrite_config=False): |
|
""" |
|
Log the dataset as W&B artifact and return the new data file with W&B links |
|
|
|
arguments: |
|
data_file (str) -- the .yaml file with information about the dataset like - path, classes etc. |
|
single_class (boolean) -- train multi-class data as single-class |
|
project (str) -- project name. Used to construct the artifact path |
|
overwrite_config (boolean) -- overwrites the data.yaml file if set to true otherwise creates a new |
|
file with _wandb postfix. Eg -> data_wandb.yaml |
|
|
|
returns: |
|
the new .yaml file with artifact links. it can be used to start training directly from artifacts |
|
""" |
|
self.data_dict = check_dataset(data_file) |
|
data = dict(self.data_dict) |
|
nc, names = (1, ['item']) if single_cls else (int(data['nc']), data['names']) |
|
names = {k: v for k, v in enumerate(names)} |
|
self.train_artifact = self.create_dataset_table(LoadImagesAndLabels( |
|
data['train'], rect=True, batch_size=1), names, name='train') if data.get('train') else None |
|
self.val_artifact = self.create_dataset_table(LoadImagesAndLabels( |
|
data['val'], rect=True, batch_size=1), names, name='val') if data.get('val') else None |
|
if data.get('train'): |
|
data['train'] = WANDB_ARTIFACT_PREFIX + str(Path(project) / 'train') |
|
if data.get('val'): |
|
data['val'] = WANDB_ARTIFACT_PREFIX + str(Path(project) / 'val') |
|
path = Path(data_file).stem |
|
path = (path if overwrite_config else path + '_wandb') + '.yaml' |
|
data.pop('download', None) |
|
data.pop('path', None) |
|
with open(path, 'w') as f: |
|
yaml.safe_dump(data, f) |
|
|
|
if self.job_type == 'Training': |
|
self.wandb_run.use_artifact(self.val_artifact) |
|
self.wandb_run.use_artifact(self.train_artifact) |
|
self.val_artifact.wait() |
|
self.val_table = self.val_artifact.get('val') |
|
self.map_val_table_path() |
|
else: |
|
self.wandb_run.log_artifact(self.train_artifact) |
|
self.wandb_run.log_artifact(self.val_artifact) |
|
return path |
|
|
|
def map_val_table_path(self): |
|
""" |
|
Map the validation dataset Table like name of file -> it's id in the W&B Table. |
|
Useful for - referencing artifacts for evaluation. |
|
""" |
|
self.val_table_path_map = {} |
|
print("Mapping dataset") |
|
for i, data in enumerate(tqdm(self.val_table.data)): |
|
self.val_table_path_map[data[3]] = data[0] |
|
|
|
def create_dataset_table(self, dataset, class_to_id, name='dataset'): |
|
""" |
|
Create and return W&B artifact containing W&B Table of the dataset. |
|
|
|
arguments: |
|
dataset (LoadImagesAndLabels) -- instance of LoadImagesAndLabels class used to iterate over the data to build Table |
|
class_to_id (dict(int, str)) -- hash map that maps class ids to labels |
|
name (str) -- name of the artifact |
|
|
|
returns: |
|
dataset artifact to be logged or used |
|
""" |
|
|
|
artifact = wandb.Artifact(name=name, type="dataset") |
|
img_files = tqdm([dataset.path]) if isinstance(dataset.path, str) and Path(dataset.path).is_dir() else None |
|
img_files = tqdm(dataset.img_files) if not img_files else img_files |
|
for img_file in img_files: |
|
if Path(img_file).is_dir(): |
|
artifact.add_dir(img_file, name='data/images') |
|
labels_path = 'labels'.join(dataset.path.rsplit('images', 1)) |
|
artifact.add_dir(labels_path, name='data/labels') |
|
else: |
|
artifact.add_file(img_file, name='data/images/' + Path(img_file).name) |
|
label_file = Path(img2label_paths([img_file])[0]) |
|
artifact.add_file(str(label_file), |
|
name='data/labels/' + label_file.name) if label_file.exists() else None |
|
table = wandb.Table(columns=["id", "train_image", "Classes", "name"]) |
|
class_set = wandb.Classes([{'id': id, 'name': name} for id, name in class_to_id.items()]) |
|
for si, (img, labels, paths, shapes) in enumerate(tqdm(dataset)): |
|
box_data, img_classes = [], {} |
|
for cls, *xywh in labels[:, 1:].tolist(): |
|
cls = int(cls) |
|
box_data.append({"position": {"middle": [xywh[0], xywh[1]], "width": xywh[2], "height": xywh[3]}, |
|
"class_id": cls, |
|
"box_caption": "%s" % (class_to_id[cls])}) |
|
img_classes[cls] = class_to_id[cls] |
|
boxes = {"ground_truth": {"box_data": box_data, "class_labels": class_to_id}} |
|
table.add_data(si, wandb.Image(paths, classes=class_set, boxes=boxes), list(img_classes.values()), |
|
Path(paths).name) |
|
artifact.add(table, name) |
|
return artifact |
|
|
|
def log_training_progress(self, predn, path, names): |
|
""" |
|
Build evaluation Table. Uses reference from validation dataset table. |
|
|
|
arguments: |
|
predn (list): list of predictions in the native space in the format - [xmin, ymin, xmax, ymax, confidence, class] |
|
path (str): local path of the current evaluation image |
|
names (dict(int, str)): hash map that maps class ids to labels |
|
""" |
|
class_set = wandb.Classes([{'id': id, 'name': name} for id, name in names.items()]) |
|
box_data = [] |
|
total_conf = 0 |
|
for *xyxy, conf, cls in predn.tolist(): |
|
if conf >= 0.25: |
|
box_data.append( |
|
{"position": {"minX": xyxy[0], "minY": xyxy[1], "maxX": xyxy[2], "maxY": xyxy[3]}, |
|
"class_id": int(cls), |
|
"box_caption": "%s %.3f" % (names[cls], conf), |
|
"scores": {"class_score": conf}, |
|
"domain": "pixel"}) |
|
total_conf = total_conf + conf |
|
boxes = {"predictions": {"box_data": box_data, "class_labels": names}} |
|
id = self.val_table_path_map[Path(path).name] |
|
self.result_table.add_data(self.current_epoch, |
|
id, |
|
self.val_table.data[id][1], |
|
wandb.Image(self.val_table.data[id][1], boxes=boxes, classes=class_set), |
|
total_conf / max(1, len(box_data)) |
|
) |
|
|
|
def val_one_image(self, pred, predn, path, names, im): |
|
""" |
|
Log validation data for one image. updates the result Table if validation dataset is uploaded and log bbox media panel |
|
|
|
arguments: |
|
pred (list): list of scaled predictions in the format - [xmin, ymin, xmax, ymax, confidence, class] |
|
predn (list): list of predictions in the native space - [xmin, ymin, xmax, ymax, confidence, class] |
|
path (str): local path of the current evaluation image |
|
""" |
|
if self.val_table and self.result_table: |
|
self.log_training_progress(predn, path, names) |
|
|
|
if len(self.bbox_media_panel_images) < self.max_imgs_to_log and self.current_epoch > 0: |
|
if self.current_epoch % self.bbox_interval == 0: |
|
box_data = [{"position": {"minX": xyxy[0], "minY": xyxy[1], "maxX": xyxy[2], "maxY": xyxy[3]}, |
|
"class_id": int(cls), |
|
"box_caption": "%s %.3f" % (names[cls], conf), |
|
"scores": {"class_score": conf}, |
|
"domain": "pixel"} for *xyxy, conf, cls in pred.tolist()] |
|
boxes = {"predictions": {"box_data": box_data, "class_labels": names}} |
|
self.bbox_media_panel_images.append(wandb.Image(im, boxes=boxes, caption=path.name)) |
|
|
|
def log(self, log_dict): |
|
""" |
|
save the metrics to the logging dictionary |
|
|
|
arguments: |
|
log_dict (Dict) -- metrics/media to be logged in current step |
|
""" |
|
if self.wandb_run: |
|
for key, value in log_dict.items(): |
|
self.log_dict[key] = value |
|
|
|
def end_epoch(self, best_result=False): |
|
""" |
|
commit the log_dict, model artifacts and Tables to W&B and flush the log_dict. |
|
|
|
arguments: |
|
best_result (boolean): Boolean representing if the result of this evaluation is best or not |
|
""" |
|
if self.wandb_run: |
|
with all_logging_disabled(): |
|
if self.bbox_media_panel_images: |
|
self.log_dict["Bounding Box Debugger/Images"] = self.bbox_media_panel_images |
|
wandb.log(self.log_dict) |
|
self.log_dict = {} |
|
self.bbox_media_panel_images = [] |
|
if self.result_artifact: |
|
self.result_artifact.add(self.result_table, 'result') |
|
wandb.log_artifact(self.result_artifact, aliases=['latest', 'last', 'epoch ' + str(self.current_epoch), |
|
('best' if best_result else '')]) |
|
|
|
wandb.log({"evaluation": self.result_table}) |
|
self.result_table = wandb.Table(["epoch", "id", "ground truth", "prediction", "avg_confidence"]) |
|
self.result_artifact = wandb.Artifact("run_" + wandb.run.id + "_progress", "evaluation") |
|
|
|
def finish_run(self): |
|
""" |
|
Log metrics if any and finish the current W&B run |
|
""" |
|
if self.wandb_run: |
|
if self.log_dict: |
|
with all_logging_disabled(): |
|
wandb.log(self.log_dict) |
|
wandb.run.finish() |
|
|
|
|
|
@contextmanager |
|
def all_logging_disabled(highest_level=logging.CRITICAL): |
|
""" source - https://gist.github.com/simon-weber/7853144 |
|
A context manager that will prevent any logging messages triggered during the body from being processed. |
|
:param highest_level: the maximum logging level in use. |
|
This would only need to be changed if a custom level greater than CRITICAL is defined. |
|
""" |
|
previous_level = logging.root.manager.disable |
|
logging.disable(highest_level) |
|
try: |
|
yield |
|
finally: |
|
logging.disable(previous_level) |
|
|