sohojoe commited on
Commit
7a1cd88
β€’
1 Parent(s): f4e57e4

refactor ffmpeg_converter_actor -> ffmpeg_converter

Browse files
ffmpeg_converter_actor.py β†’ ffmpeg_converter.py RENAMED
@@ -3,7 +3,7 @@ import asyncio
3
  import ray
4
  from ray.util.queue import Queue
5
 
6
- class FFMpegConverterActor:
7
  def __init__(self, output_queue: Queue, buffer_size: int = 1920, output_format: str='s16le'):
8
  self.output_queue = output_queue
9
  self.buffer_size = buffer_size
@@ -24,7 +24,7 @@ class FFMpegConverterActor:
24
  # If the pipe is broken, restart the process.
25
  await self.start_process()
26
  continue
27
- # print(f"FFMpegConverterActor: read {len(chunk)} bytes")
28
  chunk_ref = ray.put(chunk)
29
  await self.output_queue.put_async(chunk_ref)
30
 
 
3
  import ray
4
  from ray.util.queue import Queue
5
 
6
+ class FFMpegConverter:
7
  def __init__(self, output_queue: Queue, buffer_size: int = 1920, output_format: str='s16le'):
8
  self.output_queue = output_queue
9
  self.buffer_size = buffer_size
 
24
  # If the pipe is broken, restart the process.
25
  await self.start_process()
26
  continue
27
+ # print(f"FFMpegConverter: read {len(chunk)} bytes")
28
  chunk_ref = ray.put(chunk)
29
  await self.output_queue.put_async(chunk_ref)
30
 
respond_to_prompt_async.py CHANGED
@@ -7,7 +7,7 @@ from chat_service import ChatService
7
  # from local_speaker_service import LocalSpeakerService
8
  from text_to_speech_service import TextToSpeechService
9
  from environment_state_actor import EnvironmentStateActor
10
- from ffmpeg_converter_actor import FFMpegConverterActor
11
  from agent_response import AgentResponse
12
  import json
13
 
@@ -24,7 +24,7 @@ class RespondToPromptAsync:
24
  self.environment_state_actor = environment_state_actor
25
  self.sentence_queues = []
26
  self.sentence_tasks = []
27
- # self.ffmpeg_converter_actor = FFMpegConverterActor.remote(audio_output_queue)
28
 
29
  async def prompt_to_llm(self, prompt:str, messages:[str]):
30
  chat_service = ChatService()
@@ -69,9 +69,9 @@ class RespondToPromptAsync:
69
  chunk_count += 1
70
 
71
  async def speech_to_converter(self):
72
- self.ffmpeg_converter_actor = FFMpegConverterActor(self.audio_output_queue)
73
- await self.ffmpeg_converter_actor.start_process()
74
- self.ffmpeg_converter_actor_task = asyncio.create_task(self.ffmpeg_converter_actor.run())
75
 
76
  while True:
77
  for i, task in enumerate(self.sentence_tasks):
@@ -83,7 +83,7 @@ class RespondToPromptAsync:
83
  chunk_response = await queue.get()
84
  audio_chunk_ref = chunk_response['tts_raw_chunk_ref']
85
  audio_chunk = ray.get(audio_chunk_ref)
86
- await self.ffmpeg_converter_actor.push_chunk(audio_chunk)
87
  break
88
 
89
  await asyncio.sleep(0.01)
@@ -104,10 +104,10 @@ class RespondToPromptAsync:
104
  task.cancel()
105
 
106
  # Close FFmpeg converter actor
107
- if self.ffmpeg_converter_actor_task:
108
- self.ffmpeg_converter_actor_task.cancel()
109
- await self.ffmpeg_converter_actor.close()
110
- # ray.kill(self.ffmpeg_converter_actor)
111
 
112
  # Flush all queues
113
  while not self.llm_sentence_queue.empty():
 
7
  # from local_speaker_service import LocalSpeakerService
8
  from text_to_speech_service import TextToSpeechService
9
  from environment_state_actor import EnvironmentStateActor
10
+ from ffmpeg_converter import FFMpegConverter
11
  from agent_response import AgentResponse
12
  import json
13
 
 
24
  self.environment_state_actor = environment_state_actor
25
  self.sentence_queues = []
26
  self.sentence_tasks = []
27
+ # self.ffmpeg_converter = FFMpegConverter.remote(audio_output_queue)
28
 
29
  async def prompt_to_llm(self, prompt:str, messages:[str]):
30
  chat_service = ChatService()
 
69
  chunk_count += 1
70
 
71
  async def speech_to_converter(self):
72
+ self.ffmpeg_converter = FFMpegConverter(self.audio_output_queue)
73
+ await self.ffmpeg_converter.start_process()
74
+ self.ffmpeg_converter_task = asyncio.create_task(self.ffmpeg_converter.run())
75
 
76
  while True:
77
  for i, task in enumerate(self.sentence_tasks):
 
83
  chunk_response = await queue.get()
84
  audio_chunk_ref = chunk_response['tts_raw_chunk_ref']
85
  audio_chunk = ray.get(audio_chunk_ref)
86
+ await self.ffmpeg_converter.push_chunk(audio_chunk)
87
  break
88
 
89
  await asyncio.sleep(0.01)
 
104
  task.cancel()
105
 
106
  # Close FFmpeg converter actor
107
+ if self.ffmpeg_converter_task:
108
+ self.ffmpeg_converter_task.cancel()
109
+ await self.ffmpeg_converter.close()
110
+ # ray.kill(self.ffmpeg_converter)
111
 
112
  # Flush all queues
113
  while not self.llm_sentence_queue.empty():