sohojoe commited on
Commit
ae52b65
1 Parent(s): 6f108ca

add flush_output_queue

Browse files
ffmpeg_converter_actor.py CHANGED
@@ -5,8 +5,8 @@ from ray.util.queue import Queue
5
 
6
  @ray.remote
7
  class FFMpegConverterActor:
8
- def __init__(self, queue: Queue, buffer_size: int = 1920, output_format: str='s16le'):
9
- self.queue = queue
10
  self.buffer_size = buffer_size
11
  self.output_format = output_format
12
 
@@ -19,7 +19,7 @@ class FFMpegConverterActor:
19
  while True:
20
  chunk = await self.output_pipe.readexactly(self.buffer_size)
21
  # print(f"FFMpegConverterActor: read {len(chunk)} bytes")
22
- await self.queue.put_async(chunk)
23
 
24
  async def start_process(self):
25
  cmd = [
@@ -57,6 +57,10 @@ class FFMpegConverterActor:
57
  # def has_processed_all_data(self):
58
  # return self.process.poll() is not None
59
 
 
 
 
 
60
  def close(self):
61
  self.input_pipe.close()
62
  self.output_pipe.close()
 
5
 
6
  @ray.remote
7
  class FFMpegConverterActor:
8
+ def __init__(self, output_queue: Queue, buffer_size: int = 1920, output_format: str='s16le'):
9
+ self.output_queue = output_queue
10
  self.buffer_size = buffer_size
11
  self.output_format = output_format
12
 
 
19
  while True:
20
  chunk = await self.output_pipe.readexactly(self.buffer_size)
21
  # print(f"FFMpegConverterActor: read {len(chunk)} bytes")
22
+ await self.output_queue.put_async(chunk)
23
 
24
  async def start_process(self):
25
  cmd = [
 
57
  # def has_processed_all_data(self):
58
  # return self.process.poll() is not None
59
 
60
+ def flush_output_queue(self):
61
+ while not self.output_queue.empty():
62
+ self.output_queue.get()
63
+
64
  def close(self):
65
  self.input_pipe.close()
66
  self.output_pipe.close()
respond_to_prompt_actor.py CHANGED
@@ -103,6 +103,7 @@ class RespondToPromptActor:
103
  self.prompt_queue = Queue(maxsize=100)
104
  self.llm_sentence_queue = Queue(maxsize=100)
105
  self.speech_chunk_queue = Queue(maxsize=100)
 
106
 
107
  self.prompt_to_llm = PromptToLLMActor.remote(self.prompt_queue, self.llm_sentence_queue, voice_id)
108
  self.llm_sentence_to_speech = LLMSentanceToSpeechActor.remote(self.llm_sentence_queue, self.speech_chunk_queue, voice_id)
@@ -119,10 +120,12 @@ class RespondToPromptActor:
119
  prompt_to_llm_future = self.prompt_to_llm.cancel.remote()
120
  llm_sentence_to_speech_future = self.llm_sentence_to_speech.cancel.remote()
121
  speech_to_speaker_future = self.speech_output.cancel.remote()
 
122
  ray.get([
123
  prompt_to_llm_future,
124
  llm_sentence_to_speech_future,
125
  speech_to_speaker_future,
 
126
  ])
127
  self.prompt_queue.put(prompt)
128
  print("Enqueued prompt")
 
103
  self.prompt_queue = Queue(maxsize=100)
104
  self.llm_sentence_queue = Queue(maxsize=100)
105
  self.speech_chunk_queue = Queue(maxsize=100)
106
+ self.ffmepg_converter_actor = ffmpeg_converter_actor
107
 
108
  self.prompt_to_llm = PromptToLLMActor.remote(self.prompt_queue, self.llm_sentence_queue, voice_id)
109
  self.llm_sentence_to_speech = LLMSentanceToSpeechActor.remote(self.llm_sentence_queue, self.speech_chunk_queue, voice_id)
 
120
  prompt_to_llm_future = self.prompt_to_llm.cancel.remote()
121
  llm_sentence_to_speech_future = self.llm_sentence_to_speech.cancel.remote()
122
  speech_to_speaker_future = self.speech_output.cancel.remote()
123
+ ffmpeg_converter_future = self.ffmepg_converter_actor.flush_output_queue.remote()
124
  ray.get([
125
  prompt_to_llm_future,
126
  llm_sentence_to_speech_future,
127
  speech_to_speaker_future,
128
+ ffmpeg_converter_future,
129
  ])
130
  self.prompt_queue.put(prompt)
131
  print("Enqueued prompt")