File size: 2,908 Bytes
256a159
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import getpass
from abc import abstractmethod
from typing import Any, Dict, List, Tuple

from mmengine.config import Config, ConfigDict

from opencompass.utils import LarkReporter, get_logger


class BaseRunner:
    """Base class for all runners. A runner is responsible for launching
    multiple tasks.

    Args:
        task (ConfigDict): Task type config.
        debug (bool): Whether to run in debug mode.
        lark_bot_url (str): Lark bot url.
    """

    def __init__(self,
                 task: ConfigDict,
                 debug: bool = False,
                 lark_bot_url: str = None):
        self.task_cfg = Config(task)
        self.debug = debug
        if lark_bot_url:
            self.lark_reporter = LarkReporter(lark_bot_url)
        else:
            self.lark_reporter = None

    def __call__(self, tasks: List[Dict[str, Any]]):
        """Launch multiple tasks and summarize the results.

        Args:
            tasks (list[dict]): A list of task configs, usually generated by
                Partitioner.
        """
        status = self.launch(tasks)
        self.summarize(status)

    @abstractmethod
    def launch(self, tasks: List[Dict[str, Any]]) -> List[Tuple[str, int]]:
        """Launch multiple tasks.

        Args:
            tasks (list[dict]): A list of task configs, usually generated by
                Partitioner.

        Returns:
            list[tuple[str, int]]: A list of (task name, exit code).
        """

    def summarize(self, status: List[Tuple[str, int]]) -> None:
        """Summarize the results of the tasks.

        Args:
            status (list[tuple[str, int]]): A list of (task name, exit code).
        """

        failed_logs = []
        for _task, code in status:
            if code != 0:
                get_logger().error(f'{_task} failed with code {code}')
                failed_logs.append(_task)
        if self.lark_reporter:
            num_succeeded = len(status) - len(failed_logs)
            if len(failed_logs) > 0:
                content = f'{getpass.getuser()} \'s '
                content += f'{self.task_cfg.type} tasks finished. '
                content += f'{num_succeeded} tasks succeeded, '
                content += f'{len(failed_logs)} tasks failed. Failed tasks are'
                content += ':\n' + '\n'.join(failed_logs)
                self.lark_reporter.post(title=f'Bad news: {len(failed_logs)} '
                                        'failed.',
                                        content=content)
            else:
                content = f'{getpass.getuser()}\'s '
                content += f'{self.task_cfg.type} tasks finished. '
                content += f'{num_succeeded} tasks succeeded.'
                self.lark_reporter.post(title='Great news: all tasks '
                                        'finished!',
                                        content=content)