Spaces:
Running
Running
import logging | |
import sys | |
from functools import lru_cache | |
import torch | |
from modules import config | |
logger = logging.getLogger(__name__) | |
if sys.platform == "darwin": | |
from modules.devices import mac_devices | |
def has_mps() -> bool: | |
if sys.platform != "darwin": | |
return False | |
else: | |
return mac_devices.has_mps | |
def get_cuda_device_id(): | |
return ( | |
int(config.runtime_env_vars.device_id) | |
if config.runtime_env_vars.device_id is not None | |
and config.runtime_env_vars.device_id.isdigit() | |
else 0 | |
) or torch.cuda.current_device() | |
def get_cuda_device_string(): | |
if config.runtime_env_vars.device_id is not None: | |
return f"cuda:{config.runtime_env_vars.device_id}" | |
return "cuda" | |
def get_available_gpus() -> list[tuple[int, int]]: | |
""" | |
Get the list of available GPUs and their free memory. | |
:return: A list of tuples where each tuple contains (GPU index, free memory in bytes). | |
""" | |
available_gpus = [] | |
for i in range(torch.cuda.device_count()): | |
props = torch.cuda.get_device_properties(i) | |
free_memory = props.total_memory - torch.cuda.memory_reserved(i) | |
available_gpus.append((i, free_memory)) | |
return available_gpus | |
def get_memory_available_gpus(min_memory=2048): | |
available_gpus = get_available_gpus() | |
memory_available_gpus = [ | |
gpu for gpu, free_memory in available_gpus if free_memory > min_memory | |
] | |
return memory_available_gpus | |
def get_target_device_id_or_memory_available_gpu(): | |
memory_available_gpus = get_memory_available_gpus() | |
device_id = get_cuda_device_id() | |
if device_id not in memory_available_gpus: | |
if len(memory_available_gpus) != 0: | |
logger.warning( | |
f"Device {device_id} is not available or does not have enough memory. will try to use {memory_available_gpus}" | |
) | |
config.runtime_env_vars.device_id = str(memory_available_gpus[0]) | |
else: | |
logger.warning( | |
f"Device {device_id} is not available or does not have enough memory. Using CPU instead." | |
) | |
return "cpu" | |
return get_cuda_device_string() | |
def get_optimal_device_name(): | |
if config.runtime_env_vars.use_cpu == "all": | |
return "cpu" | |
if torch.cuda.is_available(): | |
return get_target_device_id_or_memory_available_gpu() | |
if has_mps(): | |
return "mps" | |
return "cpu" | |
def get_optimal_device(): | |
return torch.device(get_optimal_device_name()) | |
def get_device_for(task): | |
if ( | |
task in config.runtime_env_vars.use_cpu | |
or "all" in config.runtime_env_vars.use_cpu | |
): | |
return cpu | |
return get_optimal_device() | |
def torch_gc(): | |
try: | |
if torch.cuda.is_available(): | |
with torch.cuda.device(get_cuda_device_string()): | |
torch.cuda.empty_cache() | |
torch.cuda.ipc_collect() | |
if has_mps(): | |
mac_devices.torch_mps_gc() | |
except Exception as e: | |
logger.error(f"Error in torch_gc", exc_info=True) | |
cpu: torch.device = torch.device("cpu") | |
device: torch.device = None | |
dtype: torch.dtype = torch.float32 | |
dtype_dvae: torch.dtype = torch.float32 | |
dtype_vocos: torch.dtype = torch.float32 | |
dtype_gpt: torch.dtype = torch.float32 | |
dtype_decoder: torch.dtype = torch.float32 | |
def reset_device(): | |
global device | |
global dtype | |
global dtype_dvae | |
global dtype_vocos | |
global dtype_gpt | |
global dtype_decoder | |
if config.runtime_env_vars.use_cpu is None: | |
config.runtime_env_vars.use_cpu = [] | |
if "all" in config.runtime_env_vars.use_cpu and not config.runtime_env_vars.no_half: | |
logger.warning( | |
"Cannot use half precision with CPU, using full precision instead" | |
) | |
config.runtime_env_vars.no_half = True | |
if not config.runtime_env_vars.no_half: | |
dtype = torch.float16 | |
dtype_dvae = torch.float16 | |
dtype_vocos = torch.float16 | |
dtype_gpt = torch.float16 | |
dtype_decoder = torch.float16 | |
logger.info("Using half precision: torch.float16") | |
else: | |
dtype = torch.float32 | |
dtype_dvae = torch.float32 | |
dtype_vocos = torch.float32 | |
dtype_gpt = torch.float32 | |
dtype_decoder = torch.float32 | |
logger.info("Using full precision: torch.float32") | |
if "all" in config.runtime_env_vars.use_cpu: | |
device = cpu | |
else: | |
device = get_optimal_device() | |
logger.info(f"Using device: {device}") | |
def first_time_calculation(): | |
""" | |
just do any calculation with pytorch layers - the first time this is done it allocaltes about 700MB of memory and | |
spends about 2.7 seconds doing that, at least wih NVidia. | |
""" | |
x = torch.zeros((1, 1)).to(device, dtype) | |
linear = torch.nn.Linear(1, 1).to(device, dtype) | |
linear(x) | |
x = torch.zeros((1, 1, 3, 3)).to(device, dtype) | |
conv2d = torch.nn.Conv2d(1, 1, (3, 3)).to(device, dtype) | |
conv2d(x) | |