Spaces:
Sleeping
Sleeping
import json | |
import os | |
import asyncio | |
from vosk import SetLogLevel, Model, KaldiRecognizer | |
from multiprocessing import Process, Queue | |
from queue import Empty | |
SetLogLevel(-1) # mutes vosk verbosity | |
class SpeechToTextVosk: | |
def __init__(self, model='small', audio_bit_rate=16000) -> None: | |
self.model = model | |
self.audio_bit_rate = audio_bit_rate | |
# Create a Queue for inter-process communication | |
self.queue = Queue() | |
self.result_queue = Queue() | |
# Create and start a new Process with the worker function | |
self.process = Process(target=self.worker) | |
self.process.start() | |
def worker(self): | |
# load vosk model | |
# get path of current file | |
current_file_path = os.path.abspath(__file__) | |
current_directory = os.path.dirname(current_file_path) | |
_path = os.path.join(current_directory, 'models', 'vosk', self.model) | |
model_voice = Model(_path) | |
vosk = KaldiRecognizer(model_voice, self.audio_bit_rate) | |
while True: | |
try: | |
# Get the next item from the queue. Blocks for 1s if necessary. | |
data = self.queue.get(timeout=1) | |
# Stop the worker if the sentinel None is received | |
if data is None: | |
break | |
text, speaker_finished = self._process_speech(vosk, data) | |
# put the result into result_queue | |
self.result_queue.put((text, speaker_finished)) | |
except Empty: | |
pass | |
def add_speech_bytes(self, data: bytearray): | |
self.queue.put(data) | |
def _process_speech(self, vosk: KaldiRecognizer, data: bytearray) -> tuple[str, bool]: | |
text = '' | |
speaker_finished = False | |
if vosk.AcceptWaveform(data): | |
result = vosk.Result() | |
result_json = json.loads(result) | |
text = result_json['text'] | |
speaker_finished = True | |
else: | |
result = vosk.PartialResult() | |
result_json = json.loads(result) | |
text = result_json['partial'] | |
return text, speaker_finished | |
def get_text(self): | |
text = '' | |
speaker_finished = False | |
while not self.result_queue.empty(): | |
result, speaker_finished = self.result_queue.get() | |
text += result | |
if speaker_finished: | |
break | |
return (text, speaker_finished) | |
def get_audio_bit_rate(self): | |
return self.audio_bit_rate | |
def shutdown(self): | |
# Send sentinel value to stop the worker | |
self.queue.put(None) | |
# Wait for the worker process to finish | |
self.process.join() | |
def __enter__(self): | |
return self | |
def __exit__(self, exc_type, exc_value, traceback): | |
self.shutdown() | |
def __del__(self): | |
self.shutdown() | |