import zipfile
import hashlib
from utils.model import model_downloader, get_model
import requests
import json
import torch
import os
import re
import urllib.request
from inference import Inference
import gradio as gr
from constants import VOICE_METHODS, BARK_VOICES, EDGE_VOICES, zips_folder, unzips_folder
from tts.conversion import tts_infer, ELEVENLABS_VOICES_RAW, ELEVENLABS_VOICES_NAMES
api_url = "https://rvc-models-api.onrender.com/uploadfile/"
if not os.path.exists(zips_folder):
os.mkdir(zips_folder)
if not os.path.exists(unzips_folder):
os.mkdir(unzips_folder)
def get_info(path):
path = os.path.join(unzips_folder, path)
try:
a = torch.load(path, map_location="cpu")
return a
except Exception as e:
print("*****************eeeeeeeeeeeeeeeeeeeerrrrrrrrrrrrrrrrrr*****")
print(e)
return {
}
def calculate_md5(file_path):
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def compress(modelname, files):
file_path = os.path.join(zips_folder, f"{modelname}.zip")
# Select the compression mode ZIP_DEFLATED for compression
# or zipfile.ZIP_STORED to just store the file
compression = zipfile.ZIP_DEFLATED
# Comprueba si el archivo ZIP ya existe
if not os.path.exists(file_path):
# Si no existe, crea el archivo ZIP
with zipfile.ZipFile(file_path, mode="w") as zf:
try:
for file in files:
if file:
# Agrega el archivo al archivo ZIP
zf.write(unzips_folder if ".index" in file else os.path.join(unzips_folder, file), compress_type=compression)
except FileNotFoundError as fnf:
print("An error occurred", fnf)
else:
# Si el archivo ZIP ya existe, agrega los archivos a un archivo ZIP existente
with zipfile.ZipFile(file_path, mode="a") as zf:
try:
for file in files:
if file:
# Agrega el archivo al archivo ZIP
zf.write(unzips_folder if ".index" in file else os.path.join(unzips_folder, file), compress_type=compression)
except FileNotFoundError as fnf:
print("An error occurred", fnf)
return file_path
# download the model only
def get_username(url):
match_username = re.search(r'models/(.*?)/', url)
if match_username:
result = match_username.group(1)
return result
def get_username_hf(url):
match_username = re.search(r'huggingface.co/(.*?)/', url)
if match_username:
result = match_username.group(1)
return result
pattern_zip = r"/([^/]+)\.zip$"
def get_file_name(url):
match = re.search(pattern_zip, url)
if match:
extracted_string = match.group(1)
return extracted_string
else:
raise Exception("没有找到AI歌手模型的zip压缩包。")
def download_online_model(url):
url = url.strip()
if url.startswith('https://download.openxlab.org.cn/models/'):
zip_path = get_username(url) + "-" + get_file_name(url)
elif url.startswith('https://huggingface.co/'):
zip_path = get_username_hf(url) + "-" + get_file_name(url)
else:
zip_path = get_file_name(url)
if not os.path.exists(zip_path + ".zip"):
print("P.S. AI歌手模型还未下载")
try:
zip_name = url.split('/')[-1]
if 'pixeldrain.com' in url:
url = f'https://pixeldrain.com/api/file/{zip_name}'
urllib.request.urlretrieve(url, zip_path + ".zip")
#return f'[√] {dir_name} Model successfully downloaded!'
except Exception as e:
raise Exception(str(e))
else:
print("P.S. AI歌手模型之前已经下载")
return zip_path + ".zip"
def infer(model, f0_method, audio_file, index_rate, vc_transform0, protect0, resample_sr1, filter_radius1):
if not model:
return "No model url specified, please specify a model url.", None
if not audio_file:
return "No audio file specified, please load an audio file.", None
inference = Inference(
model_name=model,
f0_method=f0_method,
source_audio_path=audio_file,
feature_ratio=index_rate,
transposition=vc_transform0,
protection_amnt=protect0,
resample=resample_sr1,
harvest_median_filter=filter_radius1,
output_file_name=os.path.join("./audio-outputs", os.path.basename(audio_file))
)
output = inference.run()
if 'success' in output and output['success']:
print("Inferencia realizada exitosamente...")
return output, output['file']
else:
print("Fallo en la inferencia...", output)
return output, None
def post_model(name, model_url, version, creator):
modelname = model_downloader(model_url, zips_folder, unzips_folder)
print(f"下载的模型.zip文件名:{modelname}")
if not modelname:
return "抱歉!无法从您提供的链接下载模型,请尝试其他链接或稍后再试。"
model_files = get_model(unzips_folder, modelname)
if not model_files:
return "抱歉!无法找到模型文件,请检查链接对应的模型文件并稍后重试。"
if not model_files.get('pth'):
return "抱歉!无法找到.pth模型文件,请检查链接对应的模型文件并稍后重试。"
else:
print("已找到.pth模型文件")
if not model_files.get('index'):
return "抱歉!无法找到.index模型文件,请检查链接对应的模型文件并稍后重试。"
else:
print("已找到.index模型文件")
md5_hash = calculate_md5(os.path.join(unzips_folder,model_files['pth']))
zipfile = download_online_model(model_url) #compress(modelname, list(model_files.values()))
print(f"已打包模型文件:{model_files.values()}")
a = get_info(model_files.get('pth'))
file_to_upload = open(zipfile, "rb")
info = a.get("info", "None"),
sr = a.get("sr", "None"),
f0 = a.get("f0", "None"),
print(f"正在上传的模型:{file_to_upload}")
data = {
"name": name,
"version": version,
"creator": creator,
"hash": md5_hash,
"info": info,
"sr": sr,
"f0": f0
}
print("正在上传模型文件...")
# Realizar la solicitud POST
response = requests.post(api_url, files={"file": file_to_upload}, data=data)
result = response.json()
# Comprobar la respuesta
if response.status_code == 200:
result = response.json()
return json.dumps(result, indent=4)
else:
print("加载模型文件时出错:", response.status_code)
return result
def search_model(name):
web_service_url = "https://script.google.com/macros/s/AKfycbyRaNxtcuN8CxUrcA_nHW6Sq9G2QJor8Z2-BJUGnQ2F_CB8klF4kQL--U2r2MhLFZ5J/exec"
response = requests.post(web_service_url, json={
'type': 'search_by_filename',
'name': name
})
result = []
response.raise_for_status() # Lanza una excepción en caso de error
json_response = response.json()
cont = 0
result.append("""| AI歌手名 | 模型下载链接(可直接复制到AI翻唱页面使用) | 训练步数 Epoch | 模型采样率 |
| ---------------- | -------------- |:------:|:-----------:|
""")
yield "
".join(result)
if json_response.get('ok', None):
for model in json_response['ocurrences']:
if cont < 20:
model_name = str(model.get('name', 'N/A')).strip()
model_url = model.get('url', 'N/A')
epoch = model.get('epoch', 'N/A')
sr = model.get('sr', 'N/A')
line = f"""|{model_name}|{model_url}|{epoch}|{sr}|
"""
result.append(line)
yield "".join(result)
cont += 1
def update_tts_methods_voice(select_value):
if select_value == "Edge-tts":
return gr.Dropdown.update(choices=EDGE_VOICES, visible=True, value="es-CO-GonzaloNeural-Male"), gr.Markdown.update(visible=False), gr.Textbox.update(visible=False),gr.Radio.update(visible=False)
elif select_value == "Bark-tts":
return gr.Dropdown.update(choices=BARK_VOICES, visible=True), gr.Markdown.update(visible=False), gr.Textbox.update(visible=False),gr.Radio.update(visible=False)
elif select_value == 'ElevenLabs':
return gr.Dropdown.update(choices=ELEVENLABS_VOICES_NAMES, visible=True, value="Bella"), gr.Markdown.update(visible=True), gr.Textbox.update(visible=True), gr.Radio.update(visible=False)
elif select_value == 'CoquiTTS':
return gr.Dropdown.update(visible=False), gr.Markdown.update(visible=False), gr.Textbox.update(visible=False), gr.Radio.update(visible=True)