Spaces:
Running
on
Zero
Running
on
Zero
import copy | |
import julius | |
import numpy as np | |
import scipy | |
import torch | |
import torch.nn.functional as F | |
import torchaudio | |
class Meter(torch.nn.Module): | |
"""Tensorized version of pyloudnorm.Meter. Works with batched audio tensors. | |
Parameters | |
---------- | |
rate : int | |
Sample rate of audio. | |
filter_class : str, optional | |
Class of weighting filter used. | |
K-weighting' (default), 'Fenton/Lee 1' | |
'Fenton/Lee 2', 'Dash et al.' | |
by default "K-weighting" | |
block_size : float, optional | |
Gating block size in seconds, by default 0.400 | |
zeros : int, optional | |
Number of zeros to use in FIR approximation of | |
IIR filters, by default 512 | |
use_fir : bool, optional | |
Whether to use FIR approximation or exact IIR formulation. | |
If computing on GPU, ``use_fir=True`` will be used, as its | |
much faster, by default False | |
""" | |
def __init__( | |
self, | |
rate: int, | |
filter_class: str = "K-weighting", | |
block_size: float = 0.400, | |
zeros: int = 512, | |
use_fir: bool = False, | |
): | |
super().__init__() | |
self.rate = rate | |
self.filter_class = filter_class | |
self.block_size = block_size | |
self.use_fir = use_fir | |
G = torch.from_numpy(np.array([1.0, 1.0, 1.0, 1.41, 1.41])) | |
self.register_buffer("G", G) | |
# Compute impulse responses so that filtering is fast via | |
# a convolution at runtime, on GPU, unlike lfilter. | |
impulse = np.zeros((zeros,)) | |
impulse[..., 0] = 1.0 | |
firs = np.zeros((len(self._filters), 1, zeros)) | |
passband_gain = torch.zeros(len(self._filters)) | |
for i, (_, filter_stage) in enumerate(self._filters.items()): | |
firs[i] = scipy.signal.lfilter(filter_stage.b, filter_stage.a, impulse) | |
passband_gain[i] = filter_stage.passband_gain | |
firs = torch.from_numpy(firs[..., ::-1].copy()).float() | |
self.register_buffer("firs", firs) | |
self.register_buffer("passband_gain", passband_gain) | |
def apply_filter_gpu(self, data: torch.Tensor): | |
"""Performs FIR approximation of loudness computation. | |
Parameters | |
---------- | |
data : torch.Tensor | |
Audio data of shape (nb, nch, nt). | |
Returns | |
------- | |
torch.Tensor | |
Filtered audio data. | |
""" | |
# Data is of shape (nb, nch, nt) | |
# Reshape to (nb*nch, 1, nt) | |
nb, nt, nch = data.shape | |
data = data.permute(0, 2, 1) | |
data = data.reshape(nb * nch, 1, nt) | |
# Apply padding | |
pad_length = self.firs.shape[-1] | |
# Apply filtering in sequence | |
for i in range(self.firs.shape[0]): | |
data = F.pad(data, (pad_length, pad_length)) | |
data = julius.fftconv.fft_conv1d(data, self.firs[i, None, ...]) | |
data = self.passband_gain[i] * data | |
data = data[..., 1 : nt + 1] | |
data = data.permute(0, 2, 1) | |
data = data[:, :nt, :] | |
return data | |
def apply_filter_cpu(self, data: torch.Tensor): | |
"""Performs IIR formulation of loudness computation. | |
Parameters | |
---------- | |
data : torch.Tensor | |
Audio data of shape (nb, nch, nt). | |
Returns | |
------- | |
torch.Tensor | |
Filtered audio data. | |
""" | |
for _, filter_stage in self._filters.items(): | |
passband_gain = filter_stage.passband_gain | |
a_coeffs = torch.from_numpy(filter_stage.a).float().to(data.device) | |
b_coeffs = torch.from_numpy(filter_stage.b).float().to(data.device) | |
_data = data.permute(0, 2, 1) | |
filtered = torchaudio.functional.lfilter( | |
_data, a_coeffs, b_coeffs, clamp=False | |
) | |
data = passband_gain * filtered.permute(0, 2, 1) | |
return data | |
def apply_filter(self, data: torch.Tensor): | |
"""Applies filter on either CPU or GPU, depending | |
on if the audio is on GPU or is on CPU, or if | |
``self.use_fir`` is True. | |
Parameters | |
---------- | |
data : torch.Tensor | |
Audio data of shape (nb, nch, nt). | |
Returns | |
------- | |
torch.Tensor | |
Filtered audio data. | |
""" | |
if data.is_cuda or self.use_fir: | |
data = self.apply_filter_gpu(data) | |
else: | |
data = self.apply_filter_cpu(data) | |
return data | |
def forward(self, data: torch.Tensor): | |
"""Computes integrated loudness of data. | |
Parameters | |
---------- | |
data : torch.Tensor | |
Audio data of shape (nb, nch, nt). | |
Returns | |
------- | |
torch.Tensor | |
Filtered audio data. | |
""" | |
return self.integrated_loudness(data) | |
def _unfold(self, input_data): | |
T_g = self.block_size | |
overlap = 0.75 # overlap of 75% of the block duration | |
step = 1.0 - overlap # step size by percentage | |
kernel_size = int(T_g * self.rate) | |
stride = int(T_g * self.rate * step) | |
unfolded = julius.core.unfold(input_data.permute(0, 2, 1), kernel_size, stride) | |
unfolded = unfolded.transpose(-1, -2) | |
return unfolded | |
def integrated_loudness(self, data: torch.Tensor): | |
"""Computes integrated loudness of data. | |
Parameters | |
---------- | |
data : torch.Tensor | |
Audio data of shape (nb, nch, nt). | |
Returns | |
------- | |
torch.Tensor | |
Filtered audio data. | |
""" | |
if not torch.is_tensor(data): | |
data = torch.from_numpy(data).float() | |
else: | |
data = data.float() | |
input_data = copy.copy(data) | |
# Data always has a batch and channel dimension. | |
# Is of shape (nb, nt, nch) | |
if input_data.ndim < 2: | |
input_data = input_data.unsqueeze(-1) | |
if input_data.ndim < 3: | |
input_data = input_data.unsqueeze(0) | |
nb, nt, nch = input_data.shape | |
# Apply frequency weighting filters - account | |
# for the acoustic respose of the head and auditory system | |
input_data = self.apply_filter(input_data) | |
G = self.G # channel gains | |
T_g = self.block_size # 400 ms gating block standard | |
Gamma_a = -70.0 # -70 LKFS = absolute loudness threshold | |
unfolded = self._unfold(input_data) | |
z = (1.0 / (T_g * self.rate)) * unfolded.square().sum(2) | |
l = -0.691 + 10.0 * torch.log10((G[None, :nch, None] * z).sum(1, keepdim=True)) | |
l = l.expand_as(z) | |
# find gating block indices above absolute threshold | |
z_avg_gated = z | |
z_avg_gated[l <= Gamma_a] = 0 | |
masked = l > Gamma_a | |
z_avg_gated = z_avg_gated.sum(2) / masked.sum(2) | |
# calculate the relative threshold value (see eq. 6) | |
Gamma_r = ( | |
-0.691 + 10.0 * torch.log10((z_avg_gated * G[None, :nch]).sum(-1)) - 10.0 | |
) | |
Gamma_r = Gamma_r[:, None, None] | |
Gamma_r = Gamma_r.expand(nb, nch, l.shape[-1]) | |
# find gating block indices above relative and absolute thresholds (end of eq. 7) | |
z_avg_gated = z | |
z_avg_gated[l <= Gamma_a] = 0 | |
z_avg_gated[l <= Gamma_r] = 0 | |
masked = (l > Gamma_a) * (l > Gamma_r) | |
z_avg_gated = z_avg_gated.sum(2) / masked.sum(2) | |
# # Cannot use nan_to_num (pytorch 1.8 does not come with GCP-supported cuda version) | |
# z_avg_gated = torch.nan_to_num(z_avg_gated) | |
z_avg_gated = torch.where( | |
z_avg_gated.isnan(), torch.zeros_like(z_avg_gated), z_avg_gated | |
) | |
z_avg_gated[z_avg_gated == float("inf")] = float(np.finfo(np.float32).max) | |
z_avg_gated[z_avg_gated == -float("inf")] = float(np.finfo(np.float32).min) | |
LUFS = -0.691 + 10.0 * torch.log10((G[None, :nch] * z_avg_gated).sum(1)) | |
return LUFS.float() | |
def filter_class(self): | |
return self._filter_class | |
def filter_class(self, value): | |
from pyloudnorm import Meter | |
meter = Meter(self.rate) | |
meter.filter_class = value | |
self._filter_class = value | |
self._filters = meter._filters | |
class LoudnessMixin: | |
_loudness = None | |
MIN_LOUDNESS = -70 | |
"""Minimum loudness possible.""" | |
def loudness( | |
self, filter_class: str = "K-weighting", block_size: float = 0.400, **kwargs | |
): | |
"""Calculates loudness using an implementation of ITU-R BS.1770-4. | |
Allows control over gating block size and frequency weighting filters for | |
additional control. Measure the integrated gated loudness of a signal. | |
API is derived from PyLoudnorm, but this implementation is ported to PyTorch | |
and is tensorized across batches. When on GPU, an FIR approximation of the IIR | |
filters is used to compute loudness for speed. | |
Uses the weighting filters and block size defined by the meter | |
the integrated loudness is measured based upon the gating algorithm | |
defined in the ITU-R BS.1770-4 specification. | |
Parameters | |
---------- | |
filter_class : str, optional | |
Class of weighting filter used. | |
K-weighting' (default), 'Fenton/Lee 1' | |
'Fenton/Lee 2', 'Dash et al.' | |
by default "K-weighting" | |
block_size : float, optional | |
Gating block size in seconds, by default 0.400 | |
kwargs : dict, optional | |
Keyword arguments to :py:func:`audiotools.core.loudness.Meter`. | |
Returns | |
------- | |
torch.Tensor | |
Loudness of audio data. | |
""" | |
if self._loudness is not None: | |
return self._loudness.to(self.device) | |
original_length = self.signal_length | |
if self.signal_duration < 0.5: | |
pad_len = int((0.5 - self.signal_duration) * self.sample_rate) | |
self.zero_pad(0, pad_len) | |
# create BS.1770 meter | |
meter = Meter( | |
self.sample_rate, filter_class=filter_class, block_size=block_size, **kwargs | |
) | |
meter = meter.to(self.device) | |
# measure loudness | |
loudness = meter.integrated_loudness(self.audio_data.permute(0, 2, 1)) | |
self.truncate_samples(original_length) | |
min_loudness = ( | |
torch.ones_like(loudness, device=loudness.device) * self.MIN_LOUDNESS | |
) | |
self._loudness = torch.maximum(loudness, min_loudness) | |
return self._loudness.to(self.device) | |