from fastapi import FastAPI # Create an instance of the FastAPI class app = FastAPI() # Define a route for the root endpoint @app.get("/") def read_root(): return {"message": "Hello, World!"} def detect_onnx_models(path): onnx_models = glob.glob(path + '/*.onnx') if len(onnx_models) > 1: return onnx_models elif len(onnx_models) == 1: return onnx_models[0] else: return None def main(): """Main entry point""" models_path = "/content/piper/src/python" logging.basicConfig(level=logging.DEBUG) providers = [ "CPUExecutionProvider" if use_gpu is False else ("CUDAExecutionProvider", {"cudnn_conv_algo_search": "DEFAULT"}) ] sess_options = onnxruntime.SessionOptions() model = None onnx_models = detect_onnx_models(models_path) speaker_selection = widgets.Dropdown( options=[], description=f'{lan.translate(lang, "Select speaker")}:', layout={'visibility': 'hidden'} ) if onnx_models is None: if enhanced_accessibility: playaudio("novoices") raise Exception(lan.translate(lang, "No downloaded voice packages!")) elif isinstance(onnx_models, str): onnx_model = onnx_models model, config = load_onnx(onnx_model, sess_options, providers) if config["num_speakers"] > 1: speaker_selection.options = config["speaker_id_map"].values() speaker_selection.layout.visibility = 'visible' preview_sid = 0 if enhanced_accessibility: playaudio("multispeaker") else: speaker_selection.layout.visibility = 'hidden' preview_sid = None if enhanced_accessibility: inferencing( model, config, preview_sid, lan.translate( config["espeak"]["voice"][:2], "Interface openned. Write your texts, configure the different synthesis options or download all the voices you want. Enjoy!" ) ) else: voice_model_names = [] for current in onnx_models: voice_struct = current.split("/")[5] voice_model_names.append(voice_struct) if enhanced_accessibility: playaudio("selectmodel") selection = widgets.Dropdown( options=voice_model_names, description=f'{lan.translate(lang, "Select voice package")}:', ) load_btn = widgets.Button( description=lan.translate(lang, "Load it!") ) config = None def load_model(button): nonlocal config global onnx_model nonlocal model nonlocal models_path selected_voice = selection.value onnx_model = f"{models_path}/{selected_voice}" model, config = load_onnx(onnx_model, sess_options, providers) if enhanced_accessibility: playaudio("loaded") if config["num_speakers"] > 1: speaker_selection.options = config["speaker_id_map"].values() speaker_selection.layout.visibility = 'visible' if enhanced_accessibility: playaudio("multispeaker") else: speaker_selection.layout.visibility = 'hidden' load_btn.on_click(load_model) display(selection, load_btn) display(speaker_selection) speed_slider = widgets.FloatSlider( value=1, min=0.25, max=4, step=0.1, description=lan.translate(lang, "Rate scale"), orientation='horizontal', ) noise_scale_slider = widgets.FloatSlider( value=0.667, min=0.25, max=4, step=0.1, description=lan.translate(lang, "Phoneme noise scale"), orientation='horizontal', ) noise_scale_w_slider = widgets.FloatSlider( value=1, min=0.25, max=4, step=0.1, description=lan.translate(lang, "Phoneme stressing scale"), orientation='horizontal', ) play = widgets.Checkbox( value=True, description=lan.translate(lang, "Auto-play"), disabled=False ) text_input = widgets.Text( value='', placeholder=f'{lan.translate(lang, "Enter your text here")}:', description=lan.translate(lang, "Text to synthesize"), layout=widgets.Layout(width='80%') ) synthesize_button = widgets.Button( description=lan.translate(lang, "Synthesize"), button_style='success', # 'success', 'info', 'warning', 'danger' or '' tooltip=lan.translate(lang, "Click here to synthesize the text."), icon='check' ) close_button = widgets.Button( description=lan.translate(lang, "Exit"), tooltip=lan.translate(lang, "Closes this GUI."), icon='check' ) def on_synthesize_button_clicked(b): if model is None: if enhanced_accessibility: playaudio("nomodel") raise Exception(lan.translate(lang, "You have not loaded any model from the list!")) text = text_input.value if config["num_speakers"] > 1: sid = speaker_selection.value else: sid = None rate = speed_slider.value noise_scale = noise_scale_slider.value noise_scale_w = noise_scale_w_slider.value auto_play = play.value inferencing(model, config, sid, text, rate, noise_scale, noise_scale_w, auto_play) def on_close_button_clicked(b): clear_output() if enhanced_accessibility: playaudio("exit") synthesize_button.on_click(on_synthesize_button_clicked) close_button.on_click(on_close_button_clicked) display(text_input) display(speed_slider) display(noise_scale_slider) display(noise_scale_w_slider) display(play) display(synthesize_button) display(close_button) def load_onnx(model, sess_options, providers = ["CPUExecutionProvider"]): _LOGGER.debug("Loading model from %s", model) config = load_config(model) model = onnxruntime.InferenceSession( str(model), sess_options=sess_options, providers= providers ) _LOGGER.info("Loaded model from %s", model) return model, config def load_config(model): with open(f"{model}.json", "r") as file: config = json.load(file) return config PAD = "_" # padding (0) BOS = "^" # beginning of sentence EOS = "$" # end of sentence class PhonemeType(str, Enum): ESPEAK = "espeak" TEXT = "text" def phonemize(config, text: str) -> List[List[str]]: """Text to phonemes grouped by sentence.""" if config["phoneme_type"] == PhonemeType.ESPEAK: if config["espeak"]["voice"] == "ar": # Arabic diacritization # https://github.com/mush42/libtashkeel/ text = tashkeel_run(text) return phonemize_espeak(text, config["espeak"]["voice"]) if config["phoneme_type"] == PhonemeType.TEXT: return phonemize_codepoints(text) raise ValueError(f'Unexpected phoneme type: {config["phoneme_type"]}') def phonemes_to_ids(config, phonemes: List[str]) -> List[int]: """Phonemes to ids.""" id_map = config["phoneme_id_map"] ids: List[int] = list(id_map[BOS]) for phoneme in phonemes: if phoneme not in id_map: print("Missing phoneme from id map: %s", phoneme) continue ids.extend(id_map[phoneme]) ids.extend(id_map[PAD]) ids.extend(id_map[EOS]) return ids def inferencing(model, config, sid, line, length_scale = 1, noise_scale = 0.667, noise_scale_w = 0.8, auto_play=True): audios = [] if config["phoneme_type"] == "PhonemeType.ESPEAK": config["phoneme_type"] = "espeak" text = phonemize(config, line) for phonemes in text: phoneme_ids = phonemes_to_ids(config, phonemes) num_speakers = config["num_speakers"] if num_speakers == 1: speaker_id = None # for now else: speaker_id = sid text = np.expand_dims(np.array(phoneme_ids, dtype=np.int64), 0) text_lengths = np.array([text.shape[1]], dtype=np.int64) scales = np.array( [noise_scale, length_scale, noise_scale_w], dtype=np.float32, ) sid = None if speaker_id is not None: sid = np.array([speaker_id], dtype=np.int64) audio = model.run( None, { "input": text, "input_lengths": text_lengths, "scales": scales, "sid": sid, }, )[0].squeeze((0, 1)) audio = audio_float_to_int16(audio.squeeze()) audios.append(audio) merged_audio = np.concatenate(audios) sample_rate = config["audio"]["sample_rate"] display(Markdown(f"{line}")) display(Audio(merged_audio, rate=sample_rate, autoplay=auto_play)) def denoise( audio: np.ndarray, bias_spec: np.ndarray, denoiser_strength: float ) -> np.ndarray: audio_spec, audio_angles = transform(audio) a = bias_spec.shape[-1] b = audio_spec.shape[-1] repeats = max(1, math.ceil(b / a)) bias_spec_repeat = np.repeat(bias_spec, repeats, axis=-1)[..., :b] audio_spec_denoised = audio_spec - (bias_spec_repeat * denoiser_strength) audio_spec_denoised = np.clip(audio_spec_denoised, a_min=0.0, a_max=None) audio_denoised = inverse(audio_spec_denoised, audio_angles) return audio_denoised def stft(x, fft_size, hopsamp): """Compute and return the STFT of the supplied time domain signal x. Args: x (1-dim Numpy array): A time domain signal. fft_size (int): FFT size. Should be a power of 2, otherwise DFT will be used. hopsamp (int): Returns: The STFT. The rows are the time slices and columns are the frequency bins. """ window = np.hanning(fft_size) fft_size = int(fft_size) hopsamp = int(hopsamp) return np.array( [ np.fft.rfft(window * x[i : i + fft_size]) for i in range(0, len(x) - fft_size, hopsamp) ] ) def istft(X, fft_size, hopsamp): """Invert a STFT into a time domain signal. Args: X (2-dim Numpy array): Input spectrogram. The rows are the time slices and columns are the frequency bins. fft_size (int): hopsamp (int): The hop size, in samples. Returns: The inverse STFT. """ fft_size = int(fft_size) hopsamp = int(hopsamp) window = np.hanning(fft_size) time_slices = X.shape[0] len_samples = int(time_slices * hopsamp + fft_size) x = np.zeros(len_samples) for n, i in enumerate(range(0, len(x) - fft_size, hopsamp)): x[i : i + fft_size] += window * np.real(np.fft.irfft(X[n])) return x def inverse(magnitude, phase): recombine_magnitude_phase = np.concatenate( [magnitude * np.cos(phase), magnitude * np.sin(phase)], axis=1 ) x_org = recombine_magnitude_phase n_b, n_f, n_t = x_org.shape # pylint: disable=unpacking-non-sequence x = np.empty([n_b, n_f // 2, n_t], dtype=np.complex64) x.real = x_org[:, : n_f // 2] x.imag = x_org[:, n_f // 2 :] inverse_transform = [] for y in x: y_ = istft(y.T, fft_size=1024, hopsamp=256) inverse_transform.append(y_[None, :]) inverse_transform = np.concatenate(inverse_transform, 0) return inverse_transform def transform(input_data): x = input_data real_part = [] imag_part = [] for y in x: y_ = stft(y, fft_size=1024, hopsamp=256).T real_part.append(y_.real[None, :, :]) # pylint: disable=unsubscriptable-object imag_part.append(y_.imag[None, :, :]) # pylint: disable=unsubscriptable-object real_part = np.concatenate(real_part, 0) imag_part = np.concatenate(imag_part, 0) magnitude = np.sqrt(real_part**2 + imag_part**2) phase = np.arctan2(imag_part.data, real_part.data) return magnitude, phase