|
import copy |
|
import math |
|
import os.path as osp |
|
from fnmatch import fnmatch |
|
from typing import Dict, List, Optional, Tuple, Union |
|
|
|
import mmengine |
|
from mmengine.config import Config, ConfigDict |
|
|
|
from opencompass.registry import PARTITIONERS |
|
from opencompass.utils import (build_dataset_from_cfg, dataset_abbr_from_cfg, |
|
get_infer_output_path) |
|
|
|
from .base import BasePartitioner |
|
|
|
|
|
@PARTITIONERS.register_module() |
|
class SizePartitioner(BasePartitioner): |
|
"""Task partitioner based on the size of the dataset (with some rough |
|
expansion as an estimation of computational cost). |
|
|
|
Args: |
|
out_dir (str): The output directory of tasks. |
|
max_task_size (int): The maximum size of a task. |
|
gen_task_coef (int): The dataset cost measurement coefficient for |
|
generation tasks. |
|
strategy (str): The partition strategy. Supported strategies are: |
|
'heuristic' and 'split'. Defaults to 'heuristic'. |
|
heuristic: split large datasets into several tasks, merge small |
|
datasets into one task. |
|
split: split large datasets into several tasks only. |
|
dataset_size_path (str): The path to the dataset size cache file. |
|
keep_keys (list[str]): The keys to be kept from the experiment config |
|
to the task config. |
|
""" |
|
|
|
def __init__(self, |
|
out_dir: str, |
|
max_task_size: int = 40000, |
|
gen_task_coef: int = 20, |
|
strategy: str = 'heuristic', |
|
dataset_size_path: str = '.cache/dataset_size.json', |
|
keep_keys: Optional[List[str]] = None): |
|
super().__init__(out_dir=out_dir, keep_keys=keep_keys) |
|
self.max_task_size = max_task_size |
|
self.gen_task_coef = gen_task_coef |
|
self.dataset_size_path = dataset_size_path |
|
assert strategy in ('heuristic', 'split'), \ |
|
f'Unsupported partition strategy: {strategy}. '\ |
|
'Supported strategies are: `heuristic`, `split` .' |
|
self.strategy = strategy |
|
|
|
def partition(self, |
|
model_dataset_combinations: List[Dict[str, |
|
List[ConfigDict]]], |
|
work_dir: str, |
|
out_dir: str, |
|
add_cfg: Dict = {}) -> List[ConfigDict]: |
|
"""Partition model-dataset pairs into tasks. Each task is defined as a |
|
dict and will run independently as a unit. Its structure is as |
|
follows: |
|
|
|
.. code-block:: python |
|
|
|
{ |
|
'models': [], # a list of model configs |
|
'datasets': [[]], # a nested list of dataset configs, each |
|
list corresponds to a model |
|
'work_dir': '', # the work dir |
|
**add_cfg # other keys to be kept in the config |
|
} |
|
|
|
Args: |
|
model_dataset_combinations (List[Dict]): List of |
|
`{models: [...], datasets: [...]}` dicts. Each dict contains |
|
a list of model configs and a list of dataset configs. |
|
work_dir (str): The work dir for the task. |
|
out_dir (str): The full output path for the task, intended for |
|
Partitioners to check whether the task is finished via the |
|
existency of result file in this directory. |
|
add_cfg (dict): Other common keys to be added in the task config, |
|
used to share the same config among tasks. Defaults to {}. |
|
|
|
Returns: |
|
List[ConfigDict]: A list of tasks. |
|
""" |
|
|
|
tasks = [] |
|
for comb in model_dataset_combinations: |
|
comb['datasets'] = sorted(comb['datasets'], |
|
key=lambda x: self.get_cost(x), |
|
reverse=True) |
|
for model in comb['models']: |
|
chunks = [] |
|
for dataset in comb['datasets']: |
|
filename = get_infer_output_path(model, dataset, out_dir) |
|
|
|
if osp.exists(filename): |
|
continue |
|
dataset_size = self.get_cost(dataset) |
|
if dataset_size > self.max_task_size: |
|
root, ext = osp.splitext(filename) |
|
dataset_splits = self.split_dataset(dataset) |
|
for i, dataset_split in enumerate(dataset_splits): |
|
if not osp.exists(f'{root}_{i}{ext}'): |
|
chunks.append( |
|
(self.max_task_size, dataset_split)) |
|
else: |
|
chunks.append((dataset_size, dataset)) |
|
|
|
if self.strategy == 'heuristic': |
|
chunks = sorted(chunks, key=lambda x: x[0], reverse=True) |
|
current_size, current_chunks = 0, [] |
|
for index in range(len(chunks)): |
|
current_size += chunks[index][0] |
|
current_chunks.append(chunks[index][1]) |
|
if index == len(chunks) - 1 or current_size + chunks[ |
|
index + 1][0] > self.max_task_size: |
|
tasks.append( |
|
Config({ |
|
'models': [model], |
|
'datasets': [current_chunks], |
|
'work_dir': work_dir, |
|
**add_cfg |
|
})) |
|
current_size, current_chunks = 0, [] |
|
elif self.strategy == 'split': |
|
for _, dataset in chunks: |
|
tasks.append( |
|
Config({ |
|
'models': [model], |
|
'datasets': [[dataset]], |
|
'work_dir': work_dir, |
|
**add_cfg |
|
})) |
|
return tasks |
|
|
|
@property |
|
def dataset_size(self): |
|
if not hasattr(self, '_dataset_size'): |
|
if osp.exists(self.dataset_size_path): |
|
self._dataset_size = mmengine.load(self.dataset_size_path) |
|
else: |
|
self._dataset_size = {} |
|
return self._dataset_size |
|
|
|
def split_dataset(self, dataset_cfg: ConfigDict) -> List[ConfigDict]: |
|
"""Split dataset into several parts.""" |
|
dataset_size, num_repeats = self.get_cost(dataset_cfg, |
|
get_raw_factors=True) |
|
split_configs = [] |
|
abbr = dataset_abbr_from_cfg(dataset_cfg) |
|
step = self.max_task_size // num_repeats |
|
|
|
step = math.ceil(dataset_size / math.ceil(dataset_size / step)) |
|
for part, i in enumerate(range(0, dataset_size, step)): |
|
cfg = copy.deepcopy(dataset_cfg) |
|
cfg['abbr'] = abbr + f'_{part}' |
|
test_range = cfg['reader_cfg'].get('test_range', '') |
|
cfg['reader_cfg']['test_range'] = f'{test_range}[{i}:{i+step}]' |
|
split_configs.append(cfg) |
|
return split_configs |
|
|
|
def get_factor(self, dataset: ConfigDict) -> int: |
|
infer_cfg = dataset.infer_cfg |
|
template = (infer_cfg.prompt_template.template if 'prompt_template' |
|
in infer_cfg else infer_cfg.ice_template.template) |
|
|
|
|
|
factor = self.gen_task_coef |
|
|
|
|
|
if isinstance(template, dict): |
|
ctr = sum(key in template for key in ('begin', 'round', 'end')) |
|
if ctr != len(template.keys()): |
|
factor = len(template.keys()) |
|
|
|
dataset_abbr = dataset_abbr_from_cfg(dataset) |
|
if any( |
|
fnmatch(dataset_abbr, pattern) |
|
for pattern in ('bbh*', 'gsm8k*', 'math*', 'strategyqa*', |
|
'agieval-jec*', 'agieval-gaokao-mathcloze', |
|
'agieval-math', '*professional_law')): |
|
factor *= 10 |
|
|
|
return factor |
|
|
|
def get_cost(self, |
|
dataset: ConfigDict, |
|
get_raw_factors: bool = False) -> Union[int, Tuple[int, int]]: |
|
"""Get the computational cost of inferring on the dataset. |
|
|
|
Args: |
|
dataset (ConfigDict): The dataset config. |
|
get_raw_factors (bool): If True, the raw factors of computational |
|
cost will be returned. |
|
|
|
Returns: |
|
int or Tuple[int, int]: The size of the dataset. If get_raw_factors |
|
is True, the number of repeats will also be returned. |
|
""" |
|
dataset_abbr = dataset_abbr_from_cfg(dataset) |
|
|
|
test_range = dataset.reader_cfg.get('test_range', '') |
|
factor = self.get_factor(dataset) |
|
|
|
if dataset_abbr in self.dataset_size: |
|
actual_size = eval('len(range(self.dataset_size[dataset_abbr])' |
|
f'{test_range})') |
|
if get_raw_factors: |
|
return actual_size, factor |
|
return factor * actual_size |
|
|
|
dataset = build_dataset_from_cfg(dataset) |
|
self.dataset_size[dataset_abbr] = len(dataset.test) |
|
|
|
mmengine.mkdir_or_exist('.cache/') |
|
mmengine.dump(self.dataset_size, |
|
self.dataset_size_path, |
|
indent=4, |
|
ensure_ascii=False) |
|
|
|
actual_size = eval('len(range(self.dataset_size[dataset_abbr])' |
|
f'{test_range})') |
|
if get_raw_factors: |
|
return actual_size, factor |
|
return factor * actual_size |
|
|