import typing import julius import numpy as np import torch import torchaudio from . import util class EffectMixin: GAIN_FACTOR = np.log(10) / 20 """Gain factor for converting between amplitude and decibels.""" CODEC_PRESETS = { "8-bit": {"format": "wav", "encoding": "ULAW", "bits_per_sample": 8}, "GSM-FR": {"format": "gsm"}, "MP3": {"format": "mp3", "compression": -9}, "Vorbis": {"format": "vorbis", "compression": -1}, "Ogg": { "format": "ogg", "compression": -1, }, "Amr-nb": {"format": "amr-nb"}, } """Presets for applying codecs via torchaudio.""" def mix( self, other, snr: typing.Union[torch.Tensor, np.ndarray, float] = 10, other_eq: typing.Union[torch.Tensor, np.ndarray] = None, ): """Mixes noise with signal at specified signal-to-noise ratio. Optionally, the other signal can be equalized in-place. Parameters ---------- other : AudioSignal AudioSignal object to mix with. snr : typing.Union[torch.Tensor, np.ndarray, float], optional Signal to noise ratio, by default 10 other_eq : typing.Union[torch.Tensor, np.ndarray], optional EQ curve to apply to other signal, if any, by default None Returns ------- AudioSignal In-place modification of AudioSignal. """ snr = util.ensure_tensor(snr).to(self.device) pad_len = max(0, self.signal_length - other.signal_length) other.zero_pad(0, pad_len) other.truncate_samples(self.signal_length) if other_eq is not None: other = other.equalizer(other_eq) tgt_loudness = self.loudness() - snr other = other.normalize(tgt_loudness) self.audio_data = self.audio_data + other.audio_data return self def convolve(self, other, start_at_max: bool = True): """Convolves self with other. This function uses FFTs to do the convolution. Parameters ---------- other : AudioSignal Signal to convolve with. start_at_max : bool, optional Whether to start at the max value of other signal, to avoid inducing delays, by default True Returns ------- AudioSignal Convolved signal, in-place. """ from . import AudioSignal pad_len = self.signal_length - other.signal_length if pad_len > 0: other.zero_pad(0, pad_len) else: other.truncate_samples(self.signal_length) if start_at_max: # Use roll to rotate over the max for every item # so that the impulse responses don't induce any # delay. idx = other.audio_data.abs().argmax(axis=-1) irs = torch.zeros_like(other.audio_data) for i in range(other.batch_size): irs[i] = torch.roll(other.audio_data[i], -idx[i].item(), -1) other = AudioSignal(irs, other.sample_rate) delta = torch.zeros_like(other.audio_data) delta[..., 0] = 1 length = self.signal_length delta_fft = torch.fft.rfft(delta, length) other_fft = torch.fft.rfft(other.audio_data, length) self_fft = torch.fft.rfft(self.audio_data, length) convolved_fft = other_fft * self_fft convolved_audio = torch.fft.irfft(convolved_fft, length) delta_convolved_fft = other_fft * delta_fft delta_audio = torch.fft.irfft(delta_convolved_fft, length) # Use the delta to rescale the audio exactly as needed. delta_max = delta_audio.abs().max(dim=-1, keepdims=True)[0] scale = 1 / delta_max.clamp(1e-5) convolved_audio = convolved_audio * scale self.audio_data = convolved_audio return self def apply_ir( self, ir, drr: typing.Union[torch.Tensor, np.ndarray, float] = None, ir_eq: typing.Union[torch.Tensor, np.ndarray] = None, use_original_phase: bool = False, ): """Applies an impulse response to the signal. If ` is`ir_eq`` is specified, the impulse response is equalized before it is applied, using the given curve. Parameters ---------- ir : AudioSignal Impulse response to convolve with. drr : typing.Union[torch.Tensor, np.ndarray, float], optional Direct-to-reverberant ratio that impulse response will be altered to, if specified, by default None ir_eq : typing.Union[torch.Tensor, np.ndarray], optional Equalization that will be applied to impulse response if specified, by default None use_original_phase : bool, optional Whether to use the original phase, instead of the convolved phase, by default False Returns ------- AudioSignal Signal with impulse response applied to it """ if ir_eq is not None: ir = ir.equalizer(ir_eq) if drr is not None: ir = ir.alter_drr(drr) # Save the peak before max_spk = self.audio_data.abs().max(dim=-1, keepdims=True).values # Augment the impulse response to simulate microphone effects # and with varying direct-to-reverberant ratio. phase = self.phase self.convolve(ir) # Use the input phase if use_original_phase: self.stft() self.stft_data = self.magnitude * torch.exp(1j * phase) self.istft() # Rescale to the input's amplitude max_transformed = self.audio_data.abs().max(dim=-1, keepdims=True).values scale_factor = max_spk.clamp(1e-8) / max_transformed.clamp(1e-8) self = self * scale_factor return self def ensure_max_of_audio(self, max: float = 1.0): """Ensures that ``abs(audio_data) <= max``. Parameters ---------- max : float, optional Max absolute value of signal, by default 1.0 Returns ------- AudioSignal Signal with values scaled between -max and max. """ peak = self.audio_data.abs().max(dim=-1, keepdims=True)[0] peak_gain = torch.ones_like(peak) peak_gain[peak > max] = max / peak[peak > max] self.audio_data = self.audio_data * peak_gain return self def normalize(self, db: typing.Union[torch.Tensor, np.ndarray, float] = -24.0): """Normalizes the signal's volume to the specified db, in LUFS. This is GPU-compatible, making for very fast loudness normalization. Parameters ---------- db : typing.Union[torch.Tensor, np.ndarray, float], optional Loudness to normalize to, by default -24.0 Returns ------- AudioSignal Normalized audio signal. """ db = util.ensure_tensor(db).to(self.device) ref_db = self.loudness() gain = db - ref_db gain = torch.exp(gain * self.GAIN_FACTOR) self.audio_data = self.audio_data * gain[:, None, None] return self def volume_change(self, db: typing.Union[torch.Tensor, np.ndarray, float]): """Change volume of signal by some amount, in dB. Parameters ---------- db : typing.Union[torch.Tensor, np.ndarray, float] Amount to change volume by. Returns ------- AudioSignal Signal at new volume. """ db = util.ensure_tensor(db, ndim=1).to(self.device) gain = torch.exp(db * self.GAIN_FACTOR) self.audio_data = self.audio_data * gain[:, None, None] return self def _to_2d(self): waveform = self.audio_data.reshape(-1, self.signal_length) return waveform def _to_3d(self, waveform): return waveform.reshape(self.batch_size, self.num_channels, -1) def pitch_shift(self, n_semitones: int, quick: bool = True): """Pitch shift the signal. All items in the batch get the same pitch shift. Parameters ---------- n_semitones : int How many semitones to shift the signal by. quick : bool, optional Using quick pitch shifting, by default True Returns ------- AudioSignal Pitch shifted audio signal. """ device = self.device effects = [ ["pitch", str(n_semitones * 100)], ["rate", str(self.sample_rate)], ] if quick: effects[0].insert(1, "-q") waveform = self._to_2d().cpu() waveform, sample_rate = torchaudio.sox_effects.apply_effects_tensor( waveform, self.sample_rate, effects, channels_first=True ) self.sample_rate = sample_rate self.audio_data = self._to_3d(waveform) return self.to(device) def time_stretch(self, factor: float, quick: bool = True): """Time stretch the audio signal. Parameters ---------- factor : float Factor by which to stretch the AudioSignal. Typically between 0.8 and 1.2. quick : bool, optional Whether to use quick time stretching, by default True Returns ------- AudioSignal Time-stretched AudioSignal. """ device = self.device effects = [ ["tempo", str(factor)], ["rate", str(self.sample_rate)], ] if quick: effects[0].insert(1, "-q") waveform = self._to_2d().cpu() waveform, sample_rate = torchaudio.sox_effects.apply_effects_tensor( waveform, self.sample_rate, effects, channels_first=True ) self.sample_rate = sample_rate self.audio_data = self._to_3d(waveform) return self.to(device) def apply_codec( self, preset: str = None, format: str = "wav", encoding: str = None, bits_per_sample: int = None, compression: int = None, ): # pragma: no cover """Applies an audio codec to the signal. Parameters ---------- preset : str, optional One of the keys in ``self.CODEC_PRESETS``, by default None format : str, optional Format for audio codec, by default "wav" encoding : str, optional Encoding to use, by default None bits_per_sample : int, optional How many bits per sample, by default None compression : int, optional Compression amount of codec, by default None Returns ------- AudioSignal AudioSignal with codec applied. Raises ------ ValueError If preset is not in ``self.CODEC_PRESETS``, an error is thrown. """ torchaudio_version_070 = "0.7" in torchaudio.__version__ if torchaudio_version_070: return self kwargs = { "format": format, "encoding": encoding, "bits_per_sample": bits_per_sample, "compression": compression, } if preset is not None: if preset in self.CODEC_PRESETS: kwargs = self.CODEC_PRESETS[preset] else: raise ValueError( f"Unknown preset: {preset}. " f"Known presets: {list(self.CODEC_PRESETS.keys())}" ) waveform = self._to_2d() if kwargs["format"] in ["vorbis", "mp3", "ogg", "amr-nb"]: # Apply it in a for loop augmented = torch.cat( [ torchaudio.functional.apply_codec( waveform[i][None, :], self.sample_rate, **kwargs ) for i in range(waveform.shape[0]) ], dim=0, ) else: augmented = torchaudio.functional.apply_codec( waveform, self.sample_rate, **kwargs ) augmented = self._to_3d(augmented) self.audio_data = augmented return self def mel_filterbank(self, n_bands: int): """Breaks signal into mel bands. Parameters ---------- n_bands : int Number of mel bands to use. Returns ------- torch.Tensor Mel-filtered bands, with last axis being the band index. """ filterbank = ( julius.SplitBands(self.sample_rate, n_bands).float().to(self.device) ) filtered = filterbank(self.audio_data) return filtered.permute(1, 2, 3, 0) def equalizer(self, db: typing.Union[torch.Tensor, np.ndarray]): """Applies a mel-spaced equalizer to the audio signal. Parameters ---------- db : typing.Union[torch.Tensor, np.ndarray] EQ curve to apply. Returns ------- AudioSignal AudioSignal with equalization applied. """ db = util.ensure_tensor(db) n_bands = db.shape[-1] fbank = self.mel_filterbank(n_bands) # If there's a batch dimension, make sure it's the same. if db.ndim == 2: if db.shape[0] != 1: assert db.shape[0] == fbank.shape[0] else: db = db.unsqueeze(0) weights = (10**db).to(self.device).float() fbank = fbank * weights[:, None, None, :] eq_audio_data = fbank.sum(-1) self.audio_data = eq_audio_data return self def clip_distortion( self, clip_percentile: typing.Union[torch.Tensor, np.ndarray, float] ): """Clips the signal at a given percentile. The higher it is, the lower the threshold for clipping. Parameters ---------- clip_percentile : typing.Union[torch.Tensor, np.ndarray, float] Values are between 0.0 to 1.0. Typical values are 0.1 or below. Returns ------- AudioSignal Audio signal with clipped audio data. """ clip_percentile = util.ensure_tensor(clip_percentile, ndim=1) min_thresh = torch.quantile(self.audio_data, clip_percentile / 2, dim=-1) max_thresh = torch.quantile(self.audio_data, 1 - (clip_percentile / 2), dim=-1) nc = self.audio_data.shape[1] min_thresh = min_thresh[:, :nc, :] max_thresh = max_thresh[:, :nc, :] self.audio_data = self.audio_data.clamp(min_thresh, max_thresh) return self def quantization( self, quantization_channels: typing.Union[torch.Tensor, np.ndarray, int] ): """Applies quantization to the input waveform. Parameters ---------- quantization_channels : typing.Union[torch.Tensor, np.ndarray, int] Number of evenly spaced quantization channels to quantize to. Returns ------- AudioSignal Quantized AudioSignal. """ quantization_channels = util.ensure_tensor(quantization_channels, ndim=3) x = self.audio_data x = (x + 1) / 2 x = x * quantization_channels x = x.floor() x = x / quantization_channels x = 2 * x - 1 residual = (self.audio_data - x).detach() self.audio_data = self.audio_data - residual return self def mulaw_quantization( self, quantization_channels: typing.Union[torch.Tensor, np.ndarray, int] ): """Applies mu-law quantization to the input waveform. Parameters ---------- quantization_channels : typing.Union[torch.Tensor, np.ndarray, int] Number of mu-law spaced quantization channels to quantize to. Returns ------- AudioSignal Quantized AudioSignal. """ mu = quantization_channels - 1.0 mu = util.ensure_tensor(mu, ndim=3) x = self.audio_data # quantize x = torch.sign(x) * torch.log1p(mu * torch.abs(x)) / torch.log1p(mu) x = ((x + 1) / 2 * mu + 0.5).to(torch.int64) # unquantize x = (x / mu) * 2 - 1.0 x = torch.sign(x) * (torch.exp(torch.abs(x) * torch.log1p(mu)) - 1.0) / mu residual = (self.audio_data - x).detach() self.audio_data = self.audio_data - residual return self def __matmul__(self, other): return self.convolve(other) class ImpulseResponseMixin: """These functions are generally only used with AudioSignals that are derived from impulse responses, not other sources like music or speech. These methods are used to replicate the data augmentation described in [1]. 1. Bryan, Nicholas J. "Impulse response data augmentation and deep neural networks for blind room acoustic parameter estimation." ICASSP 2020-2020 IEEE International Conference on Acoustics, Speech and Signal Processing (ICASSP). IEEE, 2020. """ def decompose_ir(self): """Decomposes an impulse response into early and late field responses. """ # Equations 1 and 2 # ----------------- # Breaking up into early # response + late field response. td = torch.argmax(self.audio_data, dim=-1, keepdim=True) t0 = int(self.sample_rate * 0.0025) idx = torch.arange(self.audio_data.shape[-1], device=self.device)[None, None, :] idx = idx.expand(self.batch_size, -1, -1) early_idx = (idx >= td - t0) * (idx <= td + t0) early_response = torch.zeros_like(self.audio_data, device=self.device) early_response[early_idx] = self.audio_data[early_idx] late_idx = ~early_idx late_field = torch.zeros_like(self.audio_data, device=self.device) late_field[late_idx] = self.audio_data[late_idx] # Equation 4 # ---------- # Decompose early response into windowed # direct path and windowed residual. window = torch.zeros_like(self.audio_data, device=self.device) for idx in range(self.batch_size): window_idx = early_idx[idx, 0].nonzero() window[idx, ..., window_idx] = self.get_window( "hann", window_idx.shape[-1], self.device ) return early_response, late_field, window def measure_drr(self): """Measures the direct-to-reverberant ratio of the impulse response. Returns ------- float Direct-to-reverberant ratio """ early_response, late_field, _ = self.decompose_ir() num = (early_response**2).sum(dim=-1) den = (late_field**2).sum(dim=-1) drr = 10 * torch.log10(num / den) return drr @staticmethod def solve_alpha(early_response, late_field, wd, target_drr): """Used to solve for the alpha value, which is used to alter the drr. """ # Equation 5 # ---------- # Apply the good ol' quadratic formula. wd_sq = wd**2 wd_sq_1 = (1 - wd) ** 2 e_sq = early_response**2 l_sq = late_field**2 a = (wd_sq * e_sq).sum(dim=-1) b = (2 * (1 - wd) * wd * e_sq).sum(dim=-1) c = (wd_sq_1 * e_sq).sum(dim=-1) - torch.pow(10, target_drr / 10) * l_sq.sum( dim=-1 ) expr = ((b**2) - 4 * a * c).sqrt() alpha = torch.maximum( (-b - expr) / (2 * a), (-b + expr) / (2 * a), ) return alpha def alter_drr(self, drr: typing.Union[torch.Tensor, np.ndarray, float]): """Alters the direct-to-reverberant ratio of the impulse response. Parameters ---------- drr : typing.Union[torch.Tensor, np.ndarray, float] Direct-to-reverberant ratio that impulse response will be altered to, if specified, by default None Returns ------- AudioSignal Altered impulse response. """ drr = util.ensure_tensor(drr, 2, self.batch_size).to(self.device) early_response, late_field, window = self.decompose_ir() alpha = self.solve_alpha(early_response, late_field, window, drr) min_alpha = ( late_field.abs().max(dim=-1)[0] / early_response.abs().max(dim=-1)[0] ) alpha = torch.maximum(alpha, min_alpha)[..., None] aug_ir_data = ( alpha * window * early_response + ((1 - window) * early_response) + late_field ) self.audio_data = aug_ir_data self.ensure_max_of_audio() return self