Spaces:
Sleeping
Sleeping
import subprocess | |
from threading import Thread | |
from queue import Queue, Empty | |
from typing import Iterator | |
import threading | |
import time | |
class AudioStreamProcessor: | |
def __init__(self): | |
self.queue = Queue() | |
self._is_running = threading.Event() | |
self._is_running.set() | |
self.thread = Thread(target=self._process_audio_streams) | |
self.thread.start() | |
def add_audio_stream(self, audio_stream: Iterator[bytes]): | |
if self._is_running.is_set(): | |
self.queue.put(audio_stream) | |
def _process_audio_streams(self): | |
while self._is_running.is_set() or not self.queue.empty(): | |
try: | |
audio_stream = self.queue.get(timeout=1) | |
except Empty: | |
continue | |
self._stream(audio_stream) | |
def _stream(self, audio_stream: Iterator[bytes]): | |
mpv_command = ["mpv", "--no-cache", "--no-terminal", "--", "fd://0"] | |
mpv_process = subprocess.Popen( | |
mpv_command, | |
stdin=subprocess.PIPE, | |
stdout=subprocess.DEVNULL, | |
stderr=subprocess.DEVNULL, | |
) | |
for chunk in audio_stream: | |
if chunk is not None: | |
mpv_process.stdin.write(chunk) | |
mpv_process.stdin.flush() | |
if mpv_process.stdin: | |
mpv_process.stdin.close() | |
mpv_process.wait() | |
def close(self): | |
self._is_running.clear() | |
while not self.queue.empty(): # Wait until the queue is empty | |
time.sleep(0.1) | |
self.thread.join() | |