|
|
|
import shutil |
|
import csv |
|
import io |
|
import os |
|
import typing |
|
import wave |
|
import sys |
|
import audresample |
|
from mimic3_tts.__main__ import (CommandLineInterfaceState, |
|
get_args, |
|
initialize_args, |
|
initialize_tts, |
|
|
|
|
|
shutdown_tts, |
|
OutputNaming, |
|
process_line) |
|
import msinference |
|
import time |
|
import json |
|
import pandas as pd |
|
import os |
|
import numpy as np |
|
import audonnx |
|
import audb |
|
from pathlib import Path |
|
import transformers |
|
import torch |
|
import audmodel |
|
import audinterface |
|
import matplotlib.pyplot as plt |
|
import audiofile |
|
|
|
|
|
|
|
ROOT_DIR = '/data/dkounadis/mimic3-voices/' |
|
foreign_voices = [] |
|
english_voices = [] |
|
for lang in os.listdir(ROOT_DIR + 'voices'): |
|
|
|
for voice in os.listdir(ROOT_DIR + 'voices/' + lang): |
|
if 'en_' in lang: |
|
|
|
try: |
|
with open(ROOT_DIR + 'voices/' + lang + '/' + voice + '/speakers.txt', 'r') as f: |
|
for spk in f: |
|
english_voices.append(lang + '/' + voice + '#' + spk.rstrip()) |
|
|
|
except FileNotFoundError: |
|
english_voices.append(lang + '/' + voice) |
|
|
|
else: |
|
|
|
try: |
|
with open(ROOT_DIR + 'voices/' + lang + '/' + voice + '/speakers.txt', 'r') as f: |
|
for spk in f: |
|
foreign_voices.append(lang + '/' + voice + '#' + spk.rstrip()) |
|
|
|
except FileNotFoundError: |
|
foreign_voices.append(lang + '/' + voice) |
|
|
|
[print(i) for i in foreign_voices] |
|
print('\n_______________________________\n') |
|
[print(i) for i in english_voices] |
|
|
|
list_voices = [ |
|
'en_US/m-ailabs_low#mary_ann', |
|
'en_UK/apope_low', |
|
'de_DE/thorsten-emotion_low#neutral', |
|
|
|
'fr_FR/m-ailabs_low#gilles_g_le_blanc', |
|
|
|
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
LABELS = [ |
|
'arousal', 'dominance', 'valence', |
|
|
|
'Angry', |
|
'Sad', |
|
'Happy', |
|
'Surprise', |
|
'Fear', |
|
'Disgust', |
|
'Contempt', |
|
'Neutral' |
|
] |
|
|
|
|
|
config = transformers.Wav2Vec2Config() |
|
config.dev = torch.device('cuda:0') |
|
config.dev2 = torch.device('cuda:0') |
|
def _softmax(x): |
|
'''x : (batch, num_class)''' |
|
x -= x.max(1, keepdims=True) |
|
x = np.maximum(-100, x) |
|
x = np.exp(x) |
|
x /= x.sum(1, keepdims=True) |
|
return x |
|
|
|
|
|
from transformers import AutoModelForAudioClassification |
|
import types |
|
|
|
|
|
def _infer(self, x): |
|
'''x: (batch, audio-samples-16KHz)''' |
|
x = (x + self.config.mean) / self.config.std |
|
x = self.ssl_model(x, attention_mask=None).last_hidden_state |
|
|
|
h = self.pool_model.sap_linear(x).tanh() |
|
w = torch.matmul(h, self.pool_model.attention) |
|
w = w.softmax(1) |
|
mu = (x * w).sum(1) |
|
x = torch.cat( |
|
[ |
|
mu, |
|
((x * x * w).sum(1) - mu * mu).clamp(min=1e-7).sqrt() |
|
], 1) |
|
return self.ser_model(x) |
|
|
|
teacher_cat = AutoModelForAudioClassification.from_pretrained( |
|
'3loi/SER-Odyssey-Baseline-WavLM-Categorical-Attributes', |
|
trust_remote_code=True |
|
).to(config.dev2).eval() |
|
teacher_cat.forward = types.MethodType(_infer, teacher_cat) |
|
|
|
|
|
|
|
|
|
def _prenorm(x, attention_mask=None): |
|
'''mean/var''' |
|
if attention_mask is not None: |
|
N = attention_mask.sum(1, keepdim=True) |
|
x -= x.sum(1, keepdim=True) / N |
|
var = (x * x).sum(1, keepdim=True) / N |
|
|
|
else: |
|
x -= x.mean(1, keepdim=True) |
|
var = (x * x).mean(1, keepdim=True) |
|
return x / torch.sqrt(var + 1e-7) |
|
|
|
from torch import nn |
|
from transformers.models.wav2vec2.modeling_wav2vec2 import Wav2Vec2PreTrainedModel, Wav2Vec2Model |
|
class RegressionHead(nn.Module): |
|
r"""Classification head.""" |
|
|
|
def __init__(self, config): |
|
|
|
super().__init__() |
|
|
|
self.dense = nn.Linear(config.hidden_size, config.hidden_size) |
|
self.dropout = nn.Dropout(config.final_dropout) |
|
self.out_proj = nn.Linear(config.hidden_size, config.num_labels) |
|
|
|
def forward(self, features, **kwargs): |
|
|
|
x = features |
|
x = self.dropout(x) |
|
x = self.dense(x) |
|
x = torch.tanh(x) |
|
x = self.dropout(x) |
|
x = self.out_proj(x) |
|
|
|
return x |
|
|
|
|
|
class Dawn(Wav2Vec2PreTrainedModel): |
|
r"""Speech emotion classifier.""" |
|
|
|
def __init__(self, config): |
|
|
|
super().__init__(config) |
|
|
|
self.config = config |
|
self.wav2vec2 = Wav2Vec2Model(config) |
|
self.classifier = RegressionHead(config) |
|
self.init_weights() |
|
|
|
def forward( |
|
self, |
|
input_values, |
|
attention_mask=None, |
|
): |
|
x = _prenorm(input_values, attention_mask=attention_mask) |
|
outputs = self.wav2vec2(x, attention_mask=attention_mask) |
|
hidden_states = outputs[0] |
|
hidden_states = torch.mean(hidden_states, dim=1) |
|
logits = self.classifier(hidden_states) |
|
return logits |
|
|
|
|
|
dawn = Dawn.from_pretrained('audeering/wav2vec2-large-robust-12-ft-emotion-msp-dim').to(config.dev).eval() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def process_function(x, sampling_rate, idx): |
|
'''run audioset ct, adv |
|
|
|
USE onnx teachers |
|
|
|
return [synth-speech, synth-singing, 7x, 3x adv] = 11 |
|
''' |
|
|
|
|
|
|
|
|
|
|
|
logits_cat = teacher_cat(torch.from_numpy(x).to(config.dev)).cpu().detach().numpy() |
|
|
|
|
|
|
|
|
|
|
|
logits_adv = dawn(torch.from_numpy(x).to(config.dev)).cpu().detach().numpy() |
|
|
|
cat = np.concatenate([logits_adv, |
|
|
|
_softmax(logits_cat)], |
|
1) |
|
print(cat) |
|
return cat |
|
|
|
interface = audinterface.Feature( |
|
feature_names=LABELS, |
|
process_func=process_function, |
|
|
|
process_func_applies_sliding_window=False, |
|
win_dur=7.0, |
|
hop_dur=40.0, |
|
sampling_rate=16000, |
|
resample=True, |
|
verbose=True, |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
def process_lines(state: CommandLineInterfaceState, wav_path=None): |
|
'''MIMIC3 INTERNAL CALL that yields the sigh sound''' |
|
|
|
args = state.args |
|
|
|
result_idx = 0 |
|
print(f'why waitings in the for loop LIN {state.texts=}\n') |
|
for line in state.texts: |
|
|
|
line_voice: typing.Optional[str] = None |
|
line_id = "" |
|
line = line.strip() |
|
|
|
|
|
|
|
if args.output_naming == OutputNaming.ID: |
|
|
|
with io.StringIO(line) as line_io: |
|
reader = csv.reader(line_io, delimiter=args.csv_delimiter) |
|
row = next(reader) |
|
line_id, line = row[0], row[-1] |
|
if args.csv_voice: |
|
line_voice = row[1] |
|
|
|
process_line(line, state, line_id=line_id, line_voice=line_voice) |
|
result_idx += 1 |
|
time.sleep(4) |
|
|
|
if state.all_audio: |
|
|
|
|
|
if sys.stdout.isatty() and (not state.args.stdout): |
|
with io.BytesIO() as wav_io: |
|
wav_file_play: wave.Wave_write = wave.open(wav_io, "wb") |
|
with wav_file_play: |
|
wav_file_play.setframerate(state.sample_rate_hz) |
|
wav_file_play.setsampwidth(state.sample_width_bytes) |
|
wav_file_play.setnchannels(state.num_channels) |
|
wav_file_play.writeframes(state.all_audio) |
|
|
|
|
|
|
|
with open(wav_path, 'wb') as wav_file: |
|
wav_file.write(wav_io.getvalue()) |
|
wav_file.seek(0) |
|
print('\n\n5T', wav_path) |
|
else: |
|
print('\n\nDOES NOT TTSING --> ADD SOME time.sleep(4)', wav_path) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
out_dir = 'out_dir/' |
|
Path(out_dir).mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
for _id, _voice in enumerate(list_voices): |
|
_str = _voice.replace('/', '_').replace('#', '_').replace('_low', '') |
|
|
|
if 'cmu-arctic' in _str: |
|
_str = _str.replace('cmu-arctic', 'cmu_arctic') |
|
|
|
print('\n\n\n\nExecuting', _voice,'\n\n\n\n\n') |
|
|
|
if ( |
|
not os.path.isfile(out_dir + 'mimic3__' + _str + '.wav') or |
|
not os.path.isfile(out_dir + 'styletts2__' + _str + '.wav') |
|
): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if 'human' not in _voice: |
|
|
|
home_voice_dir = f'/home/audeering.local/dkounadis/.local/share/mycroft/mimic3/voices/{_voice.split("#")[0]}/' |
|
Path(home_voice_dir).mkdir(parents=True, exist_ok=True) |
|
speaker_free_voice_name = _voice.split("#")[0] if '#' in _voice else _voice |
|
|
|
|
|
if ( |
|
(not os.path.isfile(home_voice_dir + 'generator.onnx')) or |
|
(os.path.getsize(home_voice_dir + 'generator.onnx') < 500) |
|
): |
|
|
|
|
|
|
|
shutil.copyfile( |
|
f'/data/dkounadis/mimic3-voices/voices/{speaker_free_voice_name}/generator.onnx', |
|
home_voice_dir + 'generator.onnx') |
|
|
|
|
|
|
|
|
|
|
|
with open('harvard.json', 'r') as f: |
|
harvard_individual_sentences = json.load(f)['sentences'] |
|
total_audio_mimic3 = [] |
|
total_audio_styletts2 = [] |
|
ix = 0 |
|
for list_of_10 in harvard_individual_sentences[:4]: |
|
|
|
text = ' '.join(list_of_10['sentences']) |
|
|
|
print(ix, text) |
|
ix += 1 |
|
|
|
|
|
|
|
|
|
|
|
if 'human' not in _voice: |
|
rate = 1 |
|
_ssml = ( |
|
'<speak>' |
|
'<prosody volume=\'64\'>' |
|
f'<prosody rate=\'{rate}\'>' |
|
f'<voice name=\'{_voice}\'>' |
|
'<s>' |
|
f'{text[:-1] + ", .. !!!"}' |
|
'</s>' |
|
'</voice>' |
|
'</prosody>' |
|
'</prosody>' |
|
'</speak>' |
|
) |
|
with open('_tmp_ssml.txt', 'w') as f: |
|
f.write(_ssml) |
|
|
|
|
|
|
|
|
|
args = get_args() |
|
args.ssml = True |
|
args.text = [_ssml] |
|
args.interactive = False |
|
|
|
|
|
state = CommandLineInterfaceState(args=args) |
|
initialize_args(state) |
|
initialize_tts(state) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
style_path = 'tmp1.wav' |
|
process_lines(state, wav_path=style_path) |
|
shutdown_tts(state) |
|
x, fs = audiofile.read(style_path) |
|
|
|
else: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
style_path = '/cache/audb/librispeech/3.1.0/fe182b91/test-clean/3575/170457/3575-170457-0024.wav' |
|
x, fs = audiofile.read(style_path) |
|
print(x.shape,' human') |
|
total_audio_mimic3.append(x) |
|
print(f'{len(total_audio_mimic3)=}') |
|
print(fs, text, 'mimic3') |
|
|
|
|
|
|
|
if 'en_US' in _str: |
|
style_path = 'mimic3_english_4x/' + _str + '.wav' |
|
elif ('de_DE' in _str) or ('fr_FR' in _str): |
|
style_path = 'mimic3_foreign_4x/' + _str + '.wav' |
|
else: |
|
print(f'use human / generated style for {_str}') |
|
|
|
style_vec = msinference.compute_style(style_path) |
|
|
|
|
|
|
|
x = msinference.inference(text, |
|
style_vec, |
|
alpha=0.3, |
|
beta=0.7, |
|
diffusion_steps=7, |
|
embedding_scale=1) |
|
|
|
total_audio_styletts2.append(x) |
|
|
|
|
|
|
|
total_audio_styletts2 = np.concatenate(total_audio_styletts2) |
|
total_audio_styletts2 = audresample.resample(total_audio_styletts2, |
|
original_rate=24000, |
|
target_rate=16000)[0] |
|
print('RESAMPLEstyletts2', total_audio_styletts2.shape) |
|
audiofile.write(out_dir + 'styletts2__' + _str + '.wav', total_audio_styletts2, 16000) |
|
|
|
|
|
|
|
|
|
total_audio_mimic3 = np.concatenate(total_audio_mimic3) |
|
if 'human' not in _str: |
|
total_audio_mimic3 = audresample.resample(total_audio_mimic3, |
|
original_rate=24000, |
|
target_rate=16000)[0] |
|
else: |
|
print('human is already on 16kHz - MSPpodcst file') |
|
print('RESAMPLEmimic3', total_audio_mimic3.shape) |
|
audiofile.write(out_dir + 'mimic3__' + _str + '.wav', total_audio_mimic3, 16000) |
|
|
|
print(total_audio_mimic3.shape, total_audio_styletts2.shape, 'LEN OF TOTAL\n') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for engine in ['mimic3', |
|
'styletts2']: |
|
harvard_of_voice = f'{out_dir}{engine}__{_str}' |
|
if not os.path.exists(harvard_of_voice + '.pkl'): |
|
df = interface.process_file(harvard_of_voice + '.wav') |
|
df.to_pickle(harvard_of_voice + '.pkl') |
|
print('\n\n', harvard_of_voice, df,'\n___________________________\n') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print('\nVisuals\n') |
|
|
|
|
|
|
|
|
|
|
|
voice_pairs = [ |
|
[list_voices[0], list_voices[1]], |
|
[list_voices[2], list_voices[3]] |
|
] |
|
|
|
|
|
|
|
|
|
for vox1, vox2 in voice_pairs: |
|
|
|
_str1 = vox1.replace('/', '_').replace('#', '_').replace('_low', '') |
|
|
|
if 'cmu-arctic' in _str1: |
|
_str1 = _str1.replace('cmu-arctic', 'cmu_arctic') |
|
|
|
_str2 = vox2.replace('/', '_').replace('#', '_').replace('_low', '') |
|
|
|
if 'cmu-arctic' in _str2: |
|
_str2 = _str2.replace('cmu-arctic', 'cmu_arctic') |
|
|
|
|
|
vis_df = { |
|
f'mimic3_{_str1}' : pd.read_pickle(out_dir + 'mimic3__' + _str1 + '.pkl'), |
|
f'mimic3_{_str2}' : pd.read_pickle(out_dir + 'mimic3__' + _str2 + '.pkl'), |
|
f'styletts2_{_str1}' : pd.read_pickle(out_dir + 'styletts2__' + _str1 + '.pkl'), |
|
f'styletts2_{_str2}' : pd.read_pickle(out_dir + 'styletts2__' + _str2 + '.pkl'), |
|
} |
|
|
|
|
|
|
|
|
|
SHORT_LEN = min([len(v) for k, v in vis_df.items()]) |
|
for k,v in vis_df.items(): |
|
p = v[:SHORT_LEN] |
|
print('\n\n\n\n',k, p) |
|
p.reset_index(inplace= True) |
|
p.drop(columns=['file','start'], inplace=True) |
|
p.set_index('end', inplace=True) |
|
|
|
p.index = p.index.map(mapper = (lambda x: x.total_seconds())) |
|
vis_df[k] = p |
|
preds = vis_df |
|
fig, ax = plt.subplots(nrows=8, ncols=2, figsize=(24, 19.2), gridspec_kw={'hspace': 0, 'wspace': .04}) |
|
|
|
|
|
|
|
|
|
time_stamp = preds[f'mimic3_{_str1}'].index.to_numpy() |
|
for j, dim in enumerate(['arousal', |
|
'dominance', |
|
'valence']): |
|
|
|
|
|
|
|
ax[j, 0].plot(time_stamp, |
|
|
|
preds[f'styletts2_{_str1}'][dim], |
|
color=(0,104/255,139/255), |
|
label='mean_1', |
|
linewidth=2) |
|
|
|
|
|
|
|
|
|
|
|
ax[j, 0].fill_between(time_stamp, |
|
|
|
preds[f'styletts2_{_str1}'][dim], |
|
preds[f'mimic3_{_str1}'][dim], |
|
color=(.5,.5,.5), |
|
alpha=.4 |
|
) |
|
if j == 0: |
|
ax[j, 0].legend([f'StyleTTS2 using {_str1}', |
|
f'mimic3_{_str1}'], |
|
prop={'size': 10}, |
|
|
|
) |
|
ax[j, 0].set_ylabel(dim.lower(), color=(.4, .4, .4), fontsize=14) |
|
|
|
|
|
ax[j, 0].set_ylim([1e-7, .9999]) |
|
ax[j, 0].set_xticklabels(['' for _ in ax[j, 0].get_xticklabels()]) |
|
ax[j, 0].set_xlim([time_stamp[0], time_stamp[-1]]) |
|
|
|
|
|
|
|
|
|
|
|
ax[j, 1].plot(time_stamp, preds[f'styletts2_{_str2}'][dim], |
|
color=(0,104/255,139/255), |
|
label='mean_1', |
|
linewidth=2) |
|
ax[j, 1].fill_between(time_stamp, |
|
preds[f'mimic3_{_str2}'][dim], |
|
preds[f'styletts2_{_str2}'][dim], |
|
color=(.5,.5,.5), |
|
alpha=.4) |
|
if j == 0: |
|
ax[j, 1].legend([ |
|
f'StyleTTS2 using {_str2}', |
|
f'mimic3_{_str2}'], |
|
prop={'size': 10}, |
|
|
|
) |
|
|
|
|
|
ax[j, 1].set_xlabel('767 Harvard Sentences (seconds)') |
|
|
|
|
|
|
|
|
|
ax[j, 1].set_ylim([1e-7, .9999]) |
|
|
|
ax[j, 1].set_xticklabels(['' for _ in ax[j, 0].get_xticklabels()]) |
|
ax[j, 1].set_xlim([time_stamp[0], time_stamp[-1]]) |
|
|
|
|
|
|
|
|
|
ax[j, 0].grid() |
|
ax[j, 1].grid() |
|
|
|
|
|
|
|
|
|
|
|
|
|
time_stamp = preds[f'mimic3_{_str1}'].index.to_numpy() |
|
for j, dim in enumerate(['Angry', |
|
'Sad', |
|
'Happy', |
|
|
|
'Fear', |
|
'Disgust', |
|
|
|
|
|
]): |
|
j = j + 3 |
|
|
|
|
|
|
|
ax[j, 0].plot(time_stamp, preds[f'styletts2_{_str2}'][dim], |
|
color=(0,104/255,139/255), |
|
label='mean_1', |
|
linewidth=2) |
|
ax[j, 0].fill_between(time_stamp, |
|
preds[f'styletts2_{_str2}'][dim], |
|
preds[f'mimic3_{_str2}'][dim], |
|
color=(.5,.5,.5), |
|
alpha=.4) |
|
ax[j, 0].set_ylabel(dim.lower(), color=(.4, .4, .4), fontsize=14) |
|
|
|
|
|
ax[j, 0].set_ylim([1e-7, .9999]) |
|
ax[j, 0].set_xlim([time_stamp[0], time_stamp[-1]]) |
|
ax[j, 0].set_xticklabels(['' for _ in ax[j, 0].get_xticklabels()]) |
|
ax[j, 0].set_xlabel('767 Harvard Sentences (seconds)', fontsize=16, color=(.4,.4,.4)) |
|
|
|
|
|
|
|
|
|
|
|
ax[j, 1].plot(time_stamp, preds[f'styletts2_{_str2}'][dim], |
|
color=(0,104/255,139/255), |
|
label='mean_1', |
|
linewidth=2) |
|
ax[j, 1].fill_between(time_stamp, |
|
|
|
preds[f'mimic3_{_str2}'][dim], |
|
preds[f'styletts2_{_str2}'][dim], |
|
color=(.5,.5,.5), |
|
alpha=.4) |
|
|
|
|
|
|
|
|
|
|
|
ax[j, 1].set_xlabel('767 Harvard Sentences (seconds)', fontsize=16, color=(.4,.4,.4)) |
|
ax[j, 1].set_ylim([1e-7, .999]) |
|
|
|
ax[j, 1].set_xticklabels(['' for _ in ax[j, 1].get_xticklabels()]) |
|
ax[j, 1].set_xlim([time_stamp[0], time_stamp[-1]]) |
|
ax[j, 0].grid() |
|
ax[j, 1].grid() |
|
plt.savefig(f'pair_{_str1}_{_str2}.png', bbox_inches='tight') |
|
plt.close() |