File size: 5,839 Bytes
256a159 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
import copy
import math
import os.path as osp
from typing import Dict, List, Optional
import mmengine
from mmengine.config import Config, ConfigDict
from opencompass.registry import PARTITIONERS
from opencompass.utils import (build_dataset_from_cfg, dataset_abbr_from_cfg,
get_infer_output_path)
from .base import BasePartitioner
@PARTITIONERS.register_module()
class NumWorkerPartitioner(BasePartitioner):
"""Task partitioner based on the pre-defined number of workers.
Args:
out_dir (str): The output directory of tasks.
num_worker (int): The number of workers. default: 8.
min_task_size (int): The minimum size of a task. default: 16.
dataset_size_path (str): The path to the dataset size cache file.
keep_keys (list[str]): The keys to be kept from the experiment config
to the task config.
"""
def __init__(self,
out_dir: str,
num_worker: int = 8,
min_task_size: int = 16,
strategy: str = 'heuristic',
dataset_size_path: str = '.cache/dataset_size.json',
keep_keys: Optional[List[str]] = None):
super().__init__(out_dir=out_dir, keep_keys=keep_keys)
self.num_worker = num_worker
self.min_task_size = min_task_size
self.dataset_size_path = dataset_size_path
assert strategy in ('heuristic', 'split'), \
f'Unsupported partition strategy: {strategy}. '\
'Supported strategies are: `heuristic`, `split` .'
self.strategy = strategy
def partition(self,
model_dataset_combinations: List[Dict[str, List]],
work_dir: str,
out_dir: str,
add_cfg: Dict = {}) -> List[ConfigDict]:
# intentionally avoid any sort here,
# for user's abaility to manipulate the order
tasks = []
for comb in model_dataset_combinations:
for model in comb['models']:
chunks = []
for dataset in comb['datasets']:
filename = get_infer_output_path(model, dataset, out_dir)
# skip the task if the task output exists
if osp.exists(filename):
continue
dataset_size = self.get_size(dataset)
if dataset_size > self.min_task_size:
root, ext = osp.splitext(filename)
dataset_splits = self.split_dataset(dataset)
for i, dataset_split in enumerate(dataset_splits):
if not osp.exists(f'{root}_{i}{ext}'):
chunks.append(dataset_split)
else:
chunks.append(dataset)
if self.strategy == 'heuristic':
buckets = [[] for _ in range(self.num_worker)]
for i, chunk in enumerate(chunks):
buckets[i % self.num_worker].append(chunk)
for bucket in buckets:
if len(bucket) > 0:
tasks.append(
Config({
'models': [model],
'datasets': [bucket],
'work_dir': work_dir,
**add_cfg
}))
elif self.strategy == 'split':
for dataset in chunks:
tasks.append(
Config({
'models': [model],
'datasets': [[dataset]],
'work_dir': work_dir,
**add_cfg
}))
return tasks
@property
def dataset_size(self):
if not hasattr(self, '_dataset_size'):
if osp.exists(self.dataset_size_path):
self._dataset_size = mmengine.load(self.dataset_size_path)
else:
self._dataset_size = {}
return self._dataset_size
def split_dataset(self, dataset_cfg: ConfigDict) -> List[ConfigDict]:
"""Split dataset into several parts."""
dataset_size = self.get_size(dataset_cfg)
split_configs = []
abbr = dataset_abbr_from_cfg(dataset_cfg)
# evenly distribute the task
num_split = self.num_worker
step = max(math.ceil(dataset_size / num_split), self.min_task_size)
for part, i in enumerate(range(0, dataset_size, step)):
cfg = copy.deepcopy(dataset_cfg)
cfg['abbr'] = abbr + f'_{part}'
test_range = cfg['reader_cfg'].get('test_range', '')
cfg['reader_cfg']['test_range'] = f'{test_range}[{i}:{i+step}]'
split_configs.append(cfg)
return split_configs
def get_size(self, dataset: ConfigDict) -> int:
dataset_abbr = dataset_abbr_from_cfg(dataset)
test_range = dataset.reader_cfg.get('test_range', '')
if dataset_abbr in self.dataset_size:
actual_size = eval('len(range(self.dataset_size[dataset_abbr])'
f'{test_range})')
return actual_size
dataset = build_dataset_from_cfg(dataset)
self.dataset_size[dataset_abbr] = len(dataset.test)
mmengine.mkdir_or_exist('.cache/')
mmengine.dump(self.dataset_size,
self.dataset_size_path,
indent=4,
ensure_ascii=False)
actual_size = eval('len(range(self.dataset_size[dataset_abbr])'
f'{test_range})')
return actual_size
|