File size: 3,583 Bytes
5958f7e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import io
import json
import threading
from queue import Queue
from subprocess import PIPE, Popen


class RPC(object):
    def __init__(self, cmd):
        self.cmd = cmd
        self.start()
        super(RPC, self).__init__()

    def start(self):
        self._proc: Popen = Popen(self.cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)

        assert self._proc.stdin
        assert self._proc.stdout
        assert self._proc.stderr

        self.stdin = io.open(self._proc.stdin.fileno(), "wb")
        self.stdout = io.open(self._proc.stdout.fileno(), "rb")
        self.stderr = io.open(self._proc.stderr.fileno(), "rb")

        self._queue = Queue()

        self.watchdog_stdout = Watchdog(self, name="stdout")
        self.watchdog_stderr = Watchdog(self, name="stderr")

    def stop(self):
        if self.stdin and not self.stdin.closed:
            self.stdin.close()
        if self.stdout and not self.stdout.closed:
            self.stdout.close()
        if self.stderr and not self.stderr.closed:
            self.stderr.close()
        self.watchdog_stdout.stop()
        self.watchdog_stderr.stop()
        self._proc.terminate()
        self._proc.wait()
        self._proc.kill()

    def restart(self):
        self.stop()
        self.start()

    def request(self, cmd):
        self._write(cmd)
        return self._queue.get()

    def _handle_stdout(self, resp):
        try:
            self._queue.put((None, json.loads(resp)))
        except:
            self._queue.put((None, resp))

    def _handle_stderr(self, resp):
        self._queue.put((resp, None))

    def _write(self, s):
        req = json.dumps(s)
        req = req + "\0"
        try:
            self.stdin.write(bytearray(req, "utf-8"))
            self.stdin.flush()
        except:
            pass


class Watchdog(threading.Thread):
    def __init__(self, rpc, name="watchdog", interval=0.1):
        super(Watchdog, self).__init__()

        if name == "stderr":
            self.stream = rpc.stderr
            self.handle = rpc._handle_stderr
        elif name == "stdout":
            self.stream = rpc.stdout
            self.handle = rpc._handle_stdout

        # store attributes
        self.rpc = rpc
        self.name = name
        self.interval = interval
        self.daemon = True

        # register a stop event
        self._stop = threading.Event()

        self.start()

    def start(self):
        super(Watchdog, self).start()

    def stop(self):
        self._stop.set()

    def run(self):
        # reset the stop event
        self._stop.clear()

        # stop here when stream is not set or closed
        if not self.stream or self.stream.closed:
            return

        # read new incoming lines
        while not self._stop.is_set():
            resp = None

            # stop when stream is closed
            if self.stream.closed:
                break

            try:
                resp = ""
                while True:
                    c = self.stream.read(1).decode("utf-8")
                    if c == "\x00" and self.name == "stdout":
                        break
                    elif c == "\n" and self.name == "stderr":
                        break
                    elif not c:
                        # EOF
                        break
                    else:
                        resp = resp + c
            except IOError:
                # prevent residual race conditions occurring when stream is closed externally
                pass
            self.handle(resp)
            self._stop.wait(self.interval)