欧卫
'add_app_files'
58627fa
import os
import time
import torch
import random
import torch.multiprocessing as mp
import numpy as np
try:
mp.set_start_method('spawn', force=True)
except RuntimeError:
pass
import colbert.utils.distributed as distributed
from colbert.infra.run import Run
from colbert.infra.config import BaseConfig, RunConfig, RunSettings
from colbert.utils.utils import print_message
class Launcher:
def __init__(self, callee, run_config=None, return_all=False):
self.callee = callee
self.return_all = return_all
self.run_config = RunConfig.from_existing(Run().config, run_config)
self.nranks = self.run_config.nranks
def launch(self, custom_config, *args):
return_value_queue = mp.Queue()
rng = random.Random(time.time())
port = str(12355 + rng.randint(0, 1000)) # randomize the port to avoid collision on launching several jobs.
all_procs = []
for new_rank in range(0, self.nranks):
assert isinstance(custom_config, BaseConfig)
assert isinstance(custom_config, RunSettings)
new_config = type(custom_config).from_existing(custom_config, self.run_config, RunConfig(rank=new_rank))
args_ = (self.callee, port, return_value_queue, new_config, *args)
all_procs.append(mp.Process(target=setup_new_process, args=args_))
# Clear GPU space (e.g., after a `Searcher` on GPU-0 is deleted)
# TODO: Generalize this from GPU-0 only!
# TODO: Move this to a function. And call that function from __del__ in a class that's inherited by Searcher, Indexer, etc.
# t = torch.cuda.get_device_properties(0).total_memory
# r = torch.cuda.memory_reserved(0)
# a = torch.cuda.memory_allocated(0)
# f = r-a
# print_message(f"[Pre-Emptying] GPU memory check: r={r}, a={a}, f={f}")
torch.cuda.empty_cache()
# t = torch.cuda.get_device_properties(0).total_memory
# r = torch.cuda.memory_reserved(0)
# a = torch.cuda.memory_allocated(0)
# f = r-a
# print_message(f"[Post-Emptying] GPU memory check: r={r}, a={a}, f={f}")
print_memory_stats('MAIN')
for proc in all_procs:
print("#> Starting...")
proc.start()
print_memory_stats('MAIN')
# TODO: If the processes crash upon join, raise an exception and don't block on .get() below!
return_values = sorted([return_value_queue.get() for _ in all_procs])
return_values = [val for rank, val in return_values]
if not self.return_all:
return_values = return_values[0]
for proc in all_procs:
proc.join()
print("#> Joined...")
print_memory_stats('MAIN')
return return_values
def setup_new_process(callee, port, return_value_queue, config, *args):
print_memory_stats()
random.seed(12345)
np.random.seed(12345)
torch.manual_seed(12345)
torch.cuda.manual_seed_all(12345)
rank, nranks = config.rank, config.nranks
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = port
os.environ["WORLD_SIZE"] = str(config.nranks)
os.environ["RANK"] = str(config.rank)
# TODO: Ideally the gpus "getter" handles this max-nranks thing!
os.environ["CUDA_VISIBLE_DEVICES"] = ','.join(map(str, config.gpus_[:nranks]))
nranks_, distributed_ = distributed.init(rank)
assert nranks_ == nranks
# Run.init(args.rank, args.root, args.experiment, args.run)
with Run().context(config, inherit_config=False):
return_val = callee(config, *args)
return_value_queue.put((rank, return_val))
def print_memory_stats(message=''):
return # FIXME: Add this back before release.
import psutil # Remove before releases? Or at least make optional with try/except.
global_info = psutil.virtual_memory()
total, available, used, free = global_info.total, global_info.available, global_info.used, global_info.free
info = psutil.Process().memory_info()
rss, vms, shared = info.rss, info.vms, info.shared
uss = psutil.Process().memory_full_info().uss
gib = 1024 ** 3
summary = f"""
"[PID: {os.getpid()}]
[{message}]
Available: {available / gib:,.1f} / {total / gib:,.1f}
Free: {free / gib:,.1f} / {total / gib:,.1f}
Usage: {used / gib:,.1f} / {total / gib:,.1f}
RSS: {rss / gib:,.1f}
VMS: {vms / gib:,.1f}
USS: {uss / gib:,.1f}
SHARED: {shared / gib:,.1f}
""".strip().replace('\n', '\t')
print_message(summary, pad=True)