Spaces:
Running
Running
from typing import Dict, Optional, Tuple, Union | |
from transformers.models.bark import BarkSemanticModel, BarkCoarseModel, BarkFineModel, BarkPreTrainedModel | |
from transformers.models.bark.generation_configuration_bark import ( | |
BarkCoarseGenerationConfig, | |
BarkFineGenerationConfig, | |
BarkSemanticGenerationConfig, | |
) | |
from transformers import BarkConfig, AutoModel | |
from transformers.modeling_utils import get_parameter_device | |
from transformers.utils import ( | |
is_accelerate_available, | |
) | |
import torch | |
class BarkModel(BarkPreTrainedModel): | |
config_class = BarkConfig | |
def __init__(self, config): | |
super().__init__(config) | |
self.semantic = BarkSemanticModel(config.semantic_config) | |
self.coarse_acoustics = BarkCoarseModel(config.coarse_acoustics_config) | |
self.fine_acoustics = BarkFineModel(config.fine_acoustics_config) | |
self.codec_model = AutoModel.from_config(config.codec_config) | |
self.config = config | |
def device(self) -> torch.device: | |
""" | |
`torch.device`: The device on which the module is (assuming that all the module parameters are on the same | |
device). | |
""" | |
# for bark_model, device must be verified on its sub-models | |
# if has _hf_hook, has been offloaded so the device has to be found in the hook | |
if not hasattr(self.semantic, "_hf_hook"): | |
return get_parameter_device(self) | |
for module in self.semantic.modules(): | |
if ( | |
hasattr(module, "_hf_hook") | |
and hasattr(module._hf_hook, "execution_device") | |
and module._hf_hook.execution_device is not None | |
): | |
return torch.device(module._hf_hook.execution_device) | |
def enable_cpu_offload(self, gpu_id: Optional[int] = 0): | |
r""" | |
Offloads all sub-models to CPU using accelerate, reducing memory usage with a low impact on performance. This | |
method moves one whole sub-model at a time to the GPU when it is used, and the sub-model remains in GPU until | |
the next sub-model runs. | |
Args: | |
gpu_id (`int`, *optional*, defaults to 0): | |
GPU id on which the sub-models will be loaded and offloaded. | |
""" | |
if is_accelerate_available(): | |
from accelerate import cpu_offload_with_hook | |
else: | |
raise ImportError("`enable_model_cpu_offload` requires `accelerate`.") | |
device = torch.device(f"cuda:{gpu_id}") | |
if self.device.type != "cpu": | |
self.to("cpu") | |
torch.cuda.empty_cache() # otherwise we don't see the memory savings (but they probably exist) | |
# this layer is used outside the first foward pass of semantic so need to be loaded before semantic | |
self.semantic.input_embeds_layer, _ = cpu_offload_with_hook(self.semantic.input_embeds_layer, device) | |
hook = None | |
for cpu_offloaded_model in [ | |
self.semantic, | |
self.coarse_acoustics, | |
self.fine_acoustics, | |
]: | |
_, hook = cpu_offload_with_hook(cpu_offloaded_model, device, prev_module_hook=hook) | |
self.fine_acoustics_hook = hook | |
_, hook = cpu_offload_with_hook(self.codec_model, device, prev_module_hook=hook) | |
# We'll offload the last model manually. | |
self.codec_model_hook = hook | |
def codec_decode(self, fine_output): | |
"""Turn quantized audio codes into audio array using encodec.""" | |
fine_output = fine_output.transpose(0, 1) | |
emb = self.codec_model.quantizer.decode(fine_output) | |
out = self.codec_model.decoder(emb) | |
audio_arr = out.squeeze(1) # squeeze the codebook dimension | |
return audio_arr | |
def generate( | |
self, | |
input_ids: Optional[torch.Tensor] = None, | |
history_prompt: Optional[Dict[str, torch.Tensor]] = None, | |
**kwargs, | |
) -> torch.LongTensor: | |
""" | |
Generates audio from an input prompt and an additional optional `Bark` speaker prompt. | |
Args: | |
input_ids (`Optional[torch.Tensor]` of shape (batch_size, seq_len), *optional*): | |
Input ids. Will be truncated up to 256 tokens. Note that the output audios will be as long as the | |
longest generation among the batch. | |
history_prompt (`Optional[Dict[str,torch.Tensor]]`, *optional*): | |
Optional `Bark` speaker prompt. Note that for now, this model takes only one speaker prompt per batch. | |
kwargs (*optional*): Remaining dictionary of keyword arguments. Keyword arguments are of two types: | |
- Without a prefix, they will be entered as `**kwargs` for the `generate` method of each sub-model. | |
- With a *semantic_*, *coarse_*, *fine_* prefix, they will be input for the `generate` method of the | |
semantic, coarse and fine respectively. It has the priority over the keywords without a prefix. | |
This means you can, for example, specify a generation strategy for all sub-models except one. | |
Returns: | |
torch.LongTensor: Output generated audio. | |
Example: | |
```python | |
>>> from transformers import AutoProcessor, BarkModel | |
>>> processor = AutoProcessor.from_pretrained("suno/bark-small") | |
>>> model = BarkModel.from_pretrained("suno/bark-small") | |
>>> # To add a voice preset, you can pass `voice_preset` to `BarkProcessor.__call__(...)` | |
>>> voice_preset = "v2/en_speaker_6" | |
>>> inputs = processor("Hello, my dog is cute, I need him in my life", voice_preset=voice_preset) | |
>>> audio_array = model.generate(**inputs, semantic_max_new_tokens=100) | |
>>> audio_array = audio_array.cpu().numpy().squeeze() | |
``` | |
""" | |
# TODO (joao):workaround until nested generation config is compatible with PreTrained Model | |
# todo: dict | |
semantic_generation_config = BarkSemanticGenerationConfig(**self.generation_config.semantic_config) | |
coarse_generation_config = BarkCoarseGenerationConfig(**self.generation_config.coarse_acoustics_config) | |
fine_generation_config = BarkFineGenerationConfig(**self.generation_config.fine_acoustics_config) | |
kwargs_semantic = { | |
# if "attention_mask" is set, it should not be passed to CoarseModel and FineModel | |
"attention_mask": kwargs.pop("attention_mask", None) | |
} | |
kwargs_coarse = {} | |
kwargs_fine = {} | |
for key, value in kwargs.items(): | |
if key.startswith("semantic_"): | |
key = key[len("semantic_") :] | |
kwargs_semantic[key] = value | |
elif key.startswith("coarse_"): | |
key = key[len("coarse_") :] | |
kwargs_coarse[key] = value | |
elif key.startswith("fine_"): | |
key = key[len("fine_") :] | |
kwargs_fine[key] = value | |
else: | |
# If the key is already in a specific config, then it's been set with a | |
# submodules specific value and we don't override | |
if key not in kwargs_semantic: | |
kwargs_semantic[key] = value | |
if key not in kwargs_coarse: | |
kwargs_coarse[key] = value | |
if key not in kwargs_fine: | |
kwargs_fine[key] = value | |
# 1. Generate from the semantic model | |
semantic_output = self.semantic.generate( | |
input_ids, | |
history_prompt=history_prompt, | |
semantic_generation_config=semantic_generation_config, | |
**kwargs_semantic, | |
) | |
# 2. Generate from the coarse model | |
coarse_output = self.coarse_acoustics.generate( | |
semantic_output, | |
history_prompt=history_prompt, | |
semantic_generation_config=semantic_generation_config, | |
coarse_generation_config=coarse_generation_config, | |
codebook_size=self.generation_config.codebook_size, | |
**kwargs_coarse, | |
) | |
# 3. "generate" from the fine model | |
output = self.fine_acoustics.generate( | |
coarse_output, | |
history_prompt=history_prompt, | |
semantic_generation_config=semantic_generation_config, | |
coarse_generation_config=coarse_generation_config, | |
fine_generation_config=fine_generation_config, | |
codebook_size=self.generation_config.codebook_size, | |
**kwargs_fine, | |
) | |
if getattr(self, "fine_acoustics_hook", None) is not None: | |
# Manually offload fine_acoustics to CPU | |
# and load codec_model to GPU | |
# since bark doesn't use codec_model forward pass | |
self.fine_acoustics_hook.offload() | |
self.codec_model = self.codec_model.to(self.device) | |
return output |