2ndelement's picture
init
f1f433f
raw
history blame
21.5 kB
import threading
from itertools import chain
from typing import List, Optional, Tuple
import numpy
from scipy.signal import resample
from ..acoustic_feature_extractor import OjtPhoneme
from ..model import AccentPhrase, AudioQuery, Mora
from .core_wrapper import CoreWrapper, OldCoreError
from .synthesis_engine_base import SynthesisEngineBase
unvoiced_mora_phoneme_list = ["A", "I", "U", "E", "O", "cl", "pau"]
mora_phoneme_list = ["a", "i", "u", "e", "o", "N"] + unvoiced_mora_phoneme_list
# TODO: move mora utility to mora module
def to_flatten_moras(accent_phrases: List[AccentPhrase]) -> List[Mora]:
"""
accent_phrasesに含まれるMora(とpause_moraがあればそれも)を
すべて一つのリストに結合する
Parameters
----------
accent_phrases : List[AccentPhrase]
AccentPhraseのリスト
Returns
-------
moras : List[Mora]
結合されたMoraのリストを返す
"""
return list(
chain.from_iterable(
accent_phrase.moras
+ (
[accent_phrase.pause_mora]
if accent_phrase.pause_mora is not None
else []
)
for accent_phrase in accent_phrases
)
)
def to_phoneme_data_list(phoneme_str_list: List[str]):
"""
phoneme文字列のリストを、OjtPhonemeクラスのリストに変換する
Parameters
----------
phoneme_str_list : List[str]
phoneme文字列のリスト
Returns
-------
phoneme_list : List[OjtPhoneme]
変換されたOjtPhonemeクラスのリスト
"""
phoneme_data_list = [
OjtPhoneme(phoneme=p, start=i, end=i + 1)
for i, p in enumerate(phoneme_str_list)
]
phoneme_data_list = OjtPhoneme.convert(phoneme_data_list)
return phoneme_data_list
def split_mora(phoneme_list: List[OjtPhoneme]):
"""
OjtPhonemeのリストから、
母音の位置(vowel_indexes)
母音の音素列(vowel_phoneme_list)
子音の音素列(consonant_phoneme_list)
を生成し、返す
Parameters
----------
phoneme_list : List[OjtPhoneme]
phonemeクラスのリスト
Returns
-------
consonant_phoneme_list : List[OjtPhoneme]
子音の音素列
vowel_phoneme_list : List[OjtPhoneme]
母音の音素列
vowel_indexes : : List[int]
母音の位置
"""
vowel_indexes = [
i for i, p in enumerate(phoneme_list) if p.phoneme in mora_phoneme_list
]
vowel_phoneme_list = [phoneme_list[i] for i in vowel_indexes]
# postとprevのvowel_indexの差として考えられる値は1か2
# 理由としてはphoneme_listは、consonant、vowelの組み合わせか、vowel一つの連続であるから
# 1の場合はconsonant(子音)が存在しない=母音のみ(a/i/u/e/o/N/cl/pau)で構成されるモーラ(音)である
# 2の場合はconsonantが存在するモーラである
# なので、2の場合(else)でphonemeを取り出している
consonant_phoneme_list: List[Optional[OjtPhoneme]] = [None] + [
None if post - prev == 1 else phoneme_list[post - 1]
for prev, post in zip(vowel_indexes[:-1], vowel_indexes[1:])
]
return consonant_phoneme_list, vowel_phoneme_list, vowel_indexes
def pre_process(
accent_phrases: List[AccentPhrase],
) -> Tuple[List[Mora], List[OjtPhoneme]]:
"""
AccentPhraseモデルのリストを整形し、処理に必要なデータの原型を作り出す
Parameters
----------
accent_phrases : List[AccentPhrase]
AccentPhraseモデルのリスト
Returns
-------
flatten_moras : List[Mora]
AccentPhraseモデルのリスト内に含まれるすべてのMoraをリスト化したものを返す
phoneme_data_list : List[OjtPhoneme]
flatten_morasから取り出したすべてのPhonemeをOjtPhonemeに変換したものを返す
"""
flatten_moras = to_flatten_moras(accent_phrases)
phoneme_each_mora = [
([mora.consonant] if mora.consonant is not None else []) + [mora.vowel]
for mora in flatten_moras
]
phoneme_str_list = list(chain.from_iterable(phoneme_each_mora))
phoneme_str_list = ["pau"] + phoneme_str_list + ["pau"]
phoneme_data_list = to_phoneme_data_list(phoneme_str_list)
return flatten_moras, phoneme_data_list
class SynthesisEngine(SynthesisEngineBase):
def __init__(
self,
core: CoreWrapper,
):
"""
core.yukarin_s_forward: 音素列から、音素ごとの長さを求める関数
length: 音素列の長さ
phoneme_list: 音素列
speaker_id: 話者番号
return: 音素ごとの長さ
core.yukarin_sa_forward: モーラごとの音素列とアクセント情報から、モーラごとの音高を求める関数
length: モーラ列の長さ
vowel_phoneme_list: 母音の音素列
consonant_phoneme_list: 子音の音素列
start_accent_list: アクセントの開始位置
end_accent_list: アクセントの終了位置
start_accent_phrase_list: アクセント句の開始位置
end_accent_phrase_list: アクセント句の終了位置
speaker_id: 話者番号
return: モーラごとの音高
core.decode_forward: フレームごとの音素と音高から波形を求める関数
length: フレームの長さ
phoneme_size: 音素の種類数
f0: フレームごとの音高
phoneme: フレームごとの音素
speaker_id: 話者番号
return: 音声波形
speakers: coreから取得したspeakersに関するjsonデータの文字列
supported_devices:
coreから取得した対応デバイスに関するjsonデータの文字列
Noneの場合はコアが情報の取得に対応していないため、対応デバイスは不明
"""
super().__init__()
self.core = core
self._speakers = self.core.metas()
self.mutex = threading.Lock()
try:
self._supported_devices = self.core.supported_devices()
except OldCoreError:
self._supported_devices = None
self.default_sampling_rate = 24000
@property
def speakers(self) -> str:
return self._speakers
@property
def supported_devices(self) -> Optional[str]:
return self._supported_devices
def initialize_speaker_synthesis(self, speaker_id: int, skip_reinit: bool):
try:
with self.mutex:
# 以下の条件のいずれかを満たす場合, 初期化を実行する
# 1. 引数 skip_reinit が False の場合
# 2. 話者が初期化されていない場合
if (not skip_reinit) or (not self.core.is_model_loaded(speaker_id)):
self.core.load_model(speaker_id)
except OldCoreError:
pass # コアが古い場合はどうしようもないので何もしない
def is_initialized_speaker_synthesis(self, speaker_id: int) -> bool:
try:
return self.core.is_model_loaded(speaker_id)
except OldCoreError:
return True # コアが古い場合はどうしようもないのでTrueを返す
def replace_phoneme_length(
self, accent_phrases: List[AccentPhrase], speaker_id: int
) -> List[AccentPhrase]:
"""
accent_phrasesの母音・子音の長さを設定する
Parameters
----------
accent_phrases : List[AccentPhrase]
アクセント句モデルのリスト
speaker_id : int
話者ID
Returns
-------
accent_phrases : List[AccentPhrase]
母音・子音の長さが設定されたアクセント句モデルのリスト
"""
# モデルがロードされていない場合はロードする
self.initialize_speaker_synthesis(speaker_id, skip_reinit=True)
# phoneme
# AccentPhraseをすべてMoraおよびOjtPhonemeの形に分解し、処理可能な形にする
flatten_moras, phoneme_data_list = pre_process(accent_phrases)
# OjtPhonemeの形に分解されたもの(phoneme_data_list)から、vowel(母音)の位置を抜き出す
_, _, vowel_indexes_data = split_mora(phoneme_data_list)
# yukarin_s
# OjtPhonemeのリストからOjtPhonemeのPhoneme ID(OpenJTalkにおける音素のID)のリストを作る
phoneme_list_s = numpy.array(
[p.phoneme_id for p in phoneme_data_list], dtype=numpy.int64
)
# Phoneme IDのリスト(phoneme_list_s)をyukarin_s_forwardにかけ、推論器によって適切な音素の長さを割り当てる
with self.mutex:
phoneme_length = self.core.yukarin_s_forward(
length=len(phoneme_list_s),
phoneme_list=phoneme_list_s,
speaker_id=numpy.array(speaker_id, dtype=numpy.int64).reshape(-1),
)
# yukarin_s_forwarderの結果をaccent_phrasesに反映する
# flatten_moras変数に展開された値を変更することでコード量を削減しつつaccent_phrases内のデータを書き換えている
for i, mora in enumerate(flatten_moras):
mora.consonant_length = (
phoneme_length[vowel_indexes_data[i + 1] - 1]
if mora.consonant is not None
else None
)
mora.vowel_length = phoneme_length[vowel_indexes_data[i + 1]]
return accent_phrases
def replace_mora_pitch(
self, accent_phrases: List[AccentPhrase], speaker_id: int
) -> List[AccentPhrase]:
"""
accent_phrasesの音高(ピッチ)を設定する
Parameters
----------
accent_phrases : List[AccentPhrase]
アクセント句モデルのリスト
speaker_id : int
話者ID
Returns
-------
accent_phrases : List[AccentPhrase]
音高(ピッチ)が設定されたアクセント句モデルのリスト
"""
# モデルがロードされていない場合はロードする
self.initialize_speaker_synthesis(speaker_id, skip_reinit=True)
# numpy.concatenateが空リストだとエラーを返すのでチェック
if len(accent_phrases) == 0:
return []
# phoneme
# AccentPhraseをすべてMoraおよびOjtPhonemeの形に分解し、処理可能な形にする
flatten_moras, phoneme_data_list = pre_process(accent_phrases)
# accent
def _create_one_hot(accent_phrase: AccentPhrase, position: int):
"""
単位行列(numpy.eye)を応用し、accent_phrase内でone hotな配列(リスト)を作る
例えば、accent_phraseのmorasの長さが12、positionが1なら
[0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
morasの長さが同じく12、positionが-1なら
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]
のような配列を生成する
accent_phraseがpause_moraを含む場合はさらに後ろに0が足される
Parameters
----------
accent_phrase : AccentPhrase
アクセント句モデル
position : int
one hotにするindex
Returns
-------
one_hot : numpy.ndarray
one hotな配列(リスト)
"""
return numpy.r_[
numpy.eye(len(accent_phrase.moras))[position],
(0 if accent_phrase.pause_mora is not None else []),
]
# accent_phrasesから、アクセントの開始位置のリストを作る
start_accent_list = numpy.concatenate(
[
# accentはプログラミング言語におけるindexのように0始まりではなく1始まりなので、
# accentが1の場合は0番目を指定している
# accentが1ではない場合、accentはend_accent_listに用いられる
_create_one_hot(accent_phrase, 0 if accent_phrase.accent == 1 else 1)
for accent_phrase in accent_phrases
]
)
# accent_phrasesから、アクセントの終了位置のリストを作る
end_accent_list = numpy.concatenate(
[
# accentはプログラミング言語におけるindexのように0始まりではなく1始まりなので、1を引いている
_create_one_hot(accent_phrase, accent_phrase.accent - 1)
for accent_phrase in accent_phrases
]
)
# accent_phrasesから、アクセント句の開始位置のリストを作る
# これによって、yukarin_sa_forwarder内でアクセント句を区別できる
start_accent_phrase_list = numpy.concatenate(
[_create_one_hot(accent_phrase, 0) for accent_phrase in accent_phrases]
)
# accent_phrasesから、アクセント句の終了位置のリストを作る
end_accent_phrase_list = numpy.concatenate(
[_create_one_hot(accent_phrase, -1) for accent_phrase in accent_phrases]
)
# 最初と最後に0を付け加える。これによってpau(前後の無音のためのもの)を付け加えたことになる
start_accent_list = numpy.r_[0, start_accent_list, 0]
end_accent_list = numpy.r_[0, end_accent_list, 0]
start_accent_phrase_list = numpy.r_[0, start_accent_phrase_list, 0]
end_accent_phrase_list = numpy.r_[0, end_accent_phrase_list, 0]
# アクセント・アクセント句関連のデータをyukarin_sa_forwarderに渡すための最終処理、リスト内のデータをint64に変換する
start_accent_list = numpy.array(start_accent_list, dtype=numpy.int64)
end_accent_list = numpy.array(end_accent_list, dtype=numpy.int64)
start_accent_phrase_list = numpy.array(
start_accent_phrase_list, dtype=numpy.int64
)
end_accent_phrase_list = numpy.array(end_accent_phrase_list, dtype=numpy.int64)
# phonemeに関するデータを取得(変換)する
(
consonant_phoneme_data_list,
vowel_phoneme_data_list,
_,
) = split_mora(phoneme_data_list)
# yukarin_sa
# Phoneme関連のデータをyukarin_sa_forwarderに渡すための最終処理、リスト内のデータをint64に変換する
vowel_phoneme_list = numpy.array(
[p.phoneme_id for p in vowel_phoneme_data_list], dtype=numpy.int64
)
consonant_phoneme_list = numpy.array(
[
p.phoneme_id if p is not None else -1
for p in consonant_phoneme_data_list
],
dtype=numpy.int64,
)
# 今までに生成された情報をyukarin_sa_forwardにかけ、推論器によってモーラごとに適切な音高(ピッチ)を割り当てる
with self.mutex:
f0_list = self.core.yukarin_sa_forward(
length=vowel_phoneme_list.shape[0],
vowel_phoneme_list=vowel_phoneme_list[numpy.newaxis],
consonant_phoneme_list=consonant_phoneme_list[numpy.newaxis],
start_accent_list=start_accent_list[numpy.newaxis],
end_accent_list=end_accent_list[numpy.newaxis],
start_accent_phrase_list=start_accent_phrase_list[numpy.newaxis],
end_accent_phrase_list=end_accent_phrase_list[numpy.newaxis],
speaker_id=numpy.array(speaker_id, dtype=numpy.int64).reshape(-1),
)[0]
# 無声母音を含むMoraに関しては、音高(ピッチ)を0にする
for i, p in enumerate(vowel_phoneme_data_list):
if p.phoneme in unvoiced_mora_phoneme_list:
f0_list[i] = 0
# yukarin_sa_forwarderの結果をaccent_phrasesに反映する
# flatten_moras変数に展開された値を変更することでコード量を削減しつつaccent_phrases内のデータを書き換えている
for i, mora in enumerate(flatten_moras):
mora.pitch = f0_list[i + 1]
return accent_phrases
def _synthesis_impl(self, query: AudioQuery, speaker_id: int):
"""
音声合成クエリから音声合成に必要な情報を構成し、実際に音声合成を行う
Parameters
----------
query : AudioQuery
音声合成クエリ
speaker_id : int
話者ID
Returns
-------
wave : numpy.ndarray
音声合成結果
"""
# モデルがロードされていない場合はロードする
self.initialize_speaker_synthesis(speaker_id, skip_reinit=True)
# phoneme
# AccentPhraseをすべてMoraおよびOjtPhonemeの形に分解し、処理可能な形にする
flatten_moras, phoneme_data_list = pre_process(query.accent_phrases)
# OjtPhonemeのリストからOjtPhonemeのPhoneme ID(OpenJTalkにおける音素のID)のリストを作る
phoneme_list_s = numpy.array(
[p.phoneme_id for p in phoneme_data_list], dtype=numpy.int64
)
# length
# 音素の長さをリストに展開・結合する。ここには前後の無音時間も含まれる
phoneme_length_list = (
[query.prePhonemeLength]
+ [
length
for mora in flatten_moras
for length in (
[mora.consonant_length] if mora.consonant is not None else []
)
+ [mora.vowel_length]
]
+ [query.postPhonemeLength]
)
# floatにキャスト
phoneme_length = numpy.array(phoneme_length_list, dtype=numpy.float32)
# lengthにSpeed Scale(話速)を適用する
phoneme_length /= query.speedScale
# pitch
# モーラの音高(ピッチ)を展開・結合し、floatにキャストする
f0_list = [0] + [mora.pitch for mora in flatten_moras] + [0]
f0 = numpy.array(f0_list, dtype=numpy.float32)
# 音高(ピッチ)の調節を適用する(2のPitch Scale乗を掛ける)
f0 *= 2**query.pitchScale
# 有声音素(音高(ピッチ)が0より大きいもの)か否かを抽出する
voiced = f0 > 0
# 有声音素の音高(ピッチ)の平均値を求める
mean_f0 = f0[voiced].mean()
# 平均値がNaNではないとき、抑揚を適用する
# 抑揚は音高と音高の平均値の差に抑揚を掛けたもの((f0 - mean_f0) * Intonation Scale)に抑揚の平均値(mean_f0)を足したもの
if not numpy.isnan(mean_f0):
f0[voiced] = (f0[voiced] - mean_f0) * query.intonationScale + mean_f0
# OjtPhonemeの形に分解された音素リストから、vowel(母音)の位置を抜き出し、numpyのarrayにする
_, _, vowel_indexes_data = split_mora(phoneme_data_list)
vowel_indexes = numpy.array(vowel_indexes_data)
# forward decode
# 音素の長さにrateを掛け、intにキャストする
rate = 24000 / 256
phoneme_bin_num = numpy.round(phoneme_length * rate).astype(numpy.int32)
# Phoneme IDを音素の長さ分繰り返す
phoneme = numpy.repeat(phoneme_list_s, phoneme_bin_num)
# f0を母音と子音の長さの合計分繰り返す
f0 = numpy.repeat(
f0,
[a.sum() for a in numpy.split(phoneme_bin_num, vowel_indexes[:-1] + 1)],
)
# phonemeの長さとOjtPhonemeのnum_phoneme(45)分の0で初期化された2次元配列を用意する
array = numpy.zeros((len(phoneme), OjtPhoneme.num_phoneme), dtype=numpy.float32)
# 初期化された2次元配列の各行をone hotにする
array[numpy.arange(len(phoneme)), phoneme] = 1
phoneme = array
# 今まで生成された情報をdecode_forwardにかけ、推論器によって音声波形を生成する
with self.mutex:
wave = self.core.decode_forward(
length=phoneme.shape[0],
phoneme_size=phoneme.shape[1],
f0=f0[:, numpy.newaxis],
phoneme=phoneme,
speaker_id=numpy.array(speaker_id, dtype=numpy.int64).reshape(-1),
)
# volume: ゲイン適用
wave *= query.volumeScale
# 出力サンプリングレートがデフォルト(decode forwarderによるもの、24kHz)でなければ、それを適用する
if query.outputSamplingRate != self.default_sampling_rate:
wave = resample(
wave,
query.outputSamplingRate * len(wave) // self.default_sampling_rate,
)
# ステレオ変換
# 出力設定がステレオなのであれば、ステレオ化する
if query.outputStereo:
wave = numpy.array([wave, wave]).T
return wave