Spaces:
Running
Running
import gradio as gr | |
import random | |
import os | |
import shutil | |
import pandas as pd | |
import sqlite3 | |
from datasets import load_dataset | |
import threading | |
import time | |
from huggingface_hub import HfApi | |
DESCR = """ | |
# TTS Arena | |
Vote on different speech synthesis models! | |
""".strip() | |
INSTR = """ | |
## Instructions | |
* Listen to two anonymous models | |
* Vote on which one is more natural and realistic | |
* If there's a tie, click Skip | |
*IMPORTANT: Do not only rank the outputs based on naturalness. Also rank based on intelligibility (can you actually tell what they're saying?) and other factors (does it sound like a human?).* | |
**When you're ready to begin, click the Start button below!** The model names will be revealed once you vote. | |
""".strip() | |
request = '' | |
if os.getenv('HF_ID'): | |
request = f""" | |
### Request Model | |
Please fill out [this form](https://huggingface.co/spaces/{os.getenv('HF_ID')}/discussions/new?title=%5BModel+Request%5D+&description=%23%23%20Model%20Request%0A%0A%2A%2AModel%20website%2Fpaper%20%28if%20applicable%29%2A%2A%3A%0A%2A%2AModel%20available%20on%2A%2A%3A%20%28coqui%7CHF%20pipeline%7Ccustom%20code%29%0A%2A%2AWhy%20do%20you%20want%20this%20model%20added%3F%2A%2A%0A%2A%2AComments%3A%2A%2A) to request a model. | |
""" | |
ABOUT = f""" | |
## About | |
TTS Arena is a project created to evaluate leading speech synthesis models. It is inspired by the [Chatbot Arena](https://chat.lmsys.org/) by LMSys. | |
{request} | |
""".strip() | |
LDESC = """ | |
## Leaderboard | |
A list of the models, based on how highly they are ranked! | |
""".strip() | |
dataset = load_dataset("ttseval/tts-arena", token=os.getenv('HF_TOKEN')) | |
theme = gr.themes.Base( | |
font=[gr.themes.GoogleFont('Libre Franklin'), gr.themes.GoogleFont('Public Sans'), 'system-ui', 'sans-serif'], | |
) | |
model_names = { | |
'styletts2': 'StyleTTS 2', | |
'tacotron': 'Tacotron', | |
'tacotronph': 'Tacotron Phoneme', | |
'tacotrondca': 'Tacotron DCA', | |
'speedyspeech': 'Speedy Speech', | |
'overflow': 'Overflow TTS', | |
'vits': 'VITS', | |
'vitsneon': 'VITS Neon', | |
'neuralhmm': 'Neural HMM', | |
'glow': 'Glow TTS', | |
'fastpitch': 'FastPitch', | |
'jenny': 'Jenny', | |
'tortoise': 'Tortoise TTS', | |
'xtts2': 'XTTSv2', | |
'xtts': 'XTTS', | |
'elevenlabs': 'ElevenLabs', | |
'speecht5': 'SpeechT5', | |
} | |
def get_random_split(existing_split=None): | |
choice = random.choice(list(dataset.keys())) | |
if existing_split and choice == existing_split: | |
return get_random_split(choice) | |
else: | |
return choice | |
def get_db(): | |
return sqlite3.connect('database.db') | |
def create_db(): | |
conn = get_db() | |
cursor = conn.cursor() | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS model ( | |
name TEXT UNIQUE, | |
upvote INTEGER, | |
downvote INTEGER | |
); | |
''') | |
def get_data(): | |
conn = get_db() | |
cursor = conn.cursor() | |
cursor.execute('SELECT name, upvote, downvote FROM model') | |
data = cursor.fetchall() | |
df = pd.DataFrame(data, columns=['name', 'upvote', 'downvote']) | |
df['name'] = df['name'].replace(model_names) | |
df['votes'] = df['upvote'] + df['downvote'] | |
# df['score'] = round((df['upvote'] / df['votes']) * 100, 2) # Percentage score | |
## ELO SCORE | |
df['score'] = 1200 | |
for i in range(len(df)): | |
for j in range(len(df)): | |
if i != j: | |
expected_a = 1 / (1 + 10 ** ((df['score'][j] - df['score'][i]) / 400)) | |
expected_b = 1 / (1 + 10 ** ((df['score'][i] - df['score'][j]) / 400)) | |
actual_a = df['upvote'][i] / df['votes'][i] | |
actual_b = df['upvote'][j] / df['votes'][j] | |
df.at[i, 'score'] += 32 * (actual_a - expected_a) | |
df.at[j, 'score'] += 32 * (actual_b - expected_b) | |
if df['votes'][j] < 3: | |
df.at[j, 'score'] -= (3 - df['votes'][j]) * 5 | |
df['score'] = round(df['score']) | |
## ELO SCORE | |
df = df.sort_values(by='score', ascending=False) | |
# df = df[['name', 'score', 'upvote', 'votes']] | |
df = df[['name', 'score', 'votes']] | |
return df | |
def get_random_splits(): | |
choice1 = get_random_split() | |
choice2 = get_random_split(choice1) | |
return (choice1, choice2) | |
def upvote_model(model): | |
conn = get_db() | |
cursor = conn.cursor() | |
cursor.execute('UPDATE model SET upvote = upvote + 1 WHERE name = ?', (model,)) | |
if cursor.rowcount == 0: | |
cursor.execute('INSERT OR REPLACE INTO model (name, upvote, downvote) VALUES (?, 1, 0)', (model,)) | |
conn.commit() | |
cursor.close() | |
def downvote_model(model): | |
conn = get_db() | |
cursor = conn.cursor() | |
cursor.execute('UPDATE model SET downvote = downvote + 1 WHERE name = ?', (model,)) | |
if cursor.rowcount == 0: | |
cursor.execute('INSERT OR REPLACE INTO model (name, upvote, downvote) VALUES (?, 0, 1)', (model,)) | |
conn.commit() | |
cursor.close() | |
def a_is_better(model1, model2): | |
if model1 and model2: | |
upvote_model(model1) | |
downvote_model(model2) | |
return reload(model1, model2) | |
def b_is_better(model1, model2): | |
if model1 and model2: | |
upvote_model(model2) | |
downvote_model(model1) | |
return reload(model1, model2) | |
def both_bad(model1, model2): | |
if model1 and model2: | |
downvote_model(model1) | |
downvote_model(model2) | |
return reload(model1, model2) | |
def both_good(model1, model2): | |
if model1 and model2: | |
upvote_model(model1) | |
upvote_model(model2) | |
return reload(model1, model2) | |
def reload(chosenmodel1=None, chosenmodel2=None): | |
# Select random splits | |
split1, split2 = get_random_splits() | |
d1, d2 = (dataset[split1], dataset[split2]) | |
choice1, choice2 = (d1.shuffle()[0]['audio'], d2.shuffle()[0]['audio']) | |
if chosenmodel1 in model_names: | |
chosenmodel1 = model_names[chosenmodel1] | |
if chosenmodel2 in model_names: | |
chosenmodel2 = model_names[chosenmodel2] | |
out = [ | |
(choice1['sampling_rate'], choice1['array']), | |
(choice2['sampling_rate'], choice2['array']), | |
split1, | |
split2 | |
] | |
if chosenmodel1: out.append(f'This model was {chosenmodel1}') | |
if chosenmodel2: out.append(f'This model was {chosenmodel2}') | |
return out | |
with gr.Blocks() as leaderboard: | |
gr.Markdown(LDESC) | |
# df = gr.Dataframe(interactive=False, value=get_data()) | |
df = gr.Dataframe(interactive=False, min_width=0, wrap=True, column_widths=[200, 50, 50]) | |
leaderboard.load(get_data, outputs=[df]) | |
with gr.Blocks() as vote: | |
gr.Markdown(INSTR) | |
with gr.Row(): | |
gr.HTML('<div align="left"><h3>Model A</h3></div>') | |
gr.HTML('<div align="right"><h3>Model B</h3></div>') | |
model1 = gr.Textbox(interactive=False, visible=False) | |
model2 = gr.Textbox(interactive=False, visible=False) | |
# with gr.Group(): | |
# with gr.Row(): | |
# prevmodel1 = gr.Textbox(interactive=False, show_label=False, container=False, value="Vote to reveal model A") | |
# prevmodel2 = gr.Textbox(interactive=False, show_label=False, container=False, value="Vote to reveal model B", text_align="right") | |
# with gr.Row(): | |
# aud1 = gr.Audio(interactive=False, show_label=False, show_download_button=False, show_share_button=False, waveform_options={'waveform_progress_color': '#3C82F6'}) | |
# aud2 = gr.Audio(interactive=False, show_label=False, show_download_button=False, show_share_button=False, waveform_options={'waveform_progress_color': '#3C82F6'}) | |
with gr.Group(): | |
with gr.Row(): | |
with gr.Column(): | |
with gr.Group(): | |
prevmodel1 = gr.Textbox(interactive=False, show_label=False, container=False, value="Vote to reveal model A") | |
aud1 = gr.Audio(interactive=False, show_label=False, show_download_button=False, show_share_button=False, waveform_options={'waveform_progress_color': '#3C82F6'}) | |
with gr.Column(): | |
with gr.Group(): | |
prevmodel2 = gr.Textbox(interactive=False, show_label=False, container=False, value="Vote to reveal model B", text_align="right") | |
aud2 = gr.Audio(interactive=False, show_label=False, show_download_button=False, show_share_button=False, waveform_options={'waveform_progress_color': '#3C82F6'}) | |
with gr.Row(): | |
abetter = gr.Button("A is Better", variant='primary') | |
bbetter = gr.Button("B is Better", variant='primary') | |
with gr.Row(): | |
bothbad = gr.Button("Both are Bad", scale=2) | |
skipbtn = gr.Button("Skip", scale=1) | |
bothgood = gr.Button("Both are Good", scale=2) | |
outputs = [aud1, aud2, model1, model2, prevmodel1, prevmodel2] | |
abetter.click(a_is_better, outputs=outputs, inputs=[model1, model2]) | |
bbetter.click(b_is_better, outputs=outputs, inputs=[model1, model2]) | |
skipbtn.click(b_is_better, outputs=outputs, inputs=[model1, model2]) | |
bothbad.click(both_bad, outputs=outputs, inputs=[model1, model2]) | |
bothgood.click(both_good, outputs=outputs, inputs=[model1, model2]) | |
vote.load(reload, outputs=[aud1, aud2, model1, model2]) | |
with gr.Blocks() as about: | |
gr.Markdown(ABOUT) | |
pass | |
with gr.Blocks(theme=theme, css="footer {visibility: hidden}", title="TTS Leaderboard") as demo: | |
gr.Markdown(DESCR) | |
gr.TabbedInterface([vote, leaderboard, about], ['Vote', 'Leaderboard', 'About']) | |
def restart_space(): | |
api = HfApi( | |
token=os.getenv('HF_TOKEN') | |
) | |
time.sleep(60 * 60) # Every hour | |
print("Syncing DB before restarting space") | |
api.upload_file( | |
path_or_fileobj='database.db', | |
path_in_repo='database.db', | |
repo_id=os.getenv('DATASET_ID'), | |
repo_type='dataset' | |
) | |
print("Restarting space") | |
api.restart_space(repo_id=os.getenv('HF_ID')) | |
def sync_db(): | |
api = HfApi( | |
token=os.getenv('HF_TOKEN') | |
) | |
while True: | |
time.sleep(60 * 5) | |
print("Uploading DB") | |
api.upload_file( | |
path_or_fileobj='database.db', | |
path_in_repo='database.db', | |
repo_id=os.getenv('DATASET_ID'), | |
repo_type='dataset' | |
) | |
if os.getenv('HF_ID'): | |
restart_thread = threading.Thread(target=restart_space) | |
restart_thread.daemon = True | |
restart_thread.start() | |
if os.getenv('DATASET_ID'): | |
# Fetch DB | |
api = HfApi( | |
token=os.getenv('HF_TOKEN') | |
) | |
print("Downloading DB...") | |
try: | |
path = api.hf_hub_download( | |
repo_id=os.getenv('DATASET_ID'), | |
repo_type='dataset', | |
filename='database.db', | |
cache_dir='./' | |
) | |
shutil.copyfile(path, 'database.db') | |
print("Downloaded DB") | |
except: | |
pass | |
# Update DB | |
db_thread = threading.Thread(target=sync_db) | |
db_thread.daemon = True | |
db_thread.start() | |
create_db() | |
demo.queue(api_open=False).launch(show_api=False) |