--- /dev/null
+
+# Originally taken from:
+# http://code.activestate.com/recipes/552751/
+# thanks to david decotigny
+
+# Heavily based on the XML-RPC implementation in python.
+# Based on the json-rpc specs: http://json-rpc.org/wiki/specification
+# The main deviation is on the error treatment. The official spec
+# would set the 'error' attribute to a string. This implementation
+# sets it to a dictionary with keys: message/traceback/type
+
+import cjson
+import SocketServer
+import SimpleAppHTTPServer
+#import BaseHTTPServer
+import sys
+import traceback
+try:
+ import fcntl
+except ImportError:
+ fcntl = None
+
+
+###
+### Server code
+###
+import SimpleXMLRPCServer
+
+
+class SimpleJSONRPCRequestHandler(SimpleAppHTTPServer.SimpleAppHTTPRequestHandler):
+ """Simple JSONRPC request handler class and HTTP GET Server
+
+ Handles all HTTP POST requests and attempts to decode them as
+ JSONRPC requests.
+
+ Handles all HTTP GET requests and serves the content from the
+ current directory.
+
+ """
+
+ # Class attribute listing the accessible path components;
+ # paths not on this list will result in a 404 error.
+ rpc_paths = ('/', '/JSON')
+
+ def __init__(self):
+
+ self.funcs = {}
+
+ def is_rpc_path_valid(self):
+ return True
+ if self.rpc_paths:
+ return self.path in self.rpc_paths
+ else:
+ # If .rpc_paths is empty, just assume all paths are legal
+ return True
+
+ def onPOST(self, client, *args):
+ """Handles the HTTP POST request.
+
+ Attempts to interpret all HTTP POST requests as XML-RPC calls,
+ which are forwarded to the server's _dispatch method for handling.
+ """
+ print "onPost", client, args
+ self.client = client
+ self.hr = args[0]
+
+ # Check that the path is legal
+ if not self.is_rpc_path_valid():
+ self.report_404()
+ return
+
+ print "about to read data"
+ try:
+ # Get arguments by reading body of request.
+ # We read this in chunks to avoid straining
+ # socket.read(); around the 10 or 15Mb mark, some platforms
+ # begin to have problems (bug #792570).
+ max_chunk_size = 10*1024*1024
+ size_remaining = int(self.hr.headers["content-length"])
+ L = []
+ print "size_remaining", size_remaining
+ while size_remaining:
+ chunk_size = min(size_remaining, max_chunk_size)
+ data = self.hr.rfile.read(chunk_size)
+ L.append(data)
+ size_remaining -= len(L[-1])
+ data = ''.join(L)
+
+ # In previous versions of SimpleXMLRPCServer, _dispatch
+ # could be overridden in this class, instead of in
+ # SimpleXMLRPCDispatcher. To maintain backwards compatibility,
+ # check to see if a subclass implements _dispatch and dispatch
+ # using that method if present.
+ response = self._marshaled_dispatch(
+ data, getattr(self, '_dispatch', None)
+ )
+ except: # This should only happen if the module is buggy
+ # internal error, report as HTTP server error
+ self.hr.send_response(500)
+ self.hr.end_headers()
+ else:
+ # got a valid JSONRPC response
+ self.hr.send_response(200)
+ self.hr.send_header("Content-type", "text/x-json")
+ self.hr.send_header("Content-length", str(len(response)))
+ self.hr.end_headers()
+ self.hr.wfile.write(response)
+
+ # shut down the connection
+ #self.wfile.flush()
+ #self.connection.shutdown(1)
+
+ def report_404 (self):
+ # Report a 404 error
+ self.hr.send_response(404)
+ response = 'No such page'
+ self.hr.send_header("Content-type", "text/plain")
+ self.hr.send_header("Content-length", str(len(response)))
+ self.hr.end_headers()
+ self.hr.wfile.write(response)
+ # shut down the connection
+ self.hr.wfile.flush()
+ self.hr.connection.shutdown(1)
+
+ def register_function(self, function, name = None):
+ """Registers a function to respond to XML-RPC requests.
+
+ The optional name argument can be used to set a Unicode name
+ for the function.
+ """
+
+ if name is None:
+ name = function.__name__
+ self.funcs[name] = function
+
+
+ def _marshaled_dispatch(self, data, dispatch_method = None):
+ id = None
+ try:
+ req = cjson.decode(data)
+ method = req['method']
+ params = req['params']
+ id = req['id']
+
+ if dispatch_method is not None:
+ result = dispatch_method(method, params)
+ else:
+ result = self._dispatch(method, params)
+ response = dict(id=id, result=result, error=None)
+ except:
+ extpe, exv, extrc = sys.exc_info()
+ err = dict(type=str(extpe),
+ message=str(exv),
+ traceback=''.join(traceback.format_tb(extrc)))
+ response = dict(id=id, result=None, error=err)
+ try:
+ return cjson.encode(response)
+ except:
+ extpe, exv, extrc = sys.exc_info()
+ err = dict(type=str(extpe),
+ message=str(exv),
+ traceback=''.join(traceback.format_tb(extrc)))
+ response = dict(id=id, result=None, error=err)
+ return cjson.encode(response)
+
+ def _dispatch(self, method, params):
+ """Dispatches the XML-RPC method.
+
+ XML-RPC calls are forwarded to a registered function that
+ matches the called XML-RPC method name. If no such function
+ exists then the call is forwarded to the registered instance,
+ if available.
+
+ If the registered instance has a _dispatch method then that
+ method will be called with the name of the XML-RPC method and
+ its parameters as a tuple
+ e.g. instance._dispatch('add',(2,3))
+
+ If the registered instance does not have a _dispatch method
+ then the instance will be searched to find a matching method
+ and, if found, will be called.
+
+ Methods beginning with an '_' are considered private and will
+ not be called.
+ """
+
+ func = self.funcs.get(method, None)
+
+ if func is not None:
+ print "params", params
+ return func(*params)
+ else:
+ raise Exception('method "%s" is not supported' % method)
+
+
+ #def log_request(self, code='-', size='-'):
+ # """Selectively log an accepted request."""
+
+ # if self.server.logRequests:
+ # BaseHTTPServer.BaseHTTPRequestHandler.log_request(self, code, size)
+
+
+
+class SimpleJSONRPCServer:
+ """Simple JSON-RPC server.
+
+ Simple JSON-RPC server that allows functions and a single instance
+ to be installed to handle requests. The default implementation
+ attempts to dispatch JSON-RPC calls to the functions or instance
+ installed in the server. Override the _dispatch method inhereted
+ from SimpleJSONRPCDispatcher to change this behavior.
+ """
+
+ allow_reuse_address = True
+
+ def __init__(self, addr, requestHandler=SimpleJSONRPCRequestHandler,
+ logRequests=True):
+ self.logRequests = logRequests
+
+ # [Bug #1222790] If possible, set close-on-exec flag; if a
+ # method spawns a subprocess, the subprocess shouldn't have
+ # the listening socket open.
+ if fcntl is not None and hasattr(fcntl, 'FD_CLOEXEC'):
+ flags = fcntl.fcntl(self.fileno(), fcntl.F_GETFD)
+ flags |= fcntl.FD_CLOEXEC
+ fcntl.fcntl(self.fileno(), fcntl.F_SETFD, flags)
+
+
+###
+### Client code
+###
+import xmlrpclib
+
+class ResponseError(xmlrpclib.ResponseError):
+ pass
+class Fault(xmlrpclib.ResponseError):
+ pass
+
+def _get_response(file, sock):
+ data = ""
+ while 1:
+ if sock:
+ response = sock.recv(1024)
+ else:
+ response = file.read(1024)
+ if not response:
+ break
+ data += response
+
+ file.close()
+
+ return data
+
+class Transport(xmlrpclib.Transport):
+ def _parse_response(self, file, sock):
+ return _get_response(file, sock)
+
+class SafeTransport(xmlrpclib.SafeTransport):
+ def _parse_response(self, file, sock):
+ return _get_response(file, sock)
+
+class ServerProxy:
+ def __init__(self, uri, id=None, transport=None, use_datetime=0):
+ # establish a "logical" server connection
+
+ # get the url
+ import urllib
+ type, uri = urllib.splittype(uri)
+ if type not in ("http", "https"):
+ raise IOError, "unsupported JSON-RPC protocol"
+ self.__host, self.__handler = urllib.splithost(uri)
+ if not self.__handler:
+ self.__handler = "/JSON"
+
+ if transport is None:
+ if type == "https":
+ transport = SafeTransport(use_datetime=use_datetime)
+ else:
+ transport = Transport(use_datetime=use_datetime)
+
+ self.__transport = transport
+ self.__id = id
+
+ def __request(self, methodname, params):
+ # call a method on the remote server
+
+ request = cjson.encode(dict(id=self.__id, method=methodname,
+ params=params))
+
+ data = self.__transport.request(
+ self.__host,
+ self.__handler,
+ request,
+ verbose=False
+ )
+
+ response = cjson.decode(data)
+
+ if response["id"] != self.__id:
+ raise ResponseError("Invalid request id (is: %s, expected: %s)" \
+ % (response["id"], self.__id))
+ if response["error"] is not None:
+ raise Fault("JSON Error", response["error"])
+ return response["result"]
+
+ def __repr__(self):
+ return (
+ "<ServerProxy for %s%s>" %
+ (self.__host, self.__handler)
+ )
+
+ __str__ = __repr__
+
+ def __getattr__(self, name):
+ # magic method dispatcher
+ return xmlrpclib._Method(self.__request, name)
+
+
+def jsonremote(service):
+ """Make JSONRPCService a decorator so that you can write :
+
+ chatservice = SimpleJSONRPCServer()
+
+ @jsonremote(chatservice, 'login')
+ def login(request, user_name):
+ (...)
+ """
+ def remotify(func):
+ if isinstance(service, SimpleJSONRPCServer):
+ service.register_function(func, func.__name__)
+ else:
+ emsg = 'Service "%s" not found' % str(service.__name__)
+ raise NotImplementedError, emsg
+ return func
+ return remotify
+
+
+if __name__ == '__main__':
+ if not len(sys.argv) > 1:
+ import socket
+ print 'Running JSON-RPC server on port 8000'
+ server = SimpleJSONRPCServer(("localhost", 8000))
+ server.register_function(pow)
+ server.register_function(lambda x,y: x+y, 'add')
+ server.register_function(lambda x: x, 'echo')
+ server.serve_forever()
+ else:
+ remote = ServerProxy(sys.argv[1])
+ print 'Using connection', remote
+
+ print repr(remote.add(1, 2))
+ aaa = remote.add
+ print repr(remote.pow(2, 4))
+ print aaa(5, 6)
+
+ try:
+ # Invalid parameters
+ aaa(5, "toto")
+ print "Successful execution of invalid code"
+ except Fault:
+ pass
+
+ try:
+ # Invalid parameters
+ aaa(5, 6, 7)
+ print "Successful execution of invalid code"
+ except Fault:
+ pass
+
+ try:
+ # Invalid method name
+ print repr(remote.powx(2, 4))
+ print "Successful execution of invalid code"
+ except Fault:
+ pass
from cookielib import parse_ns_headers, CookieJar
from Cookie import SimpleCookie
+global _debug
_debug = False
+def set_debug(dbg):
+ global _debug
+ _debug = dbg
class MultitaskHTTPRequestHandler(BaseHTTPRequestHandler):
self.bytesWritten = self.bytesRead = 0
def close(self):
- print "sock close"
- fno = self.sock.fileno()
- self.sock.close()
- print "writable?", self.sock, fno
- yield multitask.writable(fno, timeout=0.1)
- print "is it?"
- del self.sock
+ self.sock.shutdown(1) # can't just close it.
self.sock = None
def readline(self):
def parseRequests(self):
'''Parses complete messages until connection closed. Raises ConnectionLost exception.'''
- self.hr = MultitaskHTTPRequestHandler(self.stream, ("",0,), self.remote)
+ self.hr = MultitaskHTTPRequestHandler(self.stream, self.remote, None)
self.hr.close_connection = 1
self.cookies = CookieJar()
msg.response_cookies['session']['path'] = '/'
msg.response_cookies['session']['domain'] = self.remote[0]
msg.response_cookies['session']['version'] = 0
+
+ if msg.headers.has_key('content-length'):
+ max_chunk_size = 10*1024*1024
+ size_remaining = int(msg.headers["content-length"])
+ L = []
+ while size_remaining:
+ chunk_size = min(size_remaining, max_chunk_size)
+ data = (yield self.stream.read(chunk_size))
+ L.append(data)
+ size_remaining -= len(L[-1])
+
+ pos = msg.rfile.tell()
+ msg.rfile.write(''.join(L))
+ msg.rfile.seek(pos)
+
yield self.server.queue.put((self, msg)) # new connection
def accept(self):
yield self.queue.put((None, None))
self.queue = None
-class App(SimpleAppHTTPRequestHandler):
+class BaseApp(object):
'''An application instance containing any number of streams. Except for constructor all methods are generators.'''
count = 0
def __init__(self):
def clients(self):
'''everytime this property is accessed it returns a new list of clients connected to this instance.'''
return self._clients[1:] if self._clients is not None else []
- def onConnect(self, client, *args):
- if _debug: print self.name, 'onConnect', client.path
- return True
- def onDisconnect(self, client):
- if _debug: print self.name, 'onDisconnect', client.path
-class HttpServer(object):
+class App(BaseApp, SimpleAppHTTPRequestHandler):
+ pass
+
+class HTTPServer(object):
'''A RTMP server to record and stream Flash video.'''
def __init__(self):
'''Construct a new HttpServer. It initializes the local members.'''
# TODO: we should reject non-localhost client connections.
if not client: # if the server aborted abnormally,
break # hence close the listener.
- if _debug: print 'client connection received', client, args
+ if _debug: print 'client connection received', client, msg
# if client.objectEncoding != 0 and client.objectEncoding != 3:
if client.objectEncoding != 0:
yield client.rejectConnection(reason='Unsupported encoding ' + str(client.objectEncoding) + '. Please use NetConnection.defaultObjectEncoding=ObjectEncoding.AMF0')
else:
print "cookies", str(msg.response_cookies)
session = msg.response_cookies['session'].value
- name, ignore, scope = msg.path.partition('/')
+ name = msg.path
+ print "serverlistener", name
if '*' not in self.apps and name not in self.apps:
yield client.rejectConnection(reason='Application not found: ' + name)
else: # create application instance as needed and add in our list
if _debug: print 'name=', name, 'name in apps', str(name in self.apps)
app = self.apps[name] if name in self.apps else self.apps['*'] # application class
print "clients", self.clients.keys()
- if session in self.clients: inst = self.clients[session][0]
- else: inst = app()
+ if session in self.clients:
+ inst = self.clients[session][0]
+ else:
+ inst = app()
+ msg.server = inst # whew! just in time!
try:
methodname = "on%s" % msg.command
- method = getattr(inst, methodname, None)
+ print methodname, dir(inst)
+ method = getattr(inst, methodname, None)
result = method(client, msg)
close_connection = msg.close_connection
print "close connection", close_connection
_debug = options.verbose
try:
if _debug: print time.asctime(), 'Options - %s:%d' % (options.host, options.port)
- agent = HttpServer()
+ agent = HTTPServer()
agent.root = options.root
agent.start(options.host, options.port)
if _debug: print time.asctime(), 'Flash Server Starts - %s:%d' % (options.host, options.port)