From a6f4093cc54459b8d0f53044d4a559782fa58cb5 Mon Sep 17 00:00:00 2001 From: Luke Kenneth Casson Leighton Date: Mon, 12 Jul 2010 23:29:56 +0100 Subject: [PATCH] add jsonrpc service and test --- SimpleJSONRPCServer.py | 375 +++++++++++++++++++++++++++++++++++++++++ httpd.py | 58 ++++--- jsonapp.py | 23 +++ testjsonrpc.py | 13 ++ 4 files changed, 448 insertions(+), 21 deletions(-) create mode 100644 SimpleJSONRPCServer.py create mode 100644 jsonapp.py create mode 100644 testjsonrpc.py diff --git a/SimpleJSONRPCServer.py b/SimpleJSONRPCServer.py new file mode 100644 index 0000000..987503d --- /dev/null +++ b/SimpleJSONRPCServer.py @@ -0,0 +1,375 @@ + +# Originally taken from: +# http://code.activestate.com/recipes/552751/ +# thanks to david decotigny + +# Heavily based on the XML-RPC implementation in python. +# Based on the json-rpc specs: http://json-rpc.org/wiki/specification +# The main deviation is on the error treatment. The official spec +# would set the 'error' attribute to a string. This implementation +# sets it to a dictionary with keys: message/traceback/type + +import cjson +import SocketServer +import SimpleAppHTTPServer +#import BaseHTTPServer +import sys +import traceback +try: + import fcntl +except ImportError: + fcntl = None + + +### +### Server code +### +import SimpleXMLRPCServer + + +class SimpleJSONRPCRequestHandler(SimpleAppHTTPServer.SimpleAppHTTPRequestHandler): + """Simple JSONRPC request handler class and HTTP GET Server + + Handles all HTTP POST requests and attempts to decode them as + JSONRPC requests. + + Handles all HTTP GET requests and serves the content from the + current directory. + + """ + + # Class attribute listing the accessible path components; + # paths not on this list will result in a 404 error. + rpc_paths = ('/', '/JSON') + + def __init__(self): + + self.funcs = {} + + def is_rpc_path_valid(self): + return True + if self.rpc_paths: + return self.path in self.rpc_paths + else: + # If .rpc_paths is empty, just assume all paths are legal + return True + + def onPOST(self, client, *args): + """Handles the HTTP POST request. + + Attempts to interpret all HTTP POST requests as XML-RPC calls, + which are forwarded to the server's _dispatch method for handling. + """ + print "onPost", client, args + self.client = client + self.hr = args[0] + + # Check that the path is legal + if not self.is_rpc_path_valid(): + self.report_404() + return + + print "about to read data" + try: + # Get arguments by reading body of request. + # We read this in chunks to avoid straining + # socket.read(); around the 10 or 15Mb mark, some platforms + # begin to have problems (bug #792570). + max_chunk_size = 10*1024*1024 + size_remaining = int(self.hr.headers["content-length"]) + L = [] + print "size_remaining", size_remaining + while size_remaining: + chunk_size = min(size_remaining, max_chunk_size) + data = self.hr.rfile.read(chunk_size) + L.append(data) + size_remaining -= len(L[-1]) + data = ''.join(L) + + # In previous versions of SimpleXMLRPCServer, _dispatch + # could be overridden in this class, instead of in + # SimpleXMLRPCDispatcher. To maintain backwards compatibility, + # check to see if a subclass implements _dispatch and dispatch + # using that method if present. + response = self._marshaled_dispatch( + data, getattr(self, '_dispatch', None) + ) + except: # This should only happen if the module is buggy + # internal error, report as HTTP server error + self.hr.send_response(500) + self.hr.end_headers() + else: + # got a valid JSONRPC response + self.hr.send_response(200) + self.hr.send_header("Content-type", "text/x-json") + self.hr.send_header("Content-length", str(len(response))) + self.hr.end_headers() + self.hr.wfile.write(response) + + # shut down the connection + #self.wfile.flush() + #self.connection.shutdown(1) + + def report_404 (self): + # Report a 404 error + self.hr.send_response(404) + response = 'No such page' + self.hr.send_header("Content-type", "text/plain") + self.hr.send_header("Content-length", str(len(response))) + self.hr.end_headers() + self.hr.wfile.write(response) + # shut down the connection + self.hr.wfile.flush() + self.hr.connection.shutdown(1) + + def register_function(self, function, name = None): + """Registers a function to respond to XML-RPC requests. + + The optional name argument can be used to set a Unicode name + for the function. + """ + + if name is None: + name = function.__name__ + self.funcs[name] = function + + + def _marshaled_dispatch(self, data, dispatch_method = None): + id = None + try: + req = cjson.decode(data) + method = req['method'] + params = req['params'] + id = req['id'] + + if dispatch_method is not None: + result = dispatch_method(method, params) + else: + result = self._dispatch(method, params) + response = dict(id=id, result=result, error=None) + except: + extpe, exv, extrc = sys.exc_info() + err = dict(type=str(extpe), + message=str(exv), + traceback=''.join(traceback.format_tb(extrc))) + response = dict(id=id, result=None, error=err) + try: + return cjson.encode(response) + except: + extpe, exv, extrc = sys.exc_info() + err = dict(type=str(extpe), + message=str(exv), + traceback=''.join(traceback.format_tb(extrc))) + response = dict(id=id, result=None, error=err) + return cjson.encode(response) + + def _dispatch(self, method, params): + """Dispatches the XML-RPC method. + + XML-RPC calls are forwarded to a registered function that + matches the called XML-RPC method name. If no such function + exists then the call is forwarded to the registered instance, + if available. + + If the registered instance has a _dispatch method then that + method will be called with the name of the XML-RPC method and + its parameters as a tuple + e.g. instance._dispatch('add',(2,3)) + + If the registered instance does not have a _dispatch method + then the instance will be searched to find a matching method + and, if found, will be called. + + Methods beginning with an '_' are considered private and will + not be called. + """ + + func = self.funcs.get(method, None) + + if func is not None: + print "params", params + return func(*params) + else: + raise Exception('method "%s" is not supported' % method) + + + #def log_request(self, code='-', size='-'): + # """Selectively log an accepted request.""" + + # if self.server.logRequests: + # BaseHTTPServer.BaseHTTPRequestHandler.log_request(self, code, size) + + + +class SimpleJSONRPCServer: + """Simple JSON-RPC server. + + Simple JSON-RPC server that allows functions and a single instance + to be installed to handle requests. The default implementation + attempts to dispatch JSON-RPC calls to the functions or instance + installed in the server. Override the _dispatch method inhereted + from SimpleJSONRPCDispatcher to change this behavior. + """ + + allow_reuse_address = True + + def __init__(self, addr, requestHandler=SimpleJSONRPCRequestHandler, + logRequests=True): + self.logRequests = logRequests + + # [Bug #1222790] If possible, set close-on-exec flag; if a + # method spawns a subprocess, the subprocess shouldn't have + # the listening socket open. + if fcntl is not None and hasattr(fcntl, 'FD_CLOEXEC'): + flags = fcntl.fcntl(self.fileno(), fcntl.F_GETFD) + flags |= fcntl.FD_CLOEXEC + fcntl.fcntl(self.fileno(), fcntl.F_SETFD, flags) + + +### +### Client code +### +import xmlrpclib + +class ResponseError(xmlrpclib.ResponseError): + pass +class Fault(xmlrpclib.ResponseError): + pass + +def _get_response(file, sock): + data = "" + while 1: + if sock: + response = sock.recv(1024) + else: + response = file.read(1024) + if not response: + break + data += response + + file.close() + + return data + +class Transport(xmlrpclib.Transport): + def _parse_response(self, file, sock): + return _get_response(file, sock) + +class SafeTransport(xmlrpclib.SafeTransport): + def _parse_response(self, file, sock): + return _get_response(file, sock) + +class ServerProxy: + def __init__(self, uri, id=None, transport=None, use_datetime=0): + # establish a "logical" server connection + + # get the url + import urllib + type, uri = urllib.splittype(uri) + if type not in ("http", "https"): + raise IOError, "unsupported JSON-RPC protocol" + self.__host, self.__handler = urllib.splithost(uri) + if not self.__handler: + self.__handler = "/JSON" + + if transport is None: + if type == "https": + transport = SafeTransport(use_datetime=use_datetime) + else: + transport = Transport(use_datetime=use_datetime) + + self.__transport = transport + self.__id = id + + def __request(self, methodname, params): + # call a method on the remote server + + request = cjson.encode(dict(id=self.__id, method=methodname, + params=params)) + + data = self.__transport.request( + self.__host, + self.__handler, + request, + verbose=False + ) + + response = cjson.decode(data) + + if response["id"] != self.__id: + raise ResponseError("Invalid request id (is: %s, expected: %s)" \ + % (response["id"], self.__id)) + if response["error"] is not None: + raise Fault("JSON Error", response["error"]) + return response["result"] + + def __repr__(self): + return ( + "" % + (self.__host, self.__handler) + ) + + __str__ = __repr__ + + def __getattr__(self, name): + # magic method dispatcher + return xmlrpclib._Method(self.__request, name) + + +def jsonremote(service): + """Make JSONRPCService a decorator so that you can write : + + chatservice = SimpleJSONRPCServer() + + @jsonremote(chatservice, 'login') + def login(request, user_name): + (...) + """ + def remotify(func): + if isinstance(service, SimpleJSONRPCServer): + service.register_function(func, func.__name__) + else: + emsg = 'Service "%s" not found' % str(service.__name__) + raise NotImplementedError, emsg + return func + return remotify + + +if __name__ == '__main__': + if not len(sys.argv) > 1: + import socket + print 'Running JSON-RPC server on port 8000' + server = SimpleJSONRPCServer(("localhost", 8000)) + server.register_function(pow) + server.register_function(lambda x,y: x+y, 'add') + server.register_function(lambda x: x, 'echo') + server.serve_forever() + else: + remote = ServerProxy(sys.argv[1]) + print 'Using connection', remote + + print repr(remote.add(1, 2)) + aaa = remote.add + print repr(remote.pow(2, 4)) + print aaa(5, 6) + + try: + # Invalid parameters + aaa(5, "toto") + print "Successful execution of invalid code" + except Fault: + pass + + try: + # Invalid parameters + aaa(5, 6, 7) + print "Successful execution of invalid code" + except Fault: + pass + + try: + # Invalid method name + print repr(remote.powx(2, 4)) + print "Successful execution of invalid code" + except Fault: + pass diff --git a/httpd.py b/httpd.py index e85058e..6537ee0 100644 --- a/httpd.py +++ b/httpd.py @@ -71,7 +71,11 @@ from cStringIO import StringIO from cookielib import parse_ns_headers, CookieJar from Cookie import SimpleCookie +global _debug _debug = False +def set_debug(dbg): + global _debug + _debug = dbg class MultitaskHTTPRequestHandler(BaseHTTPRequestHandler): @@ -107,13 +111,7 @@ class SockStream(object): self.bytesWritten = self.bytesRead = 0 def close(self): - print "sock close" - fno = self.sock.fileno() - self.sock.close() - print "writable?", self.sock, fno - yield multitask.writable(fno, timeout=0.1) - print "is it?" - del self.sock + self.sock.shutdown(1) # can't just close it. self.sock = None def readline(self): @@ -261,7 +259,7 @@ class Protocol(object): def parseRequests(self): '''Parses complete messages until connection closed. Raises ConnectionLost exception.''' - self.hr = MultitaskHTTPRequestHandler(self.stream, ("",0,), self.remote) + self.hr = MultitaskHTTPRequestHandler(self.stream, self.remote, None) self.hr.close_connection = 1 self.cookies = CookieJar() @@ -494,6 +492,21 @@ class Client(Protocol): msg.response_cookies['session']['path'] = '/' msg.response_cookies['session']['domain'] = self.remote[0] msg.response_cookies['session']['version'] = 0 + + if msg.headers.has_key('content-length'): + max_chunk_size = 10*1024*1024 + size_remaining = int(msg.headers["content-length"]) + L = [] + while size_remaining: + chunk_size = min(size_remaining, max_chunk_size) + data = (yield self.stream.read(chunk_size)) + L.append(data) + size_remaining -= len(L[-1]) + + pos = msg.rfile.tell() + msg.rfile.write(''.join(L)) + msg.rfile.seek(pos) + yield self.server.queue.put((self, msg)) # new connection def accept(self): @@ -567,7 +580,7 @@ class Server(object): yield self.queue.put((None, None)) self.queue = None -class App(SimpleAppHTTPRequestHandler): +class BaseApp(object): '''An application instance containing any number of streams. Except for constructor all methods are generators.''' count = 0 def __init__(self): @@ -580,13 +593,11 @@ class App(SimpleAppHTTPRequestHandler): def clients(self): '''everytime this property is accessed it returns a new list of clients connected to this instance.''' return self._clients[1:] if self._clients is not None else [] - def onConnect(self, client, *args): - if _debug: print self.name, 'onConnect', client.path - return True - def onDisconnect(self, client): - if _debug: print self.name, 'onDisconnect', client.path -class HttpServer(object): +class App(BaseApp, SimpleAppHTTPRequestHandler): + pass + +class HTTPServer(object): '''A RTMP server to record and stream Flash video.''' def __init__(self): '''Construct a new HttpServer. It initializes the local members.''' @@ -622,7 +633,7 @@ class HttpServer(object): # TODO: we should reject non-localhost client connections. if not client: # if the server aborted abnormally, break # hence close the listener. - if _debug: print 'client connection received', client, args + if _debug: print 'client connection received', client, msg # if client.objectEncoding != 0 and client.objectEncoding != 3: if client.objectEncoding != 0: yield client.rejectConnection(reason='Unsupported encoding ' + str(client.objectEncoding) + '. Please use NetConnection.defaultObjectEncoding=ObjectEncoding.AMF0') @@ -630,18 +641,23 @@ class HttpServer(object): else: print "cookies", str(msg.response_cookies) session = msg.response_cookies['session'].value - name, ignore, scope = msg.path.partition('/') + name = msg.path + print "serverlistener", name if '*' not in self.apps and name not in self.apps: yield client.rejectConnection(reason='Application not found: ' + name) else: # create application instance as needed and add in our list if _debug: print 'name=', name, 'name in apps', str(name in self.apps) app = self.apps[name] if name in self.apps else self.apps['*'] # application class print "clients", self.clients.keys() - if session in self.clients: inst = self.clients[session][0] - else: inst = app() + if session in self.clients: + inst = self.clients[session][0] + else: + inst = app() + msg.server = inst # whew! just in time! try: methodname = "on%s" % msg.command - method = getattr(inst, methodname, None) + print methodname, dir(inst) + method = getattr(inst, methodname, None) result = method(client, msg) close_connection = msg.close_connection print "close connection", close_connection @@ -846,7 +862,7 @@ if __name__ == '__main__': _debug = options.verbose try: if _debug: print time.asctime(), 'Options - %s:%d' % (options.host, options.port) - agent = HttpServer() + agent = HTTPServer() agent.root = options.root agent.start(options.host, options.port) if _debug: print time.asctime(), 'Flash Server Starts - %s:%d' % (options.host, options.port) diff --git a/jsonapp.py b/jsonapp.py new file mode 100644 index 0000000..92f13e1 --- /dev/null +++ b/jsonapp.py @@ -0,0 +1,23 @@ +import multitask +import httpd +from SimpleJSONRPCServer import SimpleJSONRPCRequestHandler +from httpd import HTTPServer, App, BaseApp + +class MyApp(SimpleJSONRPCRequestHandler, BaseApp): + '''An application instance containing any number of streams. Except for constructor all methods are generators.''' + count = 0 + def __init__(self): + BaseApp.__init__(self) + SimpleJSONRPCRequestHandler.__init__(self) + + self.register_function(self.echo) + + def echo(self, parm): + return parm + +httpd.set_debug(True) +agent = HTTPServer() +agent.apps = dict({'/json': MyApp, '*': App}) +agent.start() +multitask.run() + diff --git a/testjsonrpc.py b/testjsonrpc.py new file mode 100644 index 0000000..47eae40 --- /dev/null +++ b/testjsonrpc.py @@ -0,0 +1,13 @@ +import unittest +import jsonrpclib + +class TestJsolait(unittest.TestCase): + def test_echo(self): + s = jsonrpclib.ServerProxy("http://127.0.0.1:8080/json", verbose=0) + reply = s.echo("hello") + print reply + #self.assert_(reply["result"] == "foo bar") + + +if __name__=="__main__": + unittest.main() -- 2.30.2