import io import json import threading from queue import Queue from subprocess import PIPE, Popen class RPC(object): def __init__(self, cmd): self.cmd = cmd self.start() super(RPC, self).__init__() def start(self): self._proc: Popen = Popen(self.cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE) assert self._proc.stdin assert self._proc.stdout assert self._proc.stderr self.stdin = io.open(self._proc.stdin.fileno(), "wb") self.stdout = io.open(self._proc.stdout.fileno(), "rb") self.stderr = io.open(self._proc.stderr.fileno(), "rb") self._queue = Queue() self.watchdog_stdout = Watchdog(self, name="stdout") self.watchdog_stderr = Watchdog(self, name="stderr") def stop(self): if self.stdin and not self.stdin.closed: self.stdin.close() if self.stdout and not self.stdout.closed: self.stdout.close() if self.stderr and not self.stderr.closed: self.stderr.close() self.watchdog_stdout.stop() self.watchdog_stderr.stop() self._proc.terminate() self._proc.wait() self._proc.kill() def restart(self): self.stop() self.start() def request(self, cmd): self._write(cmd) return self._queue.get() def _handle_stdout(self, resp): try: self._queue.put((None, json.loads(resp))) except: self._queue.put((None, resp)) def _handle_stderr(self, resp): self._queue.put((resp, None)) def _write(self, s): req = json.dumps(s) req = req + "\0" try: self.stdin.write(bytearray(req, "utf-8")) self.stdin.flush() except: pass class Watchdog(threading.Thread): def __init__(self, rpc, name="watchdog", interval=0.1): super(Watchdog, self).__init__() if name == "stderr": self.stream = rpc.stderr self.handle = rpc._handle_stderr elif name == "stdout": self.stream = rpc.stdout self.handle = rpc._handle_stdout # store attributes self.rpc = rpc self.name = name self.interval = interval self.daemon = True # register a stop event self._stop = threading.Event() self.start() def start(self): super(Watchdog, self).start() def stop(self): self._stop.set() def run(self): # reset the stop event self._stop.clear() # stop here when stream is not set or closed if not self.stream or self.stream.closed: return # read new incoming lines while not self._stop.is_set(): resp = None # stop when stream is closed if self.stream.closed: break try: resp = "" while True: c = self.stream.read(1).decode("utf-8") if c == "\x00" and self.name == "stdout": break elif c == "\n" and self.name == "stderr": break elif not c: # EOF break else: resp = resp + c except IOError: # prevent residual race conditions occurring when stream is closed externally pass self.handle(resp) self._stop.wait(self.interval)