whitespace cleanup and remove unneeded code
authorlkcl <lkcl@teenymac.(none)>
Thu, 15 Jul 2010 20:06:13 +0000 (21:06 +0100)
committerlkcl <lkcl@teenymac.(none)>
Thu, 15 Jul 2010 20:06:13 +0000 (21:06 +0100)
httpd.py

index a5f1dd7da7547db2fe62370e7f79bbbdc2c86d2f..e2dc6fe9953731998cd07276b6d38ce9169012aa 100644 (file)
--- a/httpd.py
+++ b/httpd.py
@@ -1,38 +1,10 @@
 # Copyright (c) 2007-2009, Mamta Singh. All rights reserved. see README for details.
 
 '''
-This is a simple implementation of a Flash RTMP server to accept connections and stream requests. The module is organized as follows:
-1. The FlashServer class is the main class to provide the server abstraction. It uses the multitask module for co-operative multitasking.
-   It also uses the App abstract class to implement the applications.
-2. The Server class implements a simple server to receive new Client connections and inform the FlashServer application. The Client class
-   derived from Protocol implements the RTMP client functions. The Protocol class implements the base RTMP protocol parsing. A Client contains
-   various streams from the client, represented using the Stream class.
-3. The Message, Header and Command represent RTMP message, header and command respectively. The FLV class implements functions to perform read
-   and write of FLV file format.
-
-
-Typically an application can launch this server as follows:
-$ python rtmp.py
-
-To know the command line options use the -h option:
-$ python rtmp.py -h
-
-To start the server with a different directory for recording and playing FLV files from, use the following command.
-$ python rtmp.py -r some-other-directory/
-Note the terminal '/' in the directory name. Without this, it is just used as a prefix in FLV file names.
-
-A test client is available in testClient directory, and can be compiled using Flex Builder. Alternatively, you can use the SWF file to launch
-from testClient/bin-debug after starting the server. Once you have launched the client in the browser, you can connect to
-local host by clicking on 'connect' button. Then click on publish button to publish a stream. Open another browser with
-same URL and first connect then play the same stream name. If everything works fine you should be able to see the video
-from first browser to the second browser. Similar, in the first browser, if you check the record box before publishing,
-it will create a new FLV file for the recorded stream. You can close the publishing stream and play the recorded stream to
-see your recording. Note that due to initial delay in timestamp (in case publish was clicked much later than connect),
-your played video will start appearing after some initial delay.
 
 
 If an application wants to use this module as a library, it can launch the server as follows:
->>> agent = FlashServer()   # a new RTMP server instance
+>>> agent = HTTPServer()   # a new RTMP server instance
 >>> agent.root = 'flvs/'    # set the document root to be 'flvs' directory. Default is current './' directory.
 >>> agent.start()           # start the server
 >>> multitask.run()         # this is needed somewhere in the application to actually start the co-operative multitasking.
@@ -102,6 +74,7 @@ class MultitaskHTTPRequestHandler(BaseHTTPRequestHandler):
             val = v.OutputString()
             self.send_header("Set-Cookie", val)
 
+
 def process_cookies(headers, remote, cookie_key="Cookie", add_sess=True):
     ch = headers.getheaders(cookie_key)
     if _debug:
@@ -137,9 +110,11 @@ def process_cookies(headers, remote, cookie_key="Cookie", add_sess=True):
         #response_cookies['session']['version'] = 0
     return response_cookies
 
+
 class ConnectionClosed:
     'raised when the client closed the connection'
 
+
 class SockStream(object):
     '''A class that represents a socket as a stream'''
     def __init__(self, sock):
@@ -192,87 +167,16 @@ class SockStream(object):
         if _debug: print 'socket.written'
 
 
-class Header(object):
-    FULL, MESSAGE, TIME, SEPARATOR, MASK = 0x00, 0x40, 0x80, 0xC0, 0xC0
-    
-    def __init__(self, channel=0, time=0, size=None, type=None, streamId=0):
-        self.channel, self.time, self.size, self.type, self.streamId = channel, time, size, type, streamId
-        self.extendedtime = 0
-        if channel<64: self.hdrdata = chr(channel)
-        elif channel<(64+256): self.hdrdata = '\x00'+chr(channel-64)
-        else: self.hdrdata = '\x01'+chr((channel-64)%256)+chr((channel-64)/256) 
-    
-    def _appendExtendedTimestamp(self, data):
-        if self.time == 0xFFFFFF:
-            data += struct.pack('>I', self.extendedtime)
-        return data
-                    
-    def toBytes(self, control):
-        data = chr(ord(self.hdrdata[0]) | control) + self.hdrdata[1:]
-        if control == Header.SEPARATOR: return self._appendExtendedTimestamp(data)
-        
-        data += struct.pack('>I', self.time & 0xFFFFFF)[1:]  # add time
-        if control == Header.TIME: return self._appendExtendedTimestamp(data)
-        
-        data += struct.pack('>I', self.size)[1:]  # size
-        data += chr(self.type)                    # type
-        if control == Header.MESSAGE: return self._appendExtendedTimestamp(data)
-        
-        data += struct.pack('<I', self.streamId)  # add streamId
-        return self._appendExtendedTimestamp(data)
-
-    def __repr__(self):
-        return ("<Header channel=%r time=%r size=%r type=%r (0x%02x) streamId=%r>"
-            % (self.channel, self.time, self.size, self.type, self.type or 0, self.streamId))
-
-class Message(object):
-    # message types: RPC3, DATA3,and SHAREDOBJECT3 are used with AMF3
-    RPC,  RPC3, DATA, DATA3, SHAREDOBJ, SHAREDOBJ3, AUDIO, VIDEO, ACK,  CHUNK_SIZE = \
-    0x14, 0x11, 0x12, 0x0F,  0x13,      0x10,       0x08,  0x09,  0x03, 0x01
-    
-    def __init__(self, hdr=None, data=''):
-        if hdr is None: hdr = Header()
-        self.header, self.data = hdr, data
-    
-    # define properties type, streamId and time to access self.header.(property)
-    for p in ['type', 'streamId', 'time']:
-        exec 'def _g%s(self): return self.header.%s'%(p, p)
-        exec 'def _s%s(self, %s): self.header.%s = %s'%(p, p, p, p)
-        exec '%s = property(fget=_g%s, fset=_s%s)'%(p, p, p)
-    @property
-    def size(self): return len(self.data)
-            
-    def __repr__(self):
-        return ("<Message header=%r data=%r>"% (self.header, self.data))
-                
 class Protocol(object):
-    # constants
-    PING_SIZE           = 1536
-    DEFAULT_CHUNK_SIZE  = 128
-    MIN_CHANNEL_ID      = 3
-    PROTOCOL_CHANNEL_ID = 2
     
     def __init__(self, sock):
         self.stream = SockStream(sock)
-        self.lastReadHeaders = dict() # indexed by channelId
-        self.incompletePackets = dict() #indexed by channelId
-        self.readChunkSize = self.writeChunkSize = Protocol.DEFAULT_CHUNK_SIZE
-        self.lastWriteHeaders = dict() # indexed by streamId
-        self.nextChannelId = Protocol.MIN_CHANNEL_ID
         self.writeLock = threading.Lock()
         self.writeQueue = Queue.Queue()
             
     def messageReceived(self, msg):
         yield
             
-    def protocolMessage(self, msg):
-        if msg.type == Message.ACK: # respond to ACK requests
-            response = Message()
-            response.type, response.data = msg.type, msg.data
-            self.writeMessage(response)
-        elif msg.type == Message.CHUNK_SIZE:
-            self.readChunkSize = struct.unpack('>L', msg.data)[0]
-            
     def connectionClosed(self):
         yield
                             
@@ -296,13 +200,6 @@ class Protocol(object):
             print traceback.print_exc()
         #self.writeQueue.put(message)
             
-    def parseHandshake(self):
-        '''Parses the rtmp handshake'''
-        data = (yield self.stream.read(Protocol.PING_SIZE + 1)) # bound version and first ping
-        yield self.stream.write(data)
-        data = (yield self.stream.read(Protocol.PING_SIZE)) # bound second ping
-        yield self.stream.write(data)
-    
     def parseRequests(self):
         '''Parses complete messages until connection closed. Raises ConnectionLost exception.'''
 
@@ -404,84 +301,6 @@ class Protocol(object):
                 print traceback.print_exc()
         print "ending protocol write loop", repr(self)
 
-class Command(object):
-    ''' Class for command / data messages'''
-    def __init__(self, type=Message.RPC, name=None, id=None, cmdData=None, args=[]):
-        '''Create a new command with given type, name, id, cmdData and args list.'''
-        self.type, self.name, self.id, self.cmdData, self.args = type, name, id, cmdData, args[:]
-        
-    def __repr__(self):
-        return ("<Command type=%r name=%r id=%r data=%r args=%r>" % (self.type, self.name, self.id, self.cmdData, self.args))
-    
-    def setArg(self, arg):
-        self.args.append(arg)
-    
-    def getArg(self, index):
-        return self.args[index]
-    
-    @classmethod
-    def fromMessage(cls, message):
-        ''' initialize from a parsed RTMP message'''
-        assert (message.type in [Message.RPC, Message.RPC3, Message.DATA, Message.DATA3])
-
-        length = len(message.data)
-        if length == 0: raise ValueError('zero length message data')
-        
-        if message.type == Message.RPC3 or message.type == Message.DATA3:
-            assert message.data[0] == '\x00' # must be 0 in AMD3
-            data = message.data[1:]
-        else:
-            data = message.data
-        
-        amfReader = amf.AMF0(data)
-
-        inst = cls()
-        inst.name = amfReader.read() # first field is command name
-
-        try:
-            if message.type == Message.RPC:
-                inst.id = amfReader.read() # second field *may* be message id
-                inst.cmdData = amfReader.read() # third is command data
-            else:
-                inst.id = 0
-            inst.args = [] # others are optional
-            while True:
-                inst.args.append(amfReader.read())
-        except EOFError:
-            pass
-        return inst
-    
-    def toMessage(self):
-        msg = Message()
-        assert self.type
-        msg.type = self.type
-        output = amf.BytesIO()
-        amfWriter = amf.AMF0(output)
-        amfWriter.write(self.name)
-        if msg.type == Message.RPC or msg.type == Message.RPC3:
-            amfWriter.write(self.id)
-            amfWriter.write(self.cmdData)
-        for arg in self.args:
-            amfWriter.write(arg)
-        output.seek(0)
-        #hexdump.hexdump(output)
-        #output.seek(0)
-        if msg.type == Message.RPC3 or msg.type == Message.DATA3:
-            data = '\x00' + output.read()
-        else:
-            data = output.read()
-        msg.data = data
-        output.close()
-        return msg
-
-def getfilename(path, name, root):
-    '''return the file name for the given stream. The name is derived as root/scope/name.flv where scope is
-    the the path present in the path variable.'''
-    ignore, ignore, scope = path.partition('/')
-    if scope: scope = scope + '/'
-    result = root + scope + name + '.flv'
-    if _debug: print 'filename=', result
-    return result
 
 class Stream(object):
     '''The stream object that is used for RTMP stream.'''
@@ -565,16 +384,6 @@ class Client(Protocol):
 
         yield self.server.queue.put((self, msg)) # new connection
 
-    def accept(self):
-        '''Method to accept an incoming client.'''
-        response = Command()
-        response.id, response.name, response.type = 1, '_result', Message.RPC
-        if _debug: print 'Client.accept() objectEncoding=', self.objectEncoding
-        response.setArg(dict(level='status', code='NetConnection.Connect.Success',
-                        description='Connection succeeded.', details=None,
-                        objectEncoding=self.objectEncoding))
-        self.writeMessage(response.toMessage())
-            
     def rejectConnection(self, reason=''):
         '''Method to reject an incoming client.'''
         response = Command()
@@ -592,14 +401,6 @@ class Client(Protocol):
         if _debug: print 'Client.call method=', method, 'args=', args, ' msg=', cmd.toMessage()
         self.writeMessage(cmd.toMessage())
             
-    def createStream(self):
-        ''' Create a stream on the server side'''
-        stream = Stream(self)
-        stream.id = self._nextStreamId
-        self.streams[stream.id] = stream
-        self._nextStreamId += 1
-        return stream
-
 
 class Server(object):
     '''A RTMP server listens for incoming connections and informs the app.'''
@@ -650,11 +451,13 @@ class BaseApp(object):
         '''everytime this property is accessed it returns a new list of clients connected to this instance.'''
         return self._clients[1:] if self._clients is not None else []
     
+
 class App(BaseApp, SimpleAppHTTPRequestHandler):
     pass
     
+
 class HTTPServer(object):
-    '''A RTMP server to record and stream Flash video.'''
+    '''A RTMP server to record and stream HTTP.'''
     def __init__(self):
         '''Construct a new HttpServer. It initializes the local members.'''
         self.sock = self.server = None
@@ -675,7 +478,7 @@ class HTTPServer(object):
             multitask.add(self.serverlistener())
     
     def stop(self):
-        if _debug: print 'stopping Flash server'
+        if _debug: print 'stopping HTTP server'
         if self.server and self.sock:
             try: self.sock.close(); self.sock = None
             except: pass
@@ -763,161 +566,6 @@ class HTTPServer(object):
                 (sys and sys.exc_info() or None)
                 traceback.print_exc()
             
-    def clientlistener(self, client):
-        '''Client listener (generator). It receives a command and invokes client handler, or receives a new stream and invokes streamlistener.'''
-        try:
-            while True:
-                msg, arg = (yield client.recv())   # receive new message from client
-                if not msg:                   # if the client disconnected,
-                    if _debug: print 'connection closed from client'
-                    break                     #    come out of listening loop.
-                if msg == 'command':          # handle a new command
-                    multitask.add(self.clienthandler(client, arg))
-                elif msg == 'stream':         # a new stream is created, handle the stream.
-                    arg.client = client
-                    multitask.add(self.streamlistener(arg))
-        except StopIteration: raise
-        except:
-            if _debug: print 'clientlistener exception', (sys and sys.exc_info() or None)
-            traceback.print_exc()
-            
-        # client is disconnected, clear our state for application instance.
-        if _debug: print 'cleaning up client', client.path
-        inst = None
-        if client.path in self.clients:
-            inst = self.clients[client.path][0]
-            self.clients[client.path].remove(client)
-        for stream in client.streams.values(): # for all streams of this client
-            self.closehandler(stream)
-        client.streams.clear() # and clear the collection of streams
-        if client.path in self.clients and len(self.clients[client.path]) == 1: # no more clients left, delete the instance.
-            if _debug: print 'removing the application instance'
-            inst = self.clients[client.path][0]
-            inst._clients = None
-            del self.clients[client.path]
-        if inst is not None: inst.onDisconnect(client)
-        
-    def closehandler(self, stream):
-        '''A stream is closed explicitly when a closeStream command is received from given client.'''
-        if stream.client is not None:
-            inst = self.clients[stream.client.path][0]
-            if stream.name in inst.publishers and inst.publishers[stream.name] == stream: # clear the published stream
-                inst.onClose(stream.client, stream)
-                del inst.publishers[stream.name]
-            if stream.name in inst.players and stream in inst.players[stream.name]:
-                inst.onStop(stream.client, stream)
-                inst.players[stream.name].remove(stream)
-                if len(inst.players[stream.name]) == 0:
-                    del inst.players[stream.name]
-            stream.close()
-        
-    def clienthandler(self, client, cmd):
-        '''A generator to handle a single command on the client.'''
-        inst = self.clients[client.path][0]
-        if inst:
-            if cmd.name == '_error':
-                if hasattr(inst, 'onStatus'):
-                    result = inst.onStatus(client, cmd.args[0])
-            elif cmd.name == '_result':
-                if hasattr(inst, 'onResult'):
-                    result = inst.onResult(client, cmd.args[0])
-            else:
-                res, code, args = Command(), '_result', dict()
-                try: result = inst.onCommand(client, cmd.name, *cmd.args)
-                except:
-                    if _debug: print 'Client.call exception', (sys and sys.exc_info() or None) 
-                    code, args = '_error', dict()
-                res.id, res.name, res.type = cmd.id, code, (client.objectEncoding == 0.0 and Message.RPC or Message.RPC3)
-                res.args, res.cmdData = args, None
-                if _debug: print 'Client.call method=', code, 'args=', args, ' msg=', res.toMessage()
-                client.writeMessage(res.toMessage())
-                # TODO return result to caller
-        yield
-        
-    def streamlistener(self, stream):
-        '''Stream listener (generator). It receives stream message and invokes streamhandler.'''
-        stream.recordfile = None # so that it doesn't complain about missing attribute
-        while True:
-            msg = (yield stream.recv())
-            if not msg:
-                if _debug: print 'stream closed'
-                self.closehandler(stream)
-                break
-            # if _debug: msg
-            multitask.add(self.streamhandler(stream, msg))
-            
-    def streamhandler(self, stream, message):
-        '''A generator to handle a single message on the stream.'''
-        try:
-            if message.type == Message.RPC:
-                cmd = Command.fromMessage(message)
-                if _debug: print 'streamhandler received cmd=', cmd
-                if cmd.name == 'publish':
-                    yield self.publishhandler(stream, cmd)
-                elif cmd.name == 'play':
-                    yield self.playhandler(stream, cmd)
-                elif cmd.name == 'closeStream':
-                    self.closehandler(stream)
-            else: # audio or video message
-                yield self.mediahandler(stream, message)
-        except GeneratorExit: pass
-        except StopIteration: pass
-        except: 
-            if _debug: print 'exception in streamhandler', (sys and sys.exc_info())
-    
-    def publishhandler(self, stream, cmd):
-        '''A new stream is published. Store the information in the application instance.'''
-        try:
-            stream.mode = 'live' if len(cmd.args) < 2 else cmd.args[1] # live, record, append
-            stream.name = cmd.args[0]
-            if _debug: print 'publishing stream=', stream.name, 'mode=', stream.mode
-            inst = self.clients[stream.client.path][0]
-            if (stream.name in inst.publishers):
-                raise ValueError, 'Stream name already in use'
-            inst.publishers[stream.name] = stream # store the client for publisher
-            result = inst.onPublish(stream.client, stream)
-            
-            if stream.mode == 'record' or stream.mode == 'append':
-                stream.recordfile = FLV().open(getfilename(stream.client.path, stream.name, self.root), stream.mode)
-            response = Command(name='onStatus', id=cmd.id, args=[dict(level='status', code='NetStream.Publish.Start', description='', details=None)])
-            yield stream.send(response)
-        except ValueError, E: # some error occurred. inform the app.
-            if _debug: print 'error in publishing stream', str(E)
-            response = Command(name='onStatus', id=cmd.id, args=[dict(level='error',code='NetStream.Publish.BadName',description=str(E),details=None)])
-            yield stream.send(response)
-
-    def playhandler(self, stream, cmd):
-        '''A new stream is being played. Just updated the players list with this stream.'''
-        inst = self.clients[stream.client.path][0]
-        name = stream.name = cmd.args[0]  # store the stream's name
-        start = cmd.args[1] if len(cmd.args) >= 2 else -2
-        if name not in inst.players:
-            inst.players[name] = [] # initialize the players for this stream name
-        if stream not in inst.players[name]: # store the stream as players of this name
-            inst.players[name].append(stream)
-        path = getfilename(stream.client.path, stream.name, self.root)
-        if os.path.exists(path):
-            stream.playfile = FLV().open(path)
-            multitask.add(stream.playfile.reader(stream))
-        if _debug: print 'playing stream=', name, 'start=', start
-        result = inst.onPlay(stream.client, stream)
-        response = Command(name='onStatus', id=cmd.id, args=[dict(level='status',code='NetStream.Play.Start', description=stream.name, details=None)])
-        yield stream.send(response)
-            
-    def mediahandler(self, stream, message):
-        '''Handle incoming media on the stream, by sending to other stream in this application instance.'''
-        if stream.client is not None:
-            inst = self.clients[stream.client.path][0]
-            result = inst.onPublishData(stream.client, stream, message)
-            if result:
-                client = stream.client
-                for s in (inst.players.get(stream.name, [])):
-                    # if _debug: print 'D', stream.name, s.name
-                    result = inst.onPlayData(s.client, s, message)
-                    if result:
-                        yield s.send(message)
-                if stream.recordfile is not None:
-                    stream.recordfile.write(message)
 
 # The main routine to start, run and stop the service
 if __name__ == '__main__':
@@ -936,7 +584,7 @@ if __name__ == '__main__':
         agent = HTTPServer()
         agent.root = options.root
         agent.start(options.host, options.port)
-        if _debug: print time.asctime(), 'Flash Server Starts - %s:%d' % (options.host, options.port)
+        if _debug: print time.asctime(), 'HTTP Server Starts - %s:%d' % (options.host, options.port)
         multitask.run()
     except KeyboardInterrupt:
         print traceback.print_exc()
@@ -944,4 +592,4 @@ if __name__ == '__main__':
     except:
         print "exception"
         print traceback.print_exc()
-    if _debug: print time.asctime(), 'Flash Server Stops'
+    if _debug: print time.asctime(), 'HTTP Server Stops'