|
import threading |
|
from itertools import chain |
|
from typing import List, Optional, Tuple |
|
|
|
import numpy |
|
from scipy.signal import resample |
|
|
|
from ..acoustic_feature_extractor import OjtPhoneme |
|
from ..model import AccentPhrase, AudioQuery, Mora |
|
from .core_wrapper import CoreWrapper, OldCoreError |
|
from .synthesis_engine_base import SynthesisEngineBase |
|
|
|
unvoiced_mora_phoneme_list = ["A", "I", "U", "E", "O", "cl", "pau"] |
|
mora_phoneme_list = ["a", "i", "u", "e", "o", "N"] + unvoiced_mora_phoneme_list |
|
|
|
|
|
|
|
def to_flatten_moras(accent_phrases: List[AccentPhrase]) -> List[Mora]: |
|
""" |
|
accent_phrasesに含まれるMora(とpause_moraがあればそれも)を |
|
すべて一つのリストに結合する |
|
Parameters |
|
---------- |
|
accent_phrases : List[AccentPhrase] |
|
AccentPhraseのリスト |
|
Returns |
|
------- |
|
moras : List[Mora] |
|
結合されたMoraのリストを返す |
|
""" |
|
return list( |
|
chain.from_iterable( |
|
accent_phrase.moras |
|
+ ( |
|
[accent_phrase.pause_mora] |
|
if accent_phrase.pause_mora is not None |
|
else [] |
|
) |
|
for accent_phrase in accent_phrases |
|
) |
|
) |
|
|
|
|
|
def to_phoneme_data_list(phoneme_str_list: List[str]): |
|
""" |
|
phoneme文字列のリストを、OjtPhonemeクラスのリストに変換する |
|
Parameters |
|
---------- |
|
phoneme_str_list : List[str] |
|
phoneme文字列のリスト |
|
Returns |
|
------- |
|
phoneme_list : List[OjtPhoneme] |
|
変換されたOjtPhonemeクラスのリスト |
|
""" |
|
phoneme_data_list = [ |
|
OjtPhoneme(phoneme=p, start=i, end=i + 1) |
|
for i, p in enumerate(phoneme_str_list) |
|
] |
|
phoneme_data_list = OjtPhoneme.convert(phoneme_data_list) |
|
return phoneme_data_list |
|
|
|
|
|
def split_mora(phoneme_list: List[OjtPhoneme]): |
|
""" |
|
OjtPhonemeのリストから、 |
|
母音の位置(vowel_indexes) |
|
母音の音素列(vowel_phoneme_list) |
|
子音の音素列(consonant_phoneme_list) |
|
を生成し、返す |
|
Parameters |
|
---------- |
|
phoneme_list : List[OjtPhoneme] |
|
phonemeクラスのリスト |
|
Returns |
|
------- |
|
consonant_phoneme_list : List[OjtPhoneme] |
|
子音の音素列 |
|
vowel_phoneme_list : List[OjtPhoneme] |
|
母音の音素列 |
|
vowel_indexes : : List[int] |
|
母音の位置 |
|
""" |
|
vowel_indexes = [ |
|
i for i, p in enumerate(phoneme_list) if p.phoneme in mora_phoneme_list |
|
] |
|
vowel_phoneme_list = [phoneme_list[i] for i in vowel_indexes] |
|
|
|
|
|
|
|
|
|
|
|
consonant_phoneme_list: List[Optional[OjtPhoneme]] = [None] + [ |
|
None if post - prev == 1 else phoneme_list[post - 1] |
|
for prev, post in zip(vowel_indexes[:-1], vowel_indexes[1:]) |
|
] |
|
return consonant_phoneme_list, vowel_phoneme_list, vowel_indexes |
|
|
|
|
|
def pre_process( |
|
accent_phrases: List[AccentPhrase], |
|
) -> Tuple[List[Mora], List[OjtPhoneme]]: |
|
""" |
|
AccentPhraseモデルのリストを整形し、処理に必要なデータの原型を作り出す |
|
Parameters |
|
---------- |
|
accent_phrases : List[AccentPhrase] |
|
AccentPhraseモデルのリスト |
|
Returns |
|
------- |
|
flatten_moras : List[Mora] |
|
AccentPhraseモデルのリスト内に含まれるすべてのMoraをリスト化したものを返す |
|
phoneme_data_list : List[OjtPhoneme] |
|
flatten_morasから取り出したすべてのPhonemeをOjtPhonemeに変換したものを返す |
|
""" |
|
flatten_moras = to_flatten_moras(accent_phrases) |
|
|
|
phoneme_each_mora = [ |
|
([mora.consonant] if mora.consonant is not None else []) + [mora.vowel] |
|
for mora in flatten_moras |
|
] |
|
phoneme_str_list = list(chain.from_iterable(phoneme_each_mora)) |
|
phoneme_str_list = ["pau"] + phoneme_str_list + ["pau"] |
|
|
|
phoneme_data_list = to_phoneme_data_list(phoneme_str_list) |
|
|
|
return flatten_moras, phoneme_data_list |
|
|
|
|
|
class SynthesisEngine(SynthesisEngineBase): |
|
def __init__( |
|
self, |
|
core: CoreWrapper, |
|
): |
|
""" |
|
core.yukarin_s_forward: 音素列から、音素ごとの長さを求める関数 |
|
length: 音素列の長さ |
|
phoneme_list: 音素列 |
|
speaker_id: 話者番号 |
|
return: 音素ごとの長さ |
|
|
|
core.yukarin_sa_forward: モーラごとの音素列とアクセント情報から、モーラごとの音高を求める関数 |
|
length: モーラ列の長さ |
|
vowel_phoneme_list: 母音の音素列 |
|
consonant_phoneme_list: 子音の音素列 |
|
start_accent_list: アクセントの開始位置 |
|
end_accent_list: アクセントの終了位置 |
|
start_accent_phrase_list: アクセント句の開始位置 |
|
end_accent_phrase_list: アクセント句の終了位置 |
|
speaker_id: 話者番号 |
|
return: モーラごとの音高 |
|
|
|
core.decode_forward: フレームごとの音素と音高から波形を求める関数 |
|
length: フレームの長さ |
|
phoneme_size: 音素の種類数 |
|
f0: フレームごとの音高 |
|
phoneme: フレームごとの音素 |
|
speaker_id: 話者番号 |
|
return: 音声波形 |
|
|
|
speakers: coreから取得したspeakersに関するjsonデータの文字列 |
|
|
|
supported_devices: |
|
coreから取得した対応デバイスに関するjsonデータの文字列 |
|
Noneの場合はコアが情報の取得に対応していないため、対応デバイスは不明 |
|
""" |
|
super().__init__() |
|
self.core = core |
|
self._speakers = self.core.metas() |
|
self.mutex = threading.Lock() |
|
try: |
|
self._supported_devices = self.core.supported_devices() |
|
except OldCoreError: |
|
self._supported_devices = None |
|
self.default_sampling_rate = 24000 |
|
|
|
@property |
|
def speakers(self) -> str: |
|
return self._speakers |
|
|
|
@property |
|
def supported_devices(self) -> Optional[str]: |
|
return self._supported_devices |
|
|
|
def initialize_speaker_synthesis(self, speaker_id: int, skip_reinit: bool): |
|
try: |
|
with self.mutex: |
|
|
|
|
|
|
|
if (not skip_reinit) or (not self.core.is_model_loaded(speaker_id)): |
|
self.core.load_model(speaker_id) |
|
except OldCoreError: |
|
pass |
|
|
|
def is_initialized_speaker_synthesis(self, speaker_id: int) -> bool: |
|
try: |
|
return self.core.is_model_loaded(speaker_id) |
|
except OldCoreError: |
|
return True |
|
|
|
def replace_phoneme_length( |
|
self, accent_phrases: List[AccentPhrase], speaker_id: int |
|
) -> List[AccentPhrase]: |
|
""" |
|
accent_phrasesの母音・子音の長さを設定する |
|
Parameters |
|
---------- |
|
accent_phrases : List[AccentPhrase] |
|
アクセント句モデルのリスト |
|
speaker_id : int |
|
話者ID |
|
Returns |
|
------- |
|
accent_phrases : List[AccentPhrase] |
|
母音・子音の長さが設定されたアクセント句モデルのリスト |
|
""" |
|
|
|
self.initialize_speaker_synthesis(speaker_id, skip_reinit=True) |
|
|
|
|
|
flatten_moras, phoneme_data_list = pre_process(accent_phrases) |
|
|
|
_, _, vowel_indexes_data = split_mora(phoneme_data_list) |
|
|
|
|
|
|
|
phoneme_list_s = numpy.array( |
|
[p.phoneme_id for p in phoneme_data_list], dtype=numpy.int64 |
|
) |
|
|
|
with self.mutex: |
|
phoneme_length = self.core.yukarin_s_forward( |
|
length=len(phoneme_list_s), |
|
phoneme_list=phoneme_list_s, |
|
speaker_id=numpy.array(speaker_id, dtype=numpy.int64).reshape(-1), |
|
) |
|
|
|
|
|
|
|
for i, mora in enumerate(flatten_moras): |
|
mora.consonant_length = ( |
|
phoneme_length[vowel_indexes_data[i + 1] - 1] |
|
if mora.consonant is not None |
|
else None |
|
) |
|
mora.vowel_length = phoneme_length[vowel_indexes_data[i + 1]] |
|
|
|
return accent_phrases |
|
|
|
def replace_mora_pitch( |
|
self, accent_phrases: List[AccentPhrase], speaker_id: int |
|
) -> List[AccentPhrase]: |
|
""" |
|
accent_phrasesの音高(ピッチ)を設定する |
|
Parameters |
|
---------- |
|
accent_phrases : List[AccentPhrase] |
|
アクセント句モデルのリスト |
|
speaker_id : int |
|
話者ID |
|
Returns |
|
------- |
|
accent_phrases : List[AccentPhrase] |
|
音高(ピッチ)が設定されたアクセント句モデルのリスト |
|
""" |
|
|
|
self.initialize_speaker_synthesis(speaker_id, skip_reinit=True) |
|
|
|
if len(accent_phrases) == 0: |
|
return [] |
|
|
|
|
|
|
|
flatten_moras, phoneme_data_list = pre_process(accent_phrases) |
|
|
|
|
|
def _create_one_hot(accent_phrase: AccentPhrase, position: int): |
|
""" |
|
単位行列(numpy.eye)を応用し、accent_phrase内でone hotな配列(リスト)を作る |
|
例えば、accent_phraseのmorasの長さが12、positionが1なら |
|
[0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] |
|
morasの長さが同じく12、positionが-1なら |
|
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1] |
|
のような配列を生成する |
|
accent_phraseがpause_moraを含む場合はさらに後ろに0が足される |
|
Parameters |
|
---------- |
|
accent_phrase : AccentPhrase |
|
アクセント句モデル |
|
position : int |
|
one hotにするindex |
|
Returns |
|
------- |
|
one_hot : numpy.ndarray |
|
one hotな配列(リスト) |
|
""" |
|
return numpy.r_[ |
|
numpy.eye(len(accent_phrase.moras))[position], |
|
(0 if accent_phrase.pause_mora is not None else []), |
|
] |
|
|
|
|
|
start_accent_list = numpy.concatenate( |
|
[ |
|
|
|
|
|
|
|
_create_one_hot(accent_phrase, 0 if accent_phrase.accent == 1 else 1) |
|
for accent_phrase in accent_phrases |
|
] |
|
) |
|
|
|
|
|
end_accent_list = numpy.concatenate( |
|
[ |
|
|
|
_create_one_hot(accent_phrase, accent_phrase.accent - 1) |
|
for accent_phrase in accent_phrases |
|
] |
|
) |
|
|
|
|
|
|
|
start_accent_phrase_list = numpy.concatenate( |
|
[_create_one_hot(accent_phrase, 0) for accent_phrase in accent_phrases] |
|
) |
|
|
|
|
|
end_accent_phrase_list = numpy.concatenate( |
|
[_create_one_hot(accent_phrase, -1) for accent_phrase in accent_phrases] |
|
) |
|
|
|
|
|
start_accent_list = numpy.r_[0, start_accent_list, 0] |
|
end_accent_list = numpy.r_[0, end_accent_list, 0] |
|
start_accent_phrase_list = numpy.r_[0, start_accent_phrase_list, 0] |
|
end_accent_phrase_list = numpy.r_[0, end_accent_phrase_list, 0] |
|
|
|
|
|
start_accent_list = numpy.array(start_accent_list, dtype=numpy.int64) |
|
end_accent_list = numpy.array(end_accent_list, dtype=numpy.int64) |
|
start_accent_phrase_list = numpy.array( |
|
start_accent_phrase_list, dtype=numpy.int64 |
|
) |
|
end_accent_phrase_list = numpy.array(end_accent_phrase_list, dtype=numpy.int64) |
|
|
|
|
|
( |
|
consonant_phoneme_data_list, |
|
vowel_phoneme_data_list, |
|
_, |
|
) = split_mora(phoneme_data_list) |
|
|
|
|
|
|
|
vowel_phoneme_list = numpy.array( |
|
[p.phoneme_id for p in vowel_phoneme_data_list], dtype=numpy.int64 |
|
) |
|
consonant_phoneme_list = numpy.array( |
|
[ |
|
p.phoneme_id if p is not None else -1 |
|
for p in consonant_phoneme_data_list |
|
], |
|
dtype=numpy.int64, |
|
) |
|
|
|
|
|
with self.mutex: |
|
f0_list = self.core.yukarin_sa_forward( |
|
length=vowel_phoneme_list.shape[0], |
|
vowel_phoneme_list=vowel_phoneme_list[numpy.newaxis], |
|
consonant_phoneme_list=consonant_phoneme_list[numpy.newaxis], |
|
start_accent_list=start_accent_list[numpy.newaxis], |
|
end_accent_list=end_accent_list[numpy.newaxis], |
|
start_accent_phrase_list=start_accent_phrase_list[numpy.newaxis], |
|
end_accent_phrase_list=end_accent_phrase_list[numpy.newaxis], |
|
speaker_id=numpy.array(speaker_id, dtype=numpy.int64).reshape(-1), |
|
)[0] |
|
|
|
|
|
for i, p in enumerate(vowel_phoneme_data_list): |
|
if p.phoneme in unvoiced_mora_phoneme_list: |
|
f0_list[i] = 0 |
|
|
|
|
|
|
|
for i, mora in enumerate(flatten_moras): |
|
mora.pitch = f0_list[i + 1] |
|
|
|
return accent_phrases |
|
|
|
def _synthesis_impl(self, query: AudioQuery, speaker_id: int): |
|
""" |
|
音声合成クエリから音声合成に必要な情報を構成し、実際に音声合成を行う |
|
Parameters |
|
---------- |
|
query : AudioQuery |
|
音声合成クエリ |
|
speaker_id : int |
|
話者ID |
|
Returns |
|
------- |
|
wave : numpy.ndarray |
|
音声合成結果 |
|
""" |
|
|
|
self.initialize_speaker_synthesis(speaker_id, skip_reinit=True) |
|
|
|
|
|
flatten_moras, phoneme_data_list = pre_process(query.accent_phrases) |
|
|
|
|
|
phoneme_list_s = numpy.array( |
|
[p.phoneme_id for p in phoneme_data_list], dtype=numpy.int64 |
|
) |
|
|
|
|
|
|
|
phoneme_length_list = ( |
|
[query.prePhonemeLength] |
|
+ [ |
|
length |
|
for mora in flatten_moras |
|
for length in ( |
|
[mora.consonant_length] if mora.consonant is not None else [] |
|
) |
|
+ [mora.vowel_length] |
|
] |
|
+ [query.postPhonemeLength] |
|
) |
|
|
|
phoneme_length = numpy.array(phoneme_length_list, dtype=numpy.float32) |
|
|
|
|
|
phoneme_length /= query.speedScale |
|
|
|
|
|
|
|
f0_list = [0] + [mora.pitch for mora in flatten_moras] + [0] |
|
f0 = numpy.array(f0_list, dtype=numpy.float32) |
|
|
|
f0 *= 2**query.pitchScale |
|
|
|
|
|
voiced = f0 > 0 |
|
|
|
mean_f0 = f0[voiced].mean() |
|
|
|
|
|
if not numpy.isnan(mean_f0): |
|
f0[voiced] = (f0[voiced] - mean_f0) * query.intonationScale + mean_f0 |
|
|
|
|
|
_, _, vowel_indexes_data = split_mora(phoneme_data_list) |
|
vowel_indexes = numpy.array(vowel_indexes_data) |
|
|
|
|
|
|
|
rate = 24000 / 256 |
|
phoneme_bin_num = numpy.round(phoneme_length * rate).astype(numpy.int32) |
|
|
|
|
|
phoneme = numpy.repeat(phoneme_list_s, phoneme_bin_num) |
|
|
|
f0 = numpy.repeat( |
|
f0, |
|
[a.sum() for a in numpy.split(phoneme_bin_num, vowel_indexes[:-1] + 1)], |
|
) |
|
|
|
|
|
array = numpy.zeros((len(phoneme), OjtPhoneme.num_phoneme), dtype=numpy.float32) |
|
|
|
array[numpy.arange(len(phoneme)), phoneme] = 1 |
|
phoneme = array |
|
|
|
|
|
with self.mutex: |
|
wave = self.core.decode_forward( |
|
length=phoneme.shape[0], |
|
phoneme_size=phoneme.shape[1], |
|
f0=f0[:, numpy.newaxis], |
|
phoneme=phoneme, |
|
speaker_id=numpy.array(speaker_id, dtype=numpy.int64).reshape(-1), |
|
) |
|
|
|
|
|
wave *= query.volumeScale |
|
|
|
|
|
if query.outputSamplingRate != self.default_sampling_rate: |
|
wave = resample( |
|
wave, |
|
query.outputSamplingRate * len(wave) // self.default_sampling_rate, |
|
) |
|
|
|
|
|
|
|
if query.outputStereo: |
|
wave = numpy.array([wave, wave]).T |
|
|
|
return wave |
|
|