File size: 13,190 Bytes
149eeaf
ad67495
 
 
 
9ed41df
149eeaf
 
 
09ede70
ad67495
 
 
 
 
149eeaf
 
ad67495
9ed41df
ad67495
 
 
 
149eeaf
 
ad67495
149eeaf
 
ad67495
 
 
b6ba8eb
e630a78
ad67495
 
cf5e7f4
 
d91a673
b6ba8eb
e630a78
b6ba8eb
149eeaf
 
 
ad67495
b6ba8eb
e630a78
98ec0ec
5ea3bc3
 
bcea2ea
ad67495
187e31c
ad67495
 
3e4f32c
b6ba8eb
e630a78
3e4f32c
 
cf5e7f4
 
 
 
 
 
ad67495
 
 
 
 
 
 
 
149eeaf
5a17040
 
 
 
 
 
 
 
149eeaf
 
 
 
5a17040
ad67495
 
 
9ed41df
ad67495
 
9ed41df
 
98ec0ec
 
6130167
 
 
 
795c382
 
 
 
6130167
ad67495
 
 
 
df0ea75
149eeaf
 
 
 
ac35a95
149eeaf
 
ad67495
 
 
 
98ec0ec
 
 
 
149eeaf
 
 
 
 
 
 
 
 
 
98ec0ec
 
 
b6ba8eb
98ec0ec
 
5ea3bc3
 
 
98ec0ec
5a17040
6130167
5a17040
 
 
 
6130167
 
 
 
 
 
 
 
795c382
 
 
df0ea75
795c382
 
 
 
149eeaf
795c382
82ab66c
795c382
 
 
 
 
 
149eeaf
 
 
6130167
 
 
 
149eeaf
 
 
6130167
 
 
 
 
 
5a17040
 
6130167
5a17040
 
 
 
 
 
149eeaf
5a17040
 
 
 
6130167
 
98ec0ec
cf5e7f4
 
 
 
795c382
cf5e7f4
 
 
 
 
 
 
ad67495
cf5e7f4
ad67495
09ede70
 
88fcdcc
09ede70
 
 
 
 
ad67495
 
 
 
 
 
09ede70
 
 
 
ad67495
 
 
 
 
 
149eeaf
 
ad67495
 
 
 
 
 
 
 
 
 
 
 
 
 
149eeaf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bcea2ea
ad67495
dc249ac
149eeaf
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
import json
import ray
import time
import asyncio
import os
from clip_transform import CLIPTransform
from environment_state_actor import EnvironmentStateActor, EnvironmentState
from agent_state_actor import AgentStateActor
import asyncio
import subprocess

@ray.remote
class CharlesActor:
    def __init__(self):
        self._needs_init = True
        self._charles_actor_debug_output = ""
        self._environment_state:EnvironmentState = EnvironmentState(episode=0, step=0)  # Initialize as EnvironmentState
        self._state = "Initializing"
        self._clip_transform = CLIPTransform()
        
    def get_state(self):
        return self._state
    
    def get_charles_actor_debug_output(self):
        return self._charles_actor_debug_output
    
    def get_environment_state(self)->EnvironmentState:
        return self._environment_state
    
    async def _initalize_resources(self):
        # Initialize resources
        print("000 - create StreamlitAVQueue")
        self._state = "000 - creating StreamlitAVQueue"
        from streamlit_av_queue import StreamlitAVQueue
        self._streamlit_av_queue = StreamlitAVQueue()
        self._out_audio_queue = await self._streamlit_av_queue.get_out_audio_queue()
        self._out_video_queue = await self._streamlit_av_queue.get_out_video_queue()

        print("001 - create RespondToPromptActor")
        self._state = "001 - creating RespondToPromptActor"
        from respond_to_prompt_actor import RespondToPromptActor
        self._environment_state_actor = EnvironmentStateActor.remote()
        self._agent_state_actor = AgentStateActor.remote()
        self._respond_to_prompt_actor = RespondToPromptActor.remote(self._environment_state_actor, self._out_audio_queue)

        print("002 - create SpeechToTextVoskActor")
        self._state = "002 - creating SpeechToTextVoskActor"
        from speech_to_text_vosk_actor import SpeechToTextVoskActor
        self._speech_to_text_actor = SpeechToTextVoskActor.remote("small")
        # self._speech_to_text_actor = SpeechToTextVoskActor.remote("big")
        
        self._debug_queue = [
            # "hello, how are you today?",
            # "hmm, interesting, tell me more about that.",
        ]

        print("003 - create Prototypes")
        self._state = "003 - creating Prototypes"
        from prototypes import Prototypes
        self._prototypes = Prototypes()

        print("004 - create animator")
        self._state = "004 - creating animator"
        from charles_animator import CharlesAnimator
        self._animator = CharlesAnimator()

        print("010")
        self._needs_init = True
        self._state = "Initialized"
        
    async def start(self):
        if self._needs_init:
            await self._initalize_resources()
            
        debug_output_history = []

        def render_debug_output(list_of_strings):
            table_content = "##### Chat history\n"
            for item in reversed(list_of_strings):
                # table_content += f"\n```markdown\n{item}\n```\n"
                table_content += f"\n{item}\n"
            self._charles_actor_debug_output = table_content

        def add_debug_output(output):
            debug_output_history.append(output)
            if len(debug_output_history) > 10:
                debug_output_history.pop(0)
            render_debug_output(debug_output_history)
        
        self._state = "Waiting for input"
        total_video_frames = 0
        skipped_video_frames = 0
        total_audio_frames = 0
        loops = 0
        start_time = time.time()
        vector_debug = "--n/a--"
        
        process_speech_to_text_future = []
        current_responses = []
        speech_chunks_per_response = []
        human_preview_text = ""
        robot_preview_text = ""
        additional_prompt = None
        previous_prompt = ""
        is_talking = False
        has_spoken_for_this_prompt = False


        while True:
            if len(self._debug_queue) > 0:
                prompt = self._debug_queue.pop(0)
                await self._respond_to_prompt_actor.enqueue_prompt.remote(prompt)
            
            env_state = await self._environment_state_actor.begin_next_step.remote()
            self._environment_state = env_state
            self._agent_state_actor.begin_step.remote()
            audio_frames = await self._streamlit_av_queue.get_in_audio_frames_async()    
            video_frames = await self._streamlit_av_queue.get_video_frames_async()

            if len(audio_frames) > 0:
                total_audio_frames += len(audio_frames)
                # Concatenate all audio frames into a single buffer
                audio_buffer = b"".join([buffer.tobytes() for buffer in audio_frames])
                future = self._speech_to_text_actor.process_speech.remote(audio_buffer)
                process_speech_to_text_future.append(future)
            # audio_frames_task = None

            if len(video_frames) > 0:
                vector_debug = f"found {len(video_frames)} video frames"
                total_video_frames += 1
                skipped_video_frames += (len(video_frames) -1)
                image_as_array = video_frames[-1]
                image_vector = self._clip_transform.image_to_embeddings(image_as_array)
                image_vector = image_vector[0]
                distances, closest_item_key, distance_debug_str = self._prototypes.get_distances(image_vector)
                vector_debug = f"{closest_item_key} {distance_debug_str}"

            if len(process_speech_to_text_future) > 0:
                ready, _ = ray.wait([process_speech_to_text_future[0]], timeout=0)
                if ready:
                    prompt, speaker_finished, raw_json = await process_speech_to_text_future[0]
                    del process_speech_to_text_future[0]

                    prompts_to_ignore = ["um", "uh", "ah", "huh", "hmm", "the", "but", "by", "just", "i'm"]

                    if speaker_finished and len(prompt) > 0 and prompt not in prompts_to_ignore:
                        print(f"Prompt: {prompt}")
                        line = ""
                        for i, response in enumerate(current_responses):
                            line += "πŸ€– " if len(line) == 0 else ""
                            # line += f"{response} [{speech_chunks_per_response[i]}]  \n"
                            line += f"[{speech_chunks_per_response[i]}] {response}  \n"
                        if len(line) > 0:
                            add_debug_output(line)
                        current_responses = []
                        speech_chunks_per_response = []
                        env_state.llm_preview = ""
                        env_state.llm_responses = []
                        env_state.tts_raw_chunk_ids = []
                        human_preview_text = ""
                        robot_preview_text = ""
                        if additional_prompt is not None:
                            prompt = additional_prompt + ". " + prompt
                        add_debug_output(f"πŸ‘¨ {prompt}")
                        await self._respond_to_prompt_actor.enqueue_prompt.remote(prompt)
                        additional_prompt = None
                        previous_prompt = prompt
                        is_talking = False
                        has_spoken_for_this_prompt = False
                    elif len(prompt) > 0 and prompt not in prompts_to_ignore:
                        # sometimes we get a false signal of speaker_finsihed
                        # in which case we get new prompts before we have spoken
                        if len(previous_prompt) > 0 and not has_spoken_for_this_prompt:
                            additional_prompt = previous_prompt
                            has_spoken_for_this_prompt = True
                            await self._respond_to_prompt_actor.enqueue_prompt.remote("")
                        if additional_prompt is not None:
                            prompt = additional_prompt + ". " + prompt                        
                        human_preview_text = f"πŸ‘¨β“ {prompt}"

            for new_response in env_state.llm_responses:
                # add_debug_output(f"πŸ€– {new_response}")
                current_responses.append(new_response)
                speech_chunks_per_response.append(0)
                robot_preview_text = ""
            if len(env_state.llm_preview):
                robot_preview_text = f"πŸ€–β“ {env_state.llm_preview}"

            for chunk in env_state.tts_raw_chunk_ids:
                chunk = json.loads(chunk)
                # prompt = chunk['prompt']
                response_id = chunk['llm_sentence_id']
                speech_chunks_per_response[response_id] += 1

            list_of_strings = debug_output_history.copy()
            line = ""
            for i, response in enumerate(current_responses):
                line += "πŸ€– " if len(line) == 0 else ""
                line += f"[{speech_chunks_per_response[i]}] {response}  \n"
                # line += f"{response} [{speech_chunks_per_response[i]}]  \n"
            if len(robot_preview_text) > 0:
                line += robot_preview_text+"  \n"
            list_of_strings.append(line)
            if len(human_preview_text) > 0:
                list_of_strings.append(human_preview_text)
            if len(list_of_strings) > 10:
                list_of_strings.pop(0)
            render_debug_output(list_of_strings)


            await asyncio.sleep(0.01)

            # add observations to the environment state
            count = len(self._out_audio_queue)
            is_talking = bool(count > 0)
            has_spoken_for_this_prompt = has_spoken_for_this_prompt or is_talking
            frame = self._animator.update(is_talking)
            if self._out_video_queue.full():
                evicted_item = await self._out_video_queue.get_async()
                del evicted_item
            frame_ref = ray.put(frame)
            await self._out_video_queue.put_async(frame_ref)                

            loops+=1
            self._state = f"Processed {total_video_frames} video frames and {total_audio_frames} audio frames, loops: {loops}. loops per second: {loops/(time.time()-start_time):.2f}. Is speaking: {is_talking}({count}). {vector_debug}"

def init_ray():
    try:
        subprocess.check_output(["ray", "start", "--include-dashboard=True", "--head"])
    except Exception as e:
        print (f"charles_actor.py init_ray: {e}")
    # Connect to a running Ray cluster
    while not ray.is_initialized():
        time.sleep(0.1)
        ray_address = os.getenv('RAY_ADDRESS')
        if ray_address:
            ray.init(ray_address, namespace="project_charles")
        else:
            ray.init(namespace="project_charles")

async def main():
    if not ray.is_initialized():
        init_ray()

    charles_actor = CharlesActor.options(
        name="CharlesActor", 
        get_if_exists=True,
        ).remote() 
    future = charles_actor.start.remote()

    last_step = -1
    last_episode = -1
    try:
        while True:
            ready, _ = ray.wait([future], timeout=0)
            if ready:
                # The start method has terminated. You can fetch the result (if any) with ray.get().
                # If the method raised an exception, it will be re-raised here.
                try:
                    result = ray.get(future)
                    print(f"The start method has terminated with result: {result}")
                except Exception as e:
                    print(f"The start method raised an exception: {e}")
                break
            else:
                # The start method is still running. You can poll for debug information here.
                await asyncio.sleep(1)
                state = await charles_actor.get_state.remote()
                env_state = await charles_actor.get_environment_state.remote()
                if (env_state.episode != last_episode) or (env_state.step != last_step):
                    last_episode = env_state.episode
                    last_step = env_state.step
                    print(f"Charles is in state: {state}")
                    # if len(env_state.llm_preview):
                    #     print (f"llm_preview: {env_state.llm_preview}")
                    # if len(env_state.llm_responses):
                    #     print (f"llm_responses: {env_state.llm_responses}")
                    # if len(env_state.tts_raw_chunk_ids):
                    #     for chunk_json in env_state.tts_raw_chunk_ids:
                    #         chunk = json.loads(chunk_json)
                    #         prompt = chunk['prompt']
                    #         line = chunk['llm_sentence_id']
                    #         chunk_id = chunk['chunk_count']
                    #         print(f"Prompt: {prompt}, Line: {line}, Chunk: {chunk_id}")                            

    except KeyboardInterrupt as e:
        print("Script was manually terminated")
        raise(e)
    

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())