File size: 10,926 Bytes
256a159 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 |
import datetime
import json
import os
import os.path as osp
import random
import re
import subprocess
import sys
import time
from functools import partial
from typing import Any, Dict, List, Optional, Tuple
import mmengine
from mmengine.config import ConfigDict
from mmengine.utils import track_parallel_progress
from opencompass.registry import RUNNERS, TASKS
from opencompass.utils import get_logger
from .base import BaseRunner
@RUNNERS.register_module()
class DLCRunner(BaseRunner):
"""Distributed runner based on Alibaba Cloud Deep Learning Cluster (DLC).
It will launch multiple tasks in parallel with 'dlc' command. Please
install and configure DLC first before using this runner.
Args:
task (ConfigDict): Task type config.
aliyun_cfg (ConfigDict): Alibaba Cloud config.
max_num_workers (int): Max number of workers. Default: 32.
retry (int): Number of retries when job failed. Default: 2.
debug (bool): Whether to run in debug mode. Default: False.
lark_bot_url (str): Lark bot url. Default: None.
"""
def __init__(self,
task: ConfigDict,
aliyun_cfg: ConfigDict,
max_num_workers: int = 32,
eval_with_gpu: list = ['plugin_eval'],
retry: int = 2,
debug: bool = False,
lark_bot_url: str = None):
super().__init__(task=task, debug=debug, lark_bot_url=lark_bot_url)
self.aliyun_cfg = aliyun_cfg
self.max_num_workers = max_num_workers
self.retry = retry
self.eval_with_gpu = eval_with_gpu
logger = get_logger()
logger.warning(
'To ensure the integrity of the log results, the log displayed '
f'by {self.__class__.__name__} has a 10-second delay.')
def launch(self, tasks: List[Dict[str, Any]]) -> List[Tuple[str, int]]:
"""Launch multiple tasks.
Args:
tasks (list[dict]): A list of task configs, usually generated by
Partitioner.
Returns:
list[tuple[str, int]]: A list of (task name, exit code).
"""
if not self.debug:
status = track_parallel_progress(self._launch,
tasks,
nproc=self.max_num_workers,
keep_order=False)
else:
status = [self._launch(task, random_sleep=False) for task in tasks]
return status
def _launch(self, cfg: ConfigDict, random_sleep: Optional[bool] = None):
"""Launch a single task.
Args:
cfg (ConfigDict): Task config.
random_sleep (bool): Whether to sleep for a random time before
running the command. When Aliyun has many tasks to schedule,
its stability decreases. Therefore, when we need to submit a
large number of tasks at once, we adopt the "random_sleep"
strategy. Tasks that would have been submitted all at once are
now evenly spread out over a 10-second period. Default: None.
Returns:
tuple[str, int]: Task name and exit code.
"""
if random_sleep is None:
random_sleep = (self.max_num_workers > 32)
task = TASKS.build(dict(cfg=cfg, type=self.task_cfg['type']))
num_gpus = task.num_gpus
task_name = task.name
is_eval_task = 'OpenICLEval' in task_name
if is_eval_task and num_gpus == 0:
for check_name in self.eval_with_gpu:
if check_name in task_name:
num_gpus = 1
break
# Dump task config to file
mmengine.mkdir_or_exist('tmp/')
param_file = f'tmp/{os.getpid()}_params.py'
pwd = os.getcwd()
try:
cfg.dump(param_file)
if self.aliyun_cfg.get('bashrc_path') is not None:
# using user's conda env
bashrc_path = self.aliyun_cfg['bashrc_path']
assert osp.exists(bashrc_path)
assert self.aliyun_cfg.get('conda_env_name') is not None
conda_env_name = self.aliyun_cfg['conda_env_name']
shell_cmd = (f'source {bashrc_path}; '
f'conda activate {conda_env_name}; ')
else:
# using public conda env
# users can also set `python_env_path` to their
# own env python path
assert self.aliyun_cfg.get('python_env_path') is not None
shell_cmd = (
f'export PATH={self.aliyun_cfg["python_env_path"]}/bin:$PATH; ' # noqa: E501
f'export PYTHONPATH={pwd}:$PYTHONPATH; ')
huggingface_cache = self.aliyun_cfg.get('huggingface_cache')
if huggingface_cache is not None:
# HUGGINGFACE_HUB_CACHE is a Legacy env variable, here we set
# `HF_HUB_CACHE` and `HUGGINGFACE_HUB_CACHE` for bc
shell_cmd += f'export HF_HUB_CACHE={huggingface_cache}; '
shell_cmd += f'export HUGGINGFACE_HUB_CACHE={huggingface_cache}; ' # noqa: E501
torch_cache = self.aliyun_cfg.get('torch_cache')
if torch_cache is not None:
shell_cmd += f'export TORCH_HOME={torch_cache}; '
hf_offline = self.aliyun_cfg.get('hf_offline', True)
if hf_offline:
shell_cmd += 'export HF_DATASETS_OFFLINE=1; export TRANSFORMERS_OFFLINE=1; export HF_EVALUATE_OFFLINE=1; ' # noqa: E501
http_proxy = self.aliyun_cfg.get('http_proxy')
if http_proxy is not None:
shell_cmd += f'export http_proxy={http_proxy}; export https_proxy={http_proxy}; ' # noqa: E501
shell_cmd += f'export HTTP_PROXY={http_proxy}; export HTTPS_PROXY={http_proxy}; ' # noqa: E501
hf_endpoint = self.aliyun_cfg.get('hf_endpoint')
if hf_endpoint is not None:
shell_cmd += f'export HF_ENDPOINT={hf_endpoint}; '
shell_cmd += f'cd {pwd}; '
shell_cmd += '{task_cmd}'
tmpl = ('dlc create job'
f" --command '{shell_cmd}'"
f' --name {task_name[:512]}'
' --kind BatchJob'
f" -c {self.aliyun_cfg['dlc_config_path']}"
f" --workspace_id {self.aliyun_cfg['workspace_id']}"
' --worker_count 1'
f' --worker_cpu {max(num_gpus * 8, 32)}'
f' --worker_gpu {num_gpus}'
f' --worker_memory {max(num_gpus * 128, 256)}'
f" --worker_image {self.aliyun_cfg['worker_image']}")
get_cmd = partial(task.get_command,
cfg_path=param_file,
template=tmpl)
cmd = get_cmd()
logger = get_logger()
logger.debug(f'Running command: {cmd}')
# Run command with retry
if self.debug:
stdout = sys.stdout
else:
out_path = task.get_log_path(file_extension='out')
mmengine.mkdir_or_exist(osp.split(out_path)[0])
stdout = open(out_path, 'w', encoding='utf-8')
if random_sleep:
time.sleep(random.randint(0, 10))
def _run_within_retry():
output = subprocess.getoutput(cmd)
match = re.search(r'\|\s+(dlc[0-9a-z]+)\s+\|', output)
if match is None:
raise RuntimeError(
f'Failed to launch dlc job for {output}')
else:
job_id = match.group(1)
stdout.write(output)
pod_create_time = None
pri_time = None
initial_time = datetime.datetime.now()
while True:
# 1. Avoid to request dlc too frequently.
# 2. DLC job may not be ready immediately after creation.
for _ in range(5):
time.sleep(2)
try:
job_info = json.loads(
subprocess.getoutput(f'dlc get job {job_id}'))
break
except: # noqa: E722
pass
else:
raise RuntimeError(
f'Failed to get job info for {job_id}')
status = job_info['Status']
if status == 'Failed':
return -1
elif status == 'Succeeded':
return 0
elif status != 'Running':
continue
# The pod time could be different from the real time.
# Therefore we need to extract the pod start time from
# the `job_info` and calculate the `start_time` and
# `end_time` in pod.
if pod_create_time is None:
pod_create_time = job_info['GmtCreateTime']
pri_time = pod_create_time
pod_create_time = datetime.datetime.strptime(
pod_create_time, '%Y-%m-%dT%H:%M:%SZ')
elasped_time = datetime.datetime.now() - initial_time
cur_time = (pod_create_time +
elasped_time).strftime('%Y-%m-%dT%H:%M:%SZ')
logs_cmd = ('dlc logs'
f' {job_id} {job_id}-worker-0'
f" -c {self.aliyun_cfg['dlc_config_path']}"
f' --start_time {pri_time}'
f' --end_time {cur_time}')
log_output = subprocess.getoutput(logs_cmd)
if '[WARN] No logs found for the pod' not in log_output:
pri_time = cur_time
stdout.write(log_output)
stdout.flush()
return_code = _run_within_retry()
retry = self.retry
output_paths = task.get_output_paths()
while self._job_failed(return_code, output_paths) and retry > 0:
retry -= 1
cmd = get_cmd()
return_code = _run_within_retry()
finally:
# Clean up
os.remove(param_file)
return task_name, return_code
def _job_failed(self, return_code: int, output_paths: List[str]) -> bool:
return return_code != 0 or not all(
osp.exists(output_path) for output_path in output_paths)
|