|
|
|
|
|
import functools |
|
import getpass |
|
import math |
|
import os.path as osp |
|
from datetime import datetime |
|
from typing import Any, Dict, List, Optional |
|
|
|
import mmengine |
|
import tabulate |
|
from mmengine import ConfigDict |
|
|
|
from opencompass.utils import (LarkReporter, dataset_abbr_from_cfg, |
|
get_infer_output_path, get_logger, |
|
model_abbr_from_cfg) |
|
from opencompass.utils.prompt import get_prompt_hash |
|
|
|
METRIC_WHITELIST = ['score', 'auc_score', 'accuracy', 'humaneval_pass@1', 'rouge1', 'avg_toxicity_score', 'bleurt_diff', 'matthews_correlation', 'truth', 'f1', 'exact_match'] |
|
METRIC_BLACKLIST = ['bp', 'sys_len', 'ref_len'] |
|
|
|
def model_abbr_from_cfg_used_in_summarizer(model): |
|
if model.get('summarizer_abbr', None): |
|
return model['summarizer_abbr'] |
|
else: |
|
return model_abbr_from_cfg(model) |
|
|
|
|
|
class DefaultSummarizer: |
|
"""Default summarizer in OpenCompass. |
|
|
|
Args: |
|
config (ConfigDict): The configuration object of the evaluation task. It's expected to be filled out at runtime. |
|
dataset_abbrs (list[str], optional): Dataset abbreviations to be listed in the summary. |
|
summary_groups (list): The dataset groups whose results need to be averaged out. For example, mmlu. Each item it a dict with |
|
'name' (str) and 'subsets' (list of dataset abbrs), and optionally |
|
'weights' if weighted average is needed. |
|
prompt_db: A deprecated field. |
|
""" |
|
|
|
def __init__(self, config: ConfigDict, dataset_abbrs: Optional[List[str]] = None, summary_groups: List = [], prompt_db = None) -> None: |
|
self.tasks = [] |
|
self.cfg = config |
|
self.logger = get_logger() |
|
self.summary_groups = summary_groups |
|
self.dataset_abbrs = dataset_abbrs |
|
if prompt_db: |
|
self.logger.warning('prompt_db is deprecated and no longer used. ' |
|
'Please remove it from your config.') |
|
|
|
|
|
self.lark_reporter = None |
|
if self.cfg.get('lark_bot_url', None): |
|
self.lark_reporter = LarkReporter(self.cfg['lark_bot_url']) |
|
|
|
self.model_cfgs = self.cfg['models'] |
|
self.dataset_cfgs = self.cfg['datasets'] |
|
self.work_dir = self.cfg['work_dir'] |
|
model_abbrs = [] |
|
for model in self.model_cfgs: |
|
model_abbr = model_abbr_from_cfg_used_in_summarizer(model) |
|
if model_abbr in model_abbrs: |
|
continue |
|
model_abbrs.append(model_abbr) |
|
self.model_abbrs = model_abbrs |
|
|
|
def _pick_up_results(self): |
|
"""The function reads the numerical results of evaluations from the |
|
output folder based on the configuration file, and ultimately returns |
|
four dictionaries, each containing processed information in different |
|
formats. The contents of the four dictionaries are as follows: |
|
|
|
- raw_results: contains the raw results of each model on each dataset (excluding details). |
|
- parsed_results: contains the results of each model on each dataset for each metric, with metrics in METRIC_BLACKLIST being ignored. |
|
- dataset_metrics: contains the list of metrics for each dataset, consistent with the metrics in parsed_results. The list is ordered according to the METRIC_WHITELIST, |
|
with metrics appearing earlier considered more important. |
|
- dataset_eval_mode: contains the evaluation mode for each dataset. |
|
""" |
|
|
|
raw_results : Dict[str, Dict[str, Any]] = {} |
|
|
|
parsed_results : Dict[str, Dict[str, Dict[str, float]]] = {} |
|
|
|
dataset_metrics : Dict[str, List[str]] = {} |
|
|
|
for model in self.model_cfgs: |
|
model_abbr = model_abbr_from_cfg_used_in_summarizer(model) |
|
parsed_results.setdefault(model_abbr, {}) |
|
raw_results.setdefault(model_abbr, {}) |
|
for dataset in self.dataset_cfgs: |
|
dataset_abbr = dataset_abbr_from_cfg(dataset) |
|
filepath = get_infer_output_path(model, dataset, osp.join(self.work_dir, 'results')) |
|
if not osp.exists(filepath): |
|
continue |
|
result = mmengine.load(filepath) |
|
result.pop('details', None) |
|
raw_results[model_abbr][dataset_abbr] = result |
|
if 'error' in result: |
|
self.logger.debug(f'error in {model_abbr} {dataset_abbr} {result["error"]}') |
|
continue |
|
_rst, _dm = {}, [] |
|
for metric, score in result.items(): |
|
if metric not in METRIC_BLACKLIST and isinstance(score, (int, float)): |
|
_rst[metric] = score |
|
_dm.append(metric) |
|
else: |
|
continue |
|
if len(_rst) == 0: |
|
self.logger.warning(f'unknown result format: {result}, continue') |
|
continue |
|
_dm = sorted(_dm, key=lambda i: METRIC_WHITELIST.index(i) if i in METRIC_WHITELIST else len(METRIC_WHITELIST)) |
|
|
|
if dataset_abbr in dataset_metrics: |
|
assert tuple(dataset_metrics[dataset_abbr]) == tuple(_dm), \ |
|
f'{dataset_abbr} has different metrics: {dataset_metrics[dataset_abbr]} vs {_dm}' |
|
else: |
|
dataset_metrics[dataset_abbr] = _dm |
|
parsed_results[model_abbr][dataset_abbr] = _rst |
|
|
|
|
|
dataset_eval_mode : Dict[str, str] = {} |
|
for dataset in self.dataset_cfgs: |
|
inferencer = dataset.get('infer_cfg', {}).get('inferencer', {}).get('type', '') |
|
inferencer = inferencer if isinstance(inferencer, str) else inferencer.__name__ |
|
dataset_abbr = dataset_abbr_from_cfg(dataset) |
|
if 'GenInferencer' in inferencer: |
|
dataset_eval_mode[dataset_abbr] = 'gen' |
|
elif 'PPLInferencer' in inferencer: |
|
dataset_eval_mode[dataset_abbr] = 'ppl' |
|
elif 'LLInferencer' in inferencer: |
|
dataset_eval_mode[dataset_abbr] = 'll' |
|
else: |
|
dataset_eval_mode[dataset_abbr] = 'unknown' |
|
self.logger.warning(f'unknown inferencer: {inferencer} - {dataset_abbr}') |
|
return raw_results, parsed_results, dataset_metrics, dataset_eval_mode |
|
|
|
def _calculate_group_metrics(self, raw_results, parsed_results, dataset_metrics, dataset_eval_mode): |
|
"""The function calculates the numerical results for each group based |
|
on the configuration in summary_groups, and updates the contents of |
|
each dictionary accordingly.""" |
|
summary_groups = self.summary_groups |
|
for sg in summary_groups: |
|
for model_abbr in self.model_abbrs: |
|
available_metrics, missing_metrics = [], [] |
|
for i in sg['subsets']: |
|
if isinstance(i, (list, tuple)): |
|
if i[0] in parsed_results[model_abbr] and i[1] in parsed_results[model_abbr][i[0]]: |
|
available_metrics.append(i) |
|
else: |
|
missing_metrics.append(i) |
|
else: |
|
if i in parsed_results[model_abbr]: |
|
available_metrics.append(i) |
|
else: |
|
missing_metrics.append(i) |
|
|
|
if len(available_metrics) == 0: |
|
continue |
|
if len(missing_metrics) != 0: |
|
raw_results[model_abbr][sg['name']] = {'error': 'missing metrics: {}'.format(missing_metrics)} |
|
continue |
|
|
|
if 'metric' in sg: |
|
default_metric = sg['metric'] |
|
need_smart_metric = False |
|
else: |
|
need_smart_metric = True |
|
if sg.get('std', False): |
|
default_metric = 'standard_deviation' |
|
elif sg.get('weights', []): |
|
default_metric = 'weighted_average' |
|
else: |
|
default_metric = 'naive_average' |
|
|
|
scores, eval_modes, group_metrics = {}, [], None |
|
if any(isinstance(dataset_abbr, (list, tuple)) for dataset_abbr in sg['subsets']) and \ |
|
any(isinstance(dataset_abbr, str) for dataset_abbr in sg['subsets']): |
|
raise NotImplementedError('mixed dataset_abbr type is not supported') |
|
|
|
if all(isinstance(dataset_abbr, (list, tuple)) for dataset_abbr in sg['subsets']): |
|
group_metrics = [default_metric] |
|
for dataset_abbr, metric in sg['subsets']: |
|
scores.setdefault(default_metric, {})[dataset_abbr + '@' + metric] = parsed_results[model_abbr][dataset_abbr][metric] |
|
eval_modes.append(dataset_eval_mode.get(dataset_abbr, 'unknown')) |
|
else: |
|
group_metrics = list(functools.reduce(lambda a, b: a & b, [set(dataset_metrics[dataset_abbr]) for dataset_abbr in sg['subsets']])) |
|
if need_smart_metric and len(group_metrics) > 1: |
|
for metric in group_metrics: |
|
for dataset_abbr in sg['subsets']: |
|
scores.setdefault(metric, {})[dataset_abbr + '@' + metric] = parsed_results[model_abbr][dataset_abbr][metric] |
|
eval_modes.append(dataset_eval_mode.get(sg['subsets'][0], 'unknown')) |
|
else: |
|
group_metrics = [default_metric] |
|
for dataset_abbr in sg['subsets']: |
|
metric = dataset_metrics[dataset_abbr][0] |
|
scores.setdefault(default_metric, {})[dataset_abbr + '@' + metric] = parsed_results[model_abbr][dataset_abbr][metric] |
|
eval_modes.append(dataset_eval_mode.get(dataset_abbr, 'unknown')) |
|
|
|
result = {} |
|
for metric in scores: |
|
if default_metric == 'standard_deviation': |
|
avg = sum(scores[metric].values()) / len(scores[metric]) |
|
variance = sum((scores[metric][k] - avg) ** 2 for k in scores[metric]) / len(scores[metric]) |
|
scores[metric] = result[metric] = math.sqrt(variance) |
|
else: |
|
if sg.get('weights', []): |
|
|
|
try: |
|
numerator = sum(scores[metric][k] * sg['weights'][k] for k in sg['weights'] if sg['weights'][k] != 0) |
|
except KeyError: |
|
tmp_scores = {metric: {k.split('@')[0]: v for k, v in scores[metric].items()} for metric in scores} |
|
numerator = sum(tmp_scores[metric][k] * sg['weights'][k] for k in sg['weights'] if sg['weights'][k] != 0) |
|
denominator = sum(sg['weights'].values()) |
|
else: |
|
numerator = sum(scores[metric].values()) |
|
denominator = len(scores[metric]) |
|
scores[metric] = result[metric] = numerator / denominator |
|
eval_modes = list(set(eval_modes)) |
|
eval_mode = eval_modes[0] if len(eval_modes) == 1 else 'mixed' |
|
|
|
|
|
raw_results[model_abbr].setdefault(sg['name'], {}).update(scores) |
|
parsed_results[model_abbr].setdefault(sg['name'], {}).update(result) |
|
dataset_metrics.setdefault(sg['name'], []).extend(group_metrics) |
|
dataset_eval_mode[sg['name']] = eval_mode |
|
|
|
return raw_results, parsed_results, dataset_metrics, dataset_eval_mode |
|
|
|
def _format_table(self, parsed_results, dataset_metrics, dataset_eval_mode): |
|
dataset_abbrs = [dataset_abbr_from_cfg(dataset) for dataset in self.dataset_cfgs] |
|
prompt_version = {dataset_abbr_from_cfg(d): get_prompt_hash(d)[:6] for d in self.dataset_cfgs} |
|
|
|
summarizer_dataset_abbrs = [] |
|
if self.dataset_abbrs is None: |
|
|
|
for dataset_abbr in dataset_abbrs: |
|
if dataset_abbr in dataset_metrics: |
|
for metric in dataset_metrics[dataset_abbr]: |
|
summarizer_dataset_abbrs.append((dataset_abbr, metric)) |
|
else: |
|
summarizer_dataset_abbrs.append((dataset_abbr, None)) |
|
|
|
for dataset_abbr in dataset_metrics: |
|
for metric in dataset_metrics[dataset_abbr]: |
|
if (dataset_abbr, metric) not in summarizer_dataset_abbrs: |
|
summarizer_dataset_abbrs.append((dataset_abbr, metric)) |
|
else: |
|
|
|
for item in self.dataset_abbrs: |
|
if isinstance(item, str): |
|
summarizer_dataset_abbrs.append((item, None)) |
|
elif isinstance(item, (list, tuple)): |
|
summarizer_dataset_abbrs.append((item[0], item[1])) |
|
|
|
table = [] |
|
header = ['dataset', 'version', 'metric', 'mode'] + self.model_abbrs |
|
table.append(header) |
|
for dataset_abbr, metric in summarizer_dataset_abbrs: |
|
if dataset_abbr not in dataset_metrics: |
|
table.append([dataset_abbr, '-', '-', '-'] + ['-'] * len(self.model_abbrs)) |
|
continue |
|
if metric is None: |
|
metric = dataset_metrics[dataset_abbr][0] |
|
elif metric in dataset_metrics[dataset_abbr]: |
|
pass |
|
else: |
|
table.append([dataset_abbr, '-', '-', '-'] + ['-'] * len(self.model_abbrs)) |
|
continue |
|
|
|
row = [dataset_abbr, prompt_version.get(dataset_abbr, '-'), metric, dataset_eval_mode.get(dataset_abbr, '-')] |
|
for model_abbr in self.model_abbrs: |
|
if dataset_abbr in parsed_results[model_abbr]: |
|
row.append('{:.02f}'.format(parsed_results[model_abbr][dataset_abbr][metric])) |
|
else: |
|
row.append('-') |
|
table.append(row) |
|
return table |
|
|
|
def _format_raw_txt(self, raw_results): |
|
raw_dataset_abbrs = [] |
|
for model_abbr in self.model_abbrs: |
|
for dataset_abbr in raw_results[model_abbr]: |
|
if dataset_abbr not in raw_dataset_abbrs: |
|
raw_dataset_abbrs.append(dataset_abbr) |
|
raw_txts = [] |
|
for model_abbr in self.model_abbrs: |
|
raw_txts.append('-------------------------------') |
|
raw_txts.append(f'Model: {model_abbr}') |
|
for dataset_abbr in raw_dataset_abbrs: |
|
result = raw_results[model_abbr].get(dataset_abbr, '{}') |
|
raw_txts.append(f'{dataset_abbr}: {result}') |
|
raw_txts = '\n'.join(raw_txts) |
|
return raw_txts |
|
|
|
def _output_to_file(self, output_path, time_str, table, raw_txts): |
|
|
|
if output_path is None: |
|
output_path = osp.join(self.work_dir, 'summary', f'summary_{time_str}.txt') |
|
output_csv_path = osp.join(self.work_dir, 'summary', f'summary_{time_str}.csv') |
|
else: |
|
output_csv_path = output_path.replace('.txt', '.csv') |
|
|
|
output_dir = osp.split(output_path)[0] |
|
mmengine.mkdir_or_exist(output_dir) |
|
with open(output_path, 'w', encoding='utf-8') as f: |
|
text = f'{time_str}\n' + \ |
|
'tabulate format\n' + \ |
|
'^' * 128 + '\n' + \ |
|
tabulate.tabulate(table, headers='firstrow') + '\n' + \ |
|
'$' * 128 + '\n\n' + \ |
|
'-' * 128 + ' THIS IS A DIVIDER ' + '-' * 128 + '\n\n' + \ |
|
'csv format\n' + \ |
|
'^' * 128 + '\n' + \ |
|
'\n'.join([','.join(row) for row in table]) + '\n' + \ |
|
'$' * 128 + '\n\n' + \ |
|
'-' * 128 + ' THIS IS A DIVIDER ' + '-' * 128 + '\n\n' + \ |
|
'raw format\n' + \ |
|
'^' * 128 + '\n' + \ |
|
raw_txts + '\n' + \ |
|
'$' * 128 + '\n' |
|
f.write(text) |
|
self.logger.info(f'write summary to {osp.abspath(output_path)}') |
|
|
|
with open(output_csv_path, 'w', encoding='utf-8') as f: |
|
f.write('\n'.join([','.join(row) for row in table]) + '\n') |
|
self.logger.info(f'write csv to {osp.abspath(output_csv_path)}') |
|
|
|
def summarize( |
|
self, |
|
output_path: str = None, |
|
time_str: str = datetime.now().strftime('%Y%m%d_%H%M%S')): |
|
|
|
|
|
raw_results, parsed_results, dataset_metrics, dataset_eval_mode = self._pick_up_results() |
|
|
|
|
|
raw_results, parsed_results, dataset_metrics, dataset_eval_mode = \ |
|
self._calculate_group_metrics(raw_results, parsed_results, dataset_metrics, dataset_eval_mode) |
|
|
|
|
|
table = self._format_table(parsed_results, dataset_metrics, dataset_eval_mode) |
|
|
|
|
|
raw_txts = self._format_raw_txt(raw_results) |
|
|
|
|
|
print(tabulate.tabulate(table, headers='firstrow')) |
|
|
|
|
|
self._output_to_file(output_path, time_str, table, raw_txts) |
|
|
|
if self.lark_reporter: |
|
content = f'{getpass.getuser()} 的' |
|
content += f'详细评测汇总已输出至 {osp.abspath(output_path)}' |
|
self.lark_reporter.post(content) |
|
|