File size: 2,594 Bytes
4ae0b03
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import asyncio
from datetime import datetime
import json
from naptha_sdk.client.node import Node
from naptha_sdk.utils import get_logger
import os
import pytz
import requests
import time
import traceback

logger = get_logger(__name__)

async def run_mas(multi_agent_service, mas_run) -> None:
    mas_engine = MASEngine(multi_agent_service, mas_run)
    await mas_engine.start_run()


class MASEngine:
    def __init__(self, multi_agent_service, mas_run):
        self.mas = multi_agent_service
        self.mas_run = mas_run
        self.mas_name = multi_agent_service.module_name
        self.parameters = mas_run.module_params
        self.orchestrator_node = Node(self.mas_run.orchestrator_node)
        logger.info(f"Orchestrator node: {self.orchestrator_node.node_url}")

        if mas_run.worker_nodes is not None:
            self.worker_nodes = [Node(worker_node) for worker_node in mas_run.worker_nodes]
        else:
            self.worker_nodes = None

        logger.info(f"Worker Nodes: {self.worker_nodes}")

        self.consumer = {
            "public_key": mas_run.consumer_id.split(':')[1],
            'id': mas_run.consumer_id,
        }

    async def start_run(self):
        logger.info(f"Starting MAS run: {self.mas_run}")
        logger.info(f"Checking user: {self.consumer}")
        consumer = await self.orchestrator_node.check_user(user_input=self.consumer)
        if consumer["is_registered"] == True:
            logger.info("Found user...", consumer)
        elif consumer["is_registered"] == False:
            logger.info("No user found. Registering user...")
            consumer = await self.orchestrator_node.register_user(user_input=consumer)
            logger.info(f"User registered: {consumer}.")

        logger.info(f"Running multi agent service on orchestrator node {self.orchestrator_node.node_url}: {self.mas_run}")
        mas_run = await self.orchestrator_node.run_task(module_run_input=self.mas_run)
        logger.info(f"Created multi agent service run on orchestrator node {self.orchestrator_node.node_url}: {mas_run}")

        while True:
            mas_run = await self.orchestrator_node.check_task(mas_run)
            logger.info(mas_run.status)  

            if mas_run.status in ["completed", "error"]:
                break
            time.sleep(3)

        if mas_run.status == 'completed':
            logger.info(mas_run.results)
            self.agent_service_result = mas_run.results
            return mas_run.results
        else:
            logger.info(mas_run.error_message)
            return mas_run.error_message