ChatTTS-Forge / modules /api /impl /google_api.py
zhzluke96
update
01e655b
raw
history blame
4.72 kB
import base64
from fastapi import HTTPException
import io
import soundfile as sf
from pydantic import BaseModel
from modules.api.Api import APIManager
from modules.utils.audio import apply_prosody_to_audio_data
from modules.normalization import text_normalize
from modules import generate_audio as generate
from modules.ssml import parse_ssml
from modules.SynthesizeSegments import (
SynthesizeSegments,
combine_audio_segments,
synthesize_segment,
)
from modules.api import utils as api_utils
class SynthesisInput(BaseModel):
text: str = ""
ssml: str = ""
class VoiceSelectionParams(BaseModel):
languageCode: str = "ZH-CN"
name: str = "female2"
style: str = ""
temperature: float = 0.3
topP: float = 0.7
topK: int = 20
seed: int = 42
class AudioConfig(BaseModel):
audioEncoding: api_utils.AudioFormat = "mp3"
speakingRate: float = 1
pitch: float = 0
volumeGainDb: float = 0
sampleRateHertz: int
batchSize: int = 1
spliterThreshold: int = 100
class GoogleTextSynthesizeRequest(BaseModel):
input: SynthesisInput
voice: VoiceSelectionParams
audioConfig: dict
class GoogleTextSynthesizeResponse(BaseModel):
audioContent: str
async def google_text_synthesize(request: GoogleTextSynthesizeRequest):
input = request.input
voice = request.voice
audioConfig = request.audioConfig
# 提取参数
language_code = voice.languageCode
voice_name = voice.name
infer_seed = voice.seed or 42
audio_format = audioConfig.get("audioEncoding", "mp3")
speaking_rate = audioConfig.get("speakingRate", 1)
pitch = audioConfig.get("pitch", 0)
volume_gain_db = audioConfig.get("volumeGainDb", 0)
batch_size = audioConfig.get("batchSize", 1)
spliter_threshold = audioConfig.get("spliterThreshold", 100)
# TODO sample_rate
sample_rate_hertz = audioConfig.get("sampleRateHertz", 24000)
params = api_utils.calc_spk_style(spk=voice.name, style=voice.style)
# TODO maybe need to change the sample rate
sample_rate = 24000
try:
if input.text:
# 处理文本合成逻辑
text = text_normalize(input.text, is_end=True)
sample_rate, audio_data = generate.generate_audio(
text,
temperature=(
voice.temperature
if voice.temperature
else params.get("temperature", 0.3)
),
top_P=voice.topP if voice.topP else params.get("top_p", 0.7),
top_K=voice.topK if voice.topK else params.get("top_k", 20),
spk=params.get("spk", -1),
infer_seed=infer_seed,
prompt1=params.get("prompt1", ""),
prompt2=params.get("prompt2", ""),
prefix=params.get("prefix", ""),
)
elif input.ssml:
# 处理SSML合成逻辑
segments = parse_ssml(input.ssml)
for seg in segments:
seg["text"] = text_normalize(seg["text"], is_end=True)
if len(segments) == 0:
raise HTTPException(
status_code=400, detail="The SSML text is empty or parsing failed."
)
synthesize = SynthesizeSegments(batch_size=batch_size)
audio_segments = synthesize.synthesize_segments(segments)
combined_audio = combine_audio_segments(audio_segments)
buffer = io.BytesIO()
combined_audio.export(buffer, format="wav")
buffer.seek(0)
audio_data = buffer.read()
else:
raise HTTPException(
status_code=400, detail="Either text or SSML input must be provided."
)
audio_data = apply_prosody_to_audio_data(
audio_data,
rate=speaking_rate,
pitch=pitch,
volume=volume_gain_db,
sr=sample_rate,
)
buffer = io.BytesIO()
sf.write(buffer, audio_data, sample_rate, format="wav")
buffer.seek(0)
if audio_format == "mp3":
buffer = api_utils.wav_to_mp3(buffer)
base64_encoded = base64.b64encode(buffer.read())
base64_string = base64_encoded.decode("utf-8")
return {
"audioContent": f"data:audio/{audio_format.lower()};base64,{base64_string}"
}
except Exception as e:
import logging
logging.exception(e)
raise HTTPException(status_code=500, detail=str(e))
def setup(app: APIManager):
app.post("/v1/google/text:synthesize", response_model=GoogleTextSynthesizeResponse)(
google_text_synthesize
)