import argparse import os from pathlib import Path import logging import re_matching import uuid from flask import Flask, request, jsonify, render_template_string from flask_cors import CORS logging.getLogger("numba").setLevel(logging.WARNING) logging.getLogger("markdown_it").setLevel(logging.WARNING) logging.getLogger("urllib3").setLevel(logging.WARNING) logging.getLogger("matplotlib").setLevel(logging.WARNING) logging.basicConfig( level=logging.INFO, format="| %(name)s | %(levelname)s | %(message)s" ) logger = logging.getLogger(__name__) import librosa import numpy as np import torch import torch.nn as nn from torch.utils.data import Dataset from torch.utils.data import DataLoader, Dataset from tqdm import tqdm import utils from config import config import requests import torch import commons from text import cleaned_text_to_sequence, get_bert from clap_wrapper import get_clap_audio_feature, get_clap_text_feature from text.cleaner import clean_text import utils from models import SynthesizerTrn from text.symbols import symbols import sys from scipy.io.wavfile import write net_g = None device = ( "cuda:0" if torch.cuda.is_available() else ( "mps" if sys.platform == "darwin" and torch.backends.mps.is_available() else "cpu" ) ) #device = 'cpu' def get_net_g(model_path: str, device: str, hps): net_g = SynthesizerTrn( len(symbols), hps.data.filter_length // 2 + 1, hps.train.segment_size // hps.data.hop_length, n_speakers=hps.data.n_speakers, **hps.model, ).to(device) _ = net_g.eval() _ = utils.load_checkpoint(model_path, net_g, None, skip_optimizer=True) return net_g def get_text(text, language_str, hps, device): norm_text, phone, tone, word2ph = clean_text(text, language_str) phone, tone, language = cleaned_text_to_sequence(phone, tone, language_str) #print(text) if hps.data.add_blank: phone = commons.intersperse(phone, 0) tone = commons.intersperse(tone, 0) language = commons.intersperse(language, 0) for i in range(len(word2ph)): word2ph[i] = word2ph[i] * 2 word2ph[0] += 1 bert_ori = get_bert(norm_text, word2ph, language_str, device) del word2ph assert bert_ori.shape[-1] == len(phone), phone if language_str == "ZH": bert = bert_ori ja_bert = torch.zeros(1024, len(phone)) en_bert = torch.zeros(1024, len(phone)) elif language_str == "JP": bert = torch.zeros(1024, len(phone)) ja_bert = bert_ori en_bert = torch.zeros(1024, len(phone)) else: raise ValueError("language_str should be ZH, JP or EN") assert bert.shape[-1] == len( phone ), f"Bert seq len {bert.shape[-1]} != {len(phone)}" phone = torch.LongTensor(phone) tone = torch.LongTensor(tone) language = torch.LongTensor(language) return bert, ja_bert, en_bert, phone, tone, language def infer( text, sdp_ratio, noise_scale, noise_scale_w, length_scale, sid, reference_audio=None, emotion='Happy', ): language= 'JP' if is_japanese(text) else 'ZH' if isinstance(reference_audio, np.ndarray): emo = get_clap_audio_feature(reference_audio, device) else: emo = get_clap_text_feature(emotion, device) emo = torch.squeeze(emo, dim=1) bert, ja_bert, en_bert, phones, tones, lang_ids = get_text( text, language, hps, device ) with torch.no_grad(): x_tst = phones.to(device).unsqueeze(0) tones = tones.to(device).unsqueeze(0) lang_ids = lang_ids.to(device).unsqueeze(0) bert = bert.to(device).unsqueeze(0) ja_bert = ja_bert.to(device).unsqueeze(0) en_bert = en_bert.to(device).unsqueeze(0) x_tst_lengths = torch.LongTensor([phones.size(0)]).to(device) emo = emo.to(device).unsqueeze(0) del phones speakers = torch.LongTensor([hps.data.spk2id[sid]]).to(device) audio = ( net_g.infer( x_tst, x_tst_lengths, speakers, tones, lang_ids, bert, ja_bert, en_bert, emo, sdp_ratio=sdp_ratio, noise_scale=noise_scale, noise_scale_w=noise_scale_w, length_scale=length_scale, )[0][0, 0] .data.cpu() .float() .numpy() ) del x_tst, tones, lang_ids, bert, x_tst_lengths, speakers, ja_bert, en_bert, emo if torch.cuda.is_available(): torch.cuda.empty_cache() unique_filename = f"temp{uuid.uuid4()}.wav" write(unique_filename, 44100, audio) return unique_filename def is_japanese(string): for ch in string: if ord(ch) > 0x3040 and ord(ch) < 0x30FF: return True return False def loadmodel(model): try: _ = net_g.eval() _ = utils.load_checkpoint(model, net_g, None, skip_optimizer=True) return "success" except: return "error" def send_audio_to_server(audio_path,text): url="http://127.0.0.1:3000/response" files = {'file': open(audio_path, 'rb')} data = {'text': text} try: response = requests.post(url, files=files,data=data) return response.status_code, response.text except Exception as e: return 500, str(e) app = Flask(__name__) CORS(app) @app.route('/') def tts(): global last_text, last_model speaker = request.args.get('speaker') sdp_ratio = float(request.args.get('sdp_ratio', 0.2)) noise_scale = float(request.args.get('noise_scale', 0.6)) noise_scale_w = float(request.args.get('noise_scale_w', 0.8)) length_scale = float(request.args.get('length_scale', 1)) emotion = request.args.get('emotion', 'happy') text = request.args.get('text') is_chat = request.args.get('is_chat', 'false').lower() == 'true' model = request.args.get('model',modelPaths[-1]) if not speaker or not text: return render_template_string(""" TTS API Documentation """) if model != last_model: unique_filename = loadmodel(model) last_model = model if is_chat and text == last_text: # Generate 1 second of silence and return unique_filename = 'blank.wav' silence = np.zeros(44100, dtype=np.int16) write(unique_filename , 44100, silence) else: last_text = text unique_filename = infer(text, sdp_ratio=sdp_ratio, noise_scale=noise_scale, noise_scale_w=noise_scale_w, length_scale=length_scale,sid = speaker, reference_audio=None, emotion=emotion) status_code, response_text = send_audio_to_server(unique_filename,text) print(f"Response from server: {response_text} (Status code: {status_code})") with open(unique_filename ,'rb') as bit: wav_bytes = bit.read() os.remove(unique_filename) headers = { 'Content-Type': 'audio/wav', 'Text': unique_filename .encode('utf-8')} return wav_bytes, 200, headers if __name__ == "__main__": languages = [ "Auto", "ZH", "JP"] modelPaths = [] for dirpath, dirnames, filenames in os.walk("Data/BangDreamV22/models/"): for filename in filenames: modelPaths.append(os.path.join(dirpath, filename)) hps = utils.get_hparams_from_file('Data/BangDreamV22/configs/config.json') net_g = get_net_g( model_path=modelPaths[-1], device=device, hps=hps ) speaker_ids = hps.data.spk2id speakers = list(speaker_ids.keys()) last_text = "" last_model = modelPaths[-1] app.run(host="0.0.0.0", port=5000)