Spaces:
Runtime error
Runtime error
File size: 3,583 Bytes
5958f7e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
import io
import json
import threading
from queue import Queue
from subprocess import PIPE, Popen
class RPC(object):
def __init__(self, cmd):
self.cmd = cmd
self.start()
super(RPC, self).__init__()
def start(self):
self._proc: Popen = Popen(self.cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
assert self._proc.stdin
assert self._proc.stdout
assert self._proc.stderr
self.stdin = io.open(self._proc.stdin.fileno(), "wb")
self.stdout = io.open(self._proc.stdout.fileno(), "rb")
self.stderr = io.open(self._proc.stderr.fileno(), "rb")
self._queue = Queue()
self.watchdog_stdout = Watchdog(self, name="stdout")
self.watchdog_stderr = Watchdog(self, name="stderr")
def stop(self):
if self.stdin and not self.stdin.closed:
self.stdin.close()
if self.stdout and not self.stdout.closed:
self.stdout.close()
if self.stderr and not self.stderr.closed:
self.stderr.close()
self.watchdog_stdout.stop()
self.watchdog_stderr.stop()
self._proc.terminate()
self._proc.wait()
self._proc.kill()
def restart(self):
self.stop()
self.start()
def request(self, cmd):
self._write(cmd)
return self._queue.get()
def _handle_stdout(self, resp):
try:
self._queue.put((None, json.loads(resp)))
except:
self._queue.put((None, resp))
def _handle_stderr(self, resp):
self._queue.put((resp, None))
def _write(self, s):
req = json.dumps(s)
req = req + "\0"
try:
self.stdin.write(bytearray(req, "utf-8"))
self.stdin.flush()
except:
pass
class Watchdog(threading.Thread):
def __init__(self, rpc, name="watchdog", interval=0.1):
super(Watchdog, self).__init__()
if name == "stderr":
self.stream = rpc.stderr
self.handle = rpc._handle_stderr
elif name == "stdout":
self.stream = rpc.stdout
self.handle = rpc._handle_stdout
# store attributes
self.rpc = rpc
self.name = name
self.interval = interval
self.daemon = True
# register a stop event
self._stop = threading.Event()
self.start()
def start(self):
super(Watchdog, self).start()
def stop(self):
self._stop.set()
def run(self):
# reset the stop event
self._stop.clear()
# stop here when stream is not set or closed
if not self.stream or self.stream.closed:
return
# read new incoming lines
while not self._stop.is_set():
resp = None
# stop when stream is closed
if self.stream.closed:
break
try:
resp = ""
while True:
c = self.stream.read(1).decode("utf-8")
if c == "\x00" and self.name == "stdout":
break
elif c == "\n" and self.name == "stderr":
break
elif not c:
# EOF
break
else:
resp = resp + c
except IOError:
# prevent residual race conditions occurring when stream is closed externally
pass
self.handle(resp)
self._stop.wait(self.interval)
|