project_charles / speech_to_text_vosk.py
sohojoe's picture
basic POC
162d5c8
raw
history blame
2.89 kB
import json
import os
import asyncio
from vosk import SetLogLevel, Model, KaldiRecognizer
from multiprocessing import Process, Queue
from queue import Empty
SetLogLevel(-1) # mutes vosk verbosity
class SpeechToTextVosk:
def __init__(self, model='small', audio_bit_rate=16000) -> None:
self.model = model
self.audio_bit_rate = audio_bit_rate
# Create a Queue for inter-process communication
self.queue = Queue()
self.result_queue = Queue()
# Create and start a new Process with the worker function
self.process = Process(target=self.worker)
self.process.start()
def worker(self):
# load vosk model
# get path of current file
current_file_path = os.path.abspath(__file__)
current_directory = os.path.dirname(current_file_path)
_path = os.path.join(current_directory, 'models', 'vosk', self.model)
model_voice = Model(_path)
vosk = KaldiRecognizer(model_voice, self.audio_bit_rate)
while True:
try:
# Get the next item from the queue. Blocks for 1s if necessary.
data = self.queue.get(timeout=1)
# Stop the worker if the sentinel None is received
if data is None:
break
text, speaker_finished = self._process_speech(vosk, data)
# put the result into result_queue
self.result_queue.put((text, speaker_finished))
except Empty:
pass
def add_speech_bytes(self, data: bytearray):
self.queue.put(data)
def _process_speech(self, vosk: KaldiRecognizer, data: bytearray) -> tuple[str, bool]:
text = ''
speaker_finished = False
if vosk.AcceptWaveform(data):
result = vosk.Result()
result_json = json.loads(result)
text = result_json['text']
speaker_finished = True
else:
result = vosk.PartialResult()
result_json = json.loads(result)
text = result_json['partial']
return text, speaker_finished
def get_text(self):
text = ''
speaker_finished = False
while not self.result_queue.empty():
result, speaker_finished = self.result_queue.get()
text += result
if speaker_finished:
break
return (text, speaker_finished)
def get_audio_bit_rate(self):
return self.audio_bit_rate
def shutdown(self):
# Send sentinel value to stop the worker
self.queue.put(None)
# Wait for the worker process to finish
self.process.join()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.shutdown()
def __del__(self):
self.shutdown()