project_charles / app_interface_actor.py
sohojoe's picture
fix audio issue when starting a new responce
a642a9f
raw
history blame contribute delete
No virus
5.29 kB
import time
import ray
from ray.util.queue import Queue, Empty
from ray.actor import ActorHandle
import numpy as np
import pid_helper
# Ray Queue's take ~.5 seconds to splin up;
# this class creates a pool of queues to cycle through
class QueueFactory:
def __init__(self, max_size:int):
self.queues:[Queue] = []
self.queue_size = 5
self.max_size = max_size
while len(self.queues) < self.queue_size:
self.queues.append(Queue(maxsize=max_size))
def get_queue(self)->Queue:
queue = self.queues.pop(0)
self.queues.append(Queue(maxsize=self.max_size))
return queue
@ray.remote
class AppInterfaceActor:
def __init__(self):
self.audio_input_queue = Queue(maxsize=3000) # Adjust the size as needed
self.video_input_queue = Queue(maxsize=10) # Adjust the size as needed
self.audio_output_queue_factory = QueueFactory(max_size=50)
self.audio_output_queue = self.audio_output_queue_factory.get_queue()
self.video_output_queue = Queue(maxsize=10) # Adjust the size as needed
self.debug_str = ""
self.state = "Initializing"
self.charles_app_pids = []
@staticmethod
def get_singleton():
return AppInterfaceActor.options(
name="AppInterfaceActor",
get_if_exists=True,
).remote()
# functions for UI to enqueue input, dequeue output
async def enqueue_video_input_frame(self, shared_tensor_ref):
if self.video_input_queue.full():
evicted_item = await self.video_input_queue.get_async()
del evicted_item
await self.video_input_queue.put_async(shared_tensor_ref)
async def enqueue_audio_input_frame(self, shared_buffer_ref):
if self.audio_input_queue.full():
evicted_item = await self.audio_input_queue.get_async()
del evicted_item
await self.audio_input_queue.put_async(shared_buffer_ref)
async def dequeue_audio_output_frame_async(self):
start_time = time.time()
try:
frame = await self.audio_output_queue.get_async(block=False)
except Empty:
frame = None
elapsed_time = time.time() - start_time
if elapsed_time > 0.1:
print (f"dequeue_audio_output_frame_async time: {elapsed_time}. was empty: {frame is None}. frame type: {type(frame) if frame else str(0)}")
return frame
async def dequeue_video_output_frames_async(self):
video_frames = []
if self.video_output_queue.empty():
return video_frames
while not self.video_output_queue.empty():
shared_tensor = await self.video_output_queue.get_async()
video_frames.append(shared_tensor)
return video_frames
# functions for application to dequeue input, enqueue output
def get_audio_output_queue(self)->Queue:
return self.audio_output_queue
async def cycle_output_queue(self)->Queue:
self.audio_output_queue.shutdown(grace_period_s=0.1)
self.audio_output_queue = self.audio_output_queue_factory.get_queue()
return self.audio_output_queue
async def enqueue_video_output_frame(self, shared_tensor_ref):
if self.video_output_queue.full():
evicted_item = await self.video_output_queue.get_async()
del evicted_item
await self.video_output_queue.put_async(shared_tensor_ref)
async def dequeue_audio_input_frames_async(self):
audio_frames = []
if self.audio_input_queue.empty():
return audio_frames
while not self.audio_input_queue.empty():
shared_tensor = await self.audio_input_queue.get_async()
audio_frames.append(shared_tensor)
return audio_frames
async def dequeue_video_input_frames_async(self):
video_frames = []
if self.video_input_queue.empty():
return video_frames
while not self.video_input_queue.empty():
shared_tensor = await self.video_input_queue.get_async()
video_frames.append(shared_tensor)
return video_frames
# pid helpers
async def add_charles_app_pid(self, pid:int):
self.charles_app_pids.append(pid)
async def get_charles_app_pids(self)->[int]:
# prune dead pids
running_charles_app_pids = []
for pid in self.charles_app_pids:
if pid_helper.is_pid_running(pid):
running_charles_app_pids.append(pid)
self.charles_app_pids = running_charles_app_pids
return self.charles_app_pids
async def is_charles_app_running(self)->bool:
# prune dead pids
running_charles_app_pids = []
for pid in self.charles_app_pids:
if pid_helper.is_pid_running(pid):
running_charles_app_pids.append(pid)
self.charles_app_pids = running_charles_app_pids
return len(self.charles_app_pids) > 0
# debug helpers
async def get_debug_output(self)->str:
return self.debug_str
async def set_debug_output(self, debug_str:str):
self.debug_str = debug_str
async def get_state(self)->str:
return self.state
async def set_state(self, state:str):
self.state = state