# Copyright (c) 2007-2009, Mamta Singh. All rights reserved. see README for details.
'''
-This is a simple implementation of a Flash RTMP server to accept connections and stream requests. The module is organized as follows:
-1. The FlashServer class is the main class to provide the server abstraction. It uses the multitask module for co-operative multitasking.
- It also uses the App abstract class to implement the applications.
-2. The Server class implements a simple server to receive new Client connections and inform the FlashServer application. The Client class
- derived from Protocol implements the RTMP client functions. The Protocol class implements the base RTMP protocol parsing. A Client contains
- various streams from the client, represented using the Stream class.
-3. The Message, Header and Command represent RTMP message, header and command respectively. The FLV class implements functions to perform read
- and write of FLV file format.
-
-
-Typically an application can launch this server as follows:
-$ python rtmp.py
-
-To know the command line options use the -h option:
-$ python rtmp.py -h
-
-To start the server with a different directory for recording and playing FLV files from, use the following command.
-$ python rtmp.py -r some-other-directory/
-Note the terminal '/' in the directory name. Without this, it is just used as a prefix in FLV file names.
-
-A test client is available in testClient directory, and can be compiled using Flex Builder. Alternatively, you can use the SWF file to launch
-from testClient/bin-debug after starting the server. Once you have launched the client in the browser, you can connect to
-local host by clicking on 'connect' button. Then click on publish button to publish a stream. Open another browser with
-same URL and first connect then play the same stream name. If everything works fine you should be able to see the video
-from first browser to the second browser. Similar, in the first browser, if you check the record box before publishing,
-it will create a new FLV file for the recorded stream. You can close the publishing stream and play the recorded stream to
-see your recording. Note that due to initial delay in timestamp (in case publish was clicked much later than connect),
-your played video will start appearing after some initial delay.
If an application wants to use this module as a library, it can launch the server as follows:
->>> agent = FlashServer() # a new RTMP server instance
+>>> agent = HTTPServer() # a new RTMP server instance
>>> agent.root = 'flvs/' # set the document root to be 'flvs' directory. Default is current './' directory.
>>> agent.start() # start the server
>>> multitask.run() # this is needed somewhere in the application to actually start the co-operative multitasking.
val = v.OutputString()
self.send_header("Set-Cookie", val)
+
def process_cookies(headers, remote, cookie_key="Cookie", add_sess=True):
ch = headers.getheaders(cookie_key)
if _debug:
#response_cookies['session']['version'] = 0
return response_cookies
+
class ConnectionClosed:
'raised when the client closed the connection'
+
class SockStream(object):
'''A class that represents a socket as a stream'''
def __init__(self, sock):
if _debug: print 'socket.written'
-class Header(object):
- FULL, MESSAGE, TIME, SEPARATOR, MASK = 0x00, 0x40, 0x80, 0xC0, 0xC0
-
- def __init__(self, channel=0, time=0, size=None, type=None, streamId=0):
- self.channel, self.time, self.size, self.type, self.streamId = channel, time, size, type, streamId
- self.extendedtime = 0
- if channel<64: self.hdrdata = chr(channel)
- elif channel<(64+256): self.hdrdata = '\x00'+chr(channel-64)
- else: self.hdrdata = '\x01'+chr((channel-64)%256)+chr((channel-64)/256)
-
- def _appendExtendedTimestamp(self, data):
- if self.time == 0xFFFFFF:
- data += struct.pack('>I', self.extendedtime)
- return data
-
- def toBytes(self, control):
- data = chr(ord(self.hdrdata[0]) | control) + self.hdrdata[1:]
- if control == Header.SEPARATOR: return self._appendExtendedTimestamp(data)
-
- data += struct.pack('>I', self.time & 0xFFFFFF)[1:] # add time
- if control == Header.TIME: return self._appendExtendedTimestamp(data)
-
- data += struct.pack('>I', self.size)[1:] # size
- data += chr(self.type) # type
- if control == Header.MESSAGE: return self._appendExtendedTimestamp(data)
-
- data += struct.pack('<I', self.streamId) # add streamId
- return self._appendExtendedTimestamp(data)
-
- def __repr__(self):
- return ("<Header channel=%r time=%r size=%r type=%r (0x%02x) streamId=%r>"
- % (self.channel, self.time, self.size, self.type, self.type or 0, self.streamId))
-
-class Message(object):
- # message types: RPC3, DATA3,and SHAREDOBJECT3 are used with AMF3
- RPC, RPC3, DATA, DATA3, SHAREDOBJ, SHAREDOBJ3, AUDIO, VIDEO, ACK, CHUNK_SIZE = \
- 0x14, 0x11, 0x12, 0x0F, 0x13, 0x10, 0x08, 0x09, 0x03, 0x01
-
- def __init__(self, hdr=None, data=''):
- if hdr is None: hdr = Header()
- self.header, self.data = hdr, data
-
- # define properties type, streamId and time to access self.header.(property)
- for p in ['type', 'streamId', 'time']:
- exec 'def _g%s(self): return self.header.%s'%(p, p)
- exec 'def _s%s(self, %s): self.header.%s = %s'%(p, p, p, p)
- exec '%s = property(fget=_g%s, fset=_s%s)'%(p, p, p)
- @property
- def size(self): return len(self.data)
-
- def __repr__(self):
- return ("<Message header=%r data=%r>"% (self.header, self.data))
-
class Protocol(object):
- # constants
- PING_SIZE = 1536
- DEFAULT_CHUNK_SIZE = 128
- MIN_CHANNEL_ID = 3
- PROTOCOL_CHANNEL_ID = 2
def __init__(self, sock):
self.stream = SockStream(sock)
- self.lastReadHeaders = dict() # indexed by channelId
- self.incompletePackets = dict() #indexed by channelId
- self.readChunkSize = self.writeChunkSize = Protocol.DEFAULT_CHUNK_SIZE
- self.lastWriteHeaders = dict() # indexed by streamId
- self.nextChannelId = Protocol.MIN_CHANNEL_ID
self.writeLock = threading.Lock()
self.writeQueue = Queue.Queue()
def messageReceived(self, msg):
yield
- def protocolMessage(self, msg):
- if msg.type == Message.ACK: # respond to ACK requests
- response = Message()
- response.type, response.data = msg.type, msg.data
- self.writeMessage(response)
- elif msg.type == Message.CHUNK_SIZE:
- self.readChunkSize = struct.unpack('>L', msg.data)[0]
-
def connectionClosed(self):
yield
print traceback.print_exc()
#self.writeQueue.put(message)
- def parseHandshake(self):
- '''Parses the rtmp handshake'''
- data = (yield self.stream.read(Protocol.PING_SIZE + 1)) # bound version and first ping
- yield self.stream.write(data)
- data = (yield self.stream.read(Protocol.PING_SIZE)) # bound second ping
- yield self.stream.write(data)
-
def parseRequests(self):
'''Parses complete messages until connection closed. Raises ConnectionLost exception.'''
print traceback.print_exc()
print "ending protocol write loop", repr(self)
-class Command(object):
- ''' Class for command / data messages'''
- def __init__(self, type=Message.RPC, name=None, id=None, cmdData=None, args=[]):
- '''Create a new command with given type, name, id, cmdData and args list.'''
- self.type, self.name, self.id, self.cmdData, self.args = type, name, id, cmdData, args[:]
-
- def __repr__(self):
- return ("<Command type=%r name=%r id=%r data=%r args=%r>" % (self.type, self.name, self.id, self.cmdData, self.args))
-
- def setArg(self, arg):
- self.args.append(arg)
-
- def getArg(self, index):
- return self.args[index]
-
- @classmethod
- def fromMessage(cls, message):
- ''' initialize from a parsed RTMP message'''
- assert (message.type in [Message.RPC, Message.RPC3, Message.DATA, Message.DATA3])
-
- length = len(message.data)
- if length == 0: raise ValueError('zero length message data')
-
- if message.type == Message.RPC3 or message.type == Message.DATA3:
- assert message.data[0] == '\x00' # must be 0 in AMD3
- data = message.data[1:]
- else:
- data = message.data
-
- amfReader = amf.AMF0(data)
-
- inst = cls()
- inst.name = amfReader.read() # first field is command name
-
- try:
- if message.type == Message.RPC:
- inst.id = amfReader.read() # second field *may* be message id
- inst.cmdData = amfReader.read() # third is command data
- else:
- inst.id = 0
- inst.args = [] # others are optional
- while True:
- inst.args.append(amfReader.read())
- except EOFError:
- pass
- return inst
-
- def toMessage(self):
- msg = Message()
- assert self.type
- msg.type = self.type
- output = amf.BytesIO()
- amfWriter = amf.AMF0(output)
- amfWriter.write(self.name)
- if msg.type == Message.RPC or msg.type == Message.RPC3:
- amfWriter.write(self.id)
- amfWriter.write(self.cmdData)
- for arg in self.args:
- amfWriter.write(arg)
- output.seek(0)
- #hexdump.hexdump(output)
- #output.seek(0)
- if msg.type == Message.RPC3 or msg.type == Message.DATA3:
- data = '\x00' + output.read()
- else:
- data = output.read()
- msg.data = data
- output.close()
- return msg
-
-def getfilename(path, name, root):
- '''return the file name for the given stream. The name is derived as root/scope/name.flv where scope is
- the the path present in the path variable.'''
- ignore, ignore, scope = path.partition('/')
- if scope: scope = scope + '/'
- result = root + scope + name + '.flv'
- if _debug: print 'filename=', result
- return result
class Stream(object):
'''The stream object that is used for RTMP stream.'''
yield self.server.queue.put((self, msg)) # new connection
- def accept(self):
- '''Method to accept an incoming client.'''
- response = Command()
- response.id, response.name, response.type = 1, '_result', Message.RPC
- if _debug: print 'Client.accept() objectEncoding=', self.objectEncoding
- response.setArg(dict(level='status', code='NetConnection.Connect.Success',
- description='Connection succeeded.', details=None,
- objectEncoding=self.objectEncoding))
- self.writeMessage(response.toMessage())
-
def rejectConnection(self, reason=''):
'''Method to reject an incoming client.'''
response = Command()
if _debug: print 'Client.call method=', method, 'args=', args, ' msg=', cmd.toMessage()
self.writeMessage(cmd.toMessage())
- def createStream(self):
- ''' Create a stream on the server side'''
- stream = Stream(self)
- stream.id = self._nextStreamId
- self.streams[stream.id] = stream
- self._nextStreamId += 1
- return stream
-
class Server(object):
'''A RTMP server listens for incoming connections and informs the app.'''
'''everytime this property is accessed it returns a new list of clients connected to this instance.'''
return self._clients[1:] if self._clients is not None else []
+
class App(BaseApp, SimpleAppHTTPRequestHandler):
pass
+
class HTTPServer(object):
- '''A RTMP server to record and stream Flash video.'''
+ '''A RTMP server to record and stream HTTP.'''
def __init__(self):
'''Construct a new HttpServer. It initializes the local members.'''
self.sock = self.server = None
multitask.add(self.serverlistener())
def stop(self):
- if _debug: print 'stopping Flash server'
+ if _debug: print 'stopping HTTP server'
if self.server and self.sock:
try: self.sock.close(); self.sock = None
except: pass
(sys and sys.exc_info() or None)
traceback.print_exc()
- def clientlistener(self, client):
- '''Client listener (generator). It receives a command and invokes client handler, or receives a new stream and invokes streamlistener.'''
- try:
- while True:
- msg, arg = (yield client.recv()) # receive new message from client
- if not msg: # if the client disconnected,
- if _debug: print 'connection closed from client'
- break # come out of listening loop.
- if msg == 'command': # handle a new command
- multitask.add(self.clienthandler(client, arg))
- elif msg == 'stream': # a new stream is created, handle the stream.
- arg.client = client
- multitask.add(self.streamlistener(arg))
- except StopIteration: raise
- except:
- if _debug: print 'clientlistener exception', (sys and sys.exc_info() or None)
- traceback.print_exc()
-
- # client is disconnected, clear our state for application instance.
- if _debug: print 'cleaning up client', client.path
- inst = None
- if client.path in self.clients:
- inst = self.clients[client.path][0]
- self.clients[client.path].remove(client)
- for stream in client.streams.values(): # for all streams of this client
- self.closehandler(stream)
- client.streams.clear() # and clear the collection of streams
- if client.path in self.clients and len(self.clients[client.path]) == 1: # no more clients left, delete the instance.
- if _debug: print 'removing the application instance'
- inst = self.clients[client.path][0]
- inst._clients = None
- del self.clients[client.path]
- if inst is not None: inst.onDisconnect(client)
-
- def closehandler(self, stream):
- '''A stream is closed explicitly when a closeStream command is received from given client.'''
- if stream.client is not None:
- inst = self.clients[stream.client.path][0]
- if stream.name in inst.publishers and inst.publishers[stream.name] == stream: # clear the published stream
- inst.onClose(stream.client, stream)
- del inst.publishers[stream.name]
- if stream.name in inst.players and stream in inst.players[stream.name]:
- inst.onStop(stream.client, stream)
- inst.players[stream.name].remove(stream)
- if len(inst.players[stream.name]) == 0:
- del inst.players[stream.name]
- stream.close()
-
- def clienthandler(self, client, cmd):
- '''A generator to handle a single command on the client.'''
- inst = self.clients[client.path][0]
- if inst:
- if cmd.name == '_error':
- if hasattr(inst, 'onStatus'):
- result = inst.onStatus(client, cmd.args[0])
- elif cmd.name == '_result':
- if hasattr(inst, 'onResult'):
- result = inst.onResult(client, cmd.args[0])
- else:
- res, code, args = Command(), '_result', dict()
- try: result = inst.onCommand(client, cmd.name, *cmd.args)
- except:
- if _debug: print 'Client.call exception', (sys and sys.exc_info() or None)
- code, args = '_error', dict()
- res.id, res.name, res.type = cmd.id, code, (client.objectEncoding == 0.0 and Message.RPC or Message.RPC3)
- res.args, res.cmdData = args, None
- if _debug: print 'Client.call method=', code, 'args=', args, ' msg=', res.toMessage()
- client.writeMessage(res.toMessage())
- # TODO return result to caller
- yield
-
- def streamlistener(self, stream):
- '''Stream listener (generator). It receives stream message and invokes streamhandler.'''
- stream.recordfile = None # so that it doesn't complain about missing attribute
- while True:
- msg = (yield stream.recv())
- if not msg:
- if _debug: print 'stream closed'
- self.closehandler(stream)
- break
- # if _debug: msg
- multitask.add(self.streamhandler(stream, msg))
-
- def streamhandler(self, stream, message):
- '''A generator to handle a single message on the stream.'''
- try:
- if message.type == Message.RPC:
- cmd = Command.fromMessage(message)
- if _debug: print 'streamhandler received cmd=', cmd
- if cmd.name == 'publish':
- yield self.publishhandler(stream, cmd)
- elif cmd.name == 'play':
- yield self.playhandler(stream, cmd)
- elif cmd.name == 'closeStream':
- self.closehandler(stream)
- else: # audio or video message
- yield self.mediahandler(stream, message)
- except GeneratorExit: pass
- except StopIteration: pass
- except:
- if _debug: print 'exception in streamhandler', (sys and sys.exc_info())
-
- def publishhandler(self, stream, cmd):
- '''A new stream is published. Store the information in the application instance.'''
- try:
- stream.mode = 'live' if len(cmd.args) < 2 else cmd.args[1] # live, record, append
- stream.name = cmd.args[0]
- if _debug: print 'publishing stream=', stream.name, 'mode=', stream.mode
- inst = self.clients[stream.client.path][0]
- if (stream.name in inst.publishers):
- raise ValueError, 'Stream name already in use'
- inst.publishers[stream.name] = stream # store the client for publisher
- result = inst.onPublish(stream.client, stream)
-
- if stream.mode == 'record' or stream.mode == 'append':
- stream.recordfile = FLV().open(getfilename(stream.client.path, stream.name, self.root), stream.mode)
- response = Command(name='onStatus', id=cmd.id, args=[dict(level='status', code='NetStream.Publish.Start', description='', details=None)])
- yield stream.send(response)
- except ValueError, E: # some error occurred. inform the app.
- if _debug: print 'error in publishing stream', str(E)
- response = Command(name='onStatus', id=cmd.id, args=[dict(level='error',code='NetStream.Publish.BadName',description=str(E),details=None)])
- yield stream.send(response)
-
- def playhandler(self, stream, cmd):
- '''A new stream is being played. Just updated the players list with this stream.'''
- inst = self.clients[stream.client.path][0]
- name = stream.name = cmd.args[0] # store the stream's name
- start = cmd.args[1] if len(cmd.args) >= 2 else -2
- if name not in inst.players:
- inst.players[name] = [] # initialize the players for this stream name
- if stream not in inst.players[name]: # store the stream as players of this name
- inst.players[name].append(stream)
- path = getfilename(stream.client.path, stream.name, self.root)
- if os.path.exists(path):
- stream.playfile = FLV().open(path)
- multitask.add(stream.playfile.reader(stream))
- if _debug: print 'playing stream=', name, 'start=', start
- result = inst.onPlay(stream.client, stream)
- response = Command(name='onStatus', id=cmd.id, args=[dict(level='status',code='NetStream.Play.Start', description=stream.name, details=None)])
- yield stream.send(response)
-
- def mediahandler(self, stream, message):
- '''Handle incoming media on the stream, by sending to other stream in this application instance.'''
- if stream.client is not None:
- inst = self.clients[stream.client.path][0]
- result = inst.onPublishData(stream.client, stream, message)
- if result:
- client = stream.client
- for s in (inst.players.get(stream.name, [])):
- # if _debug: print 'D', stream.name, s.name
- result = inst.onPlayData(s.client, s, message)
- if result:
- yield s.send(message)
- if stream.recordfile is not None:
- stream.recordfile.write(message)
# The main routine to start, run and stop the service
if __name__ == '__main__':
agent = HTTPServer()
agent.root = options.root
agent.start(options.host, options.port)
- if _debug: print time.asctime(), 'Flash Server Starts - %s:%d' % (options.host, options.port)
+ if _debug: print time.asctime(), 'HTTP Server Starts - %s:%d' % (options.host, options.port)
multitask.run()
except KeyboardInterrupt:
print traceback.print_exc()
except:
print "exception"
print traceback.print_exc()
- if _debug: print time.asctime(), 'Flash Server Stops'
+ if _debug: print time.asctime(), 'HTTP Server Stops'