Spaces:
Sleeping
Sleeping
STR_CLIP_ID = 'clip_id' | |
STR_AUDIO_SIGNAL = 'audio_signal' | |
STR_TARGET_VECTOR = 'target_vector' | |
STR_CH_FIRST = 'channels_first' | |
STR_CH_LAST = 'channels_last' | |
import io | |
import os | |
import tqdm | |
import logging | |
import subprocess | |
from typing import Tuple | |
from pathlib import Path | |
# import librosa | |
import numpy as np | |
import soundfile as sf | |
import itertools | |
from numpy.fft import irfft | |
def _resample_load_ffmpeg(path: str, sample_rate: int, downmix_to_mono: bool) -> Tuple[np.ndarray, int]: | |
""" | |
Decoding, downmixing, and downsampling by librosa. | |
Returns a channel-first audio signal. | |
Args: | |
path: | |
sample_rate: | |
downmix_to_mono: | |
Returns: | |
(audio signal, sample rate) | |
""" | |
def _decode_resample_by_ffmpeg(filename, sr): | |
"""decode, downmix, and resample audio file""" | |
channel_cmd = '-ac 1 ' if downmix_to_mono else '' # downmixing option | |
resampling_cmd = f'-ar {str(sr)}' if sr else '' # downsampling option | |
cmd = f"ffmpeg -i \"{filename}\" {channel_cmd} {resampling_cmd} -f wav -" | |
p = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
out, err = p.communicate() | |
return out | |
src, sr = sf.read(io.BytesIO(_decode_resample_by_ffmpeg(path, sr=sample_rate))) | |
return src.T, sr | |
def _resample_load_librosa(path: str, sample_rate: int, downmix_to_mono: bool, **kwargs) -> Tuple[np.ndarray, int]: | |
""" | |
Decoding, downmixing, and downsampling by librosa. | |
Returns a channel-first audio signal. | |
""" | |
src, sr = librosa.load(path, sr=sample_rate, mono=downmix_to_mono, **kwargs) | |
return src, sr | |
def load_audio( | |
path: str or Path, | |
ch_format: str, | |
sample_rate: int = None, | |
downmix_to_mono: bool = False, | |
resample_by: str = 'ffmpeg', | |
**kwargs, | |
) -> Tuple[np.ndarray, int]: | |
"""A wrapper of librosa.load that: | |
- forces the returned audio to be 2-dim, | |
- defaults to sr=None, and | |
- defaults to downmix_to_mono=False. | |
The audio decoding is done by `audioread` or `soundfile` package and ultimately, often by ffmpeg. | |
The resampling is done by `librosa`'s child package `resampy`. | |
Args: | |
path: audio file path | |
ch_format: one of 'channels_first' or 'channels_last' | |
sample_rate: target sampling rate. if None, use the rate of the audio file | |
downmix_to_mono: | |
resample_by (str): 'librosa' or 'ffmpeg'. it decides backend for audio decoding and resampling. | |
**kwargs: keyword args for librosa.load - offset, duration, dtype, res_type. | |
Returns: | |
(audio, sr) tuple | |
""" | |
if ch_format not in (STR_CH_FIRST, STR_CH_LAST): | |
raise ValueError(f'ch_format is wrong here -> {ch_format}') | |
if os.stat(path).st_size > 8000: | |
if resample_by == 'librosa': | |
src, sr = _resample_load_librosa(path, sample_rate, downmix_to_mono, **kwargs) | |
elif resample_by == 'ffmpeg': | |
src, sr = _resample_load_ffmpeg(path, sample_rate, downmix_to_mono) | |
else: | |
raise NotImplementedError(f'resample_by: "{resample_by}" is not supposred yet') | |
else: | |
raise ValueError('Given audio is too short!') | |
return src, sr | |
# if src.ndim == 1: | |
# src = np.expand_dims(src, axis=0) | |
# # now always 2d and channels_first | |
# if ch_format == STR_CH_FIRST: | |
# return src, sr | |
# else: | |
# return src.T, sr | |
def ms(x): | |
"""Mean value of signal `x` squared. | |
:param x: Dynamic quantity. | |
:returns: Mean squared of `x`. | |
""" | |
return (np.abs(x)**2.0).mean() | |
def normalize(y, x=None): | |
"""normalize power in y to a (standard normal) white noise signal. | |
Optionally normalize to power in signal `x`. | |
#The mean power of a Gaussian with :math:`\\mu=0` and :math:`\\sigma=1` is 1. | |
""" | |
if x is not None: | |
x = ms(x) | |
else: | |
x = 1.0 | |
return y * np.sqrt(x / ms(y)) | |
def noise(N, color='white', state=None): | |
"""Noise generator. | |
:param N: Amount of samples. | |
:param color: Color of noise. | |
:param state: State of PRNG. | |
:type state: :class:`np.random.RandomState` | |
""" | |
try: | |
return _noise_generators[color](N, state) | |
except KeyError: | |
raise ValueError("Incorrect color.") | |
def white(N, state=None): | |
""" | |
White noise. | |
:param N: Amount of samples. | |
:param state: State of PRNG. | |
:type state: :class:`np.random.RandomState` | |
White noise has a constant power density. It's narrowband spectrum is therefore flat. | |
The power in white noise will increase by a factor of two for each octave band, | |
and therefore increases with 3 dB per octave. | |
""" | |
state = np.random.RandomState() if state is None else state | |
return state.randn(N) | |
def pink(N, state=None): | |
""" | |
Pink noise. | |
:param N: Amount of samples. | |
:param state: State of PRNG. | |
:type state: :class:`np.random.RandomState` | |
Pink noise has equal power in bands that are proportionally wide. | |
Power density decreases with 3 dB per octave. | |
""" | |
state = np.random.RandomState() if state is None else state | |
uneven = N % 2 | |
X = state.randn(N // 2 + 1 + uneven) + 1j * state.randn(N // 2 + 1 + uneven) | |
S = np.sqrt(np.arange(len(X)) + 1.) # +1 to avoid divide by zero | |
y = (irfft(X / S)).real | |
if uneven: | |
y = y[:-1] | |
return normalize(y) | |
def blue(N, state=None): | |
""" | |
Blue noise. | |
:param N: Amount of samples. | |
:param state: State of PRNG. | |
:type state: :class:`np.random.RandomState` | |
Power increases with 6 dB per octave. | |
Power density increases with 3 dB per octave. | |
""" | |
state = np.random.RandomState() if state is None else state | |
uneven = N % 2 | |
X = state.randn(N // 2 + 1 + uneven) + 1j * state.randn(N // 2 + 1 + uneven) | |
S = np.sqrt(np.arange(len(X))) # Filter | |
y = (irfft(X * S)).real | |
if uneven: | |
y = y[:-1] | |
return normalize(y) | |
def brown(N, state=None): | |
""" | |
Violet noise. | |
:param N: Amount of samples. | |
:param state: State of PRNG. | |
:type state: :class:`np.random.RandomState` | |
Power decreases with -3 dB per octave. | |
Power density decreases with 6 dB per octave. | |
""" | |
state = np.random.RandomState() if state is None else state | |
uneven = N % 2 | |
X = state.randn(N // 2 + 1 + uneven) + 1j * state.randn(N // 2 + 1 + uneven) | |
S = (np.arange(len(X)) + 1) # Filter | |
y = (irfft(X / S)).real | |
if uneven: | |
y = y[:-1] | |
return normalize(y) | |
def violet(N, state=None): | |
""" | |
Violet noise. Power increases with 6 dB per octave. | |
:param N: Amount of samples. | |
:param state: State of PRNG. | |
:type state: :class:`np.random.RandomState` | |
Power increases with +9 dB per octave. | |
Power density increases with +6 dB per octave. | |
""" | |
state = np.random.RandomState() if state is None else state | |
uneven = N % 2 | |
X = state.randn(N // 2 + 1 + uneven) + 1j * state.randn(N // 2 + 1 + uneven) | |
S = (np.arange(len(X))) # Filter | |
y = (irfft(X * S)).real | |
if uneven: | |
y = y[:-1] | |
return normalize(y) | |
_noise_generators = { | |
'white': white, | |
'pink': pink, | |
'blue': blue, | |
'brown': brown, | |
'violet': violet, | |
} | |
def noise_generator(N=44100, color='white', state=None): | |
"""Noise generator. | |
:param N: Amount of unique samples to generate. | |
:param color: Color of noise. | |
Generate `N` amount of unique samples and cycle over these samples. | |
""" | |
#yield from itertools.cycle(noise(N, color)) # Python 3.3 | |
for sample in itertools.cycle(noise(N, color, state)): | |
yield sample | |
def heaviside(N): | |
"""Heaviside. | |
Returns the value 0 for `x < 0`, 1 for `x > 0`, and 1/2 for `x = 0`. | |
""" | |
return 0.5 * (np.sign(N) + 1) |