File size: 11,628 Bytes
b2b3dca
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
'''
启用tecent翻译 可以在YAML 中填入下面的参数
ng_voice_translate_on : True
tencentcloud_common_region : "ap-shanghai"
tencentcloud_common_secretid : "xxxxx"
tencentcloud_common_secretkey : "xxxxx"
ng_voice_tar : 'ja'
'''

from .Extension import Extension
import urllib
import requests
import uuid
import os
import base64
from aiohttp import request
from binascii import b2a_base64
from hashlib import sha1
from urllib.parse import urlencode
from hmac import new
import random
from sys import maxsize, version_info
from time import time
from nonebot import get_driver
from aiohttp import request
from loguru import logger
from nonebot.exception import ActionFailed
import asyncio


try:
    from ujson import loads as loadJsonS
except:
    from json import loads as loadJsonS


# 扩展的配置信息,用于ai理解扩展的功能 *必填*
ext_config: dict = {
    "name": "voice",   # 扩展名称,用于标识扩展
    "arguments": {
        'sentence': 'str',  # 需要转换的文本
        'emotion': 'str',   # 情感
    },
    # 扩展的描述信息,用于提示ai理解扩展的功能 *必填* 尽量简短 使用英文更节省token
    "description": "Send a voice sentence. The emotional parameter must be one of \"normal,sweet,tsundere,sexy,whisper,murmur\" (usage in response: /#voice&hello&sweet#/) ",
    # 参考词,用于上下文参考使用,为空则每次都会被参考(消耗token)
    "refer_word": [],
    # 作者信息
    "author": "恋如雨止",
    # 版本
    "version": "0.0.2",
    # 扩展简介
    "intro": "发送语音消息(支持翻译)",
}

# 情感参数表
emotion_rate_dict = {
    'normal': {
        'custom_attributes': {
            "speed_scale": 1,
            "volume_scale": 1,
            "intonation_scale": 1,
            "pre_phoneme_length": 0.1,
            "post_phoneme_length": 0.1,
        },
        'name': 'ノーマル',
    },
    'sweet': {
        'custom_attributes': {
            "speed_scale": 1.1,
            "volume_scale": 0.9,
            "intonation_scale": 1.3,
            "pre_phoneme_length": 0.2,
            "post_phoneme_length": 0.2,
        },
        'name': 'あまあま',
    },
    'tsundere': {
        'custom_attributes': {
            "speed_scale": 1.0,
            "volume_scale": 1.1,
            "intonation_scale": 1.2,
            "pre_phoneme_length": 0.3,
            "post_phoneme_length": 0.3,
        },
        'name': 'ツンツン',
    },
    'sexy': {
        'custom_attributes': {
            "speed_scale": 0.9,
            "volume_scale": 1.2,
            "intonation_scale": 1.1,
            "pre_phoneme_length": 0.4,
            "post_phoneme_length": 0.4,
        },
        'name': 'セクシー',
    },
    'whisper': {
        'custom_attributes': {
            "speed_scale": 0.8,
            "volume_scale": 1.3,
            "intonation_scale": 1.0,
            "pre_phoneme_length": 0.5,
            "post_phoneme_length": 0.5,
        },
        'name': 'ささやき',
    },
    'murmur': {
        'custom_attributes': {
            "speed_scale": 0.7,
            "volume_scale": 1.4,
            "intonation_scale": 0.9,
            "pre_phoneme_length": 0.6,
            "post_phoneme_length": 0.6,
        },
        'name': 'ヒソヒソ',
    },
}

# 情感翻译映射表
emotion_translate_jp2en = {
    'ノーマル': 'normal',
    'あまあま': 'sweet',
    'ツンツン': 'tsundere',
    'セクシー': 'sexy',
    'ささやき': 'whisper',
    'ヒソヒソ': 'murmur',
}
emotion_translate_en2jp = {f: t for t, f in emotion_translate_jp2en.items()}

class CustomExtension(Extension):
    async def call(self, arg_dict: dict, ctx_data: dict) -> dict:
        """ 当扩展被调用时执行的函数 *由扩展自行实现*

        参数:
            arg_dict: dict, 由ai解析的参数字典 {参数名: 参数值}
        """
        custom_config: dict = self.get_custom_config()  # 获取yaml中的配置信息

        ng_voice_translate_on = custom_config.get(
            'ng_voice_translate_on', False)    # 是否启用翻译
        tencentcloud_common_region = custom_config.get(
            'tencentcloud_common_region', "ap-shanghai")  # 腾讯翻译-地区
        tencentcloud_common_secretid = custom_config.get(
            'tencentcloud_common_secretid', "xxxxx")    # 腾讯翻译-密钥id
        tencentcloud_common_secretkey = custom_config.get(
            'tencentcloud_common_secretkey', "xxxxx")  # 腾讯翻译-密钥
        ng_voice_tar = custom_config.get('g_voice_tar', 'ja')   # 翻译目标语言
        is_base64 = custom_config.get('is_base64', False)   # 是否使用base64编码

        character = custom_config.get('character', 'もち子さん')  # 人物
        url = custom_config.get('api_url', '127.0.0.1:50021')

        if not url:    # 如果没有配置语音服务器url则返回错误信息
            return {'text': f"[ext_VOICEVOX] 未配置语音服务器url"}
        if not url.startswith('http'):   # 如果不是http开头则添加
            url = f'http://{url}'
        if not url.endswith('/'):   # 如果不是/结尾则添加
            url = f'{url}/'

        # 音频缓存文件夹
        voice_path = 'voice_cache/'
        if not os.path.exists(voice_path):
            os.mkdir(voice_path)

        # 获取参数
        raw_text = arg_dict.get('sentence', None)
        emotion_key = arg_dict.get('emotion', 'normal')
        # 判断情感索引是否存在 如果不存在则使用默认情感
        if emotion_key not in self.character_emotion_dict[character]:
            emotion_key = 'normal'

        """ 腾讯翻译 """
        # 腾讯翻译-签名
        config = get_driver().config

        async def getReqSign(params: dict) -> str:
            common = {
                "Action": "TextTranslate",
                "Region": f"{tencentcloud_common_region}",
                "Timestamp": int(time()),
                "Nonce": random.randint(1, maxsize),
                "SecretId": f"{tencentcloud_common_secretid}",
                "Version": "2018-03-21",
            }
            params.update(common)
            sign_str = "POSTtmt.tencentcloudapi.com/?"
            sign_str += "&".join("%s=%s" %
                                 (k, params[k]) for k in sorted(params))
            secret_key = tencentcloud_common_secretkey
            if version_info[0] > 2:
                sign_str = bytes(sign_str, "utf-8")
                secret_key = bytes(secret_key, "utf-8")
            hashed = new(secret_key, sign_str, sha1)
            signature = b2a_base64(hashed.digest())[:-1]
            if version_info[0] > 2:
                signature = signature.decode()
            return signature

        async def q_translate(message) -> str:
            _source_text = message
            _source = "auto"
            _target = ng_voice_tar
            try:
                endpoint = "https://tmt.tencentcloudapi.com"
                params = {
                    "Source": _source,
                    "SourceText": _source_text,
                    "Target": _target,
                    "ProjectId": 0,
                }
                params["Signature"] = await getReqSign(params)
                # 加上超时参数
                async with request("POST", endpoint, data=params) as resp:
                    data = loadJsonS(await asyncio.wait_for(resp.read(), timeout=30))["Response"]
                    message = data["TargetText"]
            except ActionFailed as e:
                logger.warning(
                    f"ActionFailed {e.info['retcode']} {e.info['msg'].lower()} {e.info['wording']}"
                )
            except TimeoutError as e:
                logger.warning(
                    f"TimeoutError {e}"
                )
            return message

        """ 腾讯翻译结束 """

        if ng_voice_translate_on == True:
            t_result = await q_translate(raw_text)
        else:
            t_result = raw_text
        text = t_result + '~'  # 加上一个字符,避免合成语音丢失结尾

        # 从self.character_emotion_dict中获取角色,如果emotion_key不存在则使用第一个
        speaker = self.character_emotion_dict[character][emotion_translate_en2jp[emotion_key]]['speaker'] if emotion_translate_en2jp[
            emotion_key] in self.character_emotion_dict[character] else self.character_emotion_dict[character][0]['speaker']
        # 根据emotion_key获取从emotion_rate_dict获取自定义属性
        custom_attributes = emotion_rate_dict[emotion_key]['custom_attributes']

        # 发送查询请求并保存结果
        params = {
            "text": text,
            "speaker": speaker,
        }
        params_encoded = urlencode(params)
        res = requests.post(url + "audio_query?" + params_encoded)
        query_json = res.json()

        # 更新voicevox_query属性
        query_json['speedScale'] = custom_attributes["speed_scale"]
        query_json['volumeScale'] = custom_attributes["volume_scale"]
        query_json['intonationScale'] = custom_attributes["intonation_scale"]
        query_json['prePhonemeLength'] = custom_attributes["pre_phoneme_length"]
        query_json['postPhonemeLength'] = custom_attributes["post_phoneme_length"]

        # 发送语音合成请求并保存结果
        synthesis_params = {
            "speaker": speaker
        }
        params_encoded = urlencode(synthesis_params)
        res = requests.post(f"{url}synthesis?{params_encoded}", json=query_json, timeout=120)
        audio_data = res.content

        file_name = f"{voice_path}{uuid.uuid1()}.wav"

        if is_base64:
            audio_data = base64.b64decode(audio_data)

        with open(file_name, "wb") as f:
            f.write(audio_data)

        local_url = f"file:///{os.path.abspath(file_name)}"

        if text is not None:
            return {
                'voice': local_url,             # 语音url
                'text': f"[语音] {raw_text}",    # 文本
            }
        return {}

    def __init__(self, custom_config: dict):
        super().__init__(ext_config.copy(), custom_config)

        url = custom_config.get('api_url', '127.0.0.1:50021')

        if not url:    # 如果没有配置语音服务器url则返回错误信息
            raise Exception("未配置语音服务器url")
        if not url.startswith('http'):   # 如果不是http开头则添加
            url = f'http://{url}'
        if not url.endswith('/'):   # 如果不是/结尾则添加
            url = f'{url}/'

        # 从api获取可用角色json
        for _ in range(3):
            try:
                res = requests.get(url + "speakers", timeout=10)
                break
            except requests.exceptions.RequestException as e:
                continue
        else:
            raise Exception("获取语音服务器角色列表失败")
        speaker_json = res.json()

        self.character_emotion_dict = {}

        # 遍历角色json,获取角色列表,保存到 character_emotion_dict 中
        for character in speaker_json:
            character_name = character["name"]
            styles = character["styles"]
            em_dict = {}
            for style in styles:
                em_dict[style["name"]] = {
                    "speaker": style["id"],
                    "name": style["name"],
                }
            self.character_emotion_dict[character_name] = em_dict

        print(f"[ext_VOICEVOX] 共加载了 {len(self.character_emotion_dict)} 个角色")