File size: 1,721 Bytes
4385b66
 
245e7f9
4385b66
245e7f9
 
4385b66
 
 
 
245e7f9
 
730fe87
4385b66
 
 
 
245e7f9
 
4385b66
 
730fe87
245e7f9
 
 
 
 
4385b66
730fe87
4385b66
 
730fe87
 
 
 
 
 
4385b66
730fe87
4385b66
 
 
 
 
 
730fe87
 
 
 
4385b66
 
245e7f9
 
 
4385b66
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import subprocess
from threading import Thread
from queue import Queue, Empty
from typing import Iterator
import threading
import time

class AudioStreamProcessor:
    def __init__(self):
        self.queue = Queue()
        self._is_running = threading.Event()
        self._is_running.set()
        self.mpv_process = None
        self.thread = Thread(target=self._process_audio_streams)
        self.thread.start()

    def add_audio_stream(self, audio_stream: Iterator[bytes]):
        if self._is_running.is_set():
            self.queue.put(audio_stream)

    def _process_audio_streams(self):
        self._start_mpv()
        while self._is_running.is_set() or not self.queue.empty():
            try:
                audio_stream = self.queue.get(timeout=1)
            except Empty:
                continue
            self._stream(audio_stream)
        self._close_mpv()

    def _stream(self, audio_stream: Iterator[bytes]):
        for chunk in audio_stream:
            if chunk is not None:
                self.mpv_process.stdin.write(chunk)
                self.mpv_process.stdin.flush()

    def _start_mpv(self):
        mpv_command = ["mpv", "--no-cache", "--no-terminal", "--", "fd://0"]
        self.mpv_process = subprocess.Popen(
            mpv_command,
            stdin=subprocess.PIPE,
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL,
        )

    def _close_mpv(self):
        if self.mpv_process.stdin:
            self.mpv_process.stdin.close()
        self.mpv_process.wait()

    def close(self):
        self._is_running.clear()
        while not self.queue.empty():  # Wait until the queue is empty
            time.sleep(0.1)
        self.thread.join()