sohojoe commited on
Commit
149eeaf
1 Parent(s): 87dcd10

refactor: use more of a MDP style structure

Browse files
agent_state_actor.py ADDED
@@ -0,0 +1,34 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from datetime import datetime
2
+ import ray
3
+ from copy import deepcopy
4
+
5
+ class AgentState:
6
+ def __init__(self):
7
+ self.timestamp = datetime.utcnow()
8
+ self.input_stt_preview = ''
9
+ self.prompts = []
10
+ self.video_input = None
11
+
12
+ @ray.remote
13
+ class AgentStateActor:
14
+ def __init__(self):
15
+ self.begin_step()
16
+
17
+ def begin_step(self):
18
+ self.state = AgentState()
19
+ # self.state = deepcopy(self.state)
20
+ # self.state.timestamp = datetime.utcnow()
21
+
22
+ def set_input_stt_preview(self, input_stt_preview):
23
+ self.state.input_stt_preview = input_stt_preview
24
+
25
+ def add_input_stt_prompt(self, prompt):
26
+ self.state.prompts.append(prompt)
27
+
28
+ def add_video_input(self, video_input):
29
+ self.state.video_input = video_input
30
+
31
+ def get_state(self):
32
+ return self.state
33
+
34
+
app.py CHANGED
@@ -53,7 +53,9 @@ async def main():
53
 
54
  with col1:
55
  listening = st.checkbox("Listen", value=True)
56
- system_one_audio_history_output = st.empty()
 
 
57
 
58
  # Initialize resources if not already done
59
  system_one_audio_status.write("Initializing streaming")
@@ -98,7 +100,7 @@ async def main():
98
  try:
99
  while True:
100
  if "streamlit_av_queue" in st.session_state:
101
- st.session_state.streamlit_av_queue.set_listening(listening)
102
  if not webrtc_ctx.state.playing:
103
  system_one_audio_status.write("Stopped.")
104
  await asyncio.sleep(0.1)
@@ -112,8 +114,10 @@ async def main():
112
  pass
113
  if charles_actor is not None:
114
  try:
115
- audio_history = await charles_actor.get_system_one_audio_history_output.remote()
116
- system_one_audio_history_output.markdown(audio_history)
 
 
117
  except Exception as e:
118
  # assume we disconnected
119
  charles_actor = None
 
53
 
54
  with col1:
55
  listening = st.checkbox("Listen", value=True)
56
+ looking = st.checkbox("Look", value=False)
57
+ charles_actor_debug_output = st.empty()
58
+ environment_state_ouput = st.empty()
59
 
60
  # Initialize resources if not already done
61
  system_one_audio_status.write("Initializing streaming")
 
100
  try:
101
  while True:
102
  if "streamlit_av_queue" in st.session_state:
103
+ st.session_state.streamlit_av_queue.set_looking_listening(looking, listening)
104
  if not webrtc_ctx.state.playing:
105
  system_one_audio_status.write("Stopped.")
106
  await asyncio.sleep(0.1)
 
114
  pass
115
  if charles_actor is not None:
116
  try:
117
+ new_environment_state = await charles_actor.get_environment_state.remote()
118
+ environment_state_ouput.markdown(f"{new_environment_state}")
119
+ charles_debug_str = await charles_actor.get_charles_actor_debug_output.remote()
120
+ charles_actor_debug_output.markdown(charles_debug_str)
121
  except Exception as e:
122
  # assume we disconnected
123
  charles_actor = None
charles_actor.py CHANGED
@@ -1,23 +1,30 @@
 
1
  import ray
2
  import time
3
  import asyncio
4
  import os
5
  from clip_transform import CLIPTransform
 
 
 
6
 
7
  @ray.remote
8
  class CharlesActor:
9
  def __init__(self):
10
  self._needs_init = True
11
- self._system_one_audio_history_output = ""
 
12
  self._state = "Initializing"
13
  self._clip_transform = CLIPTransform()
14
 
15
  def get_state(self):
16
  return self._state
17
 
 
 
18
 
19
- def get_system_one_audio_history_output(self):
20
- return self._system_one_audio_history_output
21
 
22
  async def _initalize_resources(self):
23
  # Initialize resources
@@ -28,7 +35,9 @@ class CharlesActor:
28
 
29
  print("001 - create RespondToPromptActor")
30
  from respond_to_prompt_actor import RespondToPromptActor
31
- self._respond_to_prompt_actor = RespondToPromptActor.remote(self._out_audio_queue)
 
 
32
 
33
  print("002 - create SpeechToTextVoskActor")
34
  from speech_to_text_vosk_actor import SpeechToTextVoskActor
@@ -51,7 +60,20 @@ class CharlesActor:
51
  if self._needs_init:
52
  await self._initalize_resources()
53
 
54
- system_one_audio_history = []
 
 
 
 
 
 
 
 
 
 
 
 
 
55
 
56
  self._state = "Waiting for input"
57
  total_video_frames = 0
@@ -67,7 +89,13 @@ class CharlesActor:
67
  if len(self._debug_queue) > 0:
68
  prompt = self._debug_queue.pop(0)
69
  await self._respond_to_prompt_actor.enqueue_prompt.remote(prompt)
 
 
 
 
70
  audio_frames = await self._streamlit_av_queue.get_in_audio_frames_async()
 
 
71
  if len(audio_frames) > 0:
72
  total_audio_frames += len(audio_frames)
73
  # Concatenate all audio frames into a single buffer
@@ -76,6 +104,20 @@ class CharlesActor:
76
  process_speech_to_text_future.append(future)
77
  # audio_frames_task = None
78
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
79
  if len(process_speech_to_text_future) > 0:
80
  ready, _ = ray.wait([process_speech_to_text_future[0]], timeout=0)
81
  if ready:
@@ -87,32 +129,26 @@ class CharlesActor:
87
  if speaker_finished and len(prompt) > 0 and prompt not in prompts_to_ignore:
88
  print(f"Prompt: {prompt}")
89
  # system_one_audio_history.append("... " + str(raw_json))
90
- system_one_audio_history.append(prompt)
91
- while len(system_one_audio_history) > 10:
92
- system_one_audio_history = system_one_audio_history[-10:]
93
- table_content = "| System 1 Audio History |\n| --- |\n"
94
- table_content += "\n".join([f"| {item} |" for item in reversed(system_one_audio_history)])
95
- self._system_one_audio_history_output = table_content
96
  await self._respond_to_prompt_actor.enqueue_prompt.remote(prompt)
97
- # else:
98
- # print(f"not ready... " + str(raw_json))
99
-
100
- video_frames = await self._streamlit_av_queue.get_video_frames_async()
101
- if len(video_frames) > 0:
102
- vector_debug = f"found {len(video_frames)} video frames"
103
- total_video_frames += 1
104
- skipped_video_frames += (len(video_frames) -1)
105
- image_as_array = video_frames[-1]
106
- image_vector = self._clip_transform.image_to_embeddings(image_as_array)
107
- image_vector = image_vector[0]
108
- distances, closest_item_key, distance_debug_str = self._prototypes.get_distances(image_vector)
109
- vector_debug = f"{closest_item_key} {distance_debug_str}"
110
 
111
  await asyncio.sleep(0.01)
112
  loops+=1
113
  self._state = f"Processed {total_video_frames} video frames and {total_audio_frames} audio frames, loops: {loops}. loops per second: {loops/(time.time()-start_time):.2f}. {vector_debug}"
114
 
115
- if __name__ == "__main__":
116
  if not ray.is_initialized():
117
  # Try to connect to a running Ray cluster
118
  ray_address = os.getenv('RAY_ADDRESS')
@@ -127,6 +163,8 @@ if __name__ == "__main__":
127
  ).remote()
128
  future = charles_actor.start.remote()
129
 
 
 
130
  try:
131
  while True:
132
  ready, _ = ray.wait([future], timeout=0)
@@ -141,9 +179,30 @@ if __name__ == "__main__":
141
  break
142
  else:
143
  # The start method is still running. You can poll for debug information here.
144
- time.sleep(1)
145
- state = charles_actor.get_state.remote()
146
- print(f"Charles is in state: {ray.get(state)}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
147
  except KeyboardInterrupt as e:
148
  print("Script was manually terminated")
149
  raise(e)
 
 
 
 
 
 
1
+ import json
2
  import ray
3
  import time
4
  import asyncio
5
  import os
6
  from clip_transform import CLIPTransform
7
+ from environment_state_actor import EnvironmentStateActor, EnvironmentState
8
+ from agent_state_actor import AgentStateActor
9
+ import asyncio
10
 
11
  @ray.remote
12
  class CharlesActor:
13
  def __init__(self):
14
  self._needs_init = True
15
+ self._charles_actor_debug_output = ""
16
+ self._environment_state:EnvironmentState = EnvironmentState(episode=0, step=0) # Initialize as EnvironmentState
17
  self._state = "Initializing"
18
  self._clip_transform = CLIPTransform()
19
 
20
  def get_state(self):
21
  return self._state
22
 
23
+ def get_charles_actor_debug_output(self):
24
+ return self._charles_actor_debug_output
25
 
26
+ def get_environment_state(self)->EnvironmentState:
27
+ return self._environment_state
28
 
29
  async def _initalize_resources(self):
30
  # Initialize resources
 
35
 
36
  print("001 - create RespondToPromptActor")
37
  from respond_to_prompt_actor import RespondToPromptActor
38
+ self._environment_state_actor = EnvironmentStateActor.remote()
39
+ self._agent_state_actor = AgentStateActor.remote()
40
+ self._respond_to_prompt_actor = RespondToPromptActor.remote(self._environment_state_actor, self._out_audio_queue)
41
 
42
  print("002 - create SpeechToTextVoskActor")
43
  from speech_to_text_vosk_actor import SpeechToTextVoskActor
 
60
  if self._needs_init:
61
  await self._initalize_resources()
62
 
63
+ debug_output_history = []
64
+ def add_debug_output(output):
65
+ debug_output_history.append(output)
66
+ if len(debug_output_history) > 10:
67
+ debug_output_history.pop(0)
68
+ table_content = "| Charles Actor debug history |\n| --- |\n"
69
+ table_content += "\n".join([f"| {item} |" for item in reversed(debug_output_history)])
70
+ self._charles_actor_debug_output = table_content
71
+ def preview_debug_output(output):
72
+ table_content = "| Charles Actor debug history |\n| --- |\n"
73
+ debug_output_history_copy = debug_output_history.copy()
74
+ debug_output_history_copy.append(output)
75
+ table_content += "\n".join([f"| {item} |" for item in reversed(debug_output_history_copy)])
76
+ self._charles_actor_debug_output = table_content
77
 
78
  self._state = "Waiting for input"
79
  total_video_frames = 0
 
89
  if len(self._debug_queue) > 0:
90
  prompt = self._debug_queue.pop(0)
91
  await self._respond_to_prompt_actor.enqueue_prompt.remote(prompt)
92
+
93
+ env_state = await self._environment_state_actor.begin_next_step.remote()
94
+ self._environment_state = env_state
95
+ self._agent_state_actor.begin_step.remote()
96
  audio_frames = await self._streamlit_av_queue.get_in_audio_frames_async()
97
+ video_frames = await self._streamlit_av_queue.get_video_frames_async()
98
+
99
  if len(audio_frames) > 0:
100
  total_audio_frames += len(audio_frames)
101
  # Concatenate all audio frames into a single buffer
 
104
  process_speech_to_text_future.append(future)
105
  # audio_frames_task = None
106
 
107
+ if len(video_frames) > 0:
108
+ vector_debug = f"found {len(video_frames)} video frames"
109
+ total_video_frames += 1
110
+ skipped_video_frames += (len(video_frames) -1)
111
+ image_as_array = video_frames[-1]
112
+ image_vector = self._clip_transform.image_to_embeddings(image_as_array)
113
+ image_vector = image_vector[0]
114
+ distances, closest_item_key, distance_debug_str = self._prototypes.get_distances(image_vector)
115
+ vector_debug = f"{closest_item_key} {distance_debug_str}"
116
+
117
+
118
+ human_preview_text = ""
119
+ robot_preview_text = ""
120
+
121
  if len(process_speech_to_text_future) > 0:
122
  ready, _ = ray.wait([process_speech_to_text_future[0]], timeout=0)
123
  if ready:
 
129
  if speaker_finished and len(prompt) > 0 and prompt not in prompts_to_ignore:
130
  print(f"Prompt: {prompt}")
131
  # system_one_audio_history.append("... " + str(raw_json))
132
+ add_debug_output(f"👨 {prompt}")
 
 
 
 
 
133
  await self._respond_to_prompt_actor.enqueue_prompt.remote(prompt)
134
+ elif len(prompt) > 0 and prompt not in prompts_to_ignore:
135
+ human_preview_text = f"👨❓ {prompt}"
136
+
137
+ for new_response in env_state.llm_responses:
138
+ add_debug_output(f"🤖 {new_response}")
139
+ if len(env_state.llm_preview):
140
+ robot_preview_text = f"🤖❓ {env_state.llm_preview}"
141
+
142
+ if len(human_preview_text) > 0:
143
+ preview_debug_output(human_preview_text)
144
+ elif len(robot_preview_text) > 0:
145
+ preview_debug_output(robot_preview_text)
 
146
 
147
  await asyncio.sleep(0.01)
148
  loops+=1
149
  self._state = f"Processed {total_video_frames} video frames and {total_audio_frames} audio frames, loops: {loops}. loops per second: {loops/(time.time()-start_time):.2f}. {vector_debug}"
150
 
151
+ async def main():
152
  if not ray.is_initialized():
153
  # Try to connect to a running Ray cluster
154
  ray_address = os.getenv('RAY_ADDRESS')
 
163
  ).remote()
164
  future = charles_actor.start.remote()
165
 
166
+ last_step = -1
167
+ last_episode = -1
168
  try:
169
  while True:
170
  ready, _ = ray.wait([future], timeout=0)
 
179
  break
180
  else:
181
  # The start method is still running. You can poll for debug information here.
182
+ await asyncio.sleep(1)
183
+ state = await charles_actor.get_state.remote()
184
+ env_state = await charles_actor.get_environment_state.remote()
185
+ if (env_state.episode != last_episode) or (env_state.step != last_step):
186
+ last_episode = env_state.episode
187
+ last_step = env_state.step
188
+ print(f"Charles is in state: {state}")
189
+ # if len(env_state.llm_preview):
190
+ # print (f"llm_preview: {env_state.llm_preview}")
191
+ # if len(env_state.llm_responses):
192
+ # print (f"llm_responses: {env_state.llm_responses}")
193
+ # if len(env_state.tts_raw_chunk_ids):
194
+ # for chunk_json in env_state.tts_raw_chunk_ids:
195
+ # chunk = json.loads(chunk_json)
196
+ # prompt = chunk['prompt']
197
+ # line = chunk['llm_sentence_id']
198
+ # chunk_id = chunk['chunk_count']
199
+ # print(f"Prompt: {prompt}, Line: {line}, Chunk: {chunk_id}")
200
+
201
  except KeyboardInterrupt as e:
202
  print("Script was manually terminated")
203
  raise(e)
204
+
205
+
206
+ if __name__ == "__main__":
207
+ loop = asyncio.get_event_loop()
208
+ loop.run_until_complete(main())
chat_service.py CHANGED
@@ -4,7 +4,6 @@ import json
4
  import os
5
  import torch
6
  import openai
7
- from agent_response import AgentResponse
8
 
9
  class ChatService:
10
  def __init__(self, api="openai", model_id = "gpt-3.5-turbo"):
@@ -102,8 +101,8 @@ I fell off the pink step, and I had an accident.
102
  return True
103
  return False
104
 
105
- async def get_responses_as_sentances_async(self, agent_response:AgentResponse, cancel_event):
106
- self._messages.append({"role": "user", "content": agent_response['prompt']})
107
  llm_response = ""
108
  current_sentence = ""
109
  delay = 0.1
@@ -125,21 +124,17 @@ I fell off the pink step, and I had an accident.
125
  chunk_text = chunk_message['content']
126
  current_sentence += chunk_text
127
  llm_response += chunk_text
128
- agent_response['llm_preview'] = current_sentence
129
  text_to_speak = self._should_we_send_to_voice(current_sentence)
130
  if text_to_speak:
131
  current_sentence = current_sentence[len(text_to_speak):]
132
- agent_response['llm_preview'] = ''
133
- agent_response['llm_sentence'] = text_to_speak
134
- agent_response['llm_sentences'].append(text_to_speak)
135
- yield agent_response
136
- agent_response['llm_sentence_id'] += 1
137
 
138
  if cancel_event.is_set():
139
  return
140
  if len(current_sentence) > 0:
141
- agent_response['llm_sentence'] = current_sentence
142
- yield agent_response
143
  self._messages.append({"role": "assistant", "content": llm_response})
144
  return
145
 
 
4
  import os
5
  import torch
6
  import openai
 
7
 
8
  class ChatService:
9
  def __init__(self, api="openai", model_id = "gpt-3.5-turbo"):
 
101
  return True
102
  return False
103
 
104
+ async def get_responses_as_sentances_async(self, prompt, cancel_event):
105
+ self._messages.append({"role": "user", "content": prompt})
106
  llm_response = ""
107
  current_sentence = ""
108
  delay = 0.1
 
124
  chunk_text = chunk_message['content']
125
  current_sentence += chunk_text
126
  llm_response += chunk_text
 
127
  text_to_speak = self._should_we_send_to_voice(current_sentence)
128
  if text_to_speak:
129
  current_sentence = current_sentence[len(text_to_speak):]
130
+ yield text_to_speak, True
131
+ else:
132
+ yield current_sentence, False
 
 
133
 
134
  if cancel_event.is_set():
135
  return
136
  if len(current_sentence) > 0:
137
+ yield current_sentence, True
 
138
  self._messages.append({"role": "assistant", "content": llm_response})
139
  return
140
 
environment_state_actor.py ADDED
@@ -0,0 +1,60 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import ray
2
+ from datetime import datetime
3
+
4
+ from agent_state_actor import AgentState
5
+
6
+ class EnvironmentState:
7
+ def __init__(self, episode, step):
8
+
9
+ self.agent_state = None
10
+ self.timestamp = datetime.utcnow()
11
+ self.episode = episode
12
+ self.step = step
13
+ self.reward = 0
14
+ self.llm_preview = ''
15
+ self.llm_responses = []
16
+ self.tts_raw_chunk_ids = []
17
+
18
+ def __str__(self):
19
+ state = ', '.join(f'{k}={v}' for k, v in self.__dict__.items() if k not in {'episode', 'step', 'timestamp', 'reward'})
20
+ return f'episode={self.episode}, step={self.step}, timestamp={self.timestamp}, \nreward={self.reward}\nstate=({state})'
21
+
22
+
23
+ @ray.remote
24
+ class EnvironmentStateActor:
25
+ def __init__(self):
26
+ self.episode = 0
27
+ self.step = 0
28
+ self.state = None
29
+ self.reset_episode()
30
+
31
+ def reset_episode(self):
32
+ self.episode += 1
33
+ self.step = 0
34
+ self.state = EnvironmentState(self.episode, self.step)
35
+ return self.state
36
+
37
+ def begin_next_step(self)->EnvironmentState:
38
+ previous_state = self.state
39
+ self.step += 1
40
+ self.state = EnvironmentState(self.episode, self.step)
41
+ return previous_state
42
+
43
+ def add_reward(self, reward):
44
+ self.state.reward += reward
45
+
46
+ def set_llm_preview(self, llm_preview):
47
+ self.state.llm_preview = llm_preview
48
+
49
+ def add_llm_response_and_clear_llm_preview(self, llm_response):
50
+ self.state.llm_responses.append(llm_response)
51
+ self.state.llm_preview = ''
52
+
53
+ def add_tts_raw_chunk_id(self, chunk_id):
54
+ self.state.tts_raw_chunk_ids.append(chunk_id)
55
+
56
+ def add_agent_state(self, agent_state:AgentState):
57
+ self.state.agent_state = agent_state
58
+
59
+ def get_state(self)->EnvironmentState:
60
+ return self.state
prototypes.pt ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ version https://git-lfs.github.com/spec/v1
2
+ oid sha256:d59c424a695d82b6346742f635d1c4b469d5ad72c8dd47b590886d745681fd44
3
+ size 6900
prototypes.py CHANGED
@@ -10,8 +10,7 @@ from torch.nn import functional as F
10
  class Prototypes:
11
  def __init__(self):
12
  self._clip_transform = CLIPTransform()
13
- self._prepare_prototypes()
14
-
15
 
16
  def _prepare_prototypes(self):
17
  image_embeddings = self.load_images_from_folder('prototypes')
@@ -32,6 +31,15 @@ class Prototypes:
32
 
33
  self.prototype_keys = ["person", "no_person"]
34
  self.prototypes = torch.stack([person_embedding, no_person_embedding])
 
 
 
 
 
 
 
 
 
35
 
36
 
37
  def load_images_from_folder(self, folder):
 
10
  class Prototypes:
11
  def __init__(self):
12
  self._clip_transform = CLIPTransform()
13
+ self._load_prototypes()
 
14
 
15
  def _prepare_prototypes(self):
16
  image_embeddings = self.load_images_from_folder('prototypes')
 
31
 
32
  self.prototype_keys = ["person", "no_person"]
33
  self.prototypes = torch.stack([person_embedding, no_person_embedding])
34
+ # save prototypes to file
35
+ torch.save(self.prototypes, 'prototypes.pt')
36
+
37
+ def _load_prototypes(self):
38
+ # check if file exists
39
+ if not os.path.exists('prototypes.pt'):
40
+ self._prepare_prototypes()
41
+ self.prototypes = torch.load('prototypes.pt')
42
+ self.prototype_keys = ["person", "no_person"]
43
 
44
 
45
  def load_images_from_folder(self, folder):
respond_to_prompt_actor.py CHANGED
@@ -8,28 +8,45 @@ import asyncio
8
  # from ray.actor import ActorHandle
9
  from ffmpeg_converter_actor import FFMpegConverterActor
10
  from agent_response import AgentResponse
11
-
 
 
 
12
 
13
  @ray.remote
14
  class PromptToLLMActor:
15
- def __init__(self, input_queue:Queue, output_queue:Queue):
 
 
 
 
16
  load_dotenv()
17
  self.input_queue = input_queue
18
  self.output_queue = output_queue
19
  self.chat_service = ChatService()
20
  self.cancel_event = None
 
21
 
22
  async def run(self):
23
  while True:
24
  prompt = await self.input_queue.get_async()
25
  self.cancel_event = asyncio.Event()
26
  agent_response = AgentResponse(prompt)
27
- async for sentence_response in self.chat_service.get_responses_as_sentances_async(agent_response, self.cancel_event):
28
- if self.chat_service.ignore_sentence(sentence_response['llm_sentence']):
 
 
 
 
29
  continue
30
- print(f"{sentence_response['llm_sentence']} id: {agent_response['llm_sentence_id']} from prompt: {agent_response['prompt']}")
31
- sentence_response = sentence_response.make_copy()
 
 
 
 
32
  await self.output_queue.put_async(sentence_response)
 
33
 
34
  async def cancel(self):
35
  if self.cancel_event:
@@ -41,20 +58,35 @@ class PromptToLLMActor:
41
 
42
  @ray.remote
43
  class LLMSentanceToSpeechActor:
44
- def __init__(self, input_queue, output_queue, voice_id):
 
 
 
 
 
45
  load_dotenv()
46
  self.input_queue = input_queue
47
  self.output_queue = output_queue
48
  self.tts_service = TextToSpeechService(voice_id=voice_id)
49
  self.cancel_event = None
 
50
 
51
  async def run(self):
52
  while True:
53
  sentence_response = await self.input_queue.get_async()
54
  self.cancel_event = asyncio.Event()
 
55
  async for chunk_response in self.tts_service.get_speech_chunks_async(sentence_response, self.cancel_event):
56
  chunk_response = chunk_response.make_copy()
57
  await self.output_queue.put_async(chunk_response)
 
 
 
 
 
 
 
 
58
 
59
  async def cancel(self):
60
  if self.cancel_event:
@@ -85,7 +117,10 @@ class LLMSentanceToSpeechActor:
85
 
86
  @ray.remote
87
  class SpeechToConverterActor:
88
- def __init__(self, input_queue:Queue, ffmpeg_converter_actor:FFMpegConverterActor):
 
 
 
89
  load_dotenv()
90
  self.input_queue = input_queue
91
  self.ffmpeg_converter_actor = ffmpeg_converter_actor
@@ -105,18 +140,31 @@ class SpeechToConverterActor:
105
 
106
  @ray.remote
107
  class RespondToPromptActor:
108
- def __init__(self, out_audio_queue):
 
 
 
109
  voice_id="2OviOUQc1JsQRQgNkVBj"
110
  self.prompt_queue = Queue(maxsize=100)
111
  self.llm_sentence_queue = Queue(maxsize=100)
112
  self.speech_chunk_queue = Queue(maxsize=100)
 
113
 
114
  self.ffmpeg_converter_actor = FFMpegConverterActor.remote(out_audio_queue)
115
 
116
- self.prompt_to_llm = PromptToLLMActor.remote(self.prompt_queue, self.llm_sentence_queue)
117
- self.llm_sentence_to_speech = LLMSentanceToSpeechActor.remote(self.llm_sentence_queue, self.speech_chunk_queue, voice_id)
 
 
 
 
 
 
 
118
  # self.speech_output = SpeechToSpeakerActor.remote(self.speech_chunk_queue, voice_id)
119
- self.speech_output = SpeechToConverterActor.remote(self.speech_chunk_queue, self.ffmpeg_converter_actor)
 
 
120
 
121
  # Start the pipeline components.
122
  self.prompt_to_llm.run.remote()
 
8
  # from ray.actor import ActorHandle
9
  from ffmpeg_converter_actor import FFMpegConverterActor
10
  from agent_response import AgentResponse
11
+ from environment_state_actor import EnvironmentStateActor
12
+ from agent_state_actor import AgentStateActor
13
+ from agent_state_actor import AgentState
14
+ import json
15
 
16
  @ray.remote
17
  class PromptToLLMActor:
18
+ def __init__(
19
+ self,
20
+ environment_state_actor:EnvironmentStateActor,
21
+ input_queue:Queue,
22
+ output_queue:Queue):
23
  load_dotenv()
24
  self.input_queue = input_queue
25
  self.output_queue = output_queue
26
  self.chat_service = ChatService()
27
  self.cancel_event = None
28
+ self.environment_state_actor = environment_state_actor
29
 
30
  async def run(self):
31
  while True:
32
  prompt = await self.input_queue.get_async()
33
  self.cancel_event = asyncio.Event()
34
  agent_response = AgentResponse(prompt)
35
+ async for text, is_complete_sentance in self.chat_service.get_responses_as_sentances_async(prompt, self.cancel_event):
36
+ if self.chat_service.ignore_sentence(text):
37
+ is_complete_sentance = False
38
+ if not is_complete_sentance:
39
+ agent_response['llm_preview'] = text
40
+ await self.environment_state_actor.set_llm_preview.remote(text)
41
  continue
42
+ agent_response['llm_preview'] = ''
43
+ agent_response['llm_sentence'] = text
44
+ agent_response['llm_sentences'].append(text)
45
+ await self.environment_state_actor.add_llm_response_and_clear_llm_preview.remote(text)
46
+ print(f"{agent_response['llm_sentence']} id: {agent_response['llm_sentence_id']} from prompt: {agent_response['prompt']}")
47
+ sentence_response = agent_response.make_copy()
48
  await self.output_queue.put_async(sentence_response)
49
+ agent_response['llm_sentence_id'] += 1
50
 
51
  async def cancel(self):
52
  if self.cancel_event:
 
58
 
59
  @ray.remote
60
  class LLMSentanceToSpeechActor:
61
+ def __init__(
62
+ self,
63
+ environment_state_actor:EnvironmentStateActor,
64
+ input_queue,
65
+ output_queue,
66
+ voice_id):
67
  load_dotenv()
68
  self.input_queue = input_queue
69
  self.output_queue = output_queue
70
  self.tts_service = TextToSpeechService(voice_id=voice_id)
71
  self.cancel_event = None
72
+ self.environment_state_actor = environment_state_actor
73
 
74
  async def run(self):
75
  while True:
76
  sentence_response = await self.input_queue.get_async()
77
  self.cancel_event = asyncio.Event()
78
+ chunk_count = 0
79
  async for chunk_response in self.tts_service.get_speech_chunks_async(sentence_response, self.cancel_event):
80
  chunk_response = chunk_response.make_copy()
81
  await self.output_queue.put_async(chunk_response)
82
+ chunk_response = {
83
+ 'prompt': sentence_response['prompt'],
84
+ 'llm_sentence_id': sentence_response['llm_sentence_id'],
85
+ 'chunk_count': chunk_count,
86
+ }
87
+ chunk_id_json = json.dumps(chunk_response)
88
+ await self.environment_state_actor.add_tts_raw_chunk_id.remote(chunk_id_json)
89
+ chunk_count += 1
90
 
91
  async def cancel(self):
92
  if self.cancel_event:
 
117
 
118
  @ray.remote
119
  class SpeechToConverterActor:
120
+ def __init__(
121
+ self,
122
+ input_queue:Queue,
123
+ ffmpeg_converter_actor:FFMpegConverterActor):
124
  load_dotenv()
125
  self.input_queue = input_queue
126
  self.ffmpeg_converter_actor = ffmpeg_converter_actor
 
140
 
141
  @ray.remote
142
  class RespondToPromptActor:
143
+ def __init__(
144
+ self,
145
+ environment_state_actor:EnvironmentStateActor,
146
+ out_audio_queue):
147
  voice_id="2OviOUQc1JsQRQgNkVBj"
148
  self.prompt_queue = Queue(maxsize=100)
149
  self.llm_sentence_queue = Queue(maxsize=100)
150
  self.speech_chunk_queue = Queue(maxsize=100)
151
+ self.environment_state_actor = environment_state_actor
152
 
153
  self.ffmpeg_converter_actor = FFMpegConverterActor.remote(out_audio_queue)
154
 
155
+ self.prompt_to_llm = PromptToLLMActor.remote(
156
+ self.environment_state_actor,
157
+ self.prompt_queue,
158
+ self.llm_sentence_queue)
159
+ self.llm_sentence_to_speech = LLMSentanceToSpeechActor.remote(
160
+ self.environment_state_actor,
161
+ self.llm_sentence_queue,
162
+ self.speech_chunk_queue,
163
+ voice_id)
164
  # self.speech_output = SpeechToSpeakerActor.remote(self.speech_chunk_queue, voice_id)
165
+ self.speech_output = SpeechToConverterActor.remote(
166
+ self.speech_chunk_queue,
167
+ self.ffmpeg_converter_actor)
168
 
169
  # Start the pipeline components.
170
  self.prompt_to_llm.run.remote()
streamlit_av_queue.py CHANGED
@@ -15,14 +15,16 @@ class StreamlitAVQueue:
15
  self._output_channels = 2
16
  self._audio_bit_rate = audio_bit_rate
17
  self._listening = True
 
18
  self._lock = threading.Lock()
19
  self.queue_actor = WebRtcAVQueueActor.options(
20
  name="WebRtcAVQueueActor",
21
  get_if_exists=True,
22
  ).remote()
23
 
24
- def set_listening(self, listening: bool):
25
  with self._lock:
 
26
  self._listening = listening
27
 
28
  async def queued_video_frames_callback(
@@ -30,10 +32,13 @@ class StreamlitAVQueue:
30
  frames: List[av.VideoFrame],
31
  ) -> av.VideoFrame:
32
  try:
33
- for frame in frames:
34
- shared_tensor = frame.to_ndarray(format="rgb24")
35
- shared_tensor_ref = ray.put(shared_tensor)
36
- await self.queue_actor.enqueue_in_video_frame.remote(shared_tensor_ref)
 
 
 
37
  # print (f"tesnor len: {len(shared_tensor)}, tensor shape: {shared_tensor.shape}, tensor type:{shared_tensor.dtype} tensor ref: {shared_tensor_ref}")
38
  except Exception as e:
39
  print (e)
 
15
  self._output_channels = 2
16
  self._audio_bit_rate = audio_bit_rate
17
  self._listening = True
18
+ self._looking = True
19
  self._lock = threading.Lock()
20
  self.queue_actor = WebRtcAVQueueActor.options(
21
  name="WebRtcAVQueueActor",
22
  get_if_exists=True,
23
  ).remote()
24
 
25
+ def set_looking_listening(self, looking, listening: bool):
26
  with self._lock:
27
+ self._looking = looking
28
  self._listening = listening
29
 
30
  async def queued_video_frames_callback(
 
32
  frames: List[av.VideoFrame],
33
  ) -> av.VideoFrame:
34
  try:
35
+ with self._lock:
36
+ should_look = self._looking
37
+ if len(frames) > 0 and should_look:
38
+ for frame in frames:
39
+ shared_tensor = frame.to_ndarray(format="rgb24")
40
+ shared_tensor_ref = ray.put(shared_tensor)
41
+ await self.queue_actor.enqueue_in_video_frame.remote(shared_tensor_ref)
42
  # print (f"tesnor len: {len(shared_tensor)}, tensor shape: {shared_tensor.shape}, tensor type:{shared_tensor.dtype} tensor ref: {shared_tensor_ref}")
43
  except Exception as e:
44
  print (e)