ITO-Master / modules /common_audioeffects.py
jhtonyKoo's picture
modify app
6bbce1b
raw
history blame
59.2 kB
"""
Audio effects for data augmentation.
Several audio effects can be combined into an augmentation chain.
Important note: We assume that the parallelization during training is done using
multi-processing and not multi-threading. Hence, we do not need the
`@sox.sox_context()` decorators as discussed in this
[thread](https://github.com/pseeth/soxbindings/issues/4).
AI Music Technology Group, Sony Group Corporation
AI Speech and Sound Group, Sony Europe
This implementation originally belongs to Sony Group Corporation,
which has been introduced in the work "Automatic music mixing with deep learning and out-of-domain data".
Original repo link: https://github.com/sony/FxNorm-automix
This work modifies a few implementations from the original repo to suit the task.
"""
from itertools import permutations
import logging
import numpy as np
import pymixconsole as pymc
from pymixconsole.parameter import Parameter
from pymixconsole.parameter_list import ParameterList
from pymixconsole.processor import Processor
from random import shuffle
from scipy.signal import oaconvolve
import soxbindings as sox
from typing import List, Optional, Tuple, Union
from numba import jit
# prevent pysox from logging warnings regarding non-opimal timestretch factors
logging.getLogger('sox').setLevel(logging.ERROR)
# Monkey-Patch `Processor` for convenience
# (a) Allow `None` as blocksize if processor can work on variable-length audio
def new_init(self, name, parameters, block_size, sample_rate, dtype='float32'):
"""
Initialize processor.
Args:
self: Reference to object
name (str): Name of processor.
parameters (parameter_list): Parameters for this processor.
block_size (int): Size of blocks for blockwise processing.
Can also be `None` if full audio can be processed at once.
sample_rate (int): Sample rate of input audio. Use `None` if effect is independent of this value.
dtype (str): data type of samples
"""
self.name = name
self.parameters = parameters
self.block_size = block_size
self.sample_rate = sample_rate
self.dtype = dtype
# (b) make code simpler
def new_update(self, parameter_name):
"""
Update processor after randomization of parameters.
Args:
self: Reference to object.
parameter_name (str): Parameter whose value has changed.
"""
pass
# (c) representation for nice print
def new_repr(self):
"""
Create human-readable representation.
Args:
self: Reference to object.
Returns:
string representation of object.
"""
return f'Processor(name={self.name!r}, parameters={self.parameters!r}'
Processor.__init__ = new_init
Processor.__repr__ = new_repr
Processor.update = new_update
class AugmentationChain:
"""Basic audio Fx chain which is used for data augmentation."""
def __init__(self,
fxs: Optional[List[Tuple[Union[Processor, 'AugmentationChain'], float, bool]]] = [],
shuffle: Optional[bool] = False,
parallel: Optional[bool] = False,
parallel_weight_factor = None,
randomize_param_value=True):
"""
Create augmentation chain from the dictionary `fxs`.
Args:
fxs (list of tuples): First tuple element is an instances of `pymc.processor` or `AugmentationChain` that
we want to use for data augmentation. Second element gives probability that effect should be applied.
Third element defines, whether the processed signal is normalized by the RMS of the input.
shuffle (bool): If `True` then order of Fx are changed whenever chain is applied.
"""
self.fxs = fxs
self.shuffle = shuffle
self.parallel = parallel
self.parallel_weight_factor = parallel_weight_factor
self.randomize_param_value = randomize_param_value
def apply_processor(self, x, processor: Processor, rms_normalize):
"""
Pass audio in `x` through `processor` and output the respective processed audio.
Args:
x (Numpy array): Input audio of shape `n_samples` x `n_channels`.
processor (Processor): Audio effect that we want to apply.
rms_normalize (bool): If `True`, the processed signal is normalized by the RMS of the signal.
Returns:
Numpy array: Processed audio of shape `n_samples` x `n_channels` (same size as `x')
"""
n_samples_input = x.shape[0]
if processor.block_size is None:
y = processor.process(x)
else:
# make sure that n_samples is a multiple of `processor.block_size`
if x.shape[0] % processor.block_size != 0:
n_pad = processor.block_size - x.shape[0] % processor.block_size
x = np.pad(x, ((0, n_pad), (0, 0)), mode='reflective')
y = np.zeros_like(x)
for idx in range(0, x.shape[0], processor.block_size):
y[idx:idx+processor.block_size, :] = processor.process(x[idx:idx+processor.block_size, :])
if rms_normalize:
# normalize output energy such that it is the same as the input energy
scale = np.sqrt(np.mean(np.square(x)) / np.maximum(1e-7, np.mean(np.square(y))))
y *= scale
# return audio of same length as x
return y[:n_samples_input, :]
def apply_same_processor(self, x_list, processor: Processor, rms_normalize):
for i in range(len(x_list)):
x_list[i] = self.apply_processor(x_list[i], processor, rms_normalize)
return x_list
def __call__(self, x_list):
"""
Apply the same augmentation chain to audio tracks in list `x_list`.
Args:
x_list (list of Numpy array) : List of audio samples of shape `n_samples` x `n_channels`.
Returns:
y_list (list of Numpy array) : List of processed audio of same shape as `x_list` where the same effects have been applied.
"""
# randomly shuffle effect order if `self.shuffle` is True
if self.shuffle:
shuffle(self.fxs)
# apply effects with probabilities given in `self.fxs`
y_list = x_list.copy()
for fx, p, rms_normalize in self.fxs:
if np.random.rand() < p:
if isinstance(fx, Processor):
# randomize all effect parameters (also calls `update()` for each processor)
if self.randomize_param_value:
fx.randomize()
else:
fx.update(None)
# apply processor
y_list = self.apply_same_processor(y_list, fx, rms_normalize)
else:
y_list = fx(y_list)
if self.parallel:
# weighting factor of input signal in the range of (0.0 ~ 0.5)
weight_in = self.parallel_weight_factor if self.parallel_weight_factor else np.random.rand() / 2.
for i in range(len(y_list)):
y_list[i] = weight_in*x_list[i] + (1-weight_in)*y_list[i]
return y_list
def __repr__(self):
"""
Human-readable representation.
Returns:
string representation of object.
"""
return f'AugmentationChain(fxs={self.fxs!r}, shuffle={self.shuffle!r})'
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% DISTORTION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
def hard_clip(x, threshold_dB, drive):
"""
Hard clip distortion.
Args:
x: input audio
threshold_dB: threshold
drive: drive
Returns:
(Numpy array): distorted audio
"""
drive_linear = np.power(10., drive / 20.).astype(np.float32)
threshold_linear = 10. ** (threshold_dB / 20.)
return np.clip(x * drive_linear, -threshold_linear, threshold_linear)
def overdrive(x, drive, colour, sample_rate):
"""
Overdrive distortion.
Args:
x: input audio
drive: Controls the amount of distortion (dB).
colour: Controls the amount of even harmonic content in the output(dB)
sample_rate: sampling rate
Returns:
(Numpy array): distorted audio
"""
scale = np.max(np.abs(x))
if scale > 0.9:
clips = True
x = x * (0.9 / scale)
else:
clips = False
tfm = sox.Transformer()
tfm.overdrive(gain_db=drive, colour=colour)
y = tfm.build_array(input_array=x, sample_rate_in=sample_rate).astype(np.float32)
if clips:
y *= scale / 0.9 # rescale output to original scale
return y
def hyperbolic_tangent(x, drive):
"""
Hyperbolic Tanh distortion.
Args:
x: input audio
drive: drive
Returns:
(Numpy array): distorted audio
"""
drive_linear = np.power(10., drive / 20.).astype(np.float32)
return np.tanh(2. * x * drive_linear)
def soft_sine(x, drive):
"""
Soft sine distortion.
Args:
x: input audio
drive: drive
Returns:
(Numpy array): distorted audio
"""
drive_linear = np.power(10., drive / 20.).astype(np.float32)
y = np.clip(x * drive_linear, -np.pi/4.0, np.pi/4.0)
return np.sin(2. * y)
def bit_crusher(x, bits):
"""
Bit crusher distortion.
Args:
x: input audio
bits: bits
Returns:
(Numpy array): distorted audio
"""
return np.rint(x * (2 ** bits)) / (2 ** bits)
class Distortion(Processor):
"""
Distortion processor.
Processor parameters:
mode (str): Currently supports the following five modes: hard_clip, waveshaper, soft_sine, tanh, bit_crusher.
Each mode has different parameters such as threshold, factor, or bits.
threshold (float): threshold
drive (float): drive
factor (float): factor
limit_range (float): limit range
bits (int): bits
"""
def __init__(self, sample_rate, name='Distortion', parameters=None):
"""
Initialize processor.
Args:
sample_rate (int): sample rate.
name (str): Name of processor.
parameters (parameter_list): Parameters for this processor.
"""
super().__init__(name, None, block_size=None, sample_rate=sample_rate)
if not parameters:
self.parameters = ParameterList()
self.parameters.add(Parameter('mode', 'hard_clip', 'string',
options=['hard_clip',
'overdrive',
'soft_sine',
'tanh',
'bit_crusher']))
self.parameters.add(Parameter('threshold', 0.0, 'float',
units='dB', maximum=0.0, minimum=-20.0))
self.parameters.add(Parameter('drive', 0.0, 'float',
units='dB', maximum=20.0, minimum=0.0))
self.parameters.add(Parameter('colour', 20.0, 'float',
maximum=100.0, minimum=0.0))
self.parameters.add(Parameter('bits', 12, 'int',
maximum=12, minimum=8))
def process(self, x):
"""
Process audio.
Args:
x (Numpy array): input audio of size `n_samples x n_channels`.
Returns:
(Numpy array): distorted audio of size `n_samples x n_channels`.
"""
if self.parameters.mode.value == 'hard_clip':
y = hard_clip(x, self.parameters.threshold.value, self.parameters.drive.value)
elif self.parameters.mode.value == 'overdrive':
y = overdrive(x, self.parameters.drive.value,
self.parameters.colour.value, self.sample_rate)
elif self.parameters.mode.value == 'soft_sine':
y = soft_sine(x, self.parameters.drive.value)
elif self.parameters.mode.value == 'tanh':
y = hyperbolic_tangent(x, self.parameters.drive.value)
elif self.parameters.mode.value == 'bit_crusher':
y = bit_crusher(x, self.parameters.bits.value)
# If the output has low amplitude, (some distortion settigns can "crush" down the amplitude)
# Then it`s normalised to the input's amplitude
x_max = np.max(np.abs(x)) + 1e-8
o_max = np.max(np.abs(y)) + 1e-8
if x_max > o_max:
y = y*(x_max/o_max)
return y
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% EQUALISER %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
class Equaliser(Processor):
"""
Five band parametric equaliser (two shelves and three central bands).
All gains are set in dB values and range from `MIN_GAIN` dB to `MAX_GAIN` dB.
This processor is implemented as cascade of five biquad IIR filters
that are implemented using the infamous cookbook formulae from RBJ.
Processor parameters:
low_shelf_gain (float), low_shelf_freq (float)
first_band_gain (float), first_band_freq (float), first_band_q (float)
second_band_gain (float), second_band_freq (float), second_band_q (float)
third_band_gain (float), third_band_freq (float), third_band_q (float)
original from https://github.com/csteinmetz1/pymixconsole/blob/master/pymixconsole/processors/equaliser.py
"""
def __init__(self, n_channels,
sample_rate,
gain_range=(-15.0, 15.0),
q_range=(0.1, 2.0),
bands=['low_shelf', 'first_band', 'second_band', 'third_band', 'high_shelf'],
hard_clip=False,
name='Equaliser', parameters=None):
"""
Initialize processor.
Args:
n_channels (int): Number of audio channels.
sample_rate (int): Sample rate of audio.
gain_range (tuple of floats): minimum and maximum gain that can be used.
q_range (tuple of floats): minimum and maximum q value.
hard_clip (bool): Whether we clip to [-1, 1.] after processing.
name (str): Name of processor.
parameters (parameter_list): Parameters for this processor.
"""
super().__init__(name, parameters=parameters, block_size=None, sample_rate=sample_rate)
self.n_channels = n_channels
MIN_GAIN, MAX_GAIN = gain_range
MIN_Q, MAX_Q = q_range
if not parameters:
self.parameters = ParameterList()
# low shelf parameters -------
self.parameters.add(Parameter('low_shelf_gain', 0.0, 'float', minimum=MIN_GAIN, maximum=MAX_GAIN))
self.parameters.add(Parameter('low_shelf_freq', 80.0, 'float', minimum=30.0, maximum=200.0))
# first band parameters ------
self.parameters.add(Parameter('first_band_gain', 0.0, 'float', minimum=MIN_GAIN, maximum=MAX_GAIN))
self.parameters.add(Parameter('first_band_freq', 400.0, 'float', minimum=200.0, maximum=1000.0))
self.parameters.add(Parameter('first_band_q', 0.7, 'float', minimum=MIN_Q, maximum=MAX_Q))
# second band parameters -----
self.parameters.add(Parameter('second_band_gain', 0.0, 'float', minimum=MIN_GAIN, maximum=MAX_GAIN))
self.parameters.add(Parameter('second_band_freq', 2000.0, 'float', minimum=1000.0, maximum=3000.0))
self.parameters.add(Parameter('second_band_q', 0.7, 'float', minimum=MIN_Q, maximum=MAX_Q))
# third band parameters ------
self.parameters.add(Parameter('third_band_gain', 0.0, 'float', minimum=MIN_GAIN, maximum=MAX_GAIN))
self.parameters.add(Parameter('third_band_freq', 4000.0, 'float', minimum=3000.0, maximum=8000.0))
self.parameters.add(Parameter('third_band_q', 0.7, 'float', minimum=MIN_Q, maximum=MAX_Q))
# high shelf parameters ------
self.parameters.add(Parameter('high_shelf_gain', 0.0, 'float', minimum=MIN_GAIN, maximum=MAX_GAIN))
self.parameters.add(Parameter('high_shelf_freq', 8000.0, 'float', minimum=5000.0, maximum=10000.0))
self.bands = bands
self.filters = self.setup_filters()
self.hard_clip = hard_clip
def setup_filters(self):
"""
Create IIR filters.
Returns:
IIR filters
"""
filters = {}
for band in self.bands:
G = getattr(self.parameters, band + '_gain').value
fc = getattr(self.parameters, band + '_freq').value
rate = self.sample_rate
if band in ['low_shelf', 'high_shelf']:
Q = 0.707
filter_type = band
else:
Q = getattr(self.parameters, band + '_q').value
filter_type = 'peaking'
filters[band] = pymc.components.iirfilter.IIRfilter(G, Q, fc, rate, filter_type, n_channels=self.n_channels)
return filters
def update_filter(self, band):
"""
Update filters.
Args:
band (str): Band that should be updated.
"""
self.filters[band].G = getattr(self.parameters, band + '_gain').value
self.filters[band].fc = getattr(self.parameters, band + '_freq').value
self.filters[band].rate = self.sample_rate
if band in ['first_band', 'second_band', 'third_band']:
self.filters[band].Q = getattr(self.parameters, band + '_q').value
def update(self, parameter_name=None):
"""
Update processor after randomization of parameters.
Args:
parameter_name (str): Parameter whose value has changed.
"""
if parameter_name is not None:
bands = ['_'.join(parameter_name.split('_')[:2])]
else:
bands = self.bands
for band in bands:
self.update_filter(band)
for _band, iirfilter in self.filters.items():
iirfilter.reset_state()
def reset_state(self):
"""Reset state."""
for _band, iirfilter in self.filters.items():
iirfilter.reset_state()
def process(self, x):
"""
Process audio.
Args:
x (Numpy array): input audio of size `n_samples x n_channels`.
Returns:
(Numpy array): equalized audio of size `n_samples x n_channels`.
"""
for _band, iirfilter in self.filters.items():
iirfilter.reset_state()
x = iirfilter.apply_filter(x)
if self.hard_clip:
x = np.clip(x, -1.0, 1.0)
# make sure that we have float32 as IIR filtering returns float64
x = x.astype(np.float32)
# make sure that we have two dimensions (if `n_channels == 1`)
if x.ndim == 1:
x = x[:, np.newaxis]
return x
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% COMPRESSOR %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@jit(nopython=True)
def compressor_process(x, threshold, attack_time, release_time, ratio, makeup_gain, sample_rate, yL_prev):
"""
Apply compressor.
Args:
x (Numpy array): audio data.
threshold: threshold in dB.
attack_time: attack_time in ms.
release_time: release_time in ms.
ratio: ratio.
makeup_gain: makeup_gain.
sample_rate: sample rate.
yL_prev: internal state of the envelop gain.
Returns:
compressed audio.
"""
M = x.shape[0]
x_g = np.zeros(M)
x_l = np.zeros(M)
y_g = np.zeros(M)
y_l = np.zeros(M)
c = np.zeros(M)
yL_prev = 0.
alpha_attack = np.exp(-1/(0.001 * sample_rate * attack_time))
alpha_release = np.exp(-1/(0.001 * sample_rate * release_time))
for i in np.arange(M):
if np.abs(x[i]) < 0.000001:
x_g[i] = -120.0
else:
x_g[i] = 20 * np.log10(np.abs(x[i]))
if ratio > 1:
if x_g[i] >= threshold:
y_g[i] = threshold + (x_g[i] - threshold) / ratio
else:
y_g[i] = x_g[i]
elif ratio < 1:
if x_g[i] <= threshold:
y_g[i] = threshold + (x_g[i] - threshold) / (1/ratio)
else:
y_g[i] = x_g[i]
x_l[i] = x_g[i] - y_g[i]
if x_l[i] > yL_prev:
y_l[i] = alpha_attack * yL_prev + (1 - alpha_attack) * x_l[i]
else:
y_l[i] = alpha_release * yL_prev + (1 - alpha_release) * x_l[i]
c[i] = np.power(10.0, (makeup_gain - y_l[i]) / 20.0)
yL_prev = y_l[i]
y = x * c
return y, yL_prev
class Compressor(Processor):
"""
Single band stereo dynamic range compressor.
Processor parameters:
threshold (float)
attack_time (float)
release_time (float)
ratio (float)
makeup_gain (float)
"""
def __init__(self, sample_rate, name='Compressor', parameters=None):
"""
Initialize processor.
Args:
sample_rate (int): Sample rate of input audio.
name (str): Name of processor.
parameters (parameter_list): Parameters for this processor.
"""
super().__init__(name=name, parameters=parameters, block_size=None, sample_rate=sample_rate)
if not parameters:
self.parameters = ParameterList()
self.parameters.add(Parameter('threshold', -20.0, 'float', units='dB', minimum=-80.0, maximum=-5.0))
self.parameters.add(Parameter('attack_time', 2.0, 'float', units='ms', minimum=1., maximum=20.0))
self.parameters.add(Parameter('release_time', 100.0, 'float', units='ms', minimum=50.0, maximum=500.0))
self.parameters.add(Parameter('ratio', 4.0, 'float', minimum=4., maximum=40.0))
# we remove makeup_gain parameter inside the Compressor
# store internal state (for block-wise processing)
self.yL_prev = None
def process(self, x):
"""
Process audio.
Args:
x (Numpy array): input audio of size `n_samples x n_channels`.
Returns:
(Numpy array): compressed audio of size `n_samples x n_channels`.
"""
if self.yL_prev is None:
self.yL_prev = [0.] * x.shape[1]
if not self.parameters.threshold.value == 0.0 or not self.parameters.ratio.value == 1.0:
y = np.zeros_like(x)
for ch in range(x.shape[1]):
y[:, ch], self.yL_prev[ch] = compressor_process(x[:, ch],
self.parameters.threshold.value,
self.parameters.attack_time.value,
self.parameters.release_time.value,
self.parameters.ratio.value,
0.0, # makeup_gain = 0
self.sample_rate,
self.yL_prev[ch])
else:
y = x
return y
def update(self, parameter_name=None):
"""
Update processor after randomization of parameters.
Args:
parameter_name (str): Parameter whose value has changed.
"""
self.yL_prev = None
# %%%%%%%%%%%%%%%%%%%%%%%%%% CONVOLUTIONAL REVERB %%%%%%%%%%%%%%%%%%%%%%%%%%%%%
class ConvolutionalReverb(Processor):
"""
Convolutional Reverb.
Processor parameters:
wet_dry (float): Wet/dry ratio.
decay (float): Applies a fade out to the impulse response.
pre_delay (float): Value in ms. Shifts the IR in time and allows.
A positive value produces a traditional delay between the dry signal and the wet.
A negative delay is, in reality, zero delay, but effectively trims off the start of IR,
so the reverb response begins at a point further in.
"""
def __init__(self, impulse_responses, sample_rate, name='ConvolutionalReverb', parameters=None):
"""
Initialize processor.
Args:
impulse_responses (list): List with impulse responses created by `common_dataprocessing.create_dataset`
sample_rate (int): Sample rate that we should assume (used for fade-out computation)
name (str): Name of processor.
parameters (parameter_list): Parameters for this processor.
Raises:
ValueError: if no impulse responses are provided.
"""
super().__init__(name=name, parameters=parameters, block_size=None, sample_rate=sample_rate)
if impulse_responses is None:
raise ValueError('List of impulse responses must be provided for ConvolutionalReverb processor.')
self.impulse_responses = impulse_responses
if not parameters:
self.parameters = ParameterList()
self.max_ir_num = len(max(impulse_responses, key=len))
self.parameters.add(Parameter('index', 0, 'int', minimum=0, maximum=len(impulse_responses)))
self.parameters.add(Parameter('index_ir', 0, 'int', minimum=0, maximum=self.max_ir_num))
self.parameters.add(Parameter('wet', 1.0, 'float', minimum=1.0, maximum=1.0))
self.parameters.add(Parameter('dry', 0.0, 'float', minimum=0.0, maximum=0.0))
self.parameters.add(Parameter('decay', 1.0, 'float', minimum=1.0, maximum=1.0))
self.parameters.add(Parameter('pre_delay', 0, 'int', units='ms', minimum=0, maximum=0))
def update(self, parameter_name=None):
"""
Update processor after randomization of parameters.
Args:
parameter_name (str): Parameter whose value has changed.
"""
# we sample IR with a uniform random distribution according to RT60 values
chosen_ir_duration = self.impulse_responses[self.parameters.index.value]
chosen_ir_idx = self.parameters.index_ir.value % len(chosen_ir_duration)
self.h = np.copy(chosen_ir_duration[chosen_ir_idx]['impulse_response']())
# fade out the impulse based on the decay setting (starting from peak value)
if self.parameters.decay.value < 1.:
idx_peak = np.argmax(np.max(np.abs(self.h), axis=1), axis=0)
fstart = np.minimum(self.h.shape[0],
idx_peak + int(self.parameters.decay.value * (self.h.shape[0] - idx_peak)))
fstop = np.minimum(self.h.shape[0], fstart + int(0.020*self.sample_rate)) # constant 20 ms fade out
flen = fstop - fstart
fade = np.arange(1, flen+1, dtype=self.dtype)/flen
fade = np.power(0.1, fade * 5)
self.h[fstart:fstop, :] *= fade[:, np.newaxis]
self.h = self.h[:fstop]
def process(self, x):
"""
Process audio.
Args:
x (Numpy array): input audio of size `n_samples x n_channels`.
Returns:
(Numpy array): reverbed audio of size `n_samples x n_channels`.
"""
# reshape IR to the correct size
n_channels = x.shape[1]
if self.h.shape[1] == 1 and n_channels > 1:
self.h = np.hstack([self.h] * n_channels) # repeat mono IR for multi-channel input
if self.h.shape[1] > 1 and n_channels == 1:
self.h = self.h[:, np.random.randint(self.h.shape[1]), np.newaxis] # randomly choose one IR channel
if self.parameters.wet.value == 0.0:
return x
else:
# perform convolution to get wet signal
y = oaconvolve(x, self.h, mode='full', axes=0)
# cut out wet signal (compensating for the delay that the IR is introducing + predelay)
idx = np.argmax(np.max(np.abs(self.h), axis=1), axis=0)
idx += int(0.001 * np.abs(self.parameters.pre_delay.value) * self.sample_rate)
idx = np.clip(idx, 0, self.h.shape[0]-1)
y = y[idx:idx+x.shape[0], :]
# return weighted sum of dry and wet signal
return self.parameters.dry.value * x + self.parameters.wet.value * y
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%% HAAS EFFECT %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
def haas_process(x, delay, feedback, wet_channel):
"""
Add Haas effect to audio.
Args:
x (Numpy array): input audio.
delay: Delay that we apply to one of the channels (in samples).
feedback: Feedback value.
wet_channel: Which channel we process (`left` or `right`).
Returns:
(Numpy array): Audio with Haas effect.
"""
y = np.copy(x)
if wet_channel == 'left':
y[:, 0] += feedback * np.roll(x[:, 0], delay)
elif wet_channel == 'right':
y[:, 1] += feedback * np.roll(x[:, 1], delay)
return y
class Haas(Processor):
"""
Haas Effect Processor.
Randomly selects one channel and applies a short delay to it.
Processor parameters:
delay (int)
feedback (float)
wet_channel (string)
"""
def __init__(self, sample_rate, delay_range=(-0.040, 0.040), name='Haas', parameters=None,
):
"""
Initialize processor.
Args:
sample_rate (int): Sample rate of input audio.
delay_range (tuple of floats): minimum/maximum delay for Haas effect.
name (str): Name of processor.
parameters (parameter_list): Parameters for this processor.
"""
super().__init__(name=name, parameters=parameters, block_size=None, sample_rate=sample_rate)
if not parameters:
self.parameters = ParameterList()
self.parameters.add(Parameter('delay', int(delay_range[1] * sample_rate), 'int', units='samples',
minimum=int(delay_range[0] * sample_rate),
maximum=int(delay_range[1] * sample_rate)))
self.parameters.add(Parameter('feedback', 0.35, 'float', minimum=0.33, maximum=0.66))
self.parameters.add(Parameter('wet_channel', 'left', 'string', options=['left', 'right']))
def process(self, x):
"""
Process audio.
Args:
x (Numpy array): input audio of size `n_samples x n_channels`.
Returns:
(Numpy array): audio with Haas effect of size `n_samples x n_channels`.
"""
assert x.shape[1] == 1 or x.shape[1] == 2, 'Haas effect only works with monaural or stereo audio.'
if x.shape[1] < 2:
x = np.repeat(x, 2, axis=1)
y = haas_process(x, self.parameters.delay.value,
self.parameters.feedback.value, self.parameters.wet_channel.value)
return y
def update(self, parameter_name=None):
"""
Update processor after randomization of parameters.
Args:
parameter_name (str): Parameter whose value has changed.
"""
self.reset_state()
def reset_state(self):
"""Reset state."""
self.read_idx = 0
self.write_idx = self.parameters.delay.value
self.buffer = np.zeros((65536, 2))
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%% PANNER %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
class Panner(Processor):
"""
Simple stereo panner.
If input is mono, output is stereo.
Original edited from https://github.com/csteinmetz1/pymixconsole/blob/master/pymixconsole/processors/panner.py
"""
def __init__(self, name='Panner', parameters=None):
"""
Initialize processor.
Args:
name (str): Name of processor.
parameters (parameter_list): Parameters for this processor.
"""
# default processor class constructor
super().__init__(name=name, parameters=parameters, block_size=None, sample_rate=None)
if not parameters:
self.parameters = ParameterList()
self.parameters.add(Parameter('pan', 0.5, 'float', minimum=0., maximum=1.))
self.parameters.add(Parameter('pan_law', '-4.5dB', 'string',
options=['-4.5dB', 'linear', 'constant_power']))
# setup the coefficents based on default params
self.update()
def _calculate_pan_coefficents(self):
"""
Calculate panning coefficients from the chosen pan law.
Based on the set pan law determine the gain value
to apply for the left and right channel to achieve panning effect.
This operates on the assumption that the input channel is mono.
The output data will be stereo at the moment, but could be expanded
to a higher channel count format.
The panning value is in the range [0, 1], where
0 means the signal is panned completely to the left, and
1 means the signal is apanned copletely to the right.
Raises:
ValueError: `self.parameters.pan_law` is not supported.
"""
self.gains = np.zeros(2, dtype=self.dtype)
# first scale the linear [0, 1] to [0, pi/2]
theta = self.parameters.pan.value * (np.pi/2)
if self.parameters.pan_law.value == 'linear':
self.gains[0] = ((np.pi/2) - theta) * (2/np.pi)
self.gains[1] = theta * (2/np.pi)
elif self.parameters.pan_law.value == 'constant_power':
self.gains[0] = np.cos(theta)
self.gains[1] = np.sin(theta)
elif self.parameters.pan_law.value == '-4.5dB':
self.gains[0] = np.sqrt(((np.pi/2) - theta) * (2/np.pi) * np.cos(theta))
self.gains[1] = np.sqrt(theta * (2/np.pi) * np.sin(theta))
else:
raise ValueError(f'Invalid pan_law {self.parameters.pan_law.value}.')
def process(self, x):
"""
Process audio.
Args:
x (Numpy array): input audio of size `n_samples x n_channels`.
Returns:
(Numpy array): panned audio of size `n_samples x n_channels`.
"""
assert x.shape[1] == 1 or x.shape[1] == 2, 'Panner only works with monaural or stereo audio.'
if x.shape[1] < 2:
x = np.repeat(x, 2, axis=1)
return x * self.gains
def update(self, parameter_name=None):
"""
Update processor after randomization of parameters.
Args:
parameter_name (str): Parameter whose value has changed.
"""
self._calculate_pan_coefficents()
def reset_state(self):
"""Reset state."""
self._output_buffer = np.empty([self.block_size, 2])
self.update()
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%% STEREO IMAGER %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
class MidSideImager(Processor):
def __init__(self, name='IMAGER', parameters=None):
super().__init__(name, parameters=parameters, block_size=None, sample_rate=None)
if not parameters:
self.parameters = ParameterList()
# values of 0.0~1.0 indicate making the signal more centered while 1.0~2.0 means making the signal more wider
self.parameters.add(Parameter("bal", 0.0, "float", processor=self, minimum=0.0, maximum=2.0))
def process(self, data):
"""
# input shape : [signal length, 2]
### note! stereo imager won't work if the input signal is a mono signal (left==right)
### if you want to apply stereo imager to a mono signal, first stereoize it with Haas effects
"""
# to mid-side channels
mid, side = self.lr_to_ms(data[:,0], data[:,1])
# apply mid-side weights according to energy
mid_e, side_e = np.sum(mid**2), np.sum(side**2)
total_e = mid_e + side_e
# apply weights
max_side_multiplier = np.sqrt(total_e / (side_e + 1e-3))
# compute current multiply factor
cur_bal = round(getattr(self.parameters, "bal").value, 3)
side_gain = cur_bal if cur_bal <= 1. else max_side_multiplier * (cur_bal-1)
# multiply weighting factor
new_side = side * side_gain
new_side_e = side_e * (side_gain ** 2)
left_mid_e = total_e - new_side_e
mid_gain = np.sqrt(left_mid_e / (mid_e + 1e-3))
new_mid = mid * mid_gain
# convert back to left-right channels
left, right = self.ms_to_lr(new_mid, new_side)
imaged = np.stack([left, right], 1)
return imaged
# left-right channeled signal to mid-side signal
def lr_to_ms(self, left, right):
mid = left + right
side = left - right
return mid, side
# mid-side channeled signal to left-right signal
def ms_to_lr(self, mid, side):
left = (mid + side) / 2
right = (mid - side) / 2
return left, right
def update(self, parameter_name=None):
return parameter_name
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% GAIN %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
class Gain(Processor):
"""
Gain Processor.
Applies gain in dB and can also randomly inverts polarity.
Processor parameters:
gain (float): Gain that should be applied (dB scale).
invert (bool): If True, then we also invert the waveform.
"""
def __init__(self, name='Gain', parameters=None):
"""
Initialize processor.
Args:
name (str): Name of processor.
parameters (parameter_list): Parameters for this processor.
"""
super().__init__(name, parameters=parameters, block_size=None, sample_rate=None)
if not parameters:
self.parameters = ParameterList()
# self.parameters.add(Parameter('gain', 1.0, 'float', units='dB', minimum=-12.0, maximum=6.0))
self.parameters.add(Parameter('gain', 1.0, 'float', units='dB', minimum=-6.0, maximum=9.0))
self.parameters.add(Parameter('invert', False, 'bool'))
def process(self, x):
"""
Process audio.
Args:
x (Numpy array): input audio of size `n_samples x n_channels`.
Returns:
(Numpy array): gain-augmented audio of size `n_samples x n_channels`.
"""
gain = 10 ** (self.parameters.gain.value / 20.)
if self.parameters.invert.value:
gain = -gain
return gain * x
# %%%%%%%%%%%%%%%%%%%%%%% SIMPLE CHANNEL SWAP %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
class SwapChannels(Processor):
"""
Swap channels in multi-channel audio.
Processor parameters:
index (int) Selects the permutation that we are using.
Please note that "no permutation" is one of the permutations in `self.permutations` at index `0`.
"""
def __init__(self, n_channels, name='SwapChannels', parameters=None):
"""
Initialize processor.
Args:
n_channels (int): Number of channels in audio that we want to process.
name (str): Name of processor.
parameters (parameter_list): Parameters for this processor.
"""
super().__init__(name=name, parameters=parameters, block_size=None, sample_rate=None)
self.permutations = tuple(permutations(range(n_channels), n_channels))
if not parameters:
self.parameters = ParameterList()
self.parameters.add(Parameter('index', 0, 'int', minimum=0, maximum=len(self.permutations)))
def process(self, x):
"""
Process audio.
Args:
x (Numpy array): input audio of size `n_samples x n_channels`.
Returns:
(Numpy array): channel-swapped audio of size `n_samples x n_channels`.
"""
return x[:, self.permutations[self.parameters.index.value]]
# %%%%%%%%%%%%%%%%%%%%%%% Monauralize %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
class Monauralize(Processor):
"""
Monauralizes audio (i.e., removes spatial information).
Process parameters:
seed_channel (int): channel that we use for overwriting the others.
"""
def __init__(self, n_channels, name='Monauralize', parameters=None):
"""
Initialize processor.
Args:
n_channels (int): Number of channels in audio that we want to process.
name (str): Name of processor.
parameters (parameter_list): Parameters for this processor.
"""
super().__init__(name=name, parameters=parameters, block_size=None, sample_rate=None)
if not parameters:
self.parameters = ParameterList()
self.parameters.add(Parameter('seed_channel', 0, 'int', minimum=0, maximum=n_channels))
def process(self, x):
"""
Process audio.
Args:
x (Numpy array): input audio of size `n_samples x n_channels`.
Returns:
(Numpy array): monauralized audio of size `n_samples x n_channels`.
"""
return np.tile(x[:, [self.parameters.seed_channel.value]], (1, x.shape[1]))
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%% PITCH SHIFT %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
class PitchShift(Processor):
"""
Simple pitch shifter using SoX and soxbindings (https://github.com/pseeth/soxbindings).
Processor parameters:
steps (float): Pitch shift as positive/negative semitones
quick (bool): If True, this effect will run faster but with lower sound quality.
"""
def __init__(self, sample_rate, fix_length=True, name='PitchShift', parameters=None):
"""
Initialize processor.
Args:
sample_rate (int): Sample rate of input audio.
fix_length (bool): If True, then output has same length as input.
name (str): Name of processor.
parameters (parameter_list): Parameters for this processor.
"""
super().__init__(name=name, parameters=parameters, block_size=None, sample_rate=sample_rate)
if not parameters:
self.parameters = ParameterList()
self.parameters.add(Parameter('steps', 0.0, 'float', minimum=-6., maximum=6.))
self.parameters.add(Parameter('quick', False, 'bool'))
self.fix_length = fix_length
self.clips = False
def process(self, x):
"""
Process audio.
Args:
x (Numpy array): input audio of size `n_samples x n_channels`.
Returns:
(Numpy array): pitch-shifted audio of size `n_samples x n_channels`.
"""
if self.parameters.steps.value == 0.0:
y = x
else:
scale = np.max(np.abs(x))
if scale > 0.9:
clips = True
x = x * (0.9 / scale)
else:
clips = False
tfm = sox.Transformer()
tfm.pitch(self.parameters.steps.value, quick=bool(self.parameters.quick.value))
y = tfm.build_array(input_array=x, sample_rate_in=self.sample_rate).astype(np.float32)
if clips:
y *= scale / 0.9 # rescale output to original scale
if self.fix_length:
n_samples_input = x.shape[0]
n_samples_output = y.shape[0]
if n_samples_input < n_samples_output:
idx1 = (n_samples_output - n_samples_input) // 2
idx2 = idx1 + n_samples_input
y = y[idx1:idx2]
elif n_samples_input > n_samples_output:
n_pad = n_samples_input - n_samples_output
y = np.pad(y, ((n_pad//2, n_pad - n_pad//2), (0, 0)))
return y
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%% TIME STRETCH %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
class TimeStretch(Processor):
"""
Simple time stretcher using SoX and soxbindings (https://github.com/pseeth/soxbindings).
Processor parameters:
factor (float): Time stretch factor.
quick (bool): If True, this effect will run faster but with lower sound quality.
stretch_type (str): Algorithm used for stretching (`tempo` or `stretch`).
audio_type (str): Sets which time segments are most optmial when finding
the best overlapping points for time stretching.
"""
def __init__(self, sample_rate, fix_length=True, name='TimeStretch', parameters=None):
"""
Initialize processor.
Args:
sample_rate (int): Sample rate of input audio.
fix_length (bool): If True, then output has same length as input.
name (str): Name of processor.
parameters (parameter_list): Parameters for this processor.
"""
super().__init__(name=name, parameters=parameters, block_size=None, sample_rate=sample_rate)
if not parameters:
self.parameters = ParameterList()
self.parameters.add(Parameter('factor', 1.0, 'float', minimum=1/1.33, maximum=1.33))
self.parameters.add(Parameter('quick', False, 'bool'))
self.parameters.add(Parameter('stretch_type', 'tempo', 'string', options=['tempo', 'stretch']))
self.parameters.add(Parameter('audio_type', 'l', 'string', options=['m', 's', 'l']))
self.fix_length = fix_length
def process(self, x):
"""
Process audio.
Args:
x (Numpy array): input audio of size `n_samples x n_channels`.
Returns:
(Numpy array): time-stretched audio of size `n_samples x n_channels`.
"""
if self.parameters.factor.value == 1.0:
y = x
else:
scale = np.max(np.abs(x))
if scale > 0.9:
clips = True
x = x * (0.9 / scale)
else:
clips = False
tfm = sox.Transformer()
if self.parameters.stretch_type.value == 'stretch':
tfm.stretch(self.parameters.factor.value)
elif self.parameters.stretch_type.value == 'tempo':
tfm.tempo(self.parameters.factor.value,
audio_type=self.parameters.audio_type.value,
quick=bool(self.parameters.quick.value))
y = tfm.build_array(input_array=x, sample_rate_in=self.sample_rate).astype(np.float32)
if clips:
y *= scale / 0.9 # rescale output to original scale
if self.fix_length:
n_samples_input = x.shape[0]
n_samples_output = y.shape[0]
if n_samples_input < n_samples_output:
idx1 = (n_samples_output - n_samples_input) // 2
idx2 = idx1 + n_samples_input
y = y[idx1:idx2]
elif n_samples_input > n_samples_output:
n_pad = n_samples_input - n_samples_output
y = np.pad(y, ((n_pad//2, n_pad - n_pad//2), (0, 0)))
return y
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%% PLAYBACK SPEED %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
class PlaybackSpeed(Processor):
"""
Simple playback speed effect using SoX and soxbindings (https://github.com/pseeth/soxbindings).
Processor parameters:
factor (float): Playback speed factor.
"""
def __init__(self, sample_rate, fix_length=True, name='PlaybackSpeed', parameters=None):
"""
Initialize processor.
Args:
sample_rate (int): Sample rate of input audio.
fix_length (bool): If True, then output has same length as input.
name (str): Name of processor.
parameters (parameter_list): Parameters for this processor.
"""
super().__init__(name=name, parameters=parameters, block_size=None, sample_rate=sample_rate)
if not parameters:
self.parameters = ParameterList()
self.parameters.add(Parameter('factor', 1.0, 'float', minimum=1./1.33, maximum=1.33))
self.fix_length = fix_length
def process(self, x):
"""
Process audio.
Args:
x (Numpy array): input audio of size `n_samples x n_channels`.
Returns:
(Numpy array): resampled audio of size `n_samples x n_channels`.
"""
if self.parameters.factor.value == 1.0:
y = x
else:
scale = np.max(np.abs(x))
if scale > 0.9:
clips = True
x = x * (0.9 / scale)
else:
clips = False
tfm = sox.Transformer()
tfm.speed(self.parameters.factor.value)
y = tfm.build_array(input_array=x, sample_rate_in=self.sample_rate).astype(np.float32)
if clips:
y *= scale / 0.9 # rescale output to original scale
if self.fix_length:
n_samples_input = x.shape[0]
n_samples_output = y.shape[0]
if n_samples_input < n_samples_output:
idx1 = (n_samples_output - n_samples_input) // 2
idx2 = idx1 + n_samples_input
y = y[idx1:idx2]
elif n_samples_input > n_samples_output:
n_pad = n_samples_input - n_samples_output
y = np.pad(y, ((n_pad//2, n_pad - n_pad//2), (0, 0)))
return y
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%% BEND %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
class Bend(Processor):
"""
Simple bend effect using SoX and soxbindings (https://github.com/pseeth/soxbindings).
Processor parameters:
n_bends (int): Number of segments or intervals to pitch shift
"""
def __init__(self, sample_rate, pitch_range=(-600, 600), fix_length=True, name='Bend', parameters=None):
"""
Initialize processor.
Args:
sample_rate (int): Sample rate of input audio.
pitch_range (tuple of ints): min and max pitch bending ranges in cents
fix_length (bool): If True, then output has same length as input.
name (str): Name of processor.
parameters (parameter_list): Parameters for this processor.
"""
super().__init__(name=name, parameters=parameters, block_size=None, sample_rate=sample_rate)
if not parameters:
self.parameters = ParameterList()
self.parameters.add(Parameter('n_bends', 2, 'int', minimum=2, maximum=10))
self.pitch_range_min, self.pitch_range_max = pitch_range
def process(self, x):
"""
Process audio.
Args:
x (Numpy array): input audio of size `n_samples x n_channels`.
Returns:
(Numpy array): pitch-bended audio of size `n_samples x n_channels`.
"""
n_bends = self.parameters.n_bends.value
max_length = x.shape[0] / self.sample_rate
# Generates random non-overlapping segments
delta = 1. / self.sample_rate
boundaries = np.sort(delta + np.random.rand(n_bends-1) * (max_length - delta))
start, end = np.zeros(n_bends), np.zeros(n_bends)
start[0] = delta
for i, b in enumerate(boundaries):
end[i] = b
start[i+1] = b
end[-1] = max_length
# randomly sample pitch-shifts in cents
cents = np.random.randint(self.pitch_range_min, self.pitch_range_max+1, n_bends)
# remove segment if cent value is zero or start == end (as SoX does not allow such values)
idx_keep = np.logical_and(cents != 0, start != end)
n_bends, start, end, cents = sum(idx_keep), start[idx_keep], end[idx_keep], cents[idx_keep]
scale = np.max(np.abs(x))
if scale > 0.9:
clips = True
x = x * (0.9 / scale)
else:
clips = False
tfm = sox.Transformer()
tfm.bend(n_bends=int(n_bends), start_times=list(start), end_times=list(end), cents=list(cents))
y = tfm.build_array(input_array=x, sample_rate_in=self.sample_rate).astype(np.float32)
if clips:
y *= scale / 0.9 # rescale output to original scale
return y
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%% ALGORITHMIC REVERB %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
class AlgorithmicReverb(Processor):
def __init__(self, name="algoreverb", parameters=None, sample_rate=44100, **kwargs):
super().__init__(name=name, parameters=parameters, block_size=None, sample_rate=sample_rate, **kwargs)
if not parameters:
self.parameters = ParameterList()
self.parameters.add(Parameter("room_size", 0.5, "float", minimum=0.05, maximum=0.85))
self.parameters.add(Parameter("damping", 0.1, "float", minimum=0.0, maximum=1.0))
self.parameters.add(Parameter("dry_mix", 0.9, "float", minimum=0.0, maximum=1.0))
self.parameters.add(Parameter("wet_mix", 0.1, "float", minimum=0.0, maximum=1.0))
self.parameters.add(Parameter("width", 0.7, "float", minimum=0.0, maximum=1.0))
# Tuning
self.stereospread = 23
self.scalegain = 0.2
def process(self, data):
if data.ndim >= 2:
dataL = data[:,0]
if data.shape[1] == 2:
dataR = data[:,1]
else:
dataR = data[:,0]
else:
dataL = data
dataR = data
output = np.zeros((data.shape[0], 2))
xL, xR = self.process_filters(dataL.copy(), dataR.copy())
wet1_g = self.parameters.wet_mix.value * ((self.parameters.width.value/2) + 0.5)
wet2_g = self.parameters.wet_mix.value * ((1-self.parameters.width.value)/2)
dry_g = self.parameters.dry_mix.value
output[:,0] = (wet1_g * xL) + (wet2_g * xR) + (dry_g * dataL)
output[:,1] = (wet1_g * xR) + (wet2_g * xL) + (dry_g * dataR)
return output
def process_filters(self, dataL, dataR):
xL = self.combL1.process(dataL.copy() * self.scalegain)
xL += self.combL2.process(dataL.copy() * self.scalegain)
xL += self.combL3.process(dataL.copy() * self.scalegain)
xL += self.combL4.process(dataL.copy() * self.scalegain)
xL = self.combL5.process(dataL.copy() * self.scalegain)
xL += self.combL6.process(dataL.copy() * self.scalegain)
xL += self.combL7.process(dataL.copy() * self.scalegain)
xL += self.combL8.process(dataL.copy() * self.scalegain)
xR = self.combR1.process(dataR.copy() * self.scalegain)
xR += self.combR2.process(dataR.copy() * self.scalegain)
xR += self.combR3.process(dataR.copy() * self.scalegain)
xR += self.combR4.process(dataR.copy() * self.scalegain)
xR = self.combR5.process(dataR.copy() * self.scalegain)
xR += self.combR6.process(dataR.copy() * self.scalegain)
xR += self.combR7.process(dataR.copy() * self.scalegain)
xR += self.combR8.process(dataR.copy() * self.scalegain)
yL1 = self.allpassL1.process(xL)
yL2 = self.allpassL2.process(yL1)
yL3 = self.allpassL3.process(yL2)
yL4 = self.allpassL4.process(yL3)
yR1 = self.allpassR1.process(xR)
yR2 = self.allpassR2.process(yR1)
yR3 = self.allpassR3.process(yR2)
yR4 = self.allpassR4.process(yR3)
return yL4, yR4
def update(self, parameter_name):
rs = self.parameters.room_size.value
dp = self.parameters.damping.value
ss = self.stereospread
# initialize allpass and feedback comb-filters
# (with coefficients optimized for fs=44.1kHz)
self.allpassL1 = pymc.components.allpass.Allpass(556, rs, self.block_size)
self.allpassR1 = pymc.components.allpass.Allpass(556+ss, rs, self.block_size)
self.allpassL2 = pymc.components.allpass.Allpass(441, rs, self.block_size)
self.allpassR2 = pymc.components.allpass.Allpass(441+ss, rs, self.block_size)
self.allpassL3 = pymc.components.allpass.Allpass(341, rs, self.block_size)
self.allpassR3 = pymc.components.allpass.Allpass(341+ss, rs, self.block_size)
self.allpassL4 = pymc.components.allpass.Allpass(225, rs, self.block_size)
self.allpassR4 = pymc.components.allpass.Allpass(255+ss, rs, self.block_size)
self.combL1 = pymc.components.comb.Comb(1116, dp, rs, self.block_size)
self.combR1 = pymc.components.comb.Comb(1116+ss, dp, rs, self.block_size)
self.combL2 = pymc.components.comb.Comb(1188, dp, rs, self.block_size)
self.combR2 = pymc.components.comb.Comb(1188+ss, dp, rs, self.block_size)
self.combL3 = pymc.components.comb.Comb(1277, dp, rs, self.block_size)
self.combR3 = pymc.components.comb.Comb(1277+ss, dp, rs, self.block_size)
self.combL4 = pymc.components.comb.Comb(1356, dp, rs, self.block_size)
self.combR4 = pymc.components.comb.Comb(1356+ss, dp, rs, self.block_size)
self.combL5 = pymc.components.comb.Comb(1422, dp, rs, self.block_size)
self.combR5 = pymc.components.comb.Comb(1422+ss, dp, rs, self.block_size)
self.combL6 = pymc.components.comb.Comb(1491, dp, rs, self.block_size)
self.combR6 = pymc.components.comb.Comb(1491+ss, dp, rs, self.block_size)
self.combL7 = pymc.components.comb.Comb(1557, dp, rs, self.block_size)
self.combR7 = pymc.components.comb.Comb(1557+ss, dp, rs, self.block_size)
self.combL8 = pymc.components.comb.Comb(1617, dp, rs, self.block_size)
self.combR8 = pymc.components.comb.Comb(1617+ss, dp, rs, self.block_size)