Spaces:
Runtime error
Runtime error
# coding: utf-8 | |
""" | |
Minimal python RPC implementation in a single file based on the JSON-RPC 2.0 specs from | |
http://www.jsonrpc.org/specification. | |
""" | |
__author__ = "Marcel Rieger" | |
__email__ = "[email protected]" | |
__copyright__ = "Copyright 2016-2021, Marcel Rieger" | |
__credits__ = ["Marcel Rieger"] | |
__contact__ = "https://github.com/riga/jsonrpyc" | |
__license__ = "BSD-3-Clause" | |
__status__ = "Development" | |
__version__ = "1.1.1" | |
__all__ = ["RPC"] | |
import io | |
import json | |
import logging | |
import sys | |
import threading | |
import time | |
from queue import Queue | |
class Spec(object): | |
""" | |
This class wraps methods that create JSON-RPC 2.0 compatible string representations of | |
request, response and error objects. All methods are class members, so you might never want to | |
create an instance of this class, but rather use the methods directly: | |
.. code-block:: python | |
Spec.request("my_method", 18) # the id is optional | |
# => '{"jsonrpc":"2.0","method":"my_method","id": 18}' | |
Spec.response(18, "some_result") | |
# => '{"jsonrpc":"2.0","id":18,"result":"some_result"}' | |
Spec.error(18, -32603) | |
# => '{"jsonrpc":"2.0","id":18,"error":{"code":-32603,"message":"Internal error"}}' | |
""" | |
def check_id(cls, id, allow_empty=False): | |
""" | |
Value check for *id* entries. When *allow_empty* is *True*, *id* is allowed to be *None*. | |
Raises a *TypeError* when *id* is neither an integer nor a string. | |
""" | |
if (id is not None or not allow_empty) and not isinstance(id, (int, str)): | |
raise TypeError( | |
"id must be an integer or string, got {} ({})".format(id, type(id)) | |
) | |
def check_method(cls, method): | |
""" | |
Value check for *method* entries. Raises a *TypeError* when *method* is not a string. | |
""" | |
if not isinstance(method, str): | |
raise TypeError( | |
"method must be a string, got {} ({})".format(method, type(method)) | |
) | |
def check_code(cls, code): | |
""" | |
Value check for *code* entries. Raises a *TypeError* when *code* is not an integer, or a | |
*KeyError* when there is no :py:class:`RPCError` subclass registered for that *code*. | |
""" | |
if not isinstance(code, int): | |
raise TypeError("code must be an integer, got {} ({})".format(id, type(id))) | |
if not get_error(code): | |
raise ValueError("unknown code, got {} ({})".format(code, type(code))) | |
def request(cls, method, id=None, params=None): | |
""" | |
Creates the string representation of a request that calls *method* with optional *params* | |
which are encoded by ``json.dumps``. When *id* is *None*, the request is considered a | |
notification. | |
""" | |
try: | |
cls.check_method(method) | |
cls.check_id(id, allow_empty=True) | |
except Exception as e: | |
raise RPCInvalidRequest(str(e)) | |
# start building the request string | |
req = '{{"jsonrpc":"2.0","method":"{}"'.format(method) | |
# add the id when given | |
if id is not None: | |
# encode string ids | |
if isinstance(id, str): | |
id = json.dumps(id) | |
req += ',"id":{}'.format(id) | |
# add parameters when given | |
if params is not None: | |
try: | |
req += ',"params":{}'.format(json.dumps(params)) | |
except Exception as e: | |
raise RPCParseError(str(e)) | |
# end the request string | |
req += "}" | |
return req | |
def response(cls, id, result): | |
""" | |
Creates the string representation of a respone that was triggered by a request with *id*. | |
A *result* is required, even if it is *None*. | |
""" | |
try: | |
cls.check_id(id) | |
except Exception as e: | |
raise RPCInvalidRequest(str(e)) | |
# encode string ids | |
if isinstance(id, str): | |
id = json.dumps(id) | |
# build the response string | |
try: | |
res = '{{"jsonrpc":"2.0","id":{},"result":{}}}'.format( | |
id, json.dumps(result) | |
) | |
except Exception as e: | |
raise RPCParseError(str(e)) | |
return res | |
def error(cls, id, code, data=None): | |
""" | |
Creates the string representation of an error that occured while processing a request with | |
*id*. *code* must lead to a registered :py:class:`RPCError`. *data* might contain | |
additional, detailed error information and is encoded by ``json.dumps`` when set. | |
""" | |
try: | |
cls.check_id(id) | |
cls.check_code(code) | |
except Exception as e: | |
raise RPCInvalidRequest(str(e)) | |
# build the inner error data | |
message = get_error(code).title | |
err_data = '{{"code":{},"message":"{}"'.format(code, message) | |
# insert data when given | |
if data is not None: | |
try: | |
err_data += ',"data":{}}}'.format(json.dumps(data)) | |
except Exception as e: | |
raise RPCParseError(str(e)) | |
else: | |
err_data += "}" | |
# encode string ids | |
if isinstance(id, str): | |
id = json.dumps(id) | |
# start building the error string | |
err = '{{"jsonrpc":"2.0","id":{},"error":{}}}'.format(id, err_data) | |
return err | |
class RPC(object): | |
""" | |
The main class of *jsonrpyc*. Instances of this class wrap an input stream *stdin* and an output | |
stream *stdout* in order to communicate with other services. A service is not even forced to be | |
written in Python as long as it strictly implements the JSON-RPC 2.0 specification. RPC | |
instances may wrap a *target* object. By means of a :py:class:`Watchdog` instance, incoming | |
requests are routed to methods of this object whose result might be sent back as a response. | |
The watchdog instance is created but not started yet, when *watch* is not *True*. | |
Example implementation: | |
*server.py* | |
.. code-block:: python | |
import jsonrpyc | |
class MyTarget(object): | |
def greet(self, name): | |
return f"Hi, {name}!" | |
jsonrpc.RPC(MyTarget()) | |
*client.py* | |
.. code-block:: python | |
import jsonrpyc | |
from subprocess import Popen, PIPE | |
p = Popen(["python", "server.py"], stdin=PIPE, stdout=PIPE) | |
rpc = jsonrpyc.RPC(stdout=p.stdin, stdin=p.stdout) | |
# non-blocking remote procedure call with callback and js-like signature | |
def cb(err, res=None): | |
if err: | |
throw err | |
print(f"callback got: {res}") | |
rpc("greet", args=("John",), callback=cb) | |
# cb is called asynchronously which prints | |
# => "callback got: Hi, John!" | |
# blocking remote procedure call with 0.1s polling | |
print(rpc("greet", args=("John",), block=0.1)) | |
# => "Hi, John!" | |
# shutdown the process | |
p.stdin.close() | |
p.stdout.close() | |
p.terminate() | |
p.wait() | |
.. py:attribute:: target | |
The wrapped target object. Might be *None* when no object is wrapped, e.g. for the *client* | |
RPC instance. | |
.. py:attribute:: stdin | |
The input stream, re-opened with ``"rb"``. | |
.. py:attribute:: stdout | |
The output stream, re-opened with ``"wb"``. | |
.. py:attribute:: watch | |
The :py:class:`Watchdog` instance that optionally watches *stdin* and dispatches incoming | |
requests. | |
""" | |
EMPTY_RESULT = object() | |
def __init__(self, handlers=None, stdin=None, stdout=None, watch=True, **kwargs): | |
super(RPC, self).__init__() | |
self.handlers = handlers | |
# open streams | |
stdin = sys.stdin if stdin is None else stdin | |
stdout = sys.stdout if stdout is None else stdout | |
self.stdin = io.open(stdin.fileno(), "rb") | |
self.stdout = io.open(stdout.fileno(), "wb") | |
# other attributes | |
self._i = 0 | |
self._callbacks = {} | |
self._results = {} | |
# create and optional start the watchdog | |
kwargs["start"] = watch | |
# kwargs.setdefault("daemon", handlers is None) | |
self.watchdog = Watchdog(self, **kwargs) | |
def __del__(self): | |
watchdog = getattr(self, "watchdog", None) | |
if watchdog: | |
watchdog.stop() | |
def __call__(self, *args, **kwargs): | |
""" | |
Shorthand for :py:meth:`call`. | |
""" | |
return self.call(*args, **kwargs) | |
def request( | |
self, | |
cmd, | |
args=(), | |
): | |
""" | |
Sends a request to the remote service and waits for the response | |
""" | |
q = Queue() | |
def cb(err, resp): | |
q.put((err, resp)) | |
self.call(cmd, args=args, callback=cb) | |
return q.get() | |
def call(self, method, args=(), kwargs=None, callback=None, block=0): | |
""" | |
Performs an actual remote procedure call by writing a request representation (a string) to | |
the output stream. The remote RPC instance uses *method* to route to the actual method to | |
call with *args* and *kwargs*. When *callback* is set, it will be called with the result of | |
the remote call. When *block* is larger than *0*, the calling thread is blocked until the | |
result is received. In this case, *block* will be the poll interval, emulating synchronuous | |
return value behavior. When both *callback* is *None* and *block* is *0* or smaller, the | |
request is considered a notification and the remote RPC instance will not send a response. | |
""" | |
# default kwargs | |
if kwargs is None: | |
kwargs = {} | |
# check if the call is a notification | |
is_notification = callback is None and block <= 0 | |
# create a new id for requests expecting a response | |
id = None | |
if not is_notification: | |
self._i += 1 | |
id = self._i | |
# register the callback | |
if callback is not None: | |
self._callbacks[id] = callback | |
# store an empty result for the meantime | |
if block > 0: | |
self._results[id] = self.EMPTY_RESULT | |
# create the request | |
req = Spec.request(method, id=id, params=args) | |
self._write(req) | |
# blocking return value behavior | |
if block > 0: | |
while True: | |
if self._results[id] != self.EMPTY_RESULT: | |
result = self._results[id] | |
del self._results[id] | |
if isinstance(result, Exception): | |
raise result | |
else: | |
return result | |
time.sleep(block) | |
def _handle(self, line): | |
""" | |
Handles an incoming *line* and dispatches the parsed object to the request, response, or | |
error handlers. | |
""" | |
obj = json.loads(line) | |
# dispatch to the correct handler | |
if "method" in obj: | |
# request | |
self._handle_request(obj) | |
elif "error" not in obj: | |
# response | |
self._handle_response(obj) | |
else: | |
# error | |
self._handle_error(obj) | |
def _handle_request(self, req): | |
""" | |
Handles an incoming request *req*. When it containes an id, a response or error is sent | |
back. | |
""" | |
logging.debug(f"Handling request to {req['method']}") | |
try: | |
method = self._route(req["method"]) | |
result = method(req["params"]) | |
if "id" in req: | |
res = Spec.response(req["id"], result) | |
self._write(res) | |
except Exception as e: | |
if "id" in req: | |
if isinstance(e, RPCError): | |
err = Spec.error(req["id"], e.code, e.data) | |
else: | |
err = Spec.error(req["id"], -32603, str(e)) | |
self._write(err) | |
def _handle_response(self, res): | |
""" | |
Handles an incoming successful response *res*. Blocking calls are resolved and registered | |
callbacks are invoked with the first error argument being set to *None*. | |
""" | |
logging.debug(f"Handling response for {res['id']}: {res}") | |
# set the result | |
if res["id"] in self._results: | |
self._results[res["id"]] = res["result"] | |
# lookup and invoke the callback | |
if res["id"] in self._callbacks: | |
callback = self._callbacks[res["id"]] | |
del self._callbacks[res["id"]] | |
callback(None, res["result"]) | |
def _handle_error(self, res): | |
""" | |
Handles an incoming failed response *res*. Blocking calls throw an exception and | |
registered callbacks are invoked with an exception and the second result argument set to | |
*None*. | |
""" | |
logging.debug(f"Handling error {res}") | |
# extract the error and create an actual error instance to raise | |
err = res["error"] | |
error = get_error(err["code"])(err.get("data", err["message"])) | |
# set the error | |
if res["id"] in self._results: | |
self._results[res["id"]] = error | |
# lookup and invoke the callback | |
if res["id"] in self._callbacks: | |
callback = self._callbacks[res["id"]] | |
del self._callbacks[res["id"]] | |
callback(error, None) | |
def _route(self, method): | |
if method in self.handlers.keys(): | |
return self.handlers[method] | |
else: | |
raise RPCMethodNotFound(data=method) | |
def _write(self, s): | |
""" | |
Writes a string *s* to the output stream. | |
""" | |
msg = f"Content-Length: {len(s)}\n\n{s}" | |
logging.debug("SENT: \n" + str(msg) + "\n\n") | |
self.stdout.write(bytearray(msg, "utf-8")) | |
self.stdout.flush() | |
class Watchdog(threading.Thread): | |
""" | |
This class represents a thread that watches the input stream of an :py:class:`RPC` instance for | |
incoming content and dispatches requests to it. | |
.. py:attribute:: rpc | |
The :py:class:`RPC` instance. | |
.. py:attribute:: name | |
The thread's name. | |
.. py:attribute:: interval | |
The polling interval of the run loop. | |
.. py:attribute:: daemon | |
The thread's daemon flag. | |
""" | |
def __init__(self, rpc, name="watchdog", interval=0.1, daemon=True, start=True): | |
super(Watchdog, self).__init__() | |
# store attributes | |
self.rpc = rpc | |
self.name = name | |
self.interval = interval | |
self.daemon = daemon | |
# register a stop event | |
self._stop = threading.Event() | |
if start: | |
self.start() | |
def start(self): | |
""" | |
Starts with thread's activity. | |
""" | |
super(Watchdog, self).start() | |
def stop(self): | |
""" | |
Stops with thread's activity. | |
""" | |
self._stop.set() | |
def run(self): | |
# reset the stop event | |
self._stop.clear() | |
# stop here when stdin is not set or closed | |
if not self.rpc.stdin or self.rpc.stdin.closed: | |
return | |
# read new incoming lines | |
last_pos = 0 | |
while not self._stop.is_set(): | |
lines = None | |
# stop when stdin is closed | |
if self.rpc.stdin.closed: | |
break | |
# read from stdin depending on whether it is a tty or not | |
if self.rpc.stdin.isatty(): | |
cur_pos = self.rpc.stdin.tell() | |
if cur_pos != last_pos: | |
self.rpc.stdin.seek(last_pos) | |
lines = self.rpc.stdin.readlines() | |
last_pos = self.rpc.stdin.tell() | |
self.rpc.stdin.seek(cur_pos) | |
else: | |
try: | |
header = self.rpc.stdin.readline() | |
header = header.decode("utf-8").strip() | |
if header.startswith("Content-Length:"): | |
length = 2 + int(header[len("Content-Length:") :]) | |
lines = [self.rpc.stdin.read(length)] | |
except Exception: | |
# prevent residual race conditions occurring when stdin is closed externally | |
pass | |
# handle new lines if any | |
if lines: | |
for line in lines: | |
line = line.decode("utf-8").strip() | |
if line: | |
self.rpc._handle(line) | |
else: | |
self._stop.wait(self.interval) | |
class RPCError(Exception): | |
""" | |
Base class for RPC errors. | |
.. py:attribute:: message | |
The message of this error, i.e., ``"<title> (<code>)[, data: <data>]"``. | |
.. py:attribute:: data | |
Additional data of this error. Setting the data attribute will also change the message | |
attribute. | |
""" | |
def __init__(self, data=None): | |
# build the error message | |
message = "{} ({})".format(self.title, self.code) | |
if data is not None: | |
message += ", data: {}".format(data) | |
self.message = message | |
super(RPCError, self).__init__(message) | |
self.data = data | |
def __str__(self): | |
return self.message | |
error_map_distinct = {} | |
error_map_range = {} | |
def is_range(code): | |
return ( | |
isinstance(code, tuple) | |
and len(code) == 2 | |
and all(isinstance(i, int) for i in code) | |
and code[0] < code[1] | |
) | |
def register_error(cls): | |
""" | |
Decorator that registers a new RPC error derived from :py:class:`RPCError`. The purpose of | |
error registration is to have a mapping of error codes/code ranges to error classes for faster | |
lookups during error creation. | |
.. code-block:: python | |
@register_error | |
class MyCustomRPCError(RPCError): | |
code = ... | |
title = "My custom error" | |
""" | |
# it would be much cleaner to add a meta class to RPCError as a registry for codes | |
# but in CPython 2 exceptions aren't types, so simply provide a registry mechanism here | |
if not issubclass(cls, RPCError): | |
raise TypeError("'{}' is not a subclass of RPCError".format(cls)) | |
code = cls.code | |
if isinstance(code, int): | |
error_map = error_map_distinct | |
elif is_range(code): | |
error_map = error_map_range | |
else: | |
raise TypeError("invalid RPC error code {}".format(code)) | |
if code in error_map: | |
raise AttributeError("duplicate RPC error code {}".format(code)) | |
error_map[code] = cls | |
return cls | |
def get_error(code): | |
""" | |
Returns the RPC error class that was previously registered to *code*. *None* is returned when no | |
class could be found. | |
""" | |
if code in error_map_distinct: | |
return error_map_distinct[code] | |
for (lower, upper), cls in error_map_range.items(): | |
if lower <= code <= upper: | |
return cls | |
return None | |
class RPCParseError(RPCError): | |
code = -32700 | |
title = "Parse error" | |
class RPCInvalidRequest(RPCError): | |
code = -32600 | |
title = "Invalid Request" | |
class RPCMethodNotFound(RPCError): | |
code = -32601 | |
title = "Method not found" | |
class RPCInvalidParams(RPCError): | |
code = -32602 | |
title = "Invalid params" | |
class RPCInternalError(RPCError): | |
code = -32603 | |
title = "Internal error" | |
class RPCServerError(RPCError): | |
code = (-32099, -32000) | |
title = "Server error" | |