# Copyright 2020 The HuggingFace Datasets Authors and the current dataset script contributor. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """TODO: Add a description here.""" import concurrent.futures import math import statistics from typing import List, Optional, Union import datasets import evaluate import numpy as np # TODO: Add BibTeX citation _CITATION = """\ @InProceedings{huggingface:module, title = {A great new module}, authors={huggingface, Inc.}, year={2020} } """ # TODO: Add description of the module here _DESCRIPTION = """\ This new module is designed to solve this great ML task and is crafted with a lot of care. """ # TODO: Add description of the arguments of the module here _KWARGS_DESCRIPTION = """ Calculates how good are predictions given some references, using certain scores Args: predictions: list of generated time series. shape: (num_generation, num_timesteps, num_features) references: list of reference shape: (num_reference, num_timesteps, num_features) Returns: Examples: Examples should be written in doctest format, and should illustrate how to use the function. >>> my_new_module = evaluate.load("bowdbeg/matching_series") >>> results = my_new_module.compute(references=[[[0.0, 1.0]]], predictions=[[[0.0, 1.0]]]) >>> print(results) {'matchin': 1.0} """ @evaluate.utils.file_utils.add_start_docstrings(_DESCRIPTION, _KWARGS_DESCRIPTION) class matching_series(evaluate.Metric): """TODO: Short description of my evaluation module.""" def _info(self): # TODO: Specifies the evaluate.EvaluationModuleInfo object return evaluate.MetricInfo( # This is the description that will appear on the modules page. module_type="metric", description=_DESCRIPTION, citation=_CITATION, inputs_description=_KWARGS_DESCRIPTION, # This defines the format of each prediction and reference features=datasets.Features( { "predictions": datasets.Sequence(datasets.Sequence(datasets.Value("float"))), "references": datasets.Sequence(datasets.Sequence(datasets.Value("float"))), } ), # Homepage of the module for documentation homepage="https://huggingface.co/spaces/bowdbeg/matching_series", # Additional links to the codebase or references codebase_urls=["http://github.com/path/to/codebase/of/new_module"], reference_urls=["http://path.to.reference.url/new_module"], ) def _download_and_prepare(self, dl_manager): """Optional: download external resources useful to compute the scores""" pass def compute(self, *, predictions=None, references=None, **kwargs) -> Optional[dict]: """Compute the evaluation module. Usage of positional arguments is not allowed to prevent mistakes. Args: predictions (`list/array/tensor`, *optional*): Predictions. references (`list/array/tensor`, *optional*): References. **kwargs (optional): Keyword arguments that will be forwarded to the evaluation module [`~evaluate.EvaluationModule.compute`] method (see details in the docstring). Return: `dict` or `None` - Dictionary with the results if this evaluation module is run on the main process (`process_id == 0`). - `None` if the evaluation module is not run on the main process (`process_id != 0`). ```py >>> import evaluate >>> accuracy = evaluate.load("accuracy") >>> accuracy.compute(predictions=[0, 1, 1, 0], references=[0, 1, 0, 1]) ``` """ all_kwargs = {"predictions": predictions, "references": references, **kwargs} if predictions is None and references is None: missing_kwargs = {k: None for k in self._feature_names() if k not in all_kwargs} all_kwargs.update(missing_kwargs) else: missing_inputs = [k for k in self._feature_names() if k not in all_kwargs] if missing_inputs: raise ValueError( f"Evaluation module inputs are missing: {missing_inputs}. All required inputs are {list(self._feature_names())}" ) inputs = {input_name: all_kwargs[input_name] for input_name in self._feature_names()} compute_kwargs = {k: kwargs[k] for k in kwargs if k not in self._feature_names()} return self._compute(**inputs, **compute_kwargs) def _compute( self, predictions: Union[List, np.ndarray], references: Union[List, np.ndarray], batch_size: Optional[int] = None, cuc_n_calculation: int = 3, cuc_n_samples: Union[List[int], str] = "auto", metric: str = "mse", num_process: int = 1, return_distance: bool = False, return_matching: bool = False, return_each_features: bool = False, return_coverages: bool = False, return_all: bool = False, dtype=np.float32, ): """ Compute the scores of the module given the predictions and references Args: predictions: list of generated time series. shape: (num_generation, num_timesteps, num_features) references: list of reference shape: (num_reference, num_timesteps, num_features) batch_size: batch size to use for the computation. If None, the whole dataset is processed at once. cuc_n_calculation: number of Coverage Under Curve calculate times cuc_n_samples: number of samples to use for Coverage Under Curve calculation. If "auto", it uses the number of samples of the predictions. Returns: """ if return_all: return_distance = True return_matching = True return_each_features = True return_coverages = True predictions = np.array(predictions).astype(dtype) references = np.array(references).astype(dtype) if predictions.shape[1:] != references.shape[1:]: raise ValueError( "The number of features in the predictions and references should be the same. predictions: {}, references: {}".format( predictions.shape[1:], references.shape[1:] ) ) # at first, convert the inputs to numpy arrays # distance between predictions and references for all example combinations for each features # shape: (num_generation, num_reference, num_features) if batch_size is not None: if num_process > 1: distance = np.zeros((len(predictions), len(references), predictions.shape[-1]), dtype=dtype) idxs = [ (i, j) for i in range(0, len(predictions) + batch_size, batch_size) for j in range(0, len(references) + batch_size, batch_size) ] args = [ (predictions[i : i + batch_size, None], references[None, j : j + batch_size], metric, -2) for i, j in idxs ] with concurrent.futures.ProcessPoolExecutor(max_workers=num_process) as executor: results = executor.map( self._compute_metric, *zip(*args), ) for (i, j), d in zip(idxs, results): distance[i : i + batch_size, j : j + batch_size] = d else: distance = np.zeros((len(predictions), len(references), predictions.shape[-1]), dtype=dtype) # iterate over the predictions and references in batches for i in range(0, len(predictions) + batch_size, batch_size): for j in range(0, len(references) + batch_size, batch_size): d = self._compute_metric( predictions[i : i + batch_size, None], references[None, j : j + batch_size], metric=metric, axis=-2, ) distance[i : i + batch_size, j : j + batch_size] = d else: distance = self._compute_metric(predictions[:, None], references, metric=metric, axis=1) index_distance = distance.diagonal(axis1=0, axis2=1).mean() # matching scores distance_mean = distance.mean(axis=-1) # best match for each generated time series # shape: (num_generation,) best_match = np.argmin(distance_mean, axis=-1) # matching distance # shape: (num_generation,) precision_distance = distance_mean[np.arange(len(best_match)), best_match].mean() # best match for each reference time series # shape: (num_reference,) best_match_inv = np.argmin(distance_mean, axis=0) recall_distance = distance_mean[best_match_inv, np.arange(len(best_match_inv))].mean() f1_distance = 2 / (1 / precision_distance + 1 / recall_distance) mean_distance = (precision_distance + recall_distance) / 2 # matching precision, recall and f1 matching_recall = np.unique(best_match).size / len(best_match_inv) matching_precision = np.unique(best_match_inv).size / len(best_match) matching_f1 = 2 / (1 / matching_precision + 1 / matching_recall) # take matching for each feature and compute metrics for them precision_distance_features = [] recall_distance_features = [] f1_distance_features = [] mean_distance_features = [] matching_precision_features = [] matching_recall_features = [] matching_f1_features = [] index_distance_features = [] coverages_features = [] cuc_features = [] for f in range(predictions.shape[-1]): distance_f = distance[:, :, f] index_distance_f = distance_f.diagonal(axis1=0, axis2=1).mean() best_match_f = np.argmin(distance_f, axis=-1) precision_distance_f = distance_f[np.arange(len(best_match_f)), best_match_f].mean() best_match_inv_f = np.argmin(distance_f, axis=0) recall_distance_f = distance_f[best_match_inv_f, np.arange(len(best_match_inv_f))].mean() f1_distance_f = 2 / (1 / precision_distance_f + 1 / recall_distance_f) mean_distance_f = (precision_distance_f + recall_distance_f) / 2 precision_distance_features.append(precision_distance_f) recall_distance_features.append(recall_distance_f) f1_distance_features.append(f1_distance_f) index_distance_features.append(index_distance_f) mean_distance_features.append(mean_distance_f) matching_recall_f = np.unique(best_match_f).size / len(best_match_f) matching_precision_f = np.unique(best_match_inv_f).size / len(best_match_inv_f) matching_f1_f = 2 / (1 / matching_precision_f + 1 / matching_recall_f) matching_precision_features.append(matching_precision_f) matching_recall_features.append(matching_recall_f) matching_f1_features.append(matching_f1_f) coverages_f, cuc_f = self.compute_cuc(best_match_f, len(references), cuc_n_calculation, cuc_n_samples) coverages_features.append(coverages_f) cuc_features.append(cuc_f) macro_precision_distance = statistics.mean(precision_distance_features) macro_recall_distance = statistics.mean(recall_distance_features) macro_f1_distance = statistics.mean(f1_distance_features) macro_mean_distance = statistics.mean(mean_distance_features) macro_index_distance = statistics.mean(index_distance_features) macro_matching_precision = statistics.mean(matching_precision_features) macro_matching_recall = statistics.mean(matching_recall_features) macro_matching_f1 = statistics.mean(matching_f1_features) # cuc coverages, cuc = self.compute_cuc(best_match, len(references), cuc_n_calculation, cuc_n_samples) macro_cuc = statistics.mean(cuc_features) macro_coverages = [statistics.mean(c) for c in zip(*coverages_features)] out = { "precision_distance": precision_distance, "f1_distance": f1_distance, "recall_distance": recall_distance, "mean_distance": mean_distance, "index_distance": index_distance, "macro_precision_distance": macro_precision_distance, "macro_recall_distance": macro_recall_distance, "macro_f1_distance": macro_f1_distance, "macro_mean_distance": macro_mean_distance, "macro_index_distance": macro_index_distance, "matching_precision": matching_precision, "matching_recall": matching_recall, "matching_f1": matching_f1, "macro_matching_precision": macro_matching_precision, "macro_matching_recall": macro_matching_recall, "macro_matching_f1": macro_matching_f1, "cuc": cuc, "macro_cuc": macro_cuc, } if return_distance: out["distance"] = distance if return_matching: out["match"] = best_match out["match_inv"] = best_match_inv if return_each_features: if return_distance: out["distance_features"] = distance_mean out.update( { "precision_distance_features": precision_distance_features, "f1_distance_features": f1_distance_features, "recall_distance_features": recall_distance_features, "index_distance_features": index_distance_features, "matching_precision_features": matching_precision_features, "matching_recall_features": matching_recall_features, "matching_f1_features": matching_f1_features, "cuc_features": cuc_features, "coverages_features": coverages_features, } ) if return_coverages: out["coverages"] = coverages out["macro_coverages"] = macro_coverages return out def compute_cuc( self, match: np.ndarray, n_reference: int, n_calculation: int, n_samples: Union[List[int], str], ): """ Compute Coverage Under Curve Args: match: best match for each generated time series n_reference: number of reference time series n_calculation: number of Coverage Under Curve calculate times n_samples: number of samples to use for Coverage Under Curve calculation. If "auto", it uses the number of samples of the predictions. Returns: """ n_generaiton = len(match) if n_samples == "auto": exp = int(math.log2(n_generaiton)) n_samples = [int(2**i) for i in range(exp)] n_samples.append(n_generaiton) assert isinstance(n_samples, list) and all(isinstance(n, int) for n in n_samples) coverages = [] for n_sample in n_samples: coverage = 0 for _ in range(n_calculation): sample = np.random.choice(match, size=n_sample, replace=False) # type: ignore coverage += len(np.unique(sample)) / n_reference coverages.append(coverage / n_calculation) cuc = np.trapz(coverages, n_samples) / len(n_samples) / max(n_samples) return coverages, cuc @staticmethod def _compute_metric(x, y, metric: str = "mse", axis: int = -1): if metric.lower() == "mse": return np.mean((x - y) ** 2, axis=axis) elif metric.lower() == "mae": return np.mean(np.abs(x - y), axis=axis) elif metric.lower() == "rmse": return np.sqrt(np.mean((x - y) ** 2, axis=axis)) else: raise ValueError("Unknown metric: {}".format(metric))