File size: 2,435 Bytes
0156247 5210503 1c732f7 4953c74 1c732f7 7274bc5 1c732f7 7274bc5 1c732f7 7274bc5 1c732f7 7274bc5 1c732f7 7274bc5 2ca4eb5 1c732f7 5210503 1c732f7 5210503 1c732f7 5210503 1c732f7 73c9e39 1c732f7 5210503 1c732f7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
import logging
from speechbrain.pretrained import EncoderClassifier
from typing import Dict, List, Any
import requests
from pydub import AudioSegment
from io import BytesIO
import tempfile
import os
def save_chunks_to_temp_files(url, chunk_length=5000): # chunk_length in milliseconds
# Download the audio file from the URL
if not url.startswith("file://"):
response = requests.get(url)
response.raise_for_status()
# Ensure the content type is audio
if "audio" not in response.headers["Content-Type"]:
raise ValueError("URL does not seem to be an audio file")
# Convert the downloaded bytes into a file-like object
audio_file = BytesIO(response.content)
# Load audio into an AudioSegment
audio_segment = AudioSegment.from_file(audio_file)
else:
audio_segment = AudioSegment.from_file(url[7:])
# Split audio into 10-second chunks
chunks = [
audio_segment[i : i + chunk_length]
for i in range(0, len(audio_segment), chunk_length)
]
if len(chunks) > 1:
chunks[-1] = audio_segment[-chunk_length:]
# Save each chunk to a temporary file and store file paths in a list
temp_files = []
for idx, chunk in enumerate(chunks):
with tempfile.NamedTemporaryFile(
delete=False, suffix=f"_chunk{idx}.mp3"
) as temp_file:
chunk.export(temp_file.name, format="mp3")
temp_files.append(temp_file.name)
return temp_files
class EndpointHandler:
def __init__(self, path=""):
self.model = EncoderClassifier.from_hparams(
"speechbrain/lang-id-voxlingua107-ecapa"
)
print("model loaded")
logging.info("model loaded")
def __call__(self, data: Dict[str, Any]) -> List[Dict[str, Any]]:
url = data.pop("inputs", data)
print("audio_url", url)
logging.info(f"audio_url {url}")
response = []
temp_filepaths = save_chunks_to_temp_files(url)
for i, path in enumerate(temp_filepaths):
logging.info(f"processing chunk {i} / {len(temp_filepaths)}")
output = self.model.classify_file(path)
response.append(
{
"prediction": float(output[1].exp()[0]),
"language": output[3][0],
}
)
os.remove(path)
return response
|