import subprocess from threading import Thread from queue import Queue from typing import Iterator class AudioStreamProcessor: def __init__(self): self.queue = Queue() self.thread = Thread(target=self._process_audio_streams) self.thread.start() def add_audio_stream(self, audio_stream: Iterator[bytes]): self.queue.put(audio_stream) def _process_audio_streams(self): while True: audio_stream = self.queue.get() if audio_stream is None: # We'll use None as a sentinel to mark the end break self._stream(audio_stream) def _stream(self, audio_stream: Iterator[bytes]): mpv_command = ["mpv", "--no-cache", "--no-terminal", "--", "fd://0"] mpv_process = subprocess.Popen( mpv_command, stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) for chunk in audio_stream: if chunk is not None: mpv_process.stdin.write(chunk) mpv_process.stdin.flush() if mpv_process.stdin: mpv_process.stdin.close() mpv_process.wait() def close(self): self.queue.put(None) # Signal the processing thread to terminate self.thread.join()