|
import audioread |
|
import librosa |
|
|
|
import os |
|
import sys |
|
import json |
|
import time |
|
from tqdm import tqdm |
|
import pickle |
|
import hashlib |
|
import logging |
|
import traceback |
|
import shutil |
|
import soundfile as sf |
|
|
|
import torch |
|
|
|
from gui_data.constants import * |
|
from gui_data.old_data_check import file_check, remove_unneeded_yamls, remove_temps |
|
from lib_v5.vr_network.model_param_init import ModelParameters |
|
from lib_v5 import spec_utils |
|
from pathlib import Path |
|
from separate import SeperateAttributes, SeperateDemucs, SeperateMDX, SeperateVR, save_format |
|
from typing import List |
|
|
|
|
|
logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO) |
|
logging.info('UVR BEGIN') |
|
|
|
PREVIOUS_PATCH_WIN = 'UVR_Patch_1_12_23_14_54' |
|
|
|
is_dnd_compatible = True |
|
banner_placement = -2 |
|
|
|
def save_data(data): |
|
""" |
|
Saves given data as a .pkl (pickle) file |
|
|
|
Paramters: |
|
data(dict): |
|
Dictionary containing all the necessary data to save |
|
""" |
|
|
|
with open('data.pkl', 'wb') as data_file: |
|
pickle.dump(data, data_file) |
|
|
|
def load_data() -> dict: |
|
""" |
|
Loads saved pkl file and returns the stored data |
|
|
|
Returns(dict): |
|
Dictionary containing all the saved data |
|
""" |
|
try: |
|
with open('data.pkl', 'rb') as data_file: |
|
data = pickle.load(data_file) |
|
|
|
return data |
|
except (ValueError, FileNotFoundError): |
|
|
|
|
|
save_data(data=DEFAULT_DATA) |
|
|
|
return load_data() |
|
|
|
def load_model_hash_data(dictionary): |
|
'''Get the model hash dictionary''' |
|
|
|
with open(dictionary) as d: |
|
data = d.read() |
|
|
|
return json.loads(data) |
|
|
|
|
|
|
|
if getattr(sys, 'frozen', False): |
|
|
|
|
|
|
|
BASE_PATH = sys._MEIPASS |
|
else: |
|
BASE_PATH = os.path.dirname(os.path.abspath(__file__)) |
|
|
|
os.chdir(BASE_PATH) |
|
|
|
debugger = [] |
|
|
|
|
|
|
|
MODELS_DIR = os.path.join(BASE_PATH, 'models') |
|
VR_MODELS_DIR = os.path.join(MODELS_DIR, 'VR_Models') |
|
MDX_MODELS_DIR = os.path.join(MODELS_DIR, 'MDX_Net_Models') |
|
DEMUCS_MODELS_DIR = os.path.join(MODELS_DIR, 'Demucs_Models') |
|
DEMUCS_NEWER_REPO_DIR = os.path.join(DEMUCS_MODELS_DIR, 'v3_v4_repo') |
|
MDX_MIXER_PATH = os.path.join(BASE_PATH, 'lib_v5', 'mixer.ckpt') |
|
|
|
|
|
VR_HASH_DIR = os.path.join(VR_MODELS_DIR, 'model_data') |
|
VR_HASH_JSON = os.path.join(VR_MODELS_DIR, 'model_data', 'model_data.json') |
|
MDX_HASH_DIR = os.path.join(MDX_MODELS_DIR, 'model_data') |
|
MDX_HASH_JSON = os.path.join(MDX_MODELS_DIR, 'model_data', 'model_data.json') |
|
DEMUCS_MODEL_NAME_SELECT = os.path.join(DEMUCS_MODELS_DIR, 'model_data', 'model_name_mapper.json') |
|
MDX_MODEL_NAME_SELECT = os.path.join(MDX_MODELS_DIR, 'model_data', 'model_name_mapper.json') |
|
ENSEMBLE_CACHE_DIR = os.path.join(BASE_PATH, 'gui_data', 'saved_ensembles') |
|
SETTINGS_CACHE_DIR = os.path.join(BASE_PATH, 'gui_data', 'saved_settings') |
|
VR_PARAM_DIR = os.path.join(BASE_PATH, 'lib_v5', 'vr_network', 'modelparams') |
|
SAMPLE_CLIP_PATH = os.path.join(BASE_PATH, 'temp_sample_clips') |
|
ENSEMBLE_TEMP_PATH = os.path.join(BASE_PATH, 'ensemble_temps') |
|
|
|
|
|
ICON_IMG_PATH = os.path.join(BASE_PATH, 'gui_data', 'img', 'GUI-Icon.ico') |
|
FONT_PATH = os.path.join(BASE_PATH, 'gui_data', 'fonts', 'centurygothic', 'GOTHIC.TTF') |
|
|
|
|
|
COMPLETE_CHIME = os.path.join(BASE_PATH, 'gui_data', 'complete_chime.wav') |
|
FAIL_CHIME = os.path.join(BASE_PATH, 'gui_data', 'fail_chime.wav') |
|
CHANGE_LOG = os.path.join(BASE_PATH, 'gui_data', 'change_log.txt') |
|
SPLASH_DOC = os.path.join(BASE_PATH, 'tmp', 'splash.txt') |
|
|
|
file_check(os.path.join(MODELS_DIR, 'Main_Models'), VR_MODELS_DIR) |
|
file_check(os.path.join(DEMUCS_MODELS_DIR, 'v3_repo'), DEMUCS_NEWER_REPO_DIR) |
|
remove_unneeded_yamls(DEMUCS_MODELS_DIR) |
|
|
|
remove_temps(ENSEMBLE_TEMP_PATH) |
|
remove_temps(SAMPLE_CLIP_PATH) |
|
remove_temps(os.path.join(BASE_PATH, 'img')) |
|
|
|
if not os.path.isdir(ENSEMBLE_TEMP_PATH): |
|
os.mkdir(ENSEMBLE_TEMP_PATH) |
|
|
|
if not os.path.isdir(SAMPLE_CLIP_PATH): |
|
os.mkdir(SAMPLE_CLIP_PATH) |
|
|
|
model_hash_table = {} |
|
data = load_data() |
|
|
|
class ModelData(): |
|
def __init__(self, model_name: str, |
|
selected_process_method=ENSEMBLE_MODE, |
|
is_secondary_model=False, |
|
primary_model_primary_stem=None, |
|
is_primary_model_primary_stem_only=False, |
|
is_primary_model_secondary_stem_only=False, |
|
is_pre_proc_model=False, |
|
is_dry_check=False): |
|
|
|
self.is_gpu_conversion = 0 if root.is_gpu_conversion_var.get() else -1 |
|
self.is_normalization = root.is_normalization_var.get() |
|
self.is_primary_stem_only = root.is_primary_stem_only_var.get() |
|
self.is_secondary_stem_only = root.is_secondary_stem_only_var.get() |
|
self.is_denoise = root.is_denoise_var.get() |
|
self.mdx_batch_size = 1 if root.mdx_batch_size_var.get() == DEF_OPT else int(root.mdx_batch_size_var.get()) |
|
self.is_mdx_ckpt = False |
|
self.wav_type_set = root.wav_type_set |
|
self.mp3_bit_set = root.mp3_bit_set_var.get() |
|
self.save_format = root.save_format_var.get() |
|
self.is_invert_spec = root.is_invert_spec_var.get() |
|
self.is_mixer_mode = root.is_mixer_mode_var.get() |
|
self.demucs_stems = root.demucs_stems_var.get() |
|
self.demucs_source_list = [] |
|
self.demucs_stem_count = 0 |
|
self.mixer_path = MDX_MIXER_PATH |
|
self.model_name = model_name |
|
self.process_method = selected_process_method |
|
self.model_status = False if self.model_name == CHOOSE_MODEL or self.model_name == NO_MODEL else True |
|
self.primary_stem = None |
|
self.secondary_stem = None |
|
self.is_ensemble_mode = False |
|
self.ensemble_primary_stem = None |
|
self.ensemble_secondary_stem = None |
|
self.primary_model_primary_stem = primary_model_primary_stem |
|
self.is_secondary_model = is_secondary_model |
|
self.secondary_model = None |
|
self.secondary_model_scale = None |
|
self.demucs_4_stem_added_count = 0 |
|
self.is_demucs_4_stem_secondaries = False |
|
self.is_4_stem_ensemble = False |
|
self.pre_proc_model = None |
|
self.pre_proc_model_activated = False |
|
self.is_pre_proc_model = is_pre_proc_model |
|
self.is_dry_check = is_dry_check |
|
self.model_samplerate = 44100 |
|
self.model_capacity = 32, 128 |
|
self.is_vr_51_model = False |
|
self.is_demucs_pre_proc_model_inst_mix = False |
|
self.manual_download_Button = None |
|
self.secondary_model_4_stem = [] |
|
self.secondary_model_4_stem_scale = [] |
|
self.secondary_model_4_stem_names = [] |
|
self.secondary_model_4_stem_model_names_list = [] |
|
self.all_models = [] |
|
self.secondary_model_other = None |
|
self.secondary_model_scale_other = None |
|
self.secondary_model_bass = None |
|
self.secondary_model_scale_bass = None |
|
self.secondary_model_drums = None |
|
self.secondary_model_scale_drums = None |
|
|
|
if selected_process_method == ENSEMBLE_MODE: |
|
partitioned_name = model_name.partition(ENSEMBLE_PARTITION) |
|
self.process_method = partitioned_name[0] |
|
self.model_name = partitioned_name[2] |
|
self.model_and_process_tag = model_name |
|
self.ensemble_primary_stem, self.ensemble_secondary_stem = root.return_ensemble_stems() |
|
self.is_ensemble_mode = True if not is_secondary_model and not is_pre_proc_model else False |
|
self.is_4_stem_ensemble = True if root.ensemble_main_stem_var.get() == FOUR_STEM_ENSEMBLE and self.is_ensemble_mode else False |
|
self.pre_proc_model_activated = root.is_demucs_pre_proc_model_activate_var.get() if not self.ensemble_primary_stem == VOCAL_STEM else False |
|
|
|
if self.process_method == VR_ARCH_TYPE: |
|
self.is_secondary_model_activated = root.vr_is_secondary_model_activate_var.get() if not self.is_secondary_model else False |
|
self.aggression_setting = float(int(root.aggression_setting_var.get())/100) |
|
self.is_tta = root.is_tta_var.get() |
|
self.is_post_process = root.is_post_process_var.get() |
|
self.window_size = int(root.window_size_var.get()) |
|
self.batch_size = 1 if root.batch_size_var.get() == DEF_OPT else int(root.batch_size_var.get()) |
|
self.crop_size = int(root.crop_size_var.get()) |
|
self.is_high_end_process = 'mirroring' if root.is_high_end_process_var.get() else 'None' |
|
self.post_process_threshold = float(root.post_process_threshold_var.get()) |
|
self.model_capacity = 32, 128 |
|
self.model_path = os.path.join(VR_MODELS_DIR, f"{self.model_name}.pth") |
|
self.get_model_hash() |
|
if self.model_hash: |
|
self.model_data = self.get_model_data(VR_HASH_DIR, root.vr_hash_MAPPER) if not self.model_hash == WOOD_INST_MODEL_HASH else WOOD_INST_PARAMS |
|
if self.model_data: |
|
vr_model_param = os.path.join(VR_PARAM_DIR, "{}.json".format(self.model_data["vr_model_param"])) |
|
self.primary_stem = self.model_data["primary_stem"] |
|
self.secondary_stem = STEM_PAIR_MAPPER[self.primary_stem] |
|
self.vr_model_param = ModelParameters(vr_model_param) |
|
self.model_samplerate = self.vr_model_param.param['sr'] |
|
if "nout" in self.model_data.keys() and "nout_lstm" in self.model_data.keys(): |
|
self.model_capacity = self.model_data["nout"], self.model_data["nout_lstm"] |
|
self.is_vr_51_model = True |
|
else: |
|
self.model_status = False |
|
|
|
if self.process_method == MDX_ARCH_TYPE: |
|
self.is_secondary_model_activated = root.mdx_is_secondary_model_activate_var.get() if not is_secondary_model else False |
|
self.margin = int(root.margin_var.get()) |
|
self.chunks = root.determine_auto_chunks(root.chunks_var.get(), self.is_gpu_conversion) if root.is_chunk_mdxnet_var.get() else 0 |
|
self.get_mdx_model_path() |
|
self.get_model_hash() |
|
if self.model_hash: |
|
self.model_data = self.get_model_data(MDX_HASH_DIR, root.mdx_hash_MAPPER) |
|
if self.model_data: |
|
self.compensate = self.model_data["compensate"] if root.compensate_var.get() == AUTO_SELECT else float(root.compensate_var.get()) |
|
self.mdx_dim_f_set = self.model_data["mdx_dim_f_set"] |
|
self.mdx_dim_t_set = self.model_data["mdx_dim_t_set"] |
|
self.mdx_n_fft_scale_set = self.model_data["mdx_n_fft_scale_set"] |
|
self.primary_stem = self.model_data["primary_stem"] |
|
self.secondary_stem = STEM_PAIR_MAPPER[self.primary_stem] |
|
else: |
|
self.model_status = False |
|
|
|
if self.process_method == DEMUCS_ARCH_TYPE: |
|
self.is_secondary_model_activated = root.demucs_is_secondary_model_activate_var.get() if not is_secondary_model else False |
|
if not self.is_ensemble_mode: |
|
self.pre_proc_model_activated = root.is_demucs_pre_proc_model_activate_var.get() if not root.demucs_stems_var.get() in [VOCAL_STEM, INST_STEM] else False |
|
self.overlap = float(root.overlap_var.get()) |
|
self.margin_demucs = int(root.margin_demucs_var.get()) |
|
self.chunks_demucs = root.determine_auto_chunks(root.chunks_demucs_var.get(), self.is_gpu_conversion) |
|
self.shifts = int(root.shifts_var.get()) |
|
self.is_split_mode = root.is_split_mode_var.get() |
|
self.segment = root.segment_var.get() |
|
self.is_chunk_demucs = root.is_chunk_demucs_var.get() |
|
self.is_demucs_combine_stems = root.is_demucs_combine_stems_var.get() |
|
self.is_primary_stem_only = root.is_primary_stem_only_var.get() if self.is_ensemble_mode else root.is_primary_stem_only_Demucs_var.get() |
|
self.is_secondary_stem_only = root.is_secondary_stem_only_var.get() if self.is_ensemble_mode else root.is_secondary_stem_only_Demucs_var.get() |
|
self.get_demucs_model_path() |
|
self.get_demucs_model_data() |
|
|
|
self.model_basename = os.path.splitext(os.path.basename(self.model_path))[0] if self.model_status else None |
|
self.pre_proc_model_activated = self.pre_proc_model_activated if not self.is_secondary_model else False |
|
|
|
self.is_primary_model_primary_stem_only = is_primary_model_primary_stem_only |
|
self.is_primary_model_secondary_stem_only = is_primary_model_secondary_stem_only |
|
|
|
if self.is_secondary_model_activated and self.model_status: |
|
if (not self.is_ensemble_mode and root.demucs_stems_var.get() == ALL_STEMS and self.process_method == DEMUCS_ARCH_TYPE) or self.is_4_stem_ensemble: |
|
for key in DEMUCS_4_SOURCE_LIST: |
|
self.secondary_model_data(key) |
|
self.secondary_model_4_stem.append(self.secondary_model) |
|
self.secondary_model_4_stem_scale.append(self.secondary_model_scale) |
|
self.secondary_model_4_stem_names.append(key) |
|
self.demucs_4_stem_added_count = sum(i is not None for i in self.secondary_model_4_stem) |
|
self.is_secondary_model_activated = False if all(i is None for i in self.secondary_model_4_stem) else True |
|
self.demucs_4_stem_added_count = self.demucs_4_stem_added_count - 1 if self.is_secondary_model_activated else self.demucs_4_stem_added_count |
|
if self.is_secondary_model_activated: |
|
self.secondary_model_4_stem_model_names_list = [None if i is None else i.model_basename for i in self.secondary_model_4_stem] |
|
self.is_demucs_4_stem_secondaries = True |
|
else: |
|
primary_stem = self.ensemble_primary_stem if self.is_ensemble_mode and self.process_method == DEMUCS_ARCH_TYPE else self.primary_stem |
|
self.secondary_model_data(primary_stem) |
|
|
|
if self.process_method == DEMUCS_ARCH_TYPE and not is_secondary_model: |
|
if self.demucs_stem_count >= 3 and self.pre_proc_model_activated: |
|
self.pre_proc_model_activated = True |
|
self.pre_proc_model = root.process_determine_demucs_pre_proc_model(self.primary_stem) |
|
self.is_demucs_pre_proc_model_inst_mix = root.is_demucs_pre_proc_model_inst_mix_var.get() if self.pre_proc_model else False |
|
|
|
def secondary_model_data(self, primary_stem): |
|
secondary_model_data = root.process_determine_secondary_model(self.process_method, primary_stem, self.is_primary_stem_only, self.is_secondary_stem_only) |
|
self.secondary_model = secondary_model_data[0] |
|
self.secondary_model_scale = secondary_model_data[1] |
|
self.is_secondary_model_activated = False if not self.secondary_model else True |
|
if self.secondary_model: |
|
self.is_secondary_model_activated = False if self.secondary_model.model_basename == self.model_basename else True |
|
|
|
def get_mdx_model_path(self): |
|
|
|
if self.model_name.endswith(CKPT): |
|
|
|
|
|
self.is_mdx_ckpt = True |
|
|
|
ext = '' if self.is_mdx_ckpt else ONNX |
|
|
|
for file_name, chosen_mdx_model in root.mdx_name_select_MAPPER.items(): |
|
if self.model_name in chosen_mdx_model: |
|
self.model_path = os.path.join(MDX_MODELS_DIR, f"{file_name}{ext}") |
|
break |
|
else: |
|
self.model_path = os.path.join(MDX_MODELS_DIR, f"{self.model_name}{ext}") |
|
|
|
self.mixer_path = os.path.join(MDX_MODELS_DIR, f"mixer_val.ckpt") |
|
|
|
def get_demucs_model_path(self): |
|
|
|
demucs_newer = [True for x in DEMUCS_NEWER_TAGS if x in self.model_name] |
|
demucs_model_dir = DEMUCS_NEWER_REPO_DIR if demucs_newer else DEMUCS_MODELS_DIR |
|
|
|
for file_name, chosen_model in root.demucs_name_select_MAPPER.items(): |
|
if self.model_name in chosen_model: |
|
self.model_path = os.path.join(demucs_model_dir, file_name) |
|
break |
|
else: |
|
self.model_path = os.path.join(DEMUCS_NEWER_REPO_DIR, f'{self.model_name}.yaml') |
|
|
|
def get_demucs_model_data(self): |
|
|
|
self.demucs_version = DEMUCS_V4 |
|
|
|
for key, value in DEMUCS_VERSION_MAPPER.items(): |
|
if value in self.model_name: |
|
self.demucs_version = key |
|
|
|
self.demucs_source_list = DEMUCS_2_SOURCE if DEMUCS_UVR_MODEL in self.model_name else DEMUCS_4_SOURCE |
|
self.demucs_source_map = DEMUCS_2_SOURCE_MAPPER if DEMUCS_UVR_MODEL in self.model_name else DEMUCS_4_SOURCE_MAPPER |
|
self.demucs_stem_count = 2 if DEMUCS_UVR_MODEL in self.model_name else 4 |
|
|
|
if not self.is_ensemble_mode: |
|
self.primary_stem = PRIMARY_STEM if self.demucs_stems == ALL_STEMS else self.demucs_stems |
|
self.secondary_stem = STEM_PAIR_MAPPER[self.primary_stem] |
|
|
|
def get_model_data(self, model_hash_dir, hash_mapper): |
|
model_settings_json = os.path.join(model_hash_dir, "{}.json".format(self.model_hash)) |
|
|
|
if os.path.isfile(model_settings_json): |
|
return json.load(open(model_settings_json)) |
|
else: |
|
for hash, settings in hash_mapper.items(): |
|
if self.model_hash in hash: |
|
return settings |
|
else: |
|
return self.get_model_data_from_popup() |
|
|
|
def get_model_data_from_popup(self): |
|
return None |
|
|
|
def get_model_hash(self): |
|
self.model_hash = None |
|
|
|
if not os.path.isfile(self.model_path): |
|
self.model_status = False |
|
self.model_hash is None |
|
else: |
|
if model_hash_table: |
|
for (key, value) in model_hash_table.items(): |
|
if self.model_path == key: |
|
self.model_hash = value |
|
break |
|
|
|
if not self.model_hash: |
|
try: |
|
with open(self.model_path, 'rb') as f: |
|
f.seek(- 10000 * 1024, 2) |
|
self.model_hash = hashlib.md5(f.read()).hexdigest() |
|
except: |
|
self.model_hash = hashlib.md5(open(self.model_path,'rb').read()).hexdigest() |
|
|
|
table_entry = {self.model_path: self.model_hash} |
|
model_hash_table.update(table_entry) |
|
|
|
|
|
class Ensembler(): |
|
def __init__(self, is_manual_ensemble=False): |
|
self.is_save_all_outputs_ensemble = root.is_save_all_outputs_ensemble_var.get() |
|
chosen_ensemble_name = '{}'.format(root.chosen_ensemble_var.get().replace(" ", "_")) if not root.chosen_ensemble_var.get() == CHOOSE_ENSEMBLE_OPTION else 'Ensembled' |
|
ensemble_algorithm = root.ensemble_type_var.get().partition("/") |
|
ensemble_main_stem_pair = root.ensemble_main_stem_var.get().partition("/") |
|
time_stamp = round(time.time()) |
|
self.audio_tool = MANUAL_ENSEMBLE |
|
self.main_export_path = Path(root.export_path_var.get()) |
|
self.chosen_ensemble = f"_{chosen_ensemble_name}" if root.is_append_ensemble_name_var.get() else '' |
|
ensemble_folder_name = self.main_export_path if self.is_save_all_outputs_ensemble else ENSEMBLE_TEMP_PATH |
|
self.ensemble_folder_name = os.path.join(ensemble_folder_name, '{}_Outputs_{}'.format(chosen_ensemble_name, time_stamp)) |
|
self.is_testing_audio = f"{time_stamp}_" if root.is_testing_audio_var.get() else '' |
|
self.primary_algorithm = ensemble_algorithm[0] |
|
self.secondary_algorithm = ensemble_algorithm[2] |
|
self.ensemble_primary_stem = ensemble_main_stem_pair[0] |
|
self.ensemble_secondary_stem = ensemble_main_stem_pair[2] |
|
self.is_normalization = root.is_normalization_var.get() |
|
self.wav_type_set = root.wav_type_set |
|
self.mp3_bit_set = root.mp3_bit_set_var.get() |
|
self.save_format = root.save_format_var.get() |
|
if not is_manual_ensemble: |
|
os.mkdir(self.ensemble_folder_name) |
|
|
|
def ensemble_outputs(self, audio_file_base, export_path, stem, is_4_stem=False, is_inst_mix=False): |
|
"""Processes the given outputs and ensembles them with the chosen algorithm""" |
|
|
|
if is_4_stem: |
|
algorithm = root.ensemble_type_var.get() |
|
stem_tag = stem |
|
else: |
|
if is_inst_mix: |
|
algorithm = self.secondary_algorithm |
|
stem_tag = f"{self.ensemble_secondary_stem} {INST_STEM}" |
|
else: |
|
algorithm = self.primary_algorithm if stem == PRIMARY_STEM else self.secondary_algorithm |
|
stem_tag = self.ensemble_primary_stem if stem == PRIMARY_STEM else self.ensemble_secondary_stem |
|
|
|
stem_outputs = self.get_files_to_ensemble(folder=export_path, prefix=audio_file_base, suffix=f"_({stem_tag}).wav") |
|
audio_file_output = f"{self.is_testing_audio}{audio_file_base}{self.chosen_ensemble}_({stem_tag})" |
|
stem_save_path = os.path.join('{}'.format(self.main_export_path),'{}.wav'.format(audio_file_output)) |
|
|
|
if stem_outputs: |
|
spec_utils.ensemble_inputs(stem_outputs, algorithm, self.is_normalization, self.wav_type_set, stem_save_path) |
|
save_format(stem_save_path, self.save_format, self.mp3_bit_set) |
|
|
|
if self.is_save_all_outputs_ensemble: |
|
for i in stem_outputs: |
|
save_format(i, self.save_format, self.mp3_bit_set) |
|
else: |
|
for i in stem_outputs: |
|
try: |
|
os.remove(i) |
|
except Exception as e: |
|
print(e) |
|
|
|
def ensemble_manual(self, audio_inputs, audio_file_base, is_bulk=False): |
|
"""Processes the given outputs and ensembles them with the chosen algorithm""" |
|
|
|
is_mv_sep = True |
|
|
|
if is_bulk: |
|
number_list = list(set([os.path.basename(i).split("_")[0] for i in audio_inputs])) |
|
for n in number_list: |
|
current_list = [i for i in audio_inputs if os.path.basename(i).startswith(n)] |
|
audio_file_base = os.path.basename(current_list[0]).split('.wav')[0] |
|
stem_testing = "instrum" if "Instrumental" in audio_file_base else "vocals" |
|
if is_mv_sep: |
|
audio_file_base = audio_file_base.split("_") |
|
audio_file_base = f"{audio_file_base[1]}_{audio_file_base[2]}_{stem_testing}" |
|
self.ensemble_manual_process(current_list, audio_file_base, is_bulk) |
|
else: |
|
self.ensemble_manual_process(audio_inputs, audio_file_base, is_bulk) |
|
|
|
def ensemble_manual_process(self, audio_inputs, audio_file_base, is_bulk): |
|
|
|
algorithm = root.choose_algorithm_var.get() |
|
algorithm_text = "" if is_bulk else f"_({root.choose_algorithm_var.get()})" |
|
stem_save_path = os.path.join('{}'.format(self.main_export_path),'{}{}{}.wav'.format(self.is_testing_audio, audio_file_base, algorithm_text)) |
|
spec_utils.ensemble_inputs(audio_inputs, algorithm, self.is_normalization, self.wav_type_set, stem_save_path) |
|
save_format(stem_save_path, self.save_format, self.mp3_bit_set) |
|
|
|
def get_files_to_ensemble(self, folder="", prefix="", suffix=""): |
|
"""Grab all the files to be ensembled""" |
|
|
|
return [os.path.join(folder, i) for i in os.listdir(folder) if i.startswith(prefix) and i.endswith(suffix)] |
|
|
|
|
|
def secondary_stem(stem): |
|
"""Determines secondary stem""" |
|
|
|
for key, value in STEM_PAIR_MAPPER.items(): |
|
if stem in key: |
|
secondary_stem = value |
|
|
|
return secondary_stem |
|
|
|
|
|
class UVRInterface: |
|
def __init__(self) -> None: |
|
pass |
|
|
|
def assemble_model_data(self, model=None, arch_type=ENSEMBLE_MODE, is_dry_check=False) -> List[ModelData]: |
|
if arch_type == ENSEMBLE_STEM_CHECK: |
|
model_data = self.model_data_table |
|
missing_models = [model.model_status for model in model_data if not model.model_status] |
|
|
|
if missing_models or not model_data: |
|
model_data: List[ModelData] = [ModelData(model_name, is_dry_check=is_dry_check) for model_name in self.ensemble_model_list] |
|
self.model_data_table = model_data |
|
|
|
if arch_type == ENSEMBLE_MODE: |
|
model_data: List[ModelData] = [ModelData(model_name) for model_name in self.ensemble_listbox_get_all_selected_models()] |
|
if arch_type == ENSEMBLE_CHECK: |
|
model_data: List[ModelData] = [ModelData(model)] |
|
if arch_type == VR_ARCH_TYPE or arch_type == VR_ARCH_PM: |
|
model_data: List[ModelData] = [ModelData(model, VR_ARCH_TYPE)] |
|
if arch_type == MDX_ARCH_TYPE: |
|
model_data: List[ModelData] = [ModelData(model, MDX_ARCH_TYPE)] |
|
if arch_type == DEMUCS_ARCH_TYPE: |
|
model_data: List[ModelData] = [ModelData(model, DEMUCS_ARCH_TYPE)] |
|
|
|
return model_data |
|
|
|
def create_sample(self, audio_file, sample_path=SAMPLE_CLIP_PATH): |
|
try: |
|
with audioread.audio_open(audio_file) as f: |
|
track_length = int(f.duration) |
|
except Exception as e: |
|
print('Audioread failed to get duration. Trying Librosa...') |
|
y, sr = librosa.load(audio_file, mono=False, sr=44100) |
|
track_length = int(librosa.get_duration(y=y, sr=sr)) |
|
|
|
clip_duration = int(root.model_sample_mode_duration_var.get()) |
|
|
|
if track_length >= clip_duration: |
|
offset_cut = track_length//3 |
|
off_cut = offset_cut + track_length |
|
if not off_cut >= clip_duration: |
|
offset_cut = 0 |
|
name_apped = f'{clip_duration}_second_' |
|
else: |
|
offset_cut, clip_duration = 0, track_length |
|
name_apped = '' |
|
|
|
sample = librosa.load(audio_file, offset=offset_cut, duration=clip_duration, mono=False, sr=44100)[0].T |
|
audio_sample = os.path.join(sample_path, f'{os.path.splitext(os.path.basename(audio_file))[0]}_{name_apped}sample.wav') |
|
sf.write(audio_sample, sample, 44100) |
|
|
|
return audio_sample |
|
|
|
def verify_audio(self, audio_file, is_process=True, sample_path=None): |
|
is_good = False |
|
error_data = '' |
|
|
|
if os.path.isfile(audio_file): |
|
try: |
|
librosa.load(audio_file, duration=3, mono=False, sr=44100) if not type(sample_path) is str else self.create_sample(audio_file, sample_path) |
|
is_good = True |
|
except Exception as e: |
|
error_name = f'{type(e).__name__}' |
|
traceback_text = ''.join(traceback.format_tb(e.__traceback__)) |
|
message = f'{error_name}: "{e}"\n{traceback_text}"' |
|
if is_process: |
|
audio_base_name = os.path.basename(audio_file) |
|
self.error_log_var.set(f'Error Loading the Following File:\n\n\"{audio_base_name}\"\n\nRaw Error Details:\n\n{message}') |
|
else: |
|
error_data = AUDIO_VERIFICATION_CHECK(audio_file, message) |
|
|
|
if is_process: |
|
return is_good |
|
else: |
|
return is_good, error_data |
|
|
|
def cached_sources_clear(self): |
|
self.vr_cache_source_mapper = {} |
|
self.mdx_cache_source_mapper = {} |
|
self.demucs_cache_source_mapper = {} |
|
|
|
def cached_model_source_holder(self, process_method, sources, model_name=None): |
|
if process_method == VR_ARCH_TYPE: |
|
self.vr_cache_source_mapper = {**self.vr_cache_source_mapper, **{model_name: sources}} |
|
if process_method == MDX_ARCH_TYPE: |
|
self.mdx_cache_source_mapper = {**self.mdx_cache_source_mapper, **{model_name: sources}} |
|
if process_method == DEMUCS_ARCH_TYPE: |
|
self.demucs_cache_source_mapper = {**self.demucs_cache_source_mapper, **{model_name: sources}} |
|
|
|
def cached_source_callback(self, process_method, model_name=None): |
|
model, sources = None, None |
|
|
|
if process_method == VR_ARCH_TYPE: |
|
mapper = self.vr_cache_source_mapper |
|
if process_method == MDX_ARCH_TYPE: |
|
mapper = self.mdx_cache_source_mapper |
|
if process_method == DEMUCS_ARCH_TYPE: |
|
mapper = self.demucs_cache_source_mapper |
|
|
|
for key, value in mapper.items(): |
|
if model_name in key: |
|
model = key |
|
sources = value |
|
|
|
return model, sources |
|
|
|
def cached_source_model_list_check(self, model_list: List[ModelData]): |
|
model: ModelData |
|
primary_model_names = lambda process_method:[model.model_basename if model.process_method == process_method else None for model in model_list] |
|
secondary_model_names = lambda process_method:[model.secondary_model.model_basename if model.is_secondary_model_activated and model.process_method == process_method else None for model in model_list] |
|
|
|
self.vr_primary_model_names = primary_model_names(VR_ARCH_TYPE) |
|
self.mdx_primary_model_names = primary_model_names(MDX_ARCH_TYPE) |
|
self.demucs_primary_model_names = primary_model_names(DEMUCS_ARCH_TYPE) |
|
self.vr_secondary_model_names = secondary_model_names(VR_ARCH_TYPE) |
|
self.mdx_secondary_model_names = secondary_model_names(MDX_ARCH_TYPE) |
|
self.demucs_secondary_model_names = [model.secondary_model.model_basename if model.is_secondary_model_activated and model.process_method == DEMUCS_ARCH_TYPE and not model.secondary_model is None else None for model in model_list] |
|
self.demucs_pre_proc_model_name = [model.pre_proc_model.model_basename if model.pre_proc_model else None for model in model_list] |
|
|
|
for model in model_list: |
|
if model.process_method == DEMUCS_ARCH_TYPE and model.is_demucs_4_stem_secondaries: |
|
if not model.is_4_stem_ensemble: |
|
self.demucs_secondary_model_names = model.secondary_model_4_stem_model_names_list |
|
break |
|
else: |
|
for i in model.secondary_model_4_stem_model_names_list: |
|
self.demucs_secondary_model_names.append(i) |
|
|
|
self.all_models = self.vr_primary_model_names + self.mdx_primary_model_names + self.demucs_primary_model_names + self.vr_secondary_model_names + self.mdx_secondary_model_names + self.demucs_secondary_model_names + self.demucs_pre_proc_model_name |
|
|
|
def process(self, model_name, arch_type, audio_file, export_path, is_model_sample_mode=False, is_4_stem_ensemble=False, set_progress_func=None, console_write=print) -> SeperateAttributes: |
|
stime = time.perf_counter() |
|
time_elapsed = lambda:f'Time Elapsed: {time.strftime("%H:%M:%S", time.gmtime(int(time.perf_counter() - stime)))}' |
|
|
|
if arch_type==ENSEMBLE_MODE: |
|
model_list, ensemble = self.assemble_model_data(), Ensembler() |
|
export_path = ensemble.ensemble_folder_name |
|
is_ensemble = True |
|
else: |
|
model_list = self.assemble_model_data(model_name, arch_type) |
|
is_ensemble = False |
|
self.cached_source_model_list_check(model_list) |
|
model = model_list[0] |
|
|
|
if self.verify_audio(audio_file): |
|
audio_file = self.create_sample(audio_file) if is_model_sample_mode else audio_file |
|
else: |
|
print(f'"{os.path.basename(audio_file)}\" is missing or currupted.\n') |
|
exit() |
|
|
|
audio_file_base = f"{os.path.splitext(os.path.basename(audio_file))[0]}" |
|
audio_file_base = audio_file_base if is_ensemble else f"{round(time.time())}_{audio_file_base}" |
|
audio_file_base = audio_file_base if not is_ensemble else f"{audio_file_base}_{model.model_basename}" |
|
if not is_ensemble: |
|
audio_file_base = f"{audio_file_base}_{model.model_basename}" |
|
|
|
if not is_ensemble: |
|
export_path = os.path.join(Path(export_path), model.model_basename, os.path.splitext(os.path.basename(audio_file))[0]) |
|
if not os.path.isdir(export_path): |
|
os.makedirs(export_path) |
|
|
|
if set_progress_func is None: |
|
pbar = tqdm(total=1) |
|
self._progress = 0 |
|
def set_progress_func(step, inference_iterations=0): |
|
progress_curr = step + inference_iterations |
|
pbar.update(progress_curr-self._progress) |
|
self._progress = progress_curr |
|
|
|
def postprocess(): |
|
pbar.close() |
|
else: |
|
def postprocess(): |
|
pass |
|
|
|
process_data = { |
|
'model_data': model, |
|
'export_path': export_path, |
|
'audio_file_base': audio_file_base, |
|
'audio_file': audio_file, |
|
'set_progress_bar': set_progress_func, |
|
'write_to_console': lambda progress_text, base_text='': console_write(base_text + progress_text), |
|
'process_iteration': lambda:None, |
|
'cached_source_callback': self.cached_source_callback, |
|
'cached_model_source_holder': self.cached_model_source_holder, |
|
'list_all_models': self.all_models, |
|
'is_ensemble_master': is_ensemble, |
|
'is_4_stem_ensemble': is_ensemble and is_4_stem_ensemble |
|
} |
|
if model.process_method == VR_ARCH_TYPE: |
|
seperator = SeperateVR(model, process_data) |
|
if model.process_method == MDX_ARCH_TYPE: |
|
seperator = SeperateMDX(model, process_data) |
|
if model.process_method == DEMUCS_ARCH_TYPE: |
|
seperator = SeperateDemucs(model, process_data) |
|
|
|
seperator.seperate() |
|
postprocess() |
|
|
|
if is_ensemble: |
|
audio_file_base = audio_file_base.replace(f"_{model.model_basename}", "") |
|
console_write(ENSEMBLING_OUTPUTS) |
|
|
|
if is_4_stem_ensemble: |
|
for output_stem in DEMUCS_4_SOURCE_LIST: |
|
ensemble.ensemble_outputs(audio_file_base, export_path, output_stem, is_4_stem=True) |
|
else: |
|
if not root.is_secondary_stem_only_var.get(): |
|
ensemble.ensemble_outputs(audio_file_base, export_path, PRIMARY_STEM) |
|
if not root.is_primary_stem_only_var.get(): |
|
ensemble.ensemble_outputs(audio_file_base, export_path, SECONDARY_STEM) |
|
ensemble.ensemble_outputs(audio_file_base, export_path, SECONDARY_STEM, is_inst_mix=True) |
|
|
|
console_write(DONE) |
|
|
|
if is_model_sample_mode: |
|
if os.path.isfile(audio_file): |
|
os.remove(audio_file) |
|
|
|
torch.cuda.empty_cache() |
|
|
|
if is_ensemble and len(os.listdir(export_path)) == 0: |
|
shutil.rmtree(export_path) |
|
console_write(f'Process Complete, using time: {time_elapsed()}\nOutput path: {export_path}') |
|
self.cached_sources_clear() |
|
return seperator |
|
|
|
|
|
class RootWrapper: |
|
def __init__(self, var) -> None: |
|
self.var=var |
|
|
|
def set(self, val): |
|
self.var=val |
|
|
|
def get(self): |
|
return self.var |
|
|
|
class FakeRoot: |
|
def __init__(self) -> None: |
|
self.wav_type_set = 'PCM_16' |
|
self.vr_hash_MAPPER = load_model_hash_data(VR_HASH_JSON) |
|
self.mdx_hash_MAPPER = load_model_hash_data(MDX_HASH_JSON) |
|
self.mdx_name_select_MAPPER = load_model_hash_data(MDX_MODEL_NAME_SELECT) |
|
self.demucs_name_select_MAPPER = load_model_hash_data(DEMUCS_MODEL_NAME_SELECT) |
|
|
|
def __getattribute__(self, __name: str): |
|
try: |
|
return super().__getattribute__(__name) |
|
except AttributeError: |
|
wrapped=RootWrapper(None) |
|
super().__setattr__(__name, wrapped) |
|
return wrapped |
|
|
|
def load_saved_settings(self, loaded_setting: dict, process_method=None): |
|
"""Loads user saved application settings or resets to default""" |
|
|
|
for key, value in DEFAULT_DATA.items(): |
|
if not key in loaded_setting.keys(): |
|
loaded_setting = {**loaded_setting, **{key:value}} |
|
loaded_setting['batch_size'] = DEF_OPT |
|
|
|
is_ensemble = True if process_method == ENSEMBLE_MODE else False |
|
|
|
if not process_method or process_method == VR_ARCH_PM or is_ensemble: |
|
self.vr_model_var.set(loaded_setting['vr_model']) |
|
self.aggression_setting_var.set(loaded_setting['aggression_setting']) |
|
self.window_size_var.set(loaded_setting['window_size']) |
|
self.batch_size_var.set(loaded_setting['batch_size']) |
|
self.crop_size_var.set(loaded_setting['crop_size']) |
|
self.is_tta_var.set(loaded_setting['is_tta']) |
|
self.is_output_image_var.set(loaded_setting['is_output_image']) |
|
self.is_post_process_var.set(loaded_setting['is_post_process']) |
|
self.is_high_end_process_var.set(loaded_setting['is_high_end_process']) |
|
self.post_process_threshold_var.set(loaded_setting['post_process_threshold']) |
|
self.vr_voc_inst_secondary_model_var.set(loaded_setting['vr_voc_inst_secondary_model']) |
|
self.vr_other_secondary_model_var.set(loaded_setting['vr_other_secondary_model']) |
|
self.vr_bass_secondary_model_var.set(loaded_setting['vr_bass_secondary_model']) |
|
self.vr_drums_secondary_model_var.set(loaded_setting['vr_drums_secondary_model']) |
|
self.vr_is_secondary_model_activate_var.set(loaded_setting['vr_is_secondary_model_activate']) |
|
self.vr_voc_inst_secondary_model_scale_var.set(loaded_setting['vr_voc_inst_secondary_model_scale']) |
|
self.vr_other_secondary_model_scale_var.set(loaded_setting['vr_other_secondary_model_scale']) |
|
self.vr_bass_secondary_model_scale_var.set(loaded_setting['vr_bass_secondary_model_scale']) |
|
self.vr_drums_secondary_model_scale_var.set(loaded_setting['vr_drums_secondary_model_scale']) |
|
|
|
if not process_method or process_method == DEMUCS_ARCH_TYPE or is_ensemble: |
|
self.demucs_model_var.set(loaded_setting['demucs_model']) |
|
self.segment_var.set(loaded_setting['segment']) |
|
self.overlap_var.set(loaded_setting['overlap']) |
|
self.shifts_var.set(loaded_setting['shifts']) |
|
self.chunks_demucs_var.set(loaded_setting['chunks_demucs']) |
|
self.margin_demucs_var.set(loaded_setting['margin_demucs']) |
|
self.is_chunk_demucs_var.set(loaded_setting['is_chunk_demucs']) |
|
self.is_chunk_mdxnet_var.set(loaded_setting['is_chunk_mdxnet']) |
|
self.is_primary_stem_only_Demucs_var.set(loaded_setting['is_primary_stem_only_Demucs']) |
|
self.is_secondary_stem_only_Demucs_var.set(loaded_setting['is_secondary_stem_only_Demucs']) |
|
self.is_split_mode_var.set(loaded_setting['is_split_mode']) |
|
self.is_demucs_combine_stems_var.set(loaded_setting['is_demucs_combine_stems']) |
|
self.demucs_voc_inst_secondary_model_var.set(loaded_setting['demucs_voc_inst_secondary_model']) |
|
self.demucs_other_secondary_model_var.set(loaded_setting['demucs_other_secondary_model']) |
|
self.demucs_bass_secondary_model_var.set(loaded_setting['demucs_bass_secondary_model']) |
|
self.demucs_drums_secondary_model_var.set(loaded_setting['demucs_drums_secondary_model']) |
|
self.demucs_is_secondary_model_activate_var.set(loaded_setting['demucs_is_secondary_model_activate']) |
|
self.demucs_voc_inst_secondary_model_scale_var.set(loaded_setting['demucs_voc_inst_secondary_model_scale']) |
|
self.demucs_other_secondary_model_scale_var.set(loaded_setting['demucs_other_secondary_model_scale']) |
|
self.demucs_bass_secondary_model_scale_var.set(loaded_setting['demucs_bass_secondary_model_scale']) |
|
self.demucs_drums_secondary_model_scale_var.set(loaded_setting['demucs_drums_secondary_model_scale']) |
|
self.demucs_stems_var.set(loaded_setting['demucs_stems']) |
|
|
|
self.demucs_pre_proc_model_var.set(data['demucs_pre_proc_model']) |
|
self.is_demucs_pre_proc_model_activate_var.set(data['is_demucs_pre_proc_model_activate']) |
|
self.is_demucs_pre_proc_model_inst_mix_var.set(data['is_demucs_pre_proc_model_inst_mix']) |
|
|
|
if not process_method or process_method == MDX_ARCH_TYPE or is_ensemble: |
|
self.mdx_net_model_var.set(loaded_setting['mdx_net_model']) |
|
self.chunks_var.set(loaded_setting['chunks']) |
|
self.margin_var.set(loaded_setting['margin']) |
|
self.compensate_var.set(loaded_setting['compensate']) |
|
self.is_denoise_var.set(loaded_setting['is_denoise']) |
|
self.is_invert_spec_var.set(loaded_setting['is_invert_spec']) |
|
self.is_mixer_mode_var.set(loaded_setting['is_mixer_mode']) |
|
self.mdx_batch_size_var.set(loaded_setting['mdx_batch_size']) |
|
self.mdx_voc_inst_secondary_model_var.set(loaded_setting['mdx_voc_inst_secondary_model']) |
|
self.mdx_other_secondary_model_var.set(loaded_setting['mdx_other_secondary_model']) |
|
self.mdx_bass_secondary_model_var.set(loaded_setting['mdx_bass_secondary_model']) |
|
self.mdx_drums_secondary_model_var.set(loaded_setting['mdx_drums_secondary_model']) |
|
self.mdx_is_secondary_model_activate_var.set(loaded_setting['mdx_is_secondary_model_activate']) |
|
self.mdx_voc_inst_secondary_model_scale_var.set(loaded_setting['mdx_voc_inst_secondary_model_scale']) |
|
self.mdx_other_secondary_model_scale_var.set(loaded_setting['mdx_other_secondary_model_scale']) |
|
self.mdx_bass_secondary_model_scale_var.set(loaded_setting['mdx_bass_secondary_model_scale']) |
|
self.mdx_drums_secondary_model_scale_var.set(loaded_setting['mdx_drums_secondary_model_scale']) |
|
|
|
if not process_method or is_ensemble: |
|
self.is_save_all_outputs_ensemble_var.set(loaded_setting['is_save_all_outputs_ensemble']) |
|
self.is_append_ensemble_name_var.set(loaded_setting['is_append_ensemble_name']) |
|
self.chosen_audio_tool_var.set(loaded_setting['chosen_audio_tool']) |
|
self.choose_algorithm_var.set(loaded_setting['choose_algorithm']) |
|
self.time_stretch_rate_var.set(loaded_setting['time_stretch_rate']) |
|
self.pitch_rate_var.set(loaded_setting['pitch_rate']) |
|
self.is_primary_stem_only_var.set(loaded_setting['is_primary_stem_only']) |
|
self.is_secondary_stem_only_var.set(loaded_setting['is_secondary_stem_only']) |
|
self.is_testing_audio_var.set(loaded_setting['is_testing_audio']) |
|
self.is_add_model_name_var.set(loaded_setting['is_add_model_name']) |
|
self.is_accept_any_input_var.set(loaded_setting["is_accept_any_input"]) |
|
self.is_task_complete_var.set(loaded_setting['is_task_complete']) |
|
self.is_create_model_folder_var.set(loaded_setting['is_create_model_folder']) |
|
self.mp3_bit_set_var.set(loaded_setting['mp3_bit_set']) |
|
self.save_format_var.set(loaded_setting['save_format']) |
|
self.wav_type_set_var.set(loaded_setting['wav_type_set']) |
|
self.user_code_var.set(loaded_setting['user_code']) |
|
|
|
self.is_gpu_conversion_var.set(loaded_setting['is_gpu_conversion']) |
|
self.is_normalization_var.set(loaded_setting['is_normalization']) |
|
self.help_hints_var.set(loaded_setting['help_hints_var']) |
|
|
|
self.model_sample_mode_var.set(loaded_setting['model_sample_mode']) |
|
self.model_sample_mode_duration_var.set(loaded_setting['model_sample_mode_duration']) |
|
|
|
|
|
root = FakeRoot() |
|
root.load_saved_settings(DEFAULT_DATA) |