File size: 9,970 Bytes
256a159
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
import copy
import math
import os.path as osp
from fnmatch import fnmatch
from typing import Dict, List, Optional, Tuple, Union

import mmengine
from mmengine.config import Config, ConfigDict

from opencompass.registry import PARTITIONERS
from opencompass.utils import (build_dataset_from_cfg, dataset_abbr_from_cfg,
                               get_infer_output_path)

from .base import BasePartitioner


@PARTITIONERS.register_module()
class SizePartitioner(BasePartitioner):
    """Task partitioner based on the size of the dataset (with some rough
    expansion as an estimation of computational cost).

    Args:
        out_dir (str): The output directory of tasks.
        max_task_size (int): The maximum size of a task.
        gen_task_coef (int): The dataset cost measurement coefficient for
            generation tasks.
        strategy (str): The partition strategy. Supported strategies are:
            'heuristic' and 'split'. Defaults to 'heuristic'.
            heuristic: split large datasets into several tasks, merge small
                datasets into one task.
            split: split large datasets into several tasks only.
        dataset_size_path (str): The path to the dataset size cache file.
        keep_keys (list[str]): The keys to be kept from the experiment config
            to the task config.
    """

    def __init__(self,
                 out_dir: str,
                 max_task_size: int = 40000,
                 gen_task_coef: int = 20,
                 strategy: str = 'heuristic',
                 dataset_size_path: str = '.cache/dataset_size.json',
                 keep_keys: Optional[List[str]] = None):
        super().__init__(out_dir=out_dir, keep_keys=keep_keys)
        self.max_task_size = max_task_size
        self.gen_task_coef = gen_task_coef
        self.dataset_size_path = dataset_size_path
        assert strategy in ('heuristic', 'split'), \
            f'Unsupported partition strategy: {strategy}. '\
            'Supported strategies are: `heuristic`, `split` .'
        self.strategy = strategy

    def partition(self,
                  model_dataset_combinations: List[Dict[str,
                                                        List[ConfigDict]]],
                  work_dir: str,
                  out_dir: str,
                  add_cfg: Dict = {}) -> List[ConfigDict]:
        """Partition model-dataset pairs into tasks. Each task is defined as a
        dict and will run independently as a unit. Its structure is as
        follows:

        .. code-block:: python

            {
                'models': [],  # a list of model configs
                'datasets': [[]],  # a nested list of dataset configs, each
                                    list corresponds to a model
                'work_dir': '',  # the work dir
                **add_cfg  # other keys to be kept in the config
            }

        Args:
            model_dataset_combinations (List[Dict]): List of
                `{models: [...], datasets: [...]}` dicts. Each dict contains
                a list of model configs and a list of dataset configs.
            work_dir (str): The work dir for the task.
            out_dir (str): The full output path for the task, intended for
                Partitioners to check whether the task is finished via the
                existency of result file in this directory.
            add_cfg (dict): Other common keys to be added in the task config,
                used to share the same config among tasks. Defaults to {}.

        Returns:
            List[ConfigDict]: A list of tasks.
        """

        tasks = []
        for comb in model_dataset_combinations:
            comb['datasets'] = sorted(comb['datasets'],
                                      key=lambda x: self.get_cost(x),
                                      reverse=True)
            for model in comb['models']:
                chunks = []  # elements: tuple(size, dataset_chunk)
                for dataset in comb['datasets']:
                    filename = get_infer_output_path(model, dataset, out_dir)
                    # skip the task if the task output exists
                    if osp.exists(filename):
                        continue
                    dataset_size = self.get_cost(dataset)
                    if dataset_size > self.max_task_size:
                        root, ext = osp.splitext(filename)
                        dataset_splits = self.split_dataset(dataset)
                        for i, dataset_split in enumerate(dataset_splits):
                            if not osp.exists(f'{root}_{i}{ext}'):
                                chunks.append(
                                    (self.max_task_size, dataset_split))
                    else:
                        chunks.append((dataset_size, dataset))

                if self.strategy == 'heuristic':
                    chunks = sorted(chunks, key=lambda x: x[0], reverse=True)
                    current_size, current_chunks = 0, []
                    for index in range(len(chunks)):
                        current_size += chunks[index][0]
                        current_chunks.append(chunks[index][1])
                        if index == len(chunks) - 1 or current_size + chunks[
                                index + 1][0] > self.max_task_size:
                            tasks.append(
                                Config({
                                    'models': [model],
                                    'datasets': [current_chunks],
                                    'work_dir': work_dir,
                                    **add_cfg
                                }))
                            current_size, current_chunks = 0, []
                elif self.strategy == 'split':
                    for _, dataset in chunks:
                        tasks.append(
                            Config({
                                'models': [model],
                                'datasets': [[dataset]],
                                'work_dir': work_dir,
                                **add_cfg
                            }))
        return tasks

    @property
    def dataset_size(self):
        if not hasattr(self, '_dataset_size'):
            if osp.exists(self.dataset_size_path):
                self._dataset_size = mmengine.load(self.dataset_size_path)
            else:
                self._dataset_size = {}
        return self._dataset_size

    def split_dataset(self, dataset_cfg: ConfigDict) -> List[ConfigDict]:
        """Split dataset into several parts."""
        dataset_size, num_repeats = self.get_cost(dataset_cfg,
                                                  get_raw_factors=True)
        split_configs = []
        abbr = dataset_abbr_from_cfg(dataset_cfg)
        step = self.max_task_size // num_repeats
        # evenly distribute the task
        step = math.ceil(dataset_size / math.ceil(dataset_size / step))
        for part, i in enumerate(range(0, dataset_size, step)):
            cfg = copy.deepcopy(dataset_cfg)
            cfg['abbr'] = abbr + f'_{part}'
            test_range = cfg['reader_cfg'].get('test_range', '')
            cfg['reader_cfg']['test_range'] = f'{test_range}[{i}:{i+step}]'
            split_configs.append(cfg)
        return split_configs

    def get_factor(self, dataset: ConfigDict) -> int:
        infer_cfg = dataset.infer_cfg
        template = (infer_cfg.prompt_template.template if 'prompt_template'
                    in infer_cfg else infer_cfg.ice_template.template)
        # If it's the Gen template, the dataset size will be multiplied by the
        # self.gen_task_coef
        factor = self.gen_task_coef
        # If it's the PPL template, the dataset size will be multiplied by the
        # number of labels
        if isinstance(template, dict):
            ctr = sum(key in template for key in ('begin', 'round', 'end'))
            if ctr != len(template.keys()):
                factor = len(template.keys())

        dataset_abbr = dataset_abbr_from_cfg(dataset)
        if any(
                fnmatch(dataset_abbr, pattern)
                for pattern in ('bbh*', 'gsm8k*', 'math*', 'strategyqa*',
                                'agieval-jec*', 'agieval-gaokao-mathcloze',
                                'agieval-math', '*professional_law')):
            factor *= 10

        return factor

    def get_cost(self,
                 dataset: ConfigDict,
                 get_raw_factors: bool = False) -> Union[int, Tuple[int, int]]:
        """Get the computational cost of inferring on the dataset.

        Args:
            dataset (ConfigDict): The dataset config.
            get_raw_factors (bool): If True, the raw factors of computational
                cost will be returned.

        Returns:
            int or Tuple[int, int]: The size of the dataset. If get_raw_factors
                is True, the number of repeats will also be returned.
        """
        dataset_abbr = dataset_abbr_from_cfg(dataset)

        test_range = dataset.reader_cfg.get('test_range', '')
        factor = self.get_factor(dataset)

        if dataset_abbr in self.dataset_size:
            actual_size = eval('len(range(self.dataset_size[dataset_abbr])'
                               f'{test_range})')
            if get_raw_factors:
                return actual_size, factor
            return factor * actual_size

        dataset = build_dataset_from_cfg(dataset)
        self.dataset_size[dataset_abbr] = len(dataset.test)

        mmengine.mkdir_or_exist('.cache/')
        mmengine.dump(self.dataset_size,
                      self.dataset_size_path,
                      indent=4,
                      ensure_ascii=False)

        actual_size = eval('len(range(self.dataset_size[dataset_abbr])'
                           f'{test_range})')
        if get_raw_factors:
            return actual_size, factor
        return factor * actual_size