File size: 10,494 Bytes
ad67495
 
 
 
9ed41df
90a9891
2bb91de
149eeaf
09ede70
2d6aefc
ad67495
0df6dd9
ad67495
 
149eeaf
ad67495
9ed41df
ad67495
 
361f9d4
 
 
 
 
 
 
 
ad67495
 
361f9d4
28b5e08
 
2d6aefc
28b5e08
d91a673
90a9891
 
361f9d4
e6bce41
361f9d4
 
 
 
2bb91de
 
ad67495
361f9d4
98ec0ec
5ea3bc3
 
3e4f32c
361f9d4
3e4f32c
 
cf5e7f4
361f9d4
cf5e7f4
 
 
ad67495
361f9d4
4df5a8a
 
a642a9f
 
 
 
 
 
ad67495
 
 
 
 
149eeaf
5a17040
c490c32
5a17040
 
 
 
 
c490c32
5a17040
c490c32
149eeaf
 
 
c490c32
ad67495
361f9d4
ad67495
9ed41df
ad67495
 
9ed41df
 
98ec0ec
 
6130167
795c382
 
 
 
6130167
e6bce41
90a9891
28b5e08
 
149eeaf
ad67495
 
 
 
98ec0ec
 
 
 
149eeaf
 
 
 
 
 
 
 
 
 
98ec0ec
 
 
b6ba8eb
98ec0ec
 
5ea3bc3
 
 
98ec0ec
eb6999f
 
 
6130167
795c382
 
c490c32
e6bce41
4df5a8a
90a9891
2bb91de
795c382
 
 
 
90a9891
149eeaf
795c382
82ab66c
795c382
 
 
4df5a8a
90a9891
795c382
 
149eeaf
a642a9f
4df5a8a
149eeaf
14a183e
90a9891
361f9d4
6130167
5a17040
eb6999f
 
5a17040
eb6999f
 
149eeaf
5a17040
 
 
c490c32
6130167
 
2bb91de
cf5e7f4
 
aec6f97
cf5e7f4
795c382
cf5e7f4
 
28b5e08
cf5e7f4
ad67495
361f9d4
 
 
 
 
 
 
ad67495
09ede70
 
88fcdcc
09ede70
 
 
 
 
ad67495
 
 
 
 
 
09ede70
 
 
 
0df6dd9
c490c32
149eeaf
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
import ray
import time
import asyncio
import os
from clip_transform import CLIPTransform
from response_state_manager import ResponseStateManager
from respond_to_prompt_async import RespondToPromptAsync
import asyncio
import subprocess
import pid_helper

class CharlesApp:
    def __init__(self):
        self._needs_init = True
        self._charles_actor_debug_output = ""
        self._state = "Initializing"
        self._clip_transform = CLIPTransform()
        
    
    def set_state(self, state, skip_print=False):
        self._state = state
        if not skip_print:
            print(state)
        # check if self._app_interface_actor exists
        if hasattr(self, '_app_interface_actor'):
            self._app_interface_actor.set_state.remote(self._state)
    
    async def _initalize_resources(self):
        # Initialize resources
        self.set_state("001 - creating AppInterfaceActor")
        from app_interface_actor import AppInterfaceActor
        self._app_interface_actor = AppInterfaceActor.get_singleton()
        await self._app_interface_actor.add_charles_app_pid.remote(pid_helper.get_current_pid())
        self._audio_output_queue = await self._app_interface_actor.get_audio_output_queue.remote()

        self.set_state("002 - creating ResponseStateManager")
        self._response_state_manager = ResponseStateManager()

        self.set_state("003 - creating PromptManager")
        from prompt_manager import PromptManager
        self._prompt_manager = PromptManager()

        self.set_state("004 - creating RespondToPromptAsync")
        self._respond_to_prompt = None
        self._respond_to_prompt_task = None

        self.set_state("005 - create SpeechToTextVoskActor")
        from speech_to_text_vosk_actor import SpeechToTextVoskActor
        self._speech_to_text_actor = SpeechToTextVoskActor.remote("small")
        # self._speech_to_text_actor = SpeechToTextVoskActor.remote("big")

        self.set_state("006 - create Prototypes")
        from prototypes import Prototypes
        self._prototypes = Prototypes()

        self.set_state("007 - create animator")
        from charles_animator import CharlesAnimator
        self._animator = CharlesAnimator()

        self._needs_init = True
        self.set_state("010 - Initialized")

    async def cancel_response_task(self):
        if self._respond_to_prompt_task is not None:
            await self._respond_to_prompt.terminate()
            self._respond_to_prompt_task.cancel()
            self._respond_to_prompt_task = None
            self._respond_to_prompt = None
            self._audio_output_queue = await self._app_interface_actor.cycle_output_queue.remote()
        
    async def start(self):
        if self._needs_init:
            await self._initalize_resources()
            
        debug_output_history = []

        async def render_debug_output(list_of_strings):
            table_content = "##### Chat history\n"
            for item in reversed(list_of_strings):
                # table_content += f"\n```markdown\n{item}\n```\n"
                table_content += f"\n{item}\n"
            self._charles_actor_debug_output = table_content
            await self._app_interface_actor.set_debug_output.remote(self._charles_actor_debug_output)

        async def add_debug_output(output):
            debug_output_history.append(output)
            if len(debug_output_history) > 10:
                debug_output_history.pop(0)
            await render_debug_output(debug_output_history)
        
        self.set_state("Waiting for input")
        total_video_frames = 0
        skipped_video_frames = 0
        total_audio_frames = 0
        loops = 0
        start_time = time.time()
        vector_debug = "--n/a--"
        
        process_speech_to_text_future = []
        human_preview_text = ""
        additional_prompt = None
        previous_prompt = ""
        is_talking = False
        has_spoken_for_this_prompt = False

        while True:
            response_step_obs, response_state = self._response_state_manager.begin_next_step()
            audio_frames = await self._app_interface_actor.dequeue_audio_input_frames_async.remote()    
            video_frames = await self._app_interface_actor.dequeue_video_input_frames_async.remote()

            if len(audio_frames) > 0:
                total_audio_frames += len(audio_frames)
                # Concatenate all audio frames into a single buffer
                audio_buffer = b"".join([buffer.tobytes() for buffer in audio_frames])
                future = self._speech_to_text_actor.process_speech.remote(audio_buffer)
                process_speech_to_text_future.append(future)
            # audio_frames_task = None

            if len(video_frames) > 0:
                vector_debug = f"found {len(video_frames)} video frames"
                total_video_frames += 1
                skipped_video_frames += (len(video_frames) -1)
                image_as_array = video_frames[-1]
                image_vector = self._clip_transform.image_to_embeddings(image_as_array)
                image_vector = image_vector[0]
                distances, closest_item_key, distance_debug_str = self._prototypes.get_distances(image_vector)
                vector_debug = f"{closest_item_key} {distance_debug_str}"

            if len(process_speech_to_text_future) > 0:
                ready, _ = ray.wait([process_speech_to_text_future[0]], timeout=0)
                if ready:
                    prompt, speaker_finished, raw_json = await process_speech_to_text_future[0]
                    del process_speech_to_text_future[0]

                    prompts_to_ignore = ["um", "uh", "ah", "huh", "hmm", "the", "but", "by", "just", "i'm"]

                    if speaker_finished and len(prompt) > 0 and prompt not in prompts_to_ignore:
                        print(f"Prompt: {prompt}")
                        response_preview_text = self._response_state_manager.pretty_print_current_responses()
                        if len(response_preview_text) > 0:
                            await add_debug_output(response_preview_text)
                        human_preview_text = ""
                        if additional_prompt is not None:
                            prompt = additional_prompt + ". " + prompt
                        await add_debug_output(f"πŸ‘¨ {prompt}")
                        self._prompt_manager.replace_or_append_user_message(prompt)
                        await self.cancel_response_task()
                        self._respond_to_prompt = RespondToPromptAsync(self._response_state_manager, self._audio_output_queue)
                        self._respond_to_prompt_task = asyncio.create_task(self._respond_to_prompt.run(prompt, self._prompt_manager.messages))
                        additional_prompt = None
                        previous_prompt = prompt
                        is_talking = False
                        has_spoken_for_this_prompt = False
                        response_step_obs, response_state = self._response_state_manager.reset_episode()
                    elif len(prompt) > 0 and prompt not in prompts_to_ignore:
                        # sometimes we get a false signal of speaker_finsihed
                        # in which case we get new prompts before we have spoken
                        if len(previous_prompt) > 0 and not has_spoken_for_this_prompt:
                            additional_prompt = previous_prompt
                            has_spoken_for_this_prompt = True
                            await self.cancel_response_task()
                            response_step_obs, response_state = self._response_state_manager.reset_episode()                        
                        if additional_prompt is not None:
                            prompt = additional_prompt + ". " + prompt                        
                        human_preview_text = f"πŸ‘¨β“ {prompt}"
                        await self.cancel_response_task()


            # i choose to add each line of responce one at a time as them come in
            for new_response in response_step_obs.llm_responses:
                self._prompt_manager.append_assistant_message(new_response)

            list_of_strings = debug_output_history.copy()
            robot_preview_text = self._response_state_manager.pretty_print_preview_text()
            response_preview_text = self._response_state_manager.pretty_print_current_responses()
            if len(robot_preview_text) > 0:
                response_preview_text += robot_preview_text+"  \n"
            list_of_strings.append(response_preview_text)
            if len(human_preview_text) > 0:
                list_of_strings.append(human_preview_text)
            if len(list_of_strings) > 10:
                list_of_strings.pop(0)
            await render_debug_output(list_of_strings)


            await asyncio.sleep(0.001)

            # add observations to the environment state
            count = len(self._audio_output_queue)
            is_talking = bool(count > 0)
            has_spoken_for_this_prompt = has_spoken_for_this_prompt or is_talking
            frame = self._animator.update(is_talking)
            frame_ref = ray.put(frame)
            await self._app_interface_actor.enqueue_video_output_frame.remote(frame_ref)                

            loops+=1
            self.set_state(
                f"Processed {total_video_frames} video frames \
                    and {total_audio_frames} audio frames, \
                    loops: {loops}. loops per second: {loops/(time.time()-start_time):.2f}. \
                    Is speaking: {is_talking}({count}). \
                    {vector_debug}\
                    ", skip_print=True)

def init_ray():
    try:
        subprocess.check_output(["ray", "start", "--include-dashboard=True", "--head"])
    except Exception as e:
        print (f"charles_actor.py init_ray: {e}")
    # Connect to a running Ray cluster
    while not ray.is_initialized():
        time.sleep(0.1)
        ray_address = os.getenv('RAY_ADDRESS')
        if ray_address:
            ray.init(ray_address, namespace="project_charles")
        else:
            ray.init(namespace="project_charles")

async def main():
    if not ray.is_initialized():
        init_ray()

    charles_actor = CharlesApp()
    await charles_actor.start()
    

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())