Spaces:
Runtime error
Runtime error
import os | |
import gradio as gr | |
from scipy.io.wavfile import write | |
import subprocess | |
import argparse | |
from concurrent.futures import ProcessPoolExecutor | |
import time | |
import typing as tp | |
import warnings | |
from pathlib import Path | |
import torch | |
import gradio as gr | |
from audiocraft.data.audio_utils import convert_audio | |
from audiocraft.data.audio import audio_write | |
from audiocraft.models import MusicGen | |
MODEL = None # Last used model | |
IS_BATCHED = "facebook/MusicGen" in os.environ.get('SPACE_ID', '') | |
MAX_BATCH_SIZE = 6 | |
BATCHED_DURATION = 15 | |
INTERRUPTING = False | |
def interrupt(): | |
global INTERRUPTING | |
INTERRUPTING = True | |
class FileCleaner: | |
def __init__(self, file_lifetime: float = 3600): | |
self.file_lifetime = file_lifetime | |
self.files = [] | |
def add(self, path: tp.Union[str, Path]): | |
self._cleanup() | |
self.files.append((time.time(), Path(path))) | |
def _cleanup(self): | |
now = time.time() | |
for time_added, path in list(self.files): | |
if now - time_added > self.file_lifetime: | |
if path.exists(): | |
path.unlink() | |
self.files.pop(0) | |
else: | |
break | |
file_cleaner = FileCleaner() | |
def make_waveform(*args, **kwargs): | |
be = time.time() | |
with warnings.catch_warnings(): | |
warnings.simplefilter('ignore') | |
out = gr.make_waveform(*args, **kwargs) | |
print("Make a video took", time.time() - be) | |
return out | |
def load_model(version='melody'): | |
global MODEL | |
print("Loading model", version) | |
if MODEL is None or MODEL.name != version: | |
MODEL = MusicGen.get_pretrained(version) | |
def _do_predictions(texts, melodies, duration, progress=False, **gen_kwargs): | |
MODEL.set_generation_params(duration=duration, **gen_kwargs) | |
print("new batch", len(texts), texts, [None if m is None else (m[0], m[1].shape) for m in melodies]) | |
be = time.time() | |
processed_melodies = [] | |
target_sr = 32000 | |
target_ac = 1 | |
for melody in melodies: | |
if melody is None: | |
processed_melodies.append(None) | |
else: | |
sr, melody = melody[0], torch.from_numpy(melody[1]).to(MODEL.device).float().t() | |
if melody.dim() == 1: | |
melody = melody[None] | |
melody = melody[..., :int(sr * duration)] | |
melody = convert_audio(melody, sr, target_sr, target_ac) | |
processed_melodies.append(melody) | |
if any(m is not None for m in processed_melodies): | |
outputs = MODEL.generate_with_chroma( | |
descriptions=texts, | |
melody_wavs=processed_melodies, | |
melody_sample_rate=target_sr, | |
progress=progress, | |
) | |
else: | |
outputs = MODEL.generate(texts, progress=progress) | |
outputs = outputs.detach().cpu().float() | |
out_files = [] | |
for output in outputs: | |
with NamedTemporaryFile("wb", suffix=".wav", delete=False) as file: | |
audio_write( | |
file.name, output, MODEL.sample_rate, strategy="loudness", | |
loudness_headroom_db=16, loudness_compressor=True, add_suffix=False) | |
out_files.append(pool.submit(make_waveform, file.name)) | |
file_cleaner.add(file.name) | |
res = [out_file.result() for out_file in out_files] | |
for file in res: | |
file_cleaner.add(file) | |
print("batch finished", len(texts), time.time() - be) | |
print("Tempfiles currently stored: ", len(file_cleaner.files)) | |
return res | |
def predict_batched(texts, melodies): | |
max_text_length = 512 | |
texts = [text[:max_text_length] for text in texts] | |
load_model('melody') | |
res = _do_predictions(texts, melodies, BATCHED_DURATION) | |
return [res] | |
def predict_full(model, text, melody, duration, topk, topp, temperature, cfg_coef, progress=gr.Progress()): | |
global INTERRUPTING | |
INTERRUPTING = False | |
if temperature < 0: | |
raise gr.Error("Temperature must be >= 0.") | |
if topk < 0: | |
raise gr.Error("Topk must be non-negative.") | |
if topp < 0: | |
raise gr.Error("Topp must be non-negative.") | |
topk = int(topk) | |
load_model(model) | |
def _progress(generated, to_generate): | |
progress((generated, to_generate)) | |
if INTERRUPTING: | |
raise gr.Error("Interrupted.") | |
MODEL.set_custom_progress_callback(_progress) | |
outs = _do_predictions( | |
[text], [melody], duration, progress=True, | |
top_k=topk, top_p=topp, temperature=temperature, cfg_coef=cfg_coef) | |
return outs[0] | |
def toggle_audio_src(choice): | |
if choice == "mic": | |
return gr.update(source="microphone", value=None, label="Microphone") | |
else: | |
return gr.update(source="upload", value=None, label="File") | |
def ui_full(launch_kwargs): | |
with gr.Blocks() as interface: | |
gr.Markdown( | |
""" | |
# MusicGen and Demucs Combination | |
This is a combined demo of MusicGen and Demucs. | |
MusicGen is a model for music generation based on text prompts, | |
and Demucs is a model for music source separation. | |
""" | |
) | |
with gr.Row(): | |
with gr.Column(): | |
with gr.Row(): | |
text = gr.Text(label="Input Text", interactive=True) | |
with gr.Column(): | |
radio = gr.Radio(["file", "mic"], value="file", | |
label="Condition on a Melody (optional) File or Mic") | |
melody = gr.Audio(source="upload", type="numpy", label="Melody File", | |
interactive=True, elem_id="melody-input") | |
with gr.Row(): | |
submit = gr.Button("Generate Music") | |
with gr.Row(): | |
audio_output = gr.Audio(type="numpy", label="Generated Music") | |
vocals_output = gr.Audio(type="filepath", label="Vocals") | |
bass_output = gr.Audio(type="filepath", label="Bass") | |
drums_output = gr.Audio(type="filepath", label="Drums") | |
other_output = gr.Audio(type="filepath", label="Other") | |
submit.click(predict_full, | |
inputs=[text, melody, 10, 250, 0, 1.0, 3.0], | |
outputs=[audio_output, vocals_output, bass_output, drums_output, other_output]) | |
radio.change(toggle_audio_src, radio, [melody], queue=False, show_progress=False) | |
gr.Examples( | |
fn=predict_full, | |
examples=[ | |
[ | |
"An 80s driving pop song with heavy drums and synth pads in the background", | |
"./assets/bach.mp3", | |
], | |
[ | |
"A cheerful country song with acoustic guitars", | |
"./assets/bolero_ravel.mp3", | |
], | |
[ | |
"90s rock song with electric guitar and heavy drums", | |
None, | |
], | |
[ | |
"a light and cheerly EDM track, with syncopated drums, aery pads, and strong emotions", | |
"./assets/bach.mp3", | |
], | |
[ | |
"lofi slow bpm electro chill with organic samples", | |
None, | |
], | |
], | |
inputs=[text, melody], | |
outputs=[audio_output, vocals_output, bass_output, drums_output, other_output] | |
) | |
gr.Interface( | |
fn=inference, | |
inputs=gr.inputs.Audio(type="numpy", label="Input Audio"), | |
outputs=[ | |
gr.outputs.Audio(type="filepath", label="Vocals"), | |
gr.outputs.Audio(type="filepath", label="Bass"), | |
gr.outputs.Audio(type="filepath", label="Drums"), | |
gr.outputs.Audio(type="filepath", label="Other"), | |
], | |
title="MusicGen and Demucs Combination", | |
description="A combined demo of MusicGen and Demucs", | |
article="", | |
).launch(enable_queue=True) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
'--listen', | |
type=str, | |
default='0.0.0.0' if 'SPACE_ID' in os.environ else '127.0.0.1', | |
help='IP to listen on for connections to Gradio', | |
) | |
parser.add_argument( | |
'--username', type=str, default='', help='Username for authentication' | |
) | |
parser.add_argument( | |
'--password', type=str, default='', help='Password for authentication' | |
) | |
parser.add_argument( | |
'--server_port', | |
type=int, | |
default=0, | |
help='Port to run the server listener on', | |
) | |
parser.add_argument( | |
'--inbrowser', action='store_true', help='Open in browser' | |
) | |
parser.add_argument( | |
'--share', action='store_true', help='Share the gradio UI' | |
) | |
args = parser.parse_args() | |
launch_kwargs = {} | |
launch_kwargs['server_name'] = args.listen | |
if args.username and args.password: | |
launch_kwargs['auth'] = (args.username, args.password) | |
if args.server_port: | |
launch_kwargs['server_port'] = args.server_port | |
if args.inbrowser: | |
launch_kwargs['inbrowser'] = args.inbrowser | |
if args.share: | |
launch_kwargs['share'] = args.share | |
ui_full(launch_kwargs) |