File size: 6,597 Bytes
0d00307
 
 
 
 
 
 
 
 
 
 
 
c77f359
0d00307
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9a5a5b3
0d00307
 
 
 
 
 
 
9a5a5b3
0d00307
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9a5a5b3
13a1a62
 
 
c77f359
 
 
96c4257
0d00307
9a5a5b3
96c4257
9a5a5b3
 
8745348
 
9a5a5b3
 
 
 
0d00307
9a5a5b3
 
 
0d00307
9a5a5b3
0d00307
 
 
 
 
 
 
 
9a5a5b3
 
 
 
0d00307
9a5a5b3
 
0d00307
 
9a5a5b3
 
 
 
0d00307
8745348
 
 
 
 
 
96c4257
 
8745348
0d00307
 
9a5a5b3
0d00307
 
 
 
 
9a5a5b3
 
 
 
 
 
 
0d00307
9a5a5b3
0d00307
 
 
9a5a5b3
0d00307
9a5a5b3
0d00307
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import threading
from queue import Queue
import sounddevice as sd
import numpy as np
import requests
import base64
import time
from dataclasses import dataclass, field

@dataclass
class AudioStreamingClientArguments:
    sample_rate: int = field(default=16000, metadata={"help": "Audio sample rate in Hz. Default is 16000."})
    chunk_size: int = field(default=2048, metadata={"help": "The size of audio chunks in samples. Default is 1024."})
    api_url: str = field(default="https://yxfmjcvuzgi123sw.us-east-1.aws.endpoints.huggingface.cloud", metadata={"help": "The URL of the API endpoint."})
    auth_token: str = field(default="your_auth_token", metadata={"help": "Authentication token for the API."})

class AudioStreamingClient:
    def __init__(self, args: AudioStreamingClientArguments):
        self.args = args
        self.stop_event = threading.Event()
        self.send_queue = Queue()
        self.recv_queue = Queue()
        self.session_id = None
        self.headers = {
            "Accept": "application/json",
            "Authorization": f"Bearer {self.args.auth_token}",
            "Content-Type": "application/json"
        }
        self.session_state = "idle"  # Possible states: idle, sending, processing, waiting

    def start(self):
        print("Starting audio streaming...")
        
        send_thread = threading.Thread(target=self.send_audio)
        play_thread = threading.Thread(target=self.play_audio)

        with sd.InputStream(samplerate=self.args.sample_rate, channels=1, dtype='int16', callback=self.audio_callback, blocksize=self.args.chunk_size):
            send_thread.start()
            play_thread.start()

            try:
                input("Press Enter to stop streaming...")
            except KeyboardInterrupt:
                print("\nStreaming interrupted by user.")
            finally:
                self.stop_event.set()
                send_thread.join()
                play_thread.join()
                print("Audio streaming stopped.")

    def audio_callback(self, indata, frames, time, status):
        self.send_queue.put(indata.copy())

    def send_audio(self):
        buffer = b''
        while not self.stop_event.is_set():
            if self.session_state != "processing" and not self.send_queue.empty():
                while not self.send_queue.empty():  # Clear the send_queue
                    chunk = self.send_queue.get().tobytes()
                    buffer += chunk               
                if len(buffer) >= self.args.chunk_size * 2:  # * 2 because of int16
                    self.send_request(buffer)
                    buffer = b''
                    time.sleep(16*self.args.chunk_size/self.args.sample_rate)
            else:
                self.send_request()
                time.sleep(16*self.args.chunk_size/self.args.sample_rate)

    def send_request(self, audio_data=None):
        payload = {"input_type": "speech",
                   "inputs": ""}

        if audio_data is not None:
            print("Sending audio data")
            payload["inputs"] = base64.b64encode(audio_data).decode('utf-8')

        if self.session_id:
            payload["session_id"] = self.session_id
            payload["request_type"] = "continue"
        else:
            payload["request_type"] = "start"

        try:
            response = requests.post(self.args.api_url, headers=self.headers, json=payload)
            response_data = response.json()

            if "session_id" in response_data:
                self.session_id = response_data["session_id"]

            if "status" in response_data and response_data["status"] == "processing":
                print("Processing audio data")
                self.session_state = "processing"

            if "output" in response_data and response_data["output"]:
                print("Received audio data")
                self.session_state = "processing"  # Set state to processing when we start receiving audio
                audio_bytes = base64.b64decode(response_data["output"])
                audio_np = np.frombuffer(audio_bytes, dtype=np.int16)
                # Split the audio into smaller chunks for playback
                for i in range(0, len(audio_np), self.args.chunk_size):
                    chunk = audio_np[i:i+self.args.chunk_size]
                    self.recv_queue.put(chunk)

            if "status" in response_data and response_data["status"] == "completed":
                print("Completed audio processing")
                self.session_state = None
                self.session_id = None
                while not self.recv_queue.empty():
                    time.sleep(0.01)  # wait for the queue to empty
                with self.send_queue.mutex:
                    self.send_queue.queue.clear()  # Clear the queue

        except Exception as e:
            print(f"Error sending request: {e}")
            self.session_state = "idle"  # Reset state to idle in case of error

    def play_audio(self):
        def audio_callback(outdata, frames, time, status):
            if not self.recv_queue.empty():
                chunk = self.recv_queue.get()
                
                # Ensure chunk is int16 and clip to valid range
                chunk_int16 = np.clip(chunk, -32768, 32767).astype(np.int16)
                
                if len(chunk_int16) < len(outdata):
                    outdata[:len(chunk_int16), 0] = chunk_int16
                    outdata[len(chunk_int16):] = 0
                else:
                    outdata[:, 0] = chunk_int16[:len(outdata)]
            else:
                outdata[:] = 0

        with sd.OutputStream(samplerate=self.args.sample_rate, channels=1, dtype='int16', callback=audio_callback, blocksize=self.args.chunk_size):
            while not self.stop_event.is_set():
                time.sleep(0.01)

if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser(description="Audio Streaming Client")
    parser.add_argument("--sample_rate", type=int, default=16000, help="Audio sample rate in Hz. Default is 16000.")
    parser.add_argument("--chunk_size", type=int, default=1024, help="The size of audio chunks in samples. Default is 1024.")
    parser.add_argument("--api_url", type=str, required=True, help="The URL of the API endpoint.")
    parser.add_argument("--auth_token", type=str, required=True, help="Authentication token for the API.")

    args = parser.parse_args()
    client_args = AudioStreamingClientArguments(**vars(args))
    client = AudioStreamingClient(client_args)
    client.start()