File size: 4,454 Bytes
256a159
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
from copy import deepcopy
from typing import Dict, List

from mmengine.config import Config, ConfigDict

from opencompass.registry import PARTITIONERS

from .base import BasePartitioner


@PARTITIONERS.register_module()
class MultimodalNaivePartitioner(BasePartitioner):
    """Multimodal naive task partitioner.

    This partitioner will generate a task for each
    model-dataset-evaluator pair.

    Args:
        config (ConfigDict): The full config dict.
    """

    def partition(self, models: List[ConfigDict], datasets: List[ConfigDict],
                  evaluators: List[ConfigDict], load_froms: List[ConfigDict],
                  work_dir: str, num_gpus: int, num_procs: int,
                  launcher: str) -> List[Dict]:
        """Partition model-dataset pairs into tasks. Each task is defined as a
        dict and will run independently as a unit. Its structure is as follows:

        .. code-block:: python

            {
                'models': [],  # a list of model configs
                'datasets': [],  # a list of dataset configs
                'evaluators': [], # a list of evaluator configs
                'load_froms': [], # a list of load_from paths
                'work_dir': '',  # the work dir
                'num_gpus': int, # integer, number of gpus for each task
                'num_procs': int, # integer, number of gpus on single machine
                'launcher': str, # string, how to launch distributed training
            }

        Args:
            models (List[ConfigDict]): A list of model configs.
            datasets (List[ConfigDict]): A list of dataset configs.
            evaluators (List[ConfigDict]): A list of evaluator configs.
            load_froms (List[ConfigDict]): A list of load_from paths.
            work_dir (str): The work dir for the task.
            num_gpus (int): Number of gpus for each task.
            num_procs (int): Number of gpus on single machine.
            launcher (str): How to launch distributed training.
                Only `slurm`, `pytorch` and `mpi` are available.

        Returns:
            List[Dict]: A list of tasks.
        """

        tasks = []
        for model, dataset, evaluator, load_from in zip(
                models, datasets, evaluators, load_froms):
            task = Config({
                'model': model,
                'dataset': dataset,
                'evaluator': evaluator,
                'load_from': load_from,
                'work_dir': work_dir,
                'num_gpus': num_gpus,
                'num_procs': num_procs,
                'launcher': launcher
            })
            tasks.append(task)

        return tasks

    def __call__(self, cfg: ConfigDict) -> List[Dict]:
        """Generate tasks from config. Each task is defined as a
        dict and will run independently as a unit. Its structure is as
        follows:

        .. code-block:: python

            {
                'models': [],  # a list of model configs
                'datasets': [],  # a list of dataset configs
                'evaluators': [], # a list of evaluator configs
                'load_froms': [], # a list of load_from paths
                'work_dir': '',  # the work dir
                'num_gpus': int, # integer, number of gpus for each task
                'num_procs': int, # integer, number of gpus on single machine
            }

        Args:
            cfg (ConfigDict): The config dict, containing "models", "dataset"
                and "work_dir" keys.

        Returns:
            List[Dict]: A list of tasks.
        """
        cfg = deepcopy(cfg)
        models = cfg['models']
        datasets = cfg['datasets']
        evaluators = cfg['evaluators']
        load_froms = cfg['load_froms']
        work_dir = cfg['work_dir']
        num_gpus = cfg['num_gpus']
        num_procs = cfg['num_procs']
        launcher = cfg['launcher']

        tasks = self.partition(models, datasets, evaluators, load_froms,
                               work_dir, num_gpus, num_procs, launcher)

        self.logger.info(f'Partitioned into {len(tasks)} tasks.')
        for i, task in enumerate(tasks):
            model_name = task['model']['type']
            dataset_name = task['dataset']['dataset']['type']
            evaluator_name = task['evaluator'][0]['type']
            self.logger.debug(
                f'Task {i}: {model_name}-{dataset_name}-{evaluator_name}')

        return tasks