File size: 2,908 Bytes
256a159 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
import getpass
from abc import abstractmethod
from typing import Any, Dict, List, Tuple
from mmengine.config import Config, ConfigDict
from opencompass.utils import LarkReporter, get_logger
class BaseRunner:
"""Base class for all runners. A runner is responsible for launching
multiple tasks.
Args:
task (ConfigDict): Task type config.
debug (bool): Whether to run in debug mode.
lark_bot_url (str): Lark bot url.
"""
def __init__(self,
task: ConfigDict,
debug: bool = False,
lark_bot_url: str = None):
self.task_cfg = Config(task)
self.debug = debug
if lark_bot_url:
self.lark_reporter = LarkReporter(lark_bot_url)
else:
self.lark_reporter = None
def __call__(self, tasks: List[Dict[str, Any]]):
"""Launch multiple tasks and summarize the results.
Args:
tasks (list[dict]): A list of task configs, usually generated by
Partitioner.
"""
status = self.launch(tasks)
self.summarize(status)
@abstractmethod
def launch(self, tasks: List[Dict[str, Any]]) -> List[Tuple[str, int]]:
"""Launch multiple tasks.
Args:
tasks (list[dict]): A list of task configs, usually generated by
Partitioner.
Returns:
list[tuple[str, int]]: A list of (task name, exit code).
"""
def summarize(self, status: List[Tuple[str, int]]) -> None:
"""Summarize the results of the tasks.
Args:
status (list[tuple[str, int]]): A list of (task name, exit code).
"""
failed_logs = []
for _task, code in status:
if code != 0:
get_logger().error(f'{_task} failed with code {code}')
failed_logs.append(_task)
if self.lark_reporter:
num_succeeded = len(status) - len(failed_logs)
if len(failed_logs) > 0:
content = f'{getpass.getuser()} \'s '
content += f'{self.task_cfg.type} tasks finished. '
content += f'{num_succeeded} tasks succeeded, '
content += f'{len(failed_logs)} tasks failed. Failed tasks are'
content += ':\n' + '\n'.join(failed_logs)
self.lark_reporter.post(title=f'Bad news: {len(failed_logs)} '
'failed.',
content=content)
else:
content = f'{getpass.getuser()}\'s '
content += f'{self.task_cfg.type} tasks finished. '
content += f'{num_succeeded} tasks succeeded.'
self.lark_reporter.post(title='Great news: all tasks '
'finished!',
content=content)
|