|
''' |
|
启用tecent翻译 可以在YAML 中填入下面的参数 |
|
ng_voice_translate_on : True |
|
tencentcloud_common_region : "ap-shanghai" |
|
tencentcloud_common_secretid : "xxxxx" |
|
tencentcloud_common_secretkey : "xxxxx" |
|
ng_voice_tar : 'ja' |
|
''' |
|
|
|
from .Extension import Extension |
|
import urllib, requests, uuid, os, base64 |
|
from aiohttp import request |
|
from binascii import b2a_base64 |
|
from hashlib import sha1 |
|
from hmac import new |
|
from random import randint |
|
from sys import maxsize, version_info |
|
from time import time |
|
from nonebot import get_driver |
|
from aiohttp import request |
|
from loguru import logger |
|
from nonebot.exception import ActionFailed |
|
import asyncio |
|
import requests |
|
import base64 |
|
import uuid |
|
import re |
|
|
|
|
|
try: |
|
from ujson import loads as loadJsonS |
|
except: |
|
from json import loads as loadJsonS |
|
|
|
|
|
ext_config:dict = { |
|
"name": "voice", |
|
"arguments": { |
|
'sentence': 'str', |
|
}, |
|
|
|
"description": "Send a voice sentence. (usage in response: /#voice&你好#/)", |
|
|
|
"refer_word": [], |
|
|
|
"author": "KroMiose", |
|
|
|
"version": "0.0.2", |
|
|
|
"intro": "发送语音消息(支持翻译)", |
|
} |
|
|
|
|
|
|
|
class CustomExtension(Extension): |
|
async def call(self, arg_dict: dict, ctx_data: dict) -> dict: |
|
""" 当拓展被调用时执行的函数 *由拓展自行实现* |
|
|
|
参数: |
|
arg_dict: dict, 由ai解析的参数字典 {参数名: 参数值} |
|
""" |
|
custom_config:dict = self.get_custom_config() |
|
|
|
ng_voice_translate_on = custom_config.get('ng_voice_translate_on', False) |
|
tencentcloud_common_region = custom_config.get('tencentcloud_common_region', "ap-shanghai") |
|
tencentcloud_common_secretid = custom_config.get('tencentcloud_common_secretid',"xxxxx") |
|
tencentcloud_common_secretkey = custom_config.get('tencentcloud_common_secretkey', "xxxxx") |
|
ng_voice_tar = custom_config.get('g_voice_tar', 'ja') |
|
is_base64 = custom_config.get('is_base64', False) |
|
|
|
voice_path = 'voice_cache/' |
|
|
|
if not os.path.exists(voice_path): |
|
os.mkdir(voice_path) |
|
|
|
|
|
raw_text = arg_dict.get('sentence', None) |
|
|
|
|
|
|
|
|
|
config = get_driver().config |
|
async def getReqSign(params: dict) -> str: |
|
common = { |
|
"Action": "TextTranslate", |
|
"Region": f"{tencentcloud_common_region}", |
|
"Timestamp": int(time()), |
|
"Nonce": randint(1, maxsize), |
|
"SecretId": f"{tencentcloud_common_secretid}", |
|
"Version": "2018-03-21", |
|
} |
|
params.update(common) |
|
sign_str = "POSTtmt.tencentcloudapi.com/?" |
|
sign_str += "&".join("%s=%s" % (k, params[k]) for k in sorted(params)) |
|
secret_key = tencentcloud_common_secretkey |
|
if version_info[0] > 2: |
|
sign_str = bytes(sign_str, "utf-8") |
|
secret_key = bytes(secret_key, "utf-8") |
|
hashed = new(secret_key, sign_str, sha1) |
|
signature = b2a_base64(hashed.digest())[:-1] |
|
if version_info[0] > 2: |
|
signature = signature.decode() |
|
return signature |
|
|
|
|
|
async def q_translate(message) -> str: |
|
_source_text = message |
|
_source = "auto" |
|
_target = ng_voice_tar |
|
try: |
|
endpoint = "https://tmt.tencentcloudapi.com" |
|
params = { |
|
"Source": _source, |
|
"SourceText": _source_text, |
|
"Target": _target, |
|
"ProjectId": 0, |
|
} |
|
params["Signature"] = await getReqSign(params) |
|
|
|
async with request("POST", endpoint, data=params) as resp: |
|
data = loadJsonS(await asyncio.wait_for(resp.read(), timeout=30))["Response"] |
|
message = data["TargetText"] |
|
except ActionFailed as e: |
|
logger.warning( |
|
f"ActionFailed {e.info['retcode']} {e.info['msg'].lower()} {e.info['wording']}" |
|
) |
|
except TimeoutError as e: |
|
logger.warning( |
|
f"TimeoutError {e}" |
|
) |
|
return message |
|
|
|
|
|
|
|
|
|
|
|
if ng_voice_translate_on == True : |
|
t_result = await q_translate(raw_text) |
|
else: |
|
t_result = raw_text |
|
text = t_result + '~' |
|
text = urllib.parse.quote(text) |
|
|
|
|
|
|
|
|
|
url = f"https://lintonxue00-vits-umamusume-voice-synthesizer.hf.space/api/tts?text={text}&speaker_id=0" |
|
|
|
|
|
|
|
|
|
def call(self, args, context): |
|
url = args['url'] |
|
text = args.get('text') |
|
is_base64 = args.get('is_base64', False) |
|
raw_text = text |
|
|
|
|
|
r = requests.get(url) |
|
|
|
|
|
if is_base64: |
|
pattern = re.compile('[^a-zA-Z0-9+/=]') |
|
content = pattern.sub('', r.content.decode()) |
|
audio_data = base64.b64decode(content + b'=' * (-len(content) % 4), validate=True) |
|
else: |
|
audio_data = r.content |
|
|
|
|
|
file_name = f"{voice_path}{uuid.uuid1()}.ogg" |
|
with open(file_name, "wb") as f: |
|
f.write(audio_data) |
|
|
|
local_url = f"file:///{os.path.abspath(file_name)}" |
|
|
|
if text is not None: |
|
return { |
|
'voice': local_url, |
|
'text': f"[语音消息] {raw_text}", |
|
} |
|
return {} |
|
|
|
def __init__(self, custom_config: dict): |
|
super().__init__(ext_config.copy(), custom_config) |
|
|