def __init__(self, sock):
self.sock, self.buffer = sock, ''
self.bytesWritten = self.bytesRead = 0
-
+
def close(self):
self.sock.shutdown(1) # can't just close it.
self.sock = None
-
+
def readline(self):
try:
while True:
self.buffer += data
except StopIteration: raise
except: raise ConnectionClosed # anything else is treated as connection closed.
-
+
def read(self, count):
try:
while True:
self.buffer += data
except StopIteration: raise
except: raise ConnectionClosed # anything else is treated as connection closed.
-
+
def unread(self, data):
self.buffer = data + self.buffer
-
+
def write(self, data):
while len(data) > 0: # write in 4K chunks each time
chunk, data = data[:4096], data[4096:]
class Protocol(object):
-
+
def __init__(self, sock):
self.stream = SockStream(sock)
self.writeLock = threading.Lock()
self.writeQueue = Queue.Queue()
-
+
def messageReceived(self, msg):
yield
-
+
def connectionClosed(self):
yield
-
+
def parse(self):
try:
yield self.parseRequests() # parse http requests
#yield self.connectionClosed()
if _debug: print 'parse connection closed'
#yield self.server.queue.put((self, None)) # close connection
-
+
def removeConnection(self):
yield self.server.queue.put((self, None)) # close connection
except:
print traceback.print_exc()
#self.writeQueue.put(message)
-
+
def parseRequests(self):
'''Parses complete messages until connection closed. Raises ConnectionLost exception.'''
raw_requestline = (yield self.stream.readline())
if _debug: print "parseRequests, line", raw_requestline
if not raw_requestline:
- raise ConnectionClosed
+ raise ConnectionClosed
data = ''
try:
while 1:
print 'parseRequests', \
(traceback and traceback.print_exc() or None)
- raise ConnectionClosed
+ raise ConnectionClosed
self.hr.raw_requestline = raw_requestline
#pos = self.hr.rfile.tell()
print "parseRequests seek after"
if not self.hr.parse_request():
- raise ConnectionClosed
+ raise ConnectionClosed
if _debug:
print "parseRequests parse_req after"
print "parseRequest headers", repr(self.hr.headers), str(self.hr.headers)
if _debug:
print 'parseRequests, ConnectionClosed '
raise StopIteration
-
+
except:
if _debug:
print 'messageReceived', \
while self.writeQueue.empty(): (yield multitask.sleep(0.01))
data = self.writeQueue.get() # TODO this should be used using multitask.Queue and remove previous wait.
if _debug: print "write to stream", repr(data)
- if data is None:
+ if data is None:
# just in case TCP socket is not closed, close it.
try:
if _debug:
print "stream closed"
except: pass
break
-
+
try:
yield self.stream.write(data)
except ConnectionClosed:
class Stream(object):
'''The stream object that is used for RTMP stream.'''
- count = 0;
+ count = 0
def __init__(self, client):
self.client, self.id, self.name = client, 0, ''
self.recordfile = self.playfile = None # so that it doesn't complain about missing attribute
self.queue = multitask.Queue()
self._name = 'Stream[' + str(Stream.count) + ']'; Stream.count += 1
if _debug: print self, 'created'
-
+
def close(self):
if _debug: print self, 'closing'
if self.recordfile is not None: self.recordfile.close(); self.recordfile = None
if self.playfile is not None: self.playfile.close(); self.playfile = None
self.client = None # to clear the reference
pass
-
+
def __repr__(self):
- return self._name;
-
+ return self._name
+
def recv(self):
'''Generator to receive new Message on this stream, or None if stream is closed.'''
return self.queue.get()
-
- def send(self, msg):
- '''Method to send a Message or Command on this stream.'''
- if isinstance(msg, Command):
- msg = msg.toMessage()
- msg.streamId = self.id
- # if _debug: print self,'send'
- if self.client is not None: self.client.writeMessage(msg)
-
+
+
class Client(Protocol):
'''The client object represents a single connected client to the server.'''
def __init__(self, sock, server, remote):
Protocol.__init__(self, sock)
self.server = server
self.remote = remote
- self.agent = {}
self.streams = {}
- self._nextCallId = 2
- self._nextStreamId = 1
- self.objectEncoding = 0.0
self.queue = multitask.Queue() # receive queue used by application
multitask.add(self.parse())
#multitask.add(self.write())
def recv(self):
'''Generator to receive new Message (msg, arg) on this stream, or (None,None) if stream is closed.'''
return self.queue.get()
-
+
def connectionClosed(self):
'''Called when the client drops the connection'''
if _debug: 'Client.connectionClosed'
yield self.stream.close()
else:
yield None
-
+
def messageReceived(self, msg):
if _debug: print 'messageReceived cmd=', msg.command, msg.path
msg.response_cookies = process_cookies(msg.headers, self.remote)
- # slightly bad, this: read everything, put it into memory...
+ # slightly bad, this: read everything, put it into memory,
+ # but it helps to jump-start the project and enables use of
+ # standard http://python.org BaseHTTPRequestHandler.
+ # modification of mimetools.Message (actually rfc822) is
+ # otherwise required.
if msg.headers.has_key('content-length'):
max_chunk_size = 10*1024*1024
size_remaining = int(msg.headers["content-length"])
TODO: report back an HTTP error with "reason" in it.
'''
self.removeConnection()
-
+
class Server(object):
'''A RTMP server listens for incoming connections and informs the app.'''
'''Generator to wait for incoming client connections on this server and return
(client, args) or (None, None) if the socket is closed or some error.'''
return self.queue.get()
-
+
def run(self):
try:
while True:
sock, remote = (yield multitask.accept(self.sock)) # receive client TCP
if sock == None:
- if _debug: print 'rtmp.Server accept(sock) returned None.'
+ if _debug: print 'rtmp.Server accept(sock) returned None.'
break
if _debug: print 'connection received from', remote
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # make it non-block
client = Client(sock, self, remote)
- except:
+ except:
if _debug: print 'rtmp.Server exception ', (sys and sys.exc_info() or None)
traceback.print_exc()
-
+
if (self.sock):
try: self.sock.close(); self.sock = None
except: pass
def clients(self):
'''everytime this property is accessed it returns a new list of clients connected to this instance.'''
return self._clients[1:] if self._clients is not None else []
-
+
class App(BaseApp, SimpleAppHTTPRequestHandler):
pass
-
+
class HTTPServer(object):
'''A RTMP server to record and stream HTTP.'''
self.apps = dict({'*': App}) # supported applications: * means any as in {'*': App}
self.clients = dict() # list of clients indexed by scope. First item in list is app instance.
self.root = ''
-
+
def start(self, host='0.0.0.0', port=8080):
'''This should be used to start listening for RTMP connections on the given port, which defaults to 8080.'''
if _debug: print 'start', host, port
sock.listen(5)
server = self.server = Server(sock) # start rtmp server on that socket
multitask.add(self.serverlistener())
-
+
def stop(self):
if _debug: print 'stopping HTTP server'
if self.server and self.sock:
try: self.sock.close(); self.sock = None
except: pass
self.server = None
-
+
def serverlistener(self):
'''Server listener (generator). It accepts all connections and invokes client listener'''
try:
inst = app()
self.clients[session] = [inst]; inst._clients=self.clients[session]
msg.server = inst # whew! just in time!
- try:
+ try:
methodname = "on%s" % msg.command
if _debug:
print methodname, dir(inst)
close_connection = msg.close_connection
if _debug:
print "close connection", close_connection
- except:
+ except:
if _debug: traceback.print_exc()
yield client.rejectConnection(reason='Exception on %s' % methodname)
continue
if _debug:
print 'close_connection done'
pass
- else:
+ else:
if _debug:
print "result", result
yield client.rejectConnection(reason='Rejected in onConnect')
except StopIteration: raise
- except:
+ except:
if _debug:
print 'serverlistener exception', \
(sys and sys.exc_info() or None)
traceback.print_exc()
-
+
# The main routine to start, run and stop the service
if __name__ == '__main__':
parser.add_option('-r', '--root', dest='root', default='./', help="document root directory. Default './'")
parser.add_option('-d', '--verbose', dest='verbose', default=False, action='store_true', help='enable debug trace')
(options, args) = parser.parse_args()
-
+
_debug = options.verbose
try:
if _debug: print time.asctime(), 'Options - %s:%d' % (options.host, options.port)