From c274ddb0bcd81ab500abe48ddcbca7f9d4e96321 Mon Sep 17 00:00:00 2001 From: lkcl Date: Thu, 15 Jul 2010 21:14:38 +0100 Subject: [PATCH] whitespace cleanup --- httpd.py | 101 ++++++++++++++++++++++++++----------------------------- 1 file changed, 47 insertions(+), 54 deletions(-) diff --git a/httpd.py b/httpd.py index bb9be64..7daf054 100644 --- a/httpd.py +++ b/httpd.py @@ -120,11 +120,11 @@ class SockStream(object): def __init__(self, sock): self.sock, self.buffer = sock, '' self.bytesWritten = self.bytesRead = 0 - + def close(self): self.sock.shutdown(1) # can't just close it. self.sock = None - + def readline(self): try: while True: @@ -139,7 +139,7 @@ class SockStream(object): self.buffer += data except StopIteration: raise except: raise ConnectionClosed # anything else is treated as connection closed. - + def read(self, count): try: while True: @@ -153,10 +153,10 @@ class SockStream(object): self.buffer += data except StopIteration: raise except: raise ConnectionClosed # anything else is treated as connection closed. - + def unread(self, data): self.buffer = data + self.buffer - + def write(self, data): while len(data) > 0: # write in 4K chunks each time chunk, data = data[:4096], data[4096:] @@ -168,18 +168,18 @@ class SockStream(object): class Protocol(object): - + def __init__(self, sock): self.stream = SockStream(sock) self.writeLock = threading.Lock() self.writeQueue = Queue.Queue() - + def messageReceived(self, msg): yield - + def connectionClosed(self): yield - + def parse(self): try: yield self.parseRequests() # parse http requests @@ -187,7 +187,7 @@ class Protocol(object): #yield self.connectionClosed() if _debug: print 'parse connection closed' #yield self.server.queue.put((self, None)) # close connection - + def removeConnection(self): yield self.server.queue.put((self, None)) # close connection @@ -199,7 +199,7 @@ class Protocol(object): except: print traceback.print_exc() #self.writeQueue.put(message) - + def parseRequests(self): '''Parses complete messages until connection closed. Raises ConnectionLost exception.''' @@ -228,7 +228,7 @@ class Protocol(object): raw_requestline = (yield self.stream.readline()) if _debug: print "parseRequests, line", raw_requestline if not raw_requestline: - raise ConnectionClosed + raise ConnectionClosed data = '' try: while 1: @@ -244,7 +244,7 @@ class Protocol(object): print 'parseRequests', \ (traceback and traceback.print_exc() or None) - raise ConnectionClosed + raise ConnectionClosed self.hr.raw_requestline = raw_requestline #pos = self.hr.rfile.tell() @@ -258,7 +258,7 @@ class Protocol(object): print "parseRequests seek after" if not self.hr.parse_request(): - raise ConnectionClosed + raise ConnectionClosed if _debug: print "parseRequests parse_req after" print "parseRequest headers", repr(self.hr.headers), str(self.hr.headers) @@ -268,7 +268,7 @@ class Protocol(object): if _debug: print 'parseRequests, ConnectionClosed ' raise StopIteration - + except: if _debug: print 'messageReceived', \ @@ -281,7 +281,7 @@ class Protocol(object): while self.writeQueue.empty(): (yield multitask.sleep(0.01)) data = self.writeQueue.get() # TODO this should be used using multitask.Queue and remove previous wait. if _debug: print "write to stream", repr(data) - if data is None: + if data is None: # just in case TCP socket is not closed, close it. try: if _debug: @@ -292,7 +292,7 @@ class Protocol(object): print "stream closed" except: pass break - + try: yield self.stream.write(data) except ConnectionClosed: @@ -304,47 +304,36 @@ class Protocol(object): class Stream(object): '''The stream object that is used for RTMP stream.''' - count = 0; + count = 0 def __init__(self, client): self.client, self.id, self.name = client, 0, '' self.recordfile = self.playfile = None # so that it doesn't complain about missing attribute self.queue = multitask.Queue() self._name = 'Stream[' + str(Stream.count) + ']'; Stream.count += 1 if _debug: print self, 'created' - + def close(self): if _debug: print self, 'closing' if self.recordfile is not None: self.recordfile.close(); self.recordfile = None if self.playfile is not None: self.playfile.close(); self.playfile = None self.client = None # to clear the reference pass - + def __repr__(self): - return self._name; - + return self._name + def recv(self): '''Generator to receive new Message on this stream, or None if stream is closed.''' return self.queue.get() - - def send(self, msg): - '''Method to send a Message or Command on this stream.''' - if isinstance(msg, Command): - msg = msg.toMessage() - msg.streamId = self.id - # if _debug: print self,'send' - if self.client is not None: self.client.writeMessage(msg) - + + class Client(Protocol): '''The client object represents a single connected client to the server.''' def __init__(self, sock, server, remote): Protocol.__init__(self, sock) self.server = server self.remote = remote - self.agent = {} self.streams = {} - self._nextCallId = 2 - self._nextStreamId = 1 - self.objectEncoding = 0.0 self.queue = multitask.Queue() # receive queue used by application multitask.add(self.parse()) #multitask.add(self.write()) @@ -352,7 +341,7 @@ class Client(Protocol): def recv(self): '''Generator to receive new Message (msg, arg) on this stream, or (None,None) if stream is closed.''' return self.queue.get() - + def connectionClosed(self): '''Called when the client drops the connection''' if _debug: 'Client.connectionClosed' @@ -362,12 +351,16 @@ class Client(Protocol): yield self.stream.close() else: yield None - + def messageReceived(self, msg): if _debug: print 'messageReceived cmd=', msg.command, msg.path msg.response_cookies = process_cookies(msg.headers, self.remote) - # slightly bad, this: read everything, put it into memory... + # slightly bad, this: read everything, put it into memory, + # but it helps to jump-start the project and enables use of + # standard http://python.org BaseHTTPRequestHandler. + # modification of mimetools.Message (actually rfc822) is + # otherwise required. if msg.headers.has_key('content-length'): max_chunk_size = 10*1024*1024 size_remaining = int(msg.headers["content-length"]) @@ -389,7 +382,7 @@ class Client(Protocol): TODO: report back an HTTP error with "reason" in it. ''' self.removeConnection() - + class Server(object): '''A RTMP server listens for incoming connections and informs the app.''' @@ -404,21 +397,21 @@ class Server(object): '''Generator to wait for incoming client connections on this server and return (client, args) or (None, None) if the socket is closed or some error.''' return self.queue.get() - + def run(self): try: while True: sock, remote = (yield multitask.accept(self.sock)) # receive client TCP if sock == None: - if _debug: print 'rtmp.Server accept(sock) returned None.' + if _debug: print 'rtmp.Server accept(sock) returned None.' break if _debug: print 'connection received from', remote sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # make it non-block client = Client(sock, self, remote) - except: + except: if _debug: print 'rtmp.Server exception ', (sys and sys.exc_info() or None) traceback.print_exc() - + if (self.sock): try: self.sock.close(); self.sock = None except: pass @@ -439,11 +432,11 @@ class BaseApp(object): def clients(self): '''everytime this property is accessed it returns a new list of clients connected to this instance.''' return self._clients[1:] if self._clients is not None else [] - + class App(BaseApp, SimpleAppHTTPRequestHandler): pass - + class HTTPServer(object): '''A RTMP server to record and stream HTTP.''' @@ -453,7 +446,7 @@ class HTTPServer(object): self.apps = dict({'*': App}) # supported applications: * means any as in {'*': App} self.clients = dict() # list of clients indexed by scope. First item in list is app instance. self.root = '' - + def start(self, host='0.0.0.0', port=8080): '''This should be used to start listening for RTMP connections on the given port, which defaults to 8080.''' if _debug: print 'start', host, port @@ -465,14 +458,14 @@ class HTTPServer(object): sock.listen(5) server = self.server = Server(sock) # start rtmp server on that socket multitask.add(self.serverlistener()) - + def stop(self): if _debug: print 'stopping HTTP server' if self.server and self.sock: try: self.sock.close(); self.sock = None except: pass self.server = None - + def serverlistener(self): '''Server listener (generator). It accepts all connections and invokes client listener''' try: @@ -509,7 +502,7 @@ class HTTPServer(object): inst = app() self.clients[session] = [inst]; inst._clients=self.clients[session] msg.server = inst # whew! just in time! - try: + try: methodname = "on%s" % msg.command if _debug: print methodname, dir(inst) @@ -518,7 +511,7 @@ class HTTPServer(object): close_connection = msg.close_connection if _debug: print "close connection", close_connection - except: + except: if _debug: traceback.print_exc() yield client.rejectConnection(reason='Exception on %s' % methodname) continue @@ -539,17 +532,17 @@ class HTTPServer(object): if _debug: print 'close_connection done' pass - else: + else: if _debug: print "result", result yield client.rejectConnection(reason='Rejected in onConnect') except StopIteration: raise - except: + except: if _debug: print 'serverlistener exception', \ (sys and sys.exc_info() or None) traceback.print_exc() - + # The main routine to start, run and stop the service if __name__ == '__main__': @@ -561,7 +554,7 @@ if __name__ == '__main__': parser.add_option('-r', '--root', dest='root', default='./', help="document root directory. Default './'") parser.add_option('-d', '--verbose', dest='verbose', default=False, action='store_true', help='enable debug trace') (options, args) = parser.parse_args() - + _debug = options.verbose try: if _debug: print time.asctime(), 'Options - %s:%d' % (options.host, options.port) -- 2.30.2