Spaces:
Running
Running
import gradio as gr | |
import torch | |
import soundfile as sf | |
import numpy as np | |
import yaml | |
from inference import MasteringStyleTransfer | |
from utils import download_youtube_audio | |
from config import args | |
import pyloudnorm as pyln | |
import tempfile | |
import os | |
import pandas as pd | |
mastering_transfer = MasteringStyleTransfer(args) | |
def denormalize_audio(audio, dtype=np.int16): | |
""" | |
Denormalize the audio from the range [-1, 1] to the full range of the specified dtype. | |
""" | |
if dtype == np.int16: | |
audio = np.clip(audio, -1, 1) # Ensure the input is in the range [-1, 1] | |
return (audio * 32767).astype(np.int16) | |
elif dtype == np.float32: | |
return audio.astype(np.float32) | |
else: | |
raise ValueError("Unsupported dtype. Use np.int16 or np.float32.") | |
def loudness_normalize(audio, sample_rate, target_loudness=-12.0): | |
# Ensure audio is float32 | |
if audio.dtype != np.float32: | |
audio = audio.astype(np.float32) | |
# If audio is mono, reshape to (samples, 1) | |
if audio.ndim == 1: | |
audio = audio.reshape(-1, 1) | |
meter = pyln.Meter(sample_rate) # create BS.1770 meter | |
loudness = meter.integrated_loudness(audio) | |
loudness_normalized_audio = pyln.normalize.loudness(audio, loudness, target_loudness) | |
return loudness_normalized_audio | |
def process_youtube_url(url): | |
try: | |
audio, sr = download_youtube_audio(url) | |
return (sr, audio) | |
except Exception as e: | |
return None, f"Error processing YouTube URL: {str(e)}" | |
def process_audio_with_youtube(input_audio, input_youtube_url, reference_audio, reference_youtube_url): | |
if input_youtube_url: | |
input_audio, error = process_youtube_url(input_youtube_url) | |
if error: | |
return None, None, error | |
if reference_youtube_url: | |
reference_audio, error = process_youtube_url(reference_youtube_url) | |
if error: | |
return None, None, error | |
if input_audio is None or reference_audio is None: | |
return None, None, "Both input and reference audio are required." | |
return process_audio(input_audio, reference_audio) | |
def process_audio(input_audio, reference_audio): | |
output_audio, predicted_params, sr = mastering_transfer.process_audio( | |
input_audio, reference_audio, reference_audio | |
) | |
param_output = mastering_transfer.get_param_output_string(predicted_params) | |
# Convert output_audio to numpy array if it's a tensor | |
if isinstance(output_audio, torch.Tensor): | |
output_audio = output_audio.cpu().numpy() | |
if output_audio.ndim == 1: | |
output_audio = output_audio.reshape(-1, 1) | |
elif output_audio.ndim > 2: | |
output_audio = output_audio.squeeze() | |
# Ensure the audio is in the correct shape (samples, channels) | |
if output_audio.shape[1] > output_audio.shape[0]: | |
output_audio = output_audio.transpose(1,0) | |
print(output_audio.shape) | |
print(f"sr: {sr}") | |
# Normalize output audio | |
output_audio = loudness_normalize(output_audio, sr) | |
# Denormalize the audio to int16 | |
output_audio = denormalize_audio(output_audio, dtype=np.int16) | |
return (sr, output_audio), param_output | |
def perform_ito(input_audio, reference_audio, ito_reference_audio, num_steps, optimizer, learning_rate, af_weights): | |
if ito_reference_audio is None: | |
ito_reference_audio = reference_audio | |
ito_config = { | |
'optimizer': optimizer, | |
'learning_rate': learning_rate, | |
'num_steps': num_steps, | |
'af_weights': af_weights, | |
'sample_rate': args.sample_rate | |
} | |
input_tensor = mastering_transfer.preprocess_audio(input_audio, args.sample_rate) | |
reference_tensor = mastering_transfer.preprocess_audio(reference_audio, args.sample_rate) | |
ito_reference_tensor = mastering_transfer.preprocess_audio(ito_reference_audio, args.sample_rate) | |
initial_reference_feature = mastering_transfer.get_reference_embedding(reference_tensor) | |
ito_log = "" | |
loss_values = [] | |
for log_entry, current_output, current_params, step, loss in mastering_transfer.inference_time_optimization( | |
input_tensor, ito_reference_tensor, ito_config, initial_reference_feature | |
): | |
ito_log += log_entry | |
ito_param_output = mastering_transfer.get_param_output_string(current_params) | |
loss_values.append({"step": int(step), "loss": loss}) | |
# Convert current_output to numpy array if it's a tensor | |
if isinstance(current_output, torch.Tensor): | |
current_output = current_output.cpu().numpy() | |
if current_output.ndim == 1: | |
current_output = current_output.reshape(-1, 1) | |
elif current_output.ndim > 2: | |
current_output = current_output.squeeze() | |
# Ensure the audio is in the correct shape (samples, channels) | |
if current_output.shape[1] > current_output.shape[0]: | |
current_output = current_output.transpose(1,0) | |
# Loudness normalize output audio | |
current_output = loudness_normalize(current_output, args.sample_rate) | |
# Denormalize the audio to int16 | |
current_output = denormalize_audio(current_output, dtype=np.int16) | |
yield (args.sample_rate, current_output), ito_param_output, step, ito_log, pd.DataFrame(loss_values) | |
def run_ito(input_audio, reference_audio, ito_reference_audio, num_steps, optimizer, learning_rate, af_weights): | |
af_weights = [float(w.strip()) for w in af_weights.split(',')] | |
ito_generator = mastering_transfer.inference_time_optimization( | |
input_audio, reference_audio, ito_reference_audio, num_steps, optimizer, learning_rate, af_weights | |
) | |
all_results = [] | |
for result in ito_generator: | |
all_results.append(result) | |
min_loss_step = min(range(len(all_results)), key=lambda i: all_results[i]['loss']) | |
loss_df = pd.DataFrame([(r['step'], r['loss']) for r in all_results], columns=['step', 'loss']) | |
return all_results, min_loss_step, loss_df | |
def update_ito_output(all_results, selected_step): | |
selected_result = all_results[selected_step] | |
return (args.sample_rate, selected_result['audio']), selected_result['params'], selected_result['log'] | |
""" APP display """ | |
with gr.Blocks() as demo: | |
gr.Markdown("# ITO-Master: Inference Time Optimization for Mastering Style Transfer") | |
gr.Markdown("# Step 1: Mastering Style Transfer") | |
with gr.Tab("Upload Audio"): | |
with gr.Row(): | |
input_audio = gr.Audio(label="Input Audio") | |
reference_audio = gr.Audio(label="Reference Audio") | |
process_button = gr.Button("Process Mastering Style Transfer") | |
with gr.Row(): | |
output_audio = gr.Audio(label="Output Audio", type='numpy') | |
param_output = gr.Textbox(label="Predicted Parameters", lines=5) | |
process_button.click( | |
process_audio, | |
inputs=[input_audio, reference_audio], | |
outputs=[output_audio, param_output] | |
) | |
with gr.Tab("YouTube Audio"): | |
with gr.Row(): | |
input_audio_yt = gr.Audio(label="Input Audio (Optional)") | |
input_youtube_url = gr.Textbox(label="Input YouTube URL (Optional)") | |
with gr.Row(): | |
reference_audio_yt = gr.Audio(label="Reference Audio (Optional)") | |
reference_youtube_url = gr.Textbox(label="Reference YouTube URL (Optional)") | |
process_button_yt = gr.Button("Process Mastering Style Transfer") | |
with gr.Row(): | |
output_audio_yt = gr.Audio(label="Output Audio", type='numpy') | |
param_output_yt = gr.Textbox(label="Predicted Parameters", lines=5) | |
error_message_yt = gr.Textbox(label="Error Message", visible=False) | |
def process_and_handle_errors(input_audio, input_youtube_url, reference_audio, reference_youtube_url): | |
result = process_audio_with_youtube(input_audio, input_youtube_url, reference_audio, reference_youtube_url) | |
if len(result) == 3 and isinstance(result[2], str): # Error occurred | |
return None, None, gr.update(visible=True, value=result[2]) | |
return result[0], result[1], gr.update(visible=False, value="") | |
process_button_yt.click( | |
process_and_handle_errors, | |
inputs=[input_audio_yt, input_youtube_url, reference_audio_yt, reference_youtube_url], | |
outputs=[output_audio_yt, param_output_yt, error_message_yt] | |
) | |
gr.Markdown("## Step 2: Inference Time Optimization (ITO)") | |
with gr.Row(): | |
ito_reference_audio = gr.Audio(label="ITO Reference Audio (optional)") | |
with gr.Column(): | |
num_steps = gr.Slider(minimum=1, maximum=100, value=10, step=1, label="Number of Steps") | |
optimizer = gr.Dropdown(["Adam", "RAdam", "SGD"], value="RAdam", label="Optimizer") | |
learning_rate = gr.Slider(minimum=0.0001, maximum=0.1, value=0.001, step=0.0001, label="Learning Rate") | |
af_weights = gr.Textbox(label="AudioFeatureLoss Weights (comma-separated)", value="0.1,0.001,1.0,1.0,0.1") | |
ito_button = gr.Button("Perform ITO") | |
with gr.Row(): | |
with gr.Column(): | |
ito_output_audio = gr.Audio(label="ITO Output Audio") | |
ito_param_output = gr.Textbox(label="ITO Predicted Parameters", lines=15) | |
ito_step_slider = gr.Slider(minimum=1, maximum=100, step=1, label="ITO Step", interactive=True) | |
with gr.Column(): | |
ito_loss_plot = gr.LinePlot( | |
x="step", | |
y="loss", | |
title="ITO Loss Curve", | |
x_title="Step", | |
y_title="Loss", | |
height=300, | |
width=600, | |
) | |
ito_log = gr.Textbox(label="ITO Log", lines=10) | |
all_results = gr.State([]) | |
min_loss_step = gr.State(0) | |
def on_ito_complete(results, min_step, loss_df): | |
all_results.value = results | |
min_loss_step.value = min_step | |
return loss_df, gr.update(maximum=len(results), value=min_step+1) | |
ito_button.click( | |
run_ito, | |
inputs=[input_audio, reference_audio, ito_reference_audio, num_steps, optimizer, learning_rate, af_weights], | |
outputs=[all_results, min_loss_step, ito_loss_plot, ito_step_slider] | |
).then( | |
update_ito_output, | |
inputs=[all_results, ito_step_slider], | |
outputs=[ito_output_audio, ito_param_output, ito_log] | |
) | |
ito_step_slider.change( | |
update_ito_output, | |
inputs=[all_results, ito_step_slider], | |
outputs=[ito_output_audio, ito_param_output, ito_log] | |
) | |
demo.launch() | |