import json import os import asyncio from vosk import SetLogLevel, Model, KaldiRecognizer from multiprocessing import Process, Queue from queue import Empty SetLogLevel(-1) # mutes vosk verbosity class SpeechToTextVosk: def __init__(self, model='small', audio_bit_rate=16000) -> None: self.model = model self.audio_bit_rate = audio_bit_rate # Create a Queue for inter-process communication self.queue = Queue() self.result_queue = Queue() # Create and start a new Process with the worker function self.process = Process(target=self.worker) self.process.start() def worker(self): # load vosk model # get path of current file current_file_path = os.path.abspath(__file__) current_directory = os.path.dirname(current_file_path) _path = os.path.join(current_directory, 'models', 'vosk', self.model) model_voice = Model(_path) vosk = KaldiRecognizer(model_voice, self.audio_bit_rate) while True: try: # Get the next item from the queue. Blocks for 1s if necessary. data = self.queue.get(timeout=1) # Stop the worker if the sentinel None is received if data is None: break text, speaker_finished = self._process_speech(vosk, data) # put the result into result_queue self.result_queue.put((text, speaker_finished)) except Empty: pass def add_speech_bytes(self, data: bytearray): self.queue.put(data) def _process_speech(self, vosk: KaldiRecognizer, data: bytearray) -> tuple[str, bool]: text = '' speaker_finished = False if vosk.AcceptWaveform(data): result = vosk.Result() result_json = json.loads(result) text = result_json['text'] speaker_finished = True else: result = vosk.PartialResult() result_json = json.loads(result) text = result_json['partial'] return text, speaker_finished def get_text(self): text = '' speaker_finished = False while not self.result_queue.empty(): result, speaker_finished = self.result_queue.get() text += result if speaker_finished: break return (text, speaker_finished) def get_audio_bit_rate(self): return self.audio_bit_rate def shutdown(self): # Send sentinel value to stop the worker self.queue.put(None) # Wait for the worker process to finish self.process.join() def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self.shutdown() def __del__(self): self.shutdown()