|
import ChatTTS |
|
import logging |
|
from baseHandler import BaseHandler |
|
import librosa |
|
import numpy as np |
|
from rich.console import Console |
|
import torch |
|
|
|
logging.basicConfig( |
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
console = Console() |
|
|
|
|
|
class ChatTTSHandler(BaseHandler): |
|
def setup( |
|
self, |
|
should_listen, |
|
device="cuda", |
|
gen_kwargs={}, |
|
stream=True, |
|
chunk_size=512, |
|
): |
|
self.should_listen = should_listen |
|
self.device = device |
|
self.model = ChatTTS.Chat() |
|
self.model.load(compile=False) |
|
self.chunk_size = chunk_size |
|
self.stream = stream |
|
rnd_spk_emb = self.model.sample_random_speaker() |
|
self.params_infer_code = ChatTTS.Chat.InferCodeParams( |
|
spk_emb=rnd_spk_emb, |
|
) |
|
self.warmup() |
|
|
|
def warmup(self): |
|
logger.info(f"Warming up {self.__class__.__name__}") |
|
_ = self.model.infer("text") |
|
|
|
def process(self, llm_sentence): |
|
console.print(f"[green]ASSISTANT: {llm_sentence}") |
|
if self.device == "mps": |
|
import time |
|
|
|
start = time.time() |
|
torch.mps.synchronize() |
|
torch.mps.empty_cache() |
|
_ = ( |
|
time.time() - start |
|
) |
|
|
|
wavs_gen = self.model.infer( |
|
llm_sentence, params_infer_code=self.params_infer_code, stream=self.stream |
|
) |
|
|
|
if self.stream: |
|
wavs = [np.array([])] |
|
for gen in wavs_gen: |
|
if gen[0] is None or len(gen[0]) == 0: |
|
self.should_listen.set() |
|
return |
|
audio_chunk = librosa.resample(gen[0], orig_sr=24000, target_sr=16000) |
|
audio_chunk = (audio_chunk * 32768).astype(np.int16)[0] |
|
while len(audio_chunk) > self.chunk_size: |
|
yield audio_chunk[: self.chunk_size] |
|
audio_chunk = audio_chunk[self.chunk_size :] |
|
yield np.pad(audio_chunk, (0, self.chunk_size - len(audio_chunk))) |
|
else: |
|
wavs = wavs_gen |
|
if len(wavs[0]) == 0: |
|
self.should_listen.set() |
|
return |
|
audio_chunk = librosa.resample(wavs[0], orig_sr=24000, target_sr=16000) |
|
audio_chunk = (audio_chunk * 32768).astype(np.int16) |
|
for i in range(0, len(audio_chunk), self.chunk_size): |
|
yield np.pad( |
|
audio_chunk[i : i + self.chunk_size], |
|
(0, self.chunk_size - len(audio_chunk[i : i + self.chunk_size])), |
|
) |
|
self.should_listen.set() |
|
|