import subprocess from threading import Thread from queue import Queue, Empty from typing import Iterator import threading import time class AudioStreamProcessor: def __init__(self): self.queue = Queue() self._is_running = threading.Event() self._is_running.set() self.thread = Thread(target=self._process_audio_streams) self.thread.start() def add_audio_stream(self, audio_stream: Iterator[bytes]): if self._is_running.is_set(): self.queue.put(audio_stream) def _process_audio_streams(self): while self._is_running.is_set() or not self.queue.empty(): try: audio_stream = self.queue.get(timeout=1) except Empty: continue self._stream(audio_stream) def _stream(self, audio_stream: Iterator[bytes]): mpv_command = ["mpv", "--no-cache", "--no-terminal", "--", "fd://0"] mpv_process = subprocess.Popen( mpv_command, stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) for chunk in audio_stream: if chunk is not None: mpv_process.stdin.write(chunk) mpv_process.stdin.flush() if mpv_process.stdin: mpv_process.stdin.close() mpv_process.wait() def close(self): self._is_running.clear() while not self.queue.empty(): # Wait until the queue is empty time.sleep(0.1) self.thread.join()