--- /dev/null
+"""Simple HTTP Server.
+
+This module builds on BaseHTTPServer by implementing the standard GET
+and HEAD requests in a fairly straightforward manner.
+
+"""
+
+
+__version__ = "0.6"
+
+__all__ = ["SimpleHTTPRequestHandler"]
+
+import os
+import posixpath
+import BaseHTTPServer
+import urllib
+import urlparse
+import cgi
+import shutil
+import mimetypes
+try:
+ from cStringIO import StringIO
+except ImportError:
+ from StringIO import StringIO
+
+
+class SimpleAppHTTPRequestHandler(object):
+
+ """Simple HTTP request handler with GET and HEAD commands.
+
+ This serves files from the current directory and any of its
+ subdirectories. The MIME type for files is determined by
+ calling the .guess_type() method.
+
+ The GET and HEAD requests are identical except that the HEAD
+ request omits the actual contents of the file.
+
+ """
+
+ server_version = "SimpleHTTP/" + __version__
+
+ def onGET(self, client, *args):
+ """Serve a GET request."""
+ self.client = client
+ hr = args[0]
+ self.path = hr.path
+ ka = hr.request_version == "HTTP/1.1"
+ f = self.send_head(hr, ka)
+ if f:
+ self.copyfile(f, hr.wfile)
+ f.close()
+
+ def onHEAD(self):
+ self.client = client
+ """Serve a HEAD request."""
+ hr = args[0]
+ f = self.send_head(hr, False)
+ if f:
+ f.close()
+
+ def send_head(self, hr, ka):
+ """Common code for GET and HEAD commands.
+
+ This sends the response code and MIME headers.
+
+ Return value is either a file object (which has to be copied
+ to the outputfile by the caller unless the command was HEAD,
+ and must be closed by the caller under all circumstances), or
+ None, in which case the caller has nothing further to do.
+
+ """
+ path = self.translate_path(self.path)
+ f = None
+ if os.path.isdir(path):
+ if not self.path.endswith('/'):
+ # redirect browser - doing basically what apache does
+ hr.send_response(301)
+ hr.send_header("Location", self.path + "/")
+ hr.end_headers()
+ return None
+ for index in "index.html", "index.htm":
+ index = os.path.join(path, index)
+ if os.path.exists(index):
+ path = index
+ break
+ else:
+ return self.list_directory(hr, path, ka)
+ ctype = self.guess_type(path)
+ if ctype.startswith('text/'):
+ mode = 'r'
+ else:
+ mode = 'rb'
+ try:
+ f = open(path, mode)
+ except IOError:
+ hr.send_error(404, "File not found")
+ return None
+ hr.send_response(200)
+ hr.send_header("Content-type", ctype)
+ fs = os.fstat(f.fileno())
+ hr.send_header("Content-Length", str(fs[6]))
+ hr.send_header("Last-Modified", hr.date_time_string(fs.st_mtime))
+ if ka:
+ hr.send_header("Connection", "keep-alive")
+ self.client.cookies.add_cookie_header(hr)
+ hr.end_headers()
+ return f
+
+ def list_directory(self, hr, path, ka):
+ """Helper to produce a directory listing (absent index.html).
+
+ Return value is either a file object, or None (indicating an
+ error). In either case, the headers are sent, making the
+ interface the same as for send_head().
+
+ """
+ try:
+ list = os.listdir(path)
+ except os.error:
+ self.send_error(404, "No permission to list directory")
+ return None
+ list.sort(key=lambda a: a.lower())
+ f = StringIO()
+ displaypath = cgi.escape(urllib.unquote(self.path))
+ f.write("<title>Directory listing for %s</title>\n" % displaypath)
+ f.write("<h2>Directory listing for %s</h2>\n" % displaypath)
+ f.write("<hr>\n<ul>\n")
+ for name in list:
+ fullname = os.path.join(path, name)
+ displayname = linkname = name
+ # Append / for directories or @ for symbolic links
+ if os.path.isdir(fullname):
+ displayname = name + "/"
+ linkname = name + "/"
+ if os.path.islink(fullname):
+ displayname = name + "@"
+ # Note: a link to a directory displays with @ and links with /
+ f.write('<li><a href="%s">%s</a>\n'
+ % (urllib.quote(linkname), cgi.escape(displayname)))
+ f.write("</ul>\n<hr>\n")
+ length = f.tell()
+ f.seek(0)
+ hr.send_response(200)
+ hr.send_header("Content-type", "text/html")
+ hr.send_header("Content-Length", str(length))
+ if ka:
+ hr.send_header("Connection", "keep-alive")
+ self.client.cookies.add_cookie_header(hr)
+ hr.end_headers()
+ return f
+
+ def translate_path(self, path):
+ """Translate a /-separated PATH to the local filename syntax.
+
+ Components that mean special things to the local file system
+ (e.g. drive or directory names) are ignored. (XXX They should
+ probably be diagnosed.)
+
+ """
+ # abandon query parameters
+ path = urlparse.urlparse(path)[2]
+ path = posixpath.normpath(urllib.unquote(path))
+ words = path.split('/')
+ words = filter(None, words)
+ path = os.getcwd()
+ for word in words:
+ drive, word = os.path.splitdrive(word)
+ head, word = os.path.split(word)
+ if word in (os.curdir, os.pardir): continue
+ path = os.path.join(path, word)
+ return path
+
+ def copyfile(self, source, outputfile):
+ """Copy all data between two file objects.
+
+ The SOURCE argument is a file object open for reading
+ (or anything with a read() method) and the DESTINATION
+ argument is a file object open for writing (or
+ anything with a write() method).
+
+ The only reason for overriding this would be to change
+ the block size or perhaps to replace newlines by CRLF
+ -- note however that this the default server uses this
+ to copy binary data as well.
+
+ """
+ shutil.copyfileobj(source, outputfile)
+
+ def guess_type(self, path):
+ """Guess the type of a file.
+
+ Argument is a PATH (a filename).
+
+ Return value is a string of the form type/subtype,
+ usable for a MIME Content-type header.
+
+ The default implementation looks the file's extension
+ up in the table self.extensions_map, using application/octet-stream
+ as a default; however it would be permissible (if
+ slow) to look inside the data to make a better guess.
+
+ """
+
+ base, ext = posixpath.splitext(path)
+ if ext in self.extensions_map:
+ return self.extensions_map[ext]
+ ext = ext.lower()
+ if ext in self.extensions_map:
+ return self.extensions_map[ext]
+ else:
+ return self.extensions_map['']
+
+ if not mimetypes.inited:
+ mimetypes.init() # try to read system mime.types
+ extensions_map = mimetypes.types_map.copy()
+ extensions_map.update({
+ '': 'application/octet-stream', # Default
+ '.py': 'text/plain',
+ '.c': 'text/plain',
+ '.h': 'text/plain',
+ })
+
--- /dev/null
+# Copyright (c) 2007-2009, Mamta Singh. All rights reserved. see README for details.
+
+'''
+This is a simple implementation of a Flash RTMP server to accept connections and stream requests. The module is organized as follows:
+1. The FlashServer class is the main class to provide the server abstraction. It uses the multitask module for co-operative multitasking.
+ It also uses the App abstract class to implement the applications.
+2. The Server class implements a simple server to receive new Client connections and inform the FlashServer application. The Client class
+ derived from Protocol implements the RTMP client functions. The Protocol class implements the base RTMP protocol parsing. A Client contains
+ various streams from the client, represented using the Stream class.
+3. The Message, Header and Command represent RTMP message, header and command respectively. The FLV class implements functions to perform read
+ and write of FLV file format.
+
+
+Typically an application can launch this server as follows:
+$ python rtmp.py
+
+To know the command line options use the -h option:
+$ python rtmp.py -h
+
+To start the server with a different directory for recording and playing FLV files from, use the following command.
+$ python rtmp.py -r some-other-directory/
+Note the terminal '/' in the directory name. Without this, it is just used as a prefix in FLV file names.
+
+A test client is available in testClient directory, and can be compiled using Flex Builder. Alternatively, you can use the SWF file to launch
+from testClient/bin-debug after starting the server. Once you have launched the client in the browser, you can connect to
+local host by clicking on 'connect' button. Then click on publish button to publish a stream. Open another browser with
+same URL and first connect then play the same stream name. If everything works fine you should be able to see the video
+from first browser to the second browser. Similar, in the first browser, if you check the record box before publishing,
+it will create a new FLV file for the recorded stream. You can close the publishing stream and play the recorded stream to
+see your recording. Note that due to initial delay in timestamp (in case publish was clicked much later than connect),
+your played video will start appearing after some initial delay.
+
+
+If an application wants to use this module as a library, it can launch the server as follows:
+>>> agent = FlashServer() # a new RTMP server instance
+>>> agent.root = 'flvs/' # set the document root to be 'flvs' directory. Default is current './' directory.
+>>> agent.start() # start the server
+>>> multitask.run() # this is needed somewhere in the application to actually start the co-operative multitasking.
+
+
+If an application wants to specify a different application other than the default App, it can subclass it and supply the application by
+setting the server's apps property. The following example shows how to define "myapp" which invokes a 'connected()' method on client when
+the client connects to the server.
+
+class MyApp(App): # a new MyApp extends the default App in rtmp module.
+ def __init__(self): # constructor just invokes base class constructor
+ App.__init__(self)
+ def onConnect(self, client, *args):
+ result = App.onConnect(self, client, *args) # invoke base class method first
+ def invokeAdded(self, client): # define a method to invoke 'connected("some-arg")' on Flash client
+ client.call('connected', 'some-arg')
+ yield
+ multitask.add(invokeAdded(self, client)) # need to invoke later so that connection is established before callback
+...
+agent.apps = dict({'myapp': MyApp, 'someapp': MyApp, '*': App})
+
+Now the client can connect to rtmp://server/myapp or rtmp://server/someapp and will get connected to this MyApp application.
+If the client doesn't define "function connected(arg:String):void" in the NetConnection.client object then the server will
+throw an exception and display the error message.
+
+'''
+
+import os, sys, time, struct, socket, traceback, multitask
+import threading, Queue
+import uuid
+
+from BaseHTTPServer import BaseHTTPRequestHandler
+from SimpleAppHTTPServer import SimpleAppHTTPRequestHandler
+from cStringIO import StringIO
+from cookielib import parse_ns_headers, CookieJar
+
+_debug = False
+
+class MultitaskHTTPRequestHandler(BaseHTTPRequestHandler):
+
+ def setup(self):
+ self.connection = self.request
+ self.rfile = StringIO()
+ self.wfile = StringIO()
+
+ def finish(self):
+ pass
+
+ def info(self):
+ return self.headers
+
+ def get_full_url(self):
+ return self.path
+
+ def get_header(self, hdr, default):
+ return self.headers.getheader(hdr, default)
+
+ def has_header(self, hdr):
+ return False
+
+ def is_unverifiable(self):
+ return False
+
+ def add_unredirected_header(self, name, val):
+ if name == 'Cookie':
+ self.send_header("Set-Cookie", val)
+
+class ConnectionClosed:
+ 'raised when the client closed the connection'
+
+class SockStream(object):
+ '''A class that represents a socket as a stream'''
+ def __init__(self, sock):
+ self.sock, self.buffer = sock, ''
+ self.bytesWritten = self.bytesRead = 0
+
+ def close(self):
+ print "sock close"
+ fno = self.sock.fileno()
+ self.sock.close()
+ print "writable?", self.sock, fno
+ yield multitask.writable(fno, timeout=0.1)
+ print "is it?"
+ del self.sock
+ self.sock = None
+
+ def readline(self):
+ try:
+ while True:
+ nl = self.buffer.find("\n")
+ if nl >= 0: # do not have newline in buffer
+ data, self.buffer = self.buffer[:nl+1], self.buffer[nl+1:]
+ raise StopIteration(data)
+ data = (yield multitask.recv(self.sock, 4096)) # read more from socket
+ if not data: raise ConnectionClosed
+ if _debug: print 'socket.read[%d] %r'%(len(data), data)
+ self.bytesRead += len(data)
+ self.buffer += data
+ except StopIteration: raise
+ except: raise ConnectionClosed # anything else is treated as connection closed.
+
+ def read(self, count):
+ try:
+ while True:
+ if len(self.buffer) >= count: # do not have data in buffer
+ data, self.buffer = self.buffer[:count], self.buffer[count:]
+ raise StopIteration(data)
+ data = (yield multitask.recv(self.sock, 4096)) # read more from socket
+ if not data: raise ConnectionClosed
+ # if _debug: print 'socket.read[%d] %r'%(len(data), data)
+ self.bytesRead += len(data)
+ self.buffer += data
+ except StopIteration: raise
+ except: raise ConnectionClosed # anything else is treated as connection closed.
+
+ def unread(self, data):
+ self.buffer = data + self.buffer
+
+ def write(self, data):
+ while len(data) > 0: # write in 4K chunks each time
+ chunk, data = data[:4096], data[4096:]
+ self.bytesWritten += len(chunk)
+ if _debug: print 'socket.write[%d] %r'%(len(chunk), chunk)
+ try: yield multitask.send(self.sock, chunk)
+ except: raise ConnectionClosed
+ if _debug: print 'socket.written'
+
+
+class Header(object):
+ FULL, MESSAGE, TIME, SEPARATOR, MASK = 0x00, 0x40, 0x80, 0xC0, 0xC0
+
+ def __init__(self, channel=0, time=0, size=None, type=None, streamId=0):
+ self.channel, self.time, self.size, self.type, self.streamId = channel, time, size, type, streamId
+ self.extendedtime = 0
+ if channel<64: self.hdrdata = chr(channel)
+ elif channel<(64+256): self.hdrdata = '\x00'+chr(channel-64)
+ else: self.hdrdata = '\x01'+chr((channel-64)%256)+chr((channel-64)/256)
+
+ def _appendExtendedTimestamp(self, data):
+ if self.time == 0xFFFFFF:
+ data += struct.pack('>I', self.extendedtime)
+ return data
+
+ def toBytes(self, control):
+ data = chr(ord(self.hdrdata[0]) | control) + self.hdrdata[1:]
+ if control == Header.SEPARATOR: return self._appendExtendedTimestamp(data)
+
+ data += struct.pack('>I', self.time & 0xFFFFFF)[1:] # add time
+ if control == Header.TIME: return self._appendExtendedTimestamp(data)
+
+ data += struct.pack('>I', self.size)[1:] # size
+ data += chr(self.type) # type
+ if control == Header.MESSAGE: return self._appendExtendedTimestamp(data)
+
+ data += struct.pack('<I', self.streamId) # add streamId
+ return self._appendExtendedTimestamp(data)
+
+ def __repr__(self):
+ return ("<Header channel=%r time=%r size=%r type=%r (0x%02x) streamId=%r>"
+ % (self.channel, self.time, self.size, self.type, self.type or 0, self.streamId))
+
+class Message(object):
+ # message types: RPC3, DATA3,and SHAREDOBJECT3 are used with AMF3
+ RPC, RPC3, DATA, DATA3, SHAREDOBJ, SHAREDOBJ3, AUDIO, VIDEO, ACK, CHUNK_SIZE = \
+ 0x14, 0x11, 0x12, 0x0F, 0x13, 0x10, 0x08, 0x09, 0x03, 0x01
+
+ def __init__(self, hdr=None, data=''):
+ if hdr is None: hdr = Header()
+ self.header, self.data = hdr, data
+
+ # define properties type, streamId and time to access self.header.(property)
+ for p in ['type', 'streamId', 'time']:
+ exec 'def _g%s(self): return self.header.%s'%(p, p)
+ exec 'def _s%s(self, %s): self.header.%s = %s'%(p, p, p, p)
+ exec '%s = property(fget=_g%s, fset=_s%s)'%(p, p, p)
+ @property
+ def size(self): return len(self.data)
+
+ def __repr__(self):
+ return ("<Message header=%r data=%r>"% (self.header, self.data))
+
+class Protocol(object):
+ # constants
+ PING_SIZE = 1536
+ DEFAULT_CHUNK_SIZE = 128
+ MIN_CHANNEL_ID = 3
+ PROTOCOL_CHANNEL_ID = 2
+
+ def __init__(self, sock):
+ self.stream = SockStream(sock)
+ self.lastReadHeaders = dict() # indexed by channelId
+ self.incompletePackets = dict() #indexed by channelId
+ self.readChunkSize = self.writeChunkSize = Protocol.DEFAULT_CHUNK_SIZE
+ self.lastWriteHeaders = dict() # indexed by streamId
+ self.nextChannelId = Protocol.MIN_CHANNEL_ID
+ self.writeLock = threading.Lock()
+ self.writeQueue = Queue.Queue()
+
+ def messageReceived(self, msg):
+ yield
+
+ def protocolMessage(self, msg):
+ if msg.type == Message.ACK: # respond to ACK requests
+ response = Message()
+ response.type, response.data = msg.type, msg.data
+ self.writeMessage(response)
+ elif msg.type == Message.CHUNK_SIZE:
+ self.readChunkSize = struct.unpack('>L', msg.data)[0]
+
+ def connectionClosed(self):
+ yield
+
+ def parse(self):
+ try:
+ yield self.parseRequests() # parse http requests
+ except ConnectionClosed:
+ yield self.connectionClosed()
+ if _debug: print 'parse connection closed'
+
+ def writeMessage(self, message):
+ self.writeQueue.put(message)
+
+ def parseHandshake(self):
+ '''Parses the rtmp handshake'''
+ data = (yield self.stream.read(Protocol.PING_SIZE + 1)) # bound version and first ping
+ yield self.stream.write(data)
+ data = (yield self.stream.read(Protocol.PING_SIZE)) # bound second ping
+ yield self.stream.write(data)
+
+ def parseRequests(self):
+ '''Parses complete messages until connection closed. Raises ConnectionLost exception.'''
+ self.hr = MultitaskHTTPRequestHandler(self.stream, ("",0,), self.remote)
+ self.hr.close_connection = 1
+ self.cookies = CookieJar()
+
+ while True:
+
+ # prepare reading the request so that when it's handed
+ # over to "standard" HTTPRequestHandler, the data's already
+ # there.
+ print "parseRequests"
+ raw_requestline = (yield self.stream.readline())
+ if _debug: print "parseRequests, line", raw_requestline
+ if not raw_requestline:
+ raise ConnectionClosed
+ data = ''
+ try:
+ while 1:
+ line = (yield self.stream.readline())
+ data += line
+ if line in ['\n', '\r\n']:
+ break
+ except StopIteration:
+ if _debug: print "parseRequests, stopiter"
+ raise
+ except:
+ if _debug:
+ print 'parseRequests', \
+ (traceback and traceback.print_exc() or None)
+
+ raise ConnectionClosed
+
+ self.hr.raw_requestline = raw_requestline
+ self.hr.rfile.write(data)
+ print "parseRequests write after"
+ self.hr.rfile.seek(0)
+ print "parseRequests seek after"
+
+ if not self.hr.parse_request():
+ raise ConnectionClosed
+ print "parseRequests parse_req after"
+ try:
+ yield self.messageReceived(self.hr)
+ except:
+ if _debug:
+ print 'messageReceived', \
+ (traceback and traceback.print_exc() or None)
+
+ def write(self):
+ '''Writes messages to stream'''
+ while True:
+ while self.writeQueue.empty(): (yield multitask.sleep(0.01))
+ data = self.writeQueue.get() # TODO this should be used using multitask.Queue and remove previous wait.
+ if _debug: print "write to stream", repr(data)
+ if data is None:
+ # just in case TCP socket is not closed, close it.
+ try:
+ print "stream closing"
+ print self.stream
+ yield self.stream.close()
+ print "stream closed"
+ except: pass
+ break
+
+ try:
+ yield self.stream.write(data)
+ except ConnectionClosed:
+ yield self.connectionClosed()
+ except:
+ print traceback.print_exc()
+
+class Command(object):
+ ''' Class for command / data messages'''
+ def __init__(self, type=Message.RPC, name=None, id=None, cmdData=None, args=[]):
+ '''Create a new command with given type, name, id, cmdData and args list.'''
+ self.type, self.name, self.id, self.cmdData, self.args = type, name, id, cmdData, args[:]
+
+ def __repr__(self):
+ return ("<Command type=%r name=%r id=%r data=%r args=%r>" % (self.type, self.name, self.id, self.cmdData, self.args))
+
+ def setArg(self, arg):
+ self.args.append(arg)
+
+ def getArg(self, index):
+ return self.args[index]
+
+ @classmethod
+ def fromMessage(cls, message):
+ ''' initialize from a parsed RTMP message'''
+ assert (message.type in [Message.RPC, Message.RPC3, Message.DATA, Message.DATA3])
+
+ length = len(message.data)
+ if length == 0: raise ValueError('zero length message data')
+
+ if message.type == Message.RPC3 or message.type == Message.DATA3:
+ assert message.data[0] == '\x00' # must be 0 in AMD3
+ data = message.data[1:]
+ else:
+ data = message.data
+
+ amfReader = amf.AMF0(data)
+
+ inst = cls()
+ inst.name = amfReader.read() # first field is command name
+
+ try:
+ if message.type == Message.RPC:
+ inst.id = amfReader.read() # second field *may* be message id
+ inst.cmdData = amfReader.read() # third is command data
+ else:
+ inst.id = 0
+ inst.args = [] # others are optional
+ while True:
+ inst.args.append(amfReader.read())
+ except EOFError:
+ pass
+ return inst
+
+ def toMessage(self):
+ msg = Message()
+ assert self.type
+ msg.type = self.type
+ output = amf.BytesIO()
+ amfWriter = amf.AMF0(output)
+ amfWriter.write(self.name)
+ if msg.type == Message.RPC or msg.type == Message.RPC3:
+ amfWriter.write(self.id)
+ amfWriter.write(self.cmdData)
+ for arg in self.args:
+ amfWriter.write(arg)
+ output.seek(0)
+ #hexdump.hexdump(output)
+ #output.seek(0)
+ if msg.type == Message.RPC3 or msg.type == Message.DATA3:
+ data = '\x00' + output.read()
+ else:
+ data = output.read()
+ msg.data = data
+ output.close()
+ return msg
+
+def getfilename(path, name, root):
+ '''return the file name for the given stream. The name is derived as root/scope/name.flv where scope is
+ the the path present in the path variable.'''
+ ignore, ignore, scope = path.partition('/')
+ if scope: scope = scope + '/'
+ result = root + scope + name + '.flv'
+ if _debug: print 'filename=', result
+ return result
+
+class Stream(object):
+ '''The stream object that is used for RTMP stream.'''
+ count = 0;
+ def __init__(self, client):
+ self.client, self.id, self.name = client, 0, ''
+ self.recordfile = self.playfile = None # so that it doesn't complain about missing attribute
+ self.queue = multitask.Queue()
+ self._name = 'Stream[' + str(Stream.count) + ']'; Stream.count += 1
+ if _debug: print self, 'created'
+
+ def close(self):
+ if _debug: print self, 'closing'
+ if self.recordfile is not None: self.recordfile.close(); self.recordfile = None
+ if self.playfile is not None: self.playfile.close(); self.playfile = None
+ self.client = None # to clear the reference
+ pass
+
+ def __repr__(self):
+ return self._name;
+
+ def recv(self):
+ '''Generator to receive new Message on this stream, or None if stream is closed.'''
+ return self.queue.get()
+
+ def send(self, msg):
+ '''Method to send a Message or Command on this stream.'''
+ if isinstance(msg, Command):
+ msg = msg.toMessage()
+ msg.streamId = self.id
+ # if _debug: print self,'send'
+ if self.client is not None: self.client.writeMessage(msg)
+
+class Client(Protocol):
+ '''The client object represents a single connected client to the server.'''
+ def __init__(self, sock, server, remote):
+ Protocol.__init__(self, sock)
+ self.server = server
+ self.remote = remote
+ self.agent = {}
+ self.streams = {}
+ self._nextCallId = 2
+ self._nextStreamId = 1
+ self.objectEncoding = 0.0
+ self.queue = multitask.Queue() # receive queue used by application
+ multitask.add(self.parse())
+ multitask.add(self.write())
+
+ def recv(self):
+ '''Generator to receive new Message (msg, arg) on this stream, or (None,None) if stream is closed.'''
+ return self.queue.get()
+
+ def connectionClosed(self):
+ '''Called when the client drops the connection'''
+ if _debug: 'Client.connectionClosed'
+ self.writeMessage(None)
+ yield self.queue.put((None,None))
+
+ def messageReceived(self, msg):
+ if _debug: print 'messageReceived cmd=', msg.command, msg.path
+ ch = msg.headers.getheaders("Cookie")
+ ch = parse_ns_headers(ch)
+ cookies = self.cookies._cookies_from_attrs_set(ch, msg)
+ has_sess = False
+ for c in cookies:
+ self.cookies.set_cookie(c)
+ if c.name == "session":
+ has_sess = True
+ msg.sess_cookie = c
+ if not has_sess:
+ t = ("session", uuid.uuid4().hex, {}, {})
+ msg.sess_cookie = self.cookies._cookie_from_cookie_tuple(t, msg)
+ self.cookies.set_cookie(msg.sess_cookie)
+ yield self.server.queue.put((self, msg)) # new connection
+
+ def accept(self):
+ '''Method to accept an incoming client.'''
+ response = Command()
+ response.id, response.name, response.type = 1, '_result', Message.RPC
+ if _debug: print 'Client.accept() objectEncoding=', self.objectEncoding
+ response.setArg(dict(level='status', code='NetConnection.Connect.Success',
+ description='Connection succeeded.', details=None,
+ objectEncoding=self.objectEncoding))
+ self.writeMessage(response.toMessage())
+
+ def rejectConnection(self, reason=''):
+ '''Method to reject an incoming client.'''
+ response = Command()
+ response.id, response.name, response.type = 1, '_error', Message.RPC
+ response.setArg(dict(level='status', code='NetConnection.Connect.Rejected',
+ description=reason, details=None))
+ self.writeMessage(response.toMessage())
+
+ def call(self, method, *args):
+ '''Call a (callback) method on the client.'''
+ cmd = Command()
+ cmd.id, cmd.name, cmd.type = self._nextCallId, method, (self.objectEncoding == 0.0 and Message.RPC or Message.RPC3)
+ cmd.args, cmd.cmdData = args, None
+ self._nextCallId += 1
+ if _debug: print 'Client.call method=', method, 'args=', args, ' msg=', cmd.toMessage()
+ self.writeMessage(cmd.toMessage())
+
+ def createStream(self):
+ ''' Create a stream on the server side'''
+ stream = Stream(self)
+ stream.id = self._nextStreamId
+ self.streams[stream.id] = stream
+ self._nextStreamId += 1
+ return stream
+
+
+class Server(object):
+ '''A RTMP server listens for incoming connections and informs the app.'''
+ def __init__(self, sock):
+ '''Create an RTMP server on the given bound TCP socket. The server will terminate
+ when the socket is disconnected, or some other error occurs in listening.'''
+ self.sock = sock
+ self.queue = multitask.Queue() # queue to receive incoming client connections
+ multitask.add(self.run())
+
+ def recv(self):
+ '''Generator to wait for incoming client connections on this server and return
+ (client, args) or (None, None) if the socket is closed or some error.'''
+ return self.queue.get()
+
+ def run(self):
+ try:
+ while True:
+ sock, remote = (yield multitask.accept(self.sock)) # receive client TCP
+ if sock == None:
+ if _debug: print 'rtmp.Server accept(sock) returned None.'
+ break
+ if _debug: print 'connection received from', remote
+ sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # make it non-block
+ client = Client(sock, self, remote)
+ except:
+ if _debug: print 'rtmp.Server exception ', (sys and sys.exc_info() or None)
+ traceback.print_exc()
+
+ if (self.sock):
+ try: self.sock.close(); self.sock = None
+ except: pass
+ if (self.queue):
+ yield self.queue.put((None, None))
+ self.queue = None
+
+class App(SimpleAppHTTPRequestHandler):
+ '''An application instance containing any number of streams. Except for constructor all methods are generators.'''
+ count = 0
+ def __init__(self):
+ self.name = str(self.__class__.__name__) + '[' + str(App.count) + ']'; App.count += 1
+ self.players, self.publishers, self._clients = {}, {}, [] # Streams indexed by stream name, and list of clients
+ if _debug: print self.name, 'created'
+ def __del__(self):
+ if _debug: print self.name, 'destroyed'
+ @property
+ def clients(self):
+ '''everytime this property is accessed it returns a new list of clients connected to this instance.'''
+ return self._clients[1:] if self._clients is not None else []
+ def onConnect(self, client, *args):
+ if _debug: print self.name, 'onConnect', client.path
+ return True
+ def onDisconnect(self, client):
+ if _debug: print self.name, 'onDisconnect', client.path
+
+class HttpServer(object):
+ '''A RTMP server to record and stream Flash video.'''
+ def __init__(self):
+ '''Construct a new HttpServer. It initializes the local members.'''
+ self.sock = self.server = None
+ self.apps = dict({'*': App}) # supported applications: * means any as in {'*': App}
+ self.clients = dict() # list of clients indexed by scope. First item in list is app instance.
+ self.root = ''
+
+ def start(self, host='0.0.0.0', port=8080):
+ '''This should be used to start listening for RTMP connections on the given port, which defaults to 8080.'''
+ if _debug: print 'start', host, port
+ if not self.server:
+ sock = self.sock = socket.socket(type=socket.SOCK_STREAM)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.bind((host, port))
+ if _debug: print 'listening on ', sock.getsockname()
+ sock.listen(5)
+ server = self.server = Server(sock) # start rtmp server on that socket
+ multitask.add(self.serverlistener())
+
+ def stop(self):
+ if _debug: print 'stopping Flash server'
+ if self.server and self.sock:
+ try: self.sock.close(); self.sock = None
+ except: pass
+ self.server = None
+
+ def serverlistener(self):
+ '''Server listener (generator). It accepts all connections and invokes client listener'''
+ try:
+ while True: # main loop to receive new connections on the server
+ client, msg = (yield self.server.recv()) # receive an incoming client connection.
+ # TODO: we should reject non-localhost client connections.
+ if not client: # if the server aborted abnormally,
+ break # hence close the listener.
+ if _debug: print 'client connection received', client, args
+ # if client.objectEncoding != 0 and client.objectEncoding != 3:
+ if client.objectEncoding != 0:
+ yield client.rejectConnection(reason='Unsupported encoding ' + str(client.objectEncoding) + '. Please use NetConnection.defaultObjectEncoding=ObjectEncoding.AMF0')
+ yield client.connectionClosed()
+ else:
+ print "cookies", str(client.cookies)
+ session = msg.sess_cookie.value
+ name, ignore, scope = msg.path.partition('/')
+ if '*' not in self.apps and name not in self.apps:
+ yield client.rejectConnection(reason='Application not found: ' + name)
+ else: # create application instance as needed and add in our list
+ if _debug: print 'name=', name, 'name in apps', str(name in self.apps)
+ app = self.apps[name] if name in self.apps else self.apps['*'] # application class
+ if session in self.clients: inst = self.clients[session][0]
+ else: inst = app()
+ try:
+ methodname = "on%s" % msg.command
+ method = getattr(inst, methodname, None)
+ result = method(client, msg)
+ close_connection = msg.close_connection
+ print "close connection", close_connection
+ except:
+ if _debug: traceback.print_exc()
+ yield client.rejectConnection(reason='Exception on %s' % methodname)
+ continue
+ if result is True or result is None:
+ if session not in self.clients:
+ self.clients[session] = [inst]; inst._clients=self.clients[session]
+ self.clients[session].append(client)
+ msg.wfile.seek(0)
+ data = msg.wfile.read()
+ msg.wfile.seek(0)
+ msg.wfile.truncate()
+ yield client.writeMessage(data)
+ if close_connection:
+ if _debug:
+ print 'close_connection requested'
+ try:
+ yield client.connectionClosed()
+ except ClientClosed:
+ if _debug:
+ print 'close_connection done'
+ pass
+ else:
+ yield client.rejectConnection(reason='Rejected in onConnect')
+ except StopIteration: raise
+ except:
+ if _debug:
+ print 'serverlistener exception', \
+ (sys and sys.exc_info() or None)
+ traceback.print_exc()
+
+ def clientlistener(self, client):
+ '''Client listener (generator). It receives a command and invokes client handler, or receives a new stream and invokes streamlistener.'''
+ try:
+ while True:
+ msg, arg = (yield client.recv()) # receive new message from client
+ if not msg: # if the client disconnected,
+ if _debug: print 'connection closed from client'
+ break # come out of listening loop.
+ if msg == 'command': # handle a new command
+ multitask.add(self.clienthandler(client, arg))
+ elif msg == 'stream': # a new stream is created, handle the stream.
+ arg.client = client
+ multitask.add(self.streamlistener(arg))
+ except StopIteration: raise
+ except:
+ if _debug: print 'clientlistener exception', (sys and sys.exc_info() or None)
+ traceback.print_exc()
+
+ # client is disconnected, clear our state for application instance.
+ if _debug: print 'cleaning up client', client.path
+ inst = None
+ if client.path in self.clients:
+ inst = self.clients[client.path][0]
+ self.clients[client.path].remove(client)
+ for stream in client.streams.values(): # for all streams of this client
+ self.closehandler(stream)
+ client.streams.clear() # and clear the collection of streams
+ if client.path in self.clients and len(self.clients[client.path]) == 1: # no more clients left, delete the instance.
+ if _debug: print 'removing the application instance'
+ inst = self.clients[client.path][0]
+ inst._clients = None
+ del self.clients[client.path]
+ if inst is not None: inst.onDisconnect(client)
+
+ def closehandler(self, stream):
+ '''A stream is closed explicitly when a closeStream command is received from given client.'''
+ if stream.client is not None:
+ inst = self.clients[stream.client.path][0]
+ if stream.name in inst.publishers and inst.publishers[stream.name] == stream: # clear the published stream
+ inst.onClose(stream.client, stream)
+ del inst.publishers[stream.name]
+ if stream.name in inst.players and stream in inst.players[stream.name]:
+ inst.onStop(stream.client, stream)
+ inst.players[stream.name].remove(stream)
+ if len(inst.players[stream.name]) == 0:
+ del inst.players[stream.name]
+ stream.close()
+
+ def clienthandler(self, client, cmd):
+ '''A generator to handle a single command on the client.'''
+ inst = self.clients[client.path][0]
+ if inst:
+ if cmd.name == '_error':
+ if hasattr(inst, 'onStatus'):
+ result = inst.onStatus(client, cmd.args[0])
+ elif cmd.name == '_result':
+ if hasattr(inst, 'onResult'):
+ result = inst.onResult(client, cmd.args[0])
+ else:
+ res, code, args = Command(), '_result', dict()
+ try: result = inst.onCommand(client, cmd.name, *cmd.args)
+ except:
+ if _debug: print 'Client.call exception', (sys and sys.exc_info() or None)
+ code, args = '_error', dict()
+ res.id, res.name, res.type = cmd.id, code, (client.objectEncoding == 0.0 and Message.RPC or Message.RPC3)
+ res.args, res.cmdData = args, None
+ if _debug: print 'Client.call method=', code, 'args=', args, ' msg=', res.toMessage()
+ client.writeMessage(res.toMessage())
+ # TODO return result to caller
+ yield
+
+ def streamlistener(self, stream):
+ '''Stream listener (generator). It receives stream message and invokes streamhandler.'''
+ stream.recordfile = None # so that it doesn't complain about missing attribute
+ while True:
+ msg = (yield stream.recv())
+ if not msg:
+ if _debug: print 'stream closed'
+ self.closehandler(stream)
+ break
+ # if _debug: msg
+ multitask.add(self.streamhandler(stream, msg))
+
+ def streamhandler(self, stream, message):
+ '''A generator to handle a single message on the stream.'''
+ try:
+ if message.type == Message.RPC:
+ cmd = Command.fromMessage(message)
+ if _debug: print 'streamhandler received cmd=', cmd
+ if cmd.name == 'publish':
+ yield self.publishhandler(stream, cmd)
+ elif cmd.name == 'play':
+ yield self.playhandler(stream, cmd)
+ elif cmd.name == 'closeStream':
+ self.closehandler(stream)
+ else: # audio or video message
+ yield self.mediahandler(stream, message)
+ except GeneratorExit: pass
+ except StopIteration: pass
+ except:
+ if _debug: print 'exception in streamhandler', (sys and sys.exc_info())
+
+ def publishhandler(self, stream, cmd):
+ '''A new stream is published. Store the information in the application instance.'''
+ try:
+ stream.mode = 'live' if len(cmd.args) < 2 else cmd.args[1] # live, record, append
+ stream.name = cmd.args[0]
+ if _debug: print 'publishing stream=', stream.name, 'mode=', stream.mode
+ inst = self.clients[stream.client.path][0]
+ if (stream.name in inst.publishers):
+ raise ValueError, 'Stream name already in use'
+ inst.publishers[stream.name] = stream # store the client for publisher
+ result = inst.onPublish(stream.client, stream)
+
+ if stream.mode == 'record' or stream.mode == 'append':
+ stream.recordfile = FLV().open(getfilename(stream.client.path, stream.name, self.root), stream.mode)
+ response = Command(name='onStatus', id=cmd.id, args=[dict(level='status', code='NetStream.Publish.Start', description='', details=None)])
+ yield stream.send(response)
+ except ValueError, E: # some error occurred. inform the app.
+ if _debug: print 'error in publishing stream', str(E)
+ response = Command(name='onStatus', id=cmd.id, args=[dict(level='error',code='NetStream.Publish.BadName',description=str(E),details=None)])
+ yield stream.send(response)
+
+ def playhandler(self, stream, cmd):
+ '''A new stream is being played. Just updated the players list with this stream.'''
+ inst = self.clients[stream.client.path][0]
+ name = stream.name = cmd.args[0] # store the stream's name
+ start = cmd.args[1] if len(cmd.args) >= 2 else -2
+ if name not in inst.players:
+ inst.players[name] = [] # initialize the players for this stream name
+ if stream not in inst.players[name]: # store the stream as players of this name
+ inst.players[name].append(stream)
+ path = getfilename(stream.client.path, stream.name, self.root)
+ if os.path.exists(path):
+ stream.playfile = FLV().open(path)
+ multitask.add(stream.playfile.reader(stream))
+ if _debug: print 'playing stream=', name, 'start=', start
+ result = inst.onPlay(stream.client, stream)
+ response = Command(name='onStatus', id=cmd.id, args=[dict(level='status',code='NetStream.Play.Start', description=stream.name, details=None)])
+ yield stream.send(response)
+
+ def mediahandler(self, stream, message):
+ '''Handle incoming media on the stream, by sending to other stream in this application instance.'''
+ if stream.client is not None:
+ inst = self.clients[stream.client.path][0]
+ result = inst.onPublishData(stream.client, stream, message)
+ if result:
+ client = stream.client
+ for s in (inst.players.get(stream.name, [])):
+ # if _debug: print 'D', stream.name, s.name
+ result = inst.onPlayData(s.client, s, message)
+ if result:
+ yield s.send(message)
+ if stream.recordfile is not None:
+ stream.recordfile.write(message)
+
+# The main routine to start, run and stop the service
+if __name__ == '__main__':
+ print "optparse"
+ from optparse import OptionParser
+ parser = OptionParser()
+ parser.add_option('-i', '--host', dest='host', default='0.0.0.0', help="listening IP address. Default '0.0.0.0'")
+ parser.add_option('-p', '--port', dest='port', default=8080, type="int", help='listening port number. Default 8080')
+ parser.add_option('-r', '--root', dest='root', default='./', help="document root directory. Default './'")
+ parser.add_option('-d', '--verbose', dest='verbose', default=False, action='store_true', help='enable debug trace')
+ (options, args) = parser.parse_args()
+
+ _debug = options.verbose
+ try:
+ if _debug: print time.asctime(), 'Options - %s:%d' % (options.host, options.port)
+ agent = HttpServer()
+ agent.root = options.root
+ agent.start(options.host, options.port)
+ if _debug: print time.asctime(), 'Flash Server Starts - %s:%d' % (options.host, options.port)
+ multitask.run()
+ except KeyboardInterrupt:
+ print traceback.print_exc()
+ pass
+ except:
+ print "exception"
+ print traceback.print_exc()
+ if _debug: print time.asctime(), 'Flash Server Stops'
--- /dev/null
+################################################################################
+#
+# Copyright (c) 2007 Christopher J. Stawarz
+#
+# Permission is hereby granted, free of charge, to any person
+# obtaining a copy of this software and associated documentation files
+# (the "Software"), to deal in the Software without restriction,
+# including without limitation the rights to use, copy, modify, merge,
+# publish, distribute, sublicense, and/or sell copies of the Software,
+# and to permit persons to whom the Software is furnished to do so,
+# subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+#
+################################################################################
+
+
+
+"""
+
+Cooperative multitasking and asynchronous I/O using generators
+
+multitask allows Python programs to use generators (a.k.a. coroutines)
+to perform cooperative multitasking and asynchronous I/O.
+Applications written using multitask consist of a set of cooperating
+tasks that yield to a shared task manager whenever they perform a
+(potentially) blocking operation, such as I/O on a socket or getting
+data from a queue. The task manager temporarily suspends the task
+(allowing other tasks to run in the meantime) and then restarts it
+when the blocking operation is complete. Such an approach is suitable
+for applications that would otherwise have to use select() and/or
+multiple threads to achieve concurrency.
+
+The functions and classes in the multitask module allow tasks to yield
+for I/O operations on sockets and file descriptors, adding/removing
+data to/from queues, or sleeping for a specified interval. When
+yielding, a task can also specify a timeout. If the operation for
+which the task yielded has not completed after the given number of
+seconds, the task is restarted, and a Timeout exception is raised at
+the point of yielding.
+
+As a very simple example, here's how one could use multitask to allow
+two unrelated tasks to run concurrently:
+
+ >>> def printer(message):
+ ... while True:
+ ... print message
+ ... yield
+ ...
+ >>> multitask.add(printer('hello'))
+ >>> multitask.add(printer('goodbye'))
+ >>> multitask.run()
+ hello
+ goodbye
+ hello
+ goodbye
+ hello
+ goodbye
+ [and so on ...]
+
+For a more useful example, here's how one could implement a
+multitasking server that can handle multiple concurrent client
+connections:
+
+ def listener(sock):
+ while True:
+ conn, address = (yield multitask.accept(sock))
+ multitask.add(client_handler(conn))
+
+ def client_handler(sock):
+ while True:
+ request = (yield multitask.recv(sock, 1024))
+ if not request:
+ break
+ response = handle_request(request)
+ yield multitask.send(sock, response)
+
+ multitask.add(listener(sock))
+ multitask.run()
+
+Tasks can also yield other tasks, which allows for composition of
+tasks and reuse of existing multitasking code. A child task runs
+until it either completes or raises an exception. To return output to
+its parent, a child task raises StopIteration, passing the output
+value(s) to the StopIteration constructor. An unhandled exception
+raised within a child task is propagated to its parent. For example:
+
+ >>> def parent():
+ ... print (yield return_none())
+ ... print (yield return_one())
+ ... print (yield return_many())
+ ... try:
+ ... yield raise_exception()
+ ... except Exception, e:
+ ... print 'caught exception: %s' % e
+ ...
+ >>> def return_none():
+ ... yield
+ ... # do nothing
+ ... # or return
+ ... # or raise StopIteration
+ ... # or raise StopIteration(None)
+ ...
+ >>> def return_one():
+ ... yield
+ ... raise StopIteration(1)
+ ...
+ >>> def return_many():
+ ... yield
+ ... raise StopIteration(2, 3) # or raise StopIteration((2, 3))
+ ...
+ >>> def raise_exception():
+ ... yield
+ ... raise RuntimeError('foo')
+ ...
+ >>> multitask.add(parent())
+ >>> multitask.run()
+ None
+ 1
+ (2, 3)
+ caught exception: foo
+
+"""
+
+
+import collections
+import errno
+from functools import partial
+import heapq
+import os
+import select
+import sys
+import time
+import types
+
+
+__author__ = 'Christopher Stawarz <cstawarz@csail.mit.edu>'
+__version__ = '0.2.0'
+__revision__ = int('$Revision: 5025 $'.split()[1])
+
+
+
+################################################################################
+#
+# Timeout exception type
+#
+################################################################################
+
+
+
+class Timeout(Exception):
+ 'Raised in a yielding task when an operation times out'
+ pass
+
+
+
+################################################################################
+#
+# _ChildTask class
+#
+################################################################################
+
+
+
+class _ChildTask(object):
+
+ def __init__(self, parent, task):
+ self.parent = parent
+ self.task = task
+
+ def send(self, value):
+ return self.task.send(value)
+
+ def throw(self, type, value=None, traceback=None):
+ return self.task.throw(type, value, traceback)
+
+
+
+################################################################################
+#
+# YieldCondition class
+#
+################################################################################
+
+
+
+class YieldCondition(object):
+
+ """
+
+ Base class for objects that are yielded by a task to the task
+ manager and specify the condition(s) under which the task should
+ be restarted. Only subclasses of this class are useful to
+ application code.
+
+ """
+
+ def __init__(self, timeout=None):
+ """
+
+ If timeout is None, the task will be suspended indefinitely
+ until the condition is met. Otherwise, if the condition is
+ not met within timeout seconds, a Timeout exception will be
+ raised in the yielding task.
+
+ """
+
+ self.task = None
+ self.handle_expiration = None
+
+ if timeout is None:
+ self.expiration = None
+ else:
+ self.expiration = time.time() + float(timeout)
+
+ def _expires(self):
+ return (self.expiration is not None)
+
+
+
+################################################################################
+#
+# _SleepDelay class and related functions
+#
+################################################################################
+
+
+
+class _SleepDelay(YieldCondition):
+
+ def __init__(self, seconds):
+ seconds = float(seconds)
+ if seconds <= 0.0:
+ raise ValueError("'seconds' must be greater than 0")
+ super(_SleepDelay, self).__init__(seconds)
+
+
+def sleep(seconds):
+ """
+
+ A task that yields the result of this function will be resumed
+ after the specified number of seconds have elapsed. For example:
+
+ while too_early():
+ yield sleep(5) # Sleep for five seconds
+ do_something() # Done sleeping; get back to work
+
+ """
+
+ return _SleepDelay(seconds)
+
+
+
+################################################################################
+#
+# FDReady class and related functions
+#
+################################################################################
+
+
+
+class FDReady(YieldCondition):
+
+ """
+
+ A task that yields an instance of this class will be suspended
+ until a specified file descriptor is ready for I/O.
+
+ """
+
+ def __init__(self, fd, read=False, write=False, exc=False, timeout=None):
+ """
+
+ Resume the yielding task when fd is ready for reading,
+ writing, and/or "exceptional" condition handling. fd can be
+ any object accepted by select.select() (meaning an integer or
+ an object with a fileno() method that returns an integer).
+ Any exception raised by select() due to fd will be re-raised
+ in the yielding task.
+
+ If timeout is not None, a Timeout exception will be raised in
+ the yielding task if fd is not ready after timeout seconds
+ have elapsed.
+
+ """
+
+ super(FDReady, self).__init__(timeout)
+
+ self.fd = (fd if _is_file_descriptor(fd) else fd.fileno())
+
+ if not (read or write or exc):
+ raise ValueError("'read', 'write', and 'exc' cannot all be false")
+ self.read = read
+ self.write = write
+ self.exc = exc
+
+ def fileno(self):
+ 'Return the file descriptor on which the yielding task is waiting'
+ return self.fd
+
+ def _add_to_fdsets(self, read_fds, write_fds, exc_fds):
+ for add, fdset in ((self.read, read_fds),
+ (self.write, write_fds),
+ (self.exc, exc_fds)):
+ if add:
+ fdset.add(self)
+
+ def _remove_from_fdsets(self, read_fds, write_fds, exc_fds):
+ for fdset in (read_fds, write_fds, exc_fds):
+ fdset.discard(self)
+
+
+def _is_file_descriptor(fd):
+ return isinstance(fd, (int, long))
+
+
+def readable(fd, timeout=None):
+ """
+
+ A task that yields the result of this function will be resumed
+ when fd is readable. If timeout is not None, a Timeout exception
+ will be raised in the yielding task if fd is not readable after
+ timeout seconds have elapsed. For example:
+
+ try:
+ yield readable(sock, timeout=5)
+ data = sock.recv(1024)
+ except Timeout:
+ # No data after 5 seconds
+
+ """
+
+ return FDReady(fd, read=True, timeout=timeout)
+
+
+def writable(fd, timeout=None):
+ """
+
+ A task that yields the result of this function will be resumed
+ when fd is writable. If timeout is not None, a Timeout exception
+ will be raised in the yielding task if fd is not writable after
+ timeout seconds have elapsed. For example:
+
+ try:
+ yield writable(sock, timeout=5)
+ nsent = sock.send(data)
+ except Timeout:
+ # Can't send after 5 seconds
+
+ """
+
+ return FDReady(fd, write=True, timeout=timeout)
+
+
+
+################################################################################
+#
+# FDAction class and related functions
+#
+################################################################################
+
+
+
+class FDAction(FDReady):
+
+ """
+
+ A task that yields an instance of this class will be suspended
+ until an I/O operation on a specified file descriptor is complete.
+
+ """
+
+ def __init__(self, fd, func, args=(), kwargs={}, read=False, write=False,
+ exc=False):
+ """
+
+ Resume the yielding task when fd is ready for reading,
+ writing, and/or "exceptional" condition handling. fd can be
+ any object accepted by select.select() (meaning an integer or
+ an object with a fileno() method that returns an integer).
+ Any exception raised by select() due to fd will be re-raised
+ in the yielding task.
+
+ The value of the yield expression will be the result of
+ calling func with the specified args and kwargs (which
+ presumably performs a read, write, or other I/O operation on
+ fd). If func raises an exception, it will be re-raised in the
+ yielding task. Thus, FDAction is really just a convenient
+ subclass of FDReady that requests that the task manager
+ perform an I/O operation on the calling task's behalf.
+
+ If kwargs contains a timeout argument that is not None, a
+ Timeout exception will be raised in the yielding task if fd is
+ not ready after timeout seconds have elapsed.
+
+ """
+
+ timeout = kwargs.pop('timeout', None)
+ super(FDAction, self).__init__(fd, read, write, exc, timeout)
+
+ self.func = func
+ self.args = args
+ self.kwargs = kwargs
+
+ def _eval(self):
+ return self.func(*(self.args), **(self.kwargs))
+
+
+def read(fd, *args, **kwargs):
+ """
+
+ A task that yields the result of this function will be resumed
+ when fd is readable, and the value of the yield expression will be
+ the result of reading from fd. If a timeout keyword is given and
+ is not None, a Timeout exception will be raised in the yielding
+ task if fd is not readable after timeout seconds have elapsed.
+ Other arguments will be passed to the read function (os.read() if
+ fd is an integer, fd.read() otherwise). For example:
+
+ try:
+ data = (yield read(fd, 1024, timeout=5))
+ except Timeout:
+ # No data after 5 seconds
+
+ """
+
+ func = (partial(os.read, fd) if _is_file_descriptor(fd) else fd.read)
+ return FDAction(fd, func, args, kwargs, read=True)
+
+
+def readline(fd, *args, **kwargs):
+ """
+
+ A task that yields the result of this function will be resumed
+ when fd is readable, and the value of the yield expression will be
+ the result of reading a line from fd. If a timeout keyword is
+ given and is not None, a Timeout exception will be raised in the
+ yielding task if fd is not readable after timeout seconds have
+ elapsed. Other arguments will be passed to fd.readline(). For
+ example:
+
+ try:
+ data = (yield readline(fd, timeout=5))
+ except Timeout:
+ # No data after 5 seconds
+
+ """
+
+ return FDAction(fd, fd.readline, args, kwargs, read=True)
+
+
+def write(fd, *args, **kwargs):
+ """
+
+ A task that yields the result of this function will be resumed
+ when fd is writable, and the value of the yield expression will be
+ the result of writing to fd. If a timeout keyword is given and is
+ not None, a Timeout exception will be raised in the yielding task
+ if fd is not writable after timeout seconds have elapsed. Other
+ arguments will be passed to the write function (os.write() if fd
+ is an integer, fd.write() otherwise). For example:
+
+ try:
+ nbytes = (yield write(fd, data, timeout=5))
+ except Timeout:
+ # Can't write after 5 seconds
+
+ """
+
+ func = (partial(os.write, fd) if _is_file_descriptor(fd) else fd.write)
+ return FDAction(fd, func, args, kwargs, write=True)
+
+
+def accept(sock, *args, **kwargs):
+ """
+
+ A task that yields the result of this function will be resumed
+ when sock is readable, and the value of the yield expression will
+ be the result of accepting a new connection on sock. If a timeout
+ keyword is given and is not None, a Timeout exception will be
+ raised in the yielding task if sock is not readable after timeout
+ seconds have elapsed. Other arguments will be passed to
+ sock.accept(). For example:
+
+ try:
+ conn, address = (yield accept(sock, timeout=5))
+ except Timeout:
+ # No connections after 5 seconds
+
+ """
+
+ return FDAction(sock, sock.accept, args, kwargs, read=True)
+
+
+def recv(sock, *args, **kwargs):
+ """
+
+ A task that yields the result of this function will be resumed
+ when sock is readable, and the value of the yield expression will
+ be the result of receiving from sock. If a timeout keyword is
+ given and is not None, a Timeout exception will be raised in the
+ yielding task if sock is not readable after timeout seconds have
+ elapsed. Other arguments will be passed to sock.recv(). For
+ example:
+
+ try:
+ data = (yield recv(sock, 1024, timeout=5))
+ except Timeout:
+ # No data after 5 seconds
+
+ """
+
+ return FDAction(sock, sock.recv, args, kwargs, read=True)
+
+
+def recvfrom(sock, *args, **kwargs):
+ """
+
+ A task that yields the result of this function will be resumed
+ when sock is readable, and the value of the yield expression will
+ be the result of receiving from sock. If a timeout keyword is
+ given and is not None, a Timeout exception will be raised in the
+ yielding task if sock is not readable after timeout seconds have
+ elapsed. Other arguments will be passed to sock.recvfrom(). For
+ example:
+
+ try:
+ data, address = (yield recvfrom(sock, 1024, timeout=5))
+ except Timeout:
+ # No data after 5 seconds
+
+ """
+
+ return FDAction(sock, sock.recvfrom, args, kwargs, read=True)
+
+
+def send(sock, *args, **kwargs):
+ """
+
+ A task that yields the result of this function will be resumed
+ when sock is writable, and the value of the yield expression will
+ be the result of sending to sock. If a timeout keyword is given
+ and is not None, a Timeout exception will be raised in the
+ yielding task if sock is not writable after timeout seconds have
+ elapsed. Other arguments will be passed to the sock.send(). For
+ example:
+
+ try:
+ nsent = (yield send(sock, data, timeout=5))
+ except Timeout:
+ # Can't send after 5 seconds
+
+ """
+
+ return FDAction(sock, sock.send, args, kwargs, write=True)
+
+
+def sendto(sock, *args, **kwargs):
+ """
+
+ A task that yields the result of this function will be resumed
+ when sock is writable, and the value of the yield expression will
+ be the result of sending to sock. If a timeout keyword is given
+ and is not None, a Timeout exception will be raised in the
+ yielding task if sock is not writable after timeout seconds have
+ elapsed. Other arguments will be passed to the sock.sendto().
+ For example:
+
+ try:
+ nsent = (yield sendto(sock, data, address, timeout=5))
+ except Timeout:
+ # Can't send after 5 seconds
+
+ """
+
+ return FDAction(sock, sock.sendto, args, kwargs, write=True)
+
+
+
+################################################################################
+#
+# Queue and _QueueAction classes
+#
+################################################################################
+
+
+
+class Queue(object):
+
+ """
+
+ A multi-producer, multi-consumer FIFO queue (similar to
+ Queue.Queue) that can be used for exchanging data between tasks
+
+ """
+
+ def __init__(self, contents=(), maxsize=0):
+ """
+
+ Create a new Queue instance. contents is a sequence (empty by
+ default) containing the initial contents of the queue. If
+ maxsize is greater than 0, the queue will hold a maximum of
+ maxsize items, and put() will block until space is available
+ in the queue.
+
+ """
+
+ self.maxsize = int(maxsize)
+ self._queue = collections.deque(contents)
+
+ def __len__(self):
+ 'Return the number of items in the queue'
+ return len(self._queue)
+
+ def _get(self):
+ return self._queue.popleft()
+
+ def _put(self, item):
+ self._queue.append(item)
+
+ def empty(self):
+ 'Return True is the queue is empty, False otherwise'
+ return (len(self) == 0)
+
+ def full(self):
+ 'Return True is the queue is full, False otherwise'
+ return ((len(self) >= self.maxsize) if (self.maxsize > 0) else False)
+
+ def get(self, timeout=None):
+ """
+
+ A task that yields the result of this method will be resumed
+ when an item is available in the queue, and the value of the
+ yield expression will be the item. If timeout is not None, a
+ Timeout exception will be raised in the yielding task if an
+ item is not available after timeout seconds have elapsed. For
+ example:
+
+ try:
+ item = (yield queue.get(timeout=5))
+ except Timeout:
+ # No item available after 5 seconds
+
+ """
+
+ return _QueueAction(self, timeout=timeout)
+
+ def put(self, item, timeout=None):
+ """
+
+ A task that yields the result of this method will be resumed
+ when item has been added to the queue. If timeout is not
+ None, a Timeout exception will be raised in the yielding task
+ if no space is available after timeout seconds have elapsed.
+ For example:
+
+ try:
+ yield queue.put(item, timeout=5)
+ except Timeout:
+ # No space available after 5 seconds
+
+ """
+
+ return _QueueAction(self, item, timeout=timeout)
+
+
+class _QueueAction(YieldCondition):
+
+ NO_ITEM = object()
+
+ def __init__(self, queue, item=NO_ITEM, timeout=None):
+ super(_QueueAction, self).__init__(timeout)
+ if not isinstance(queue, Queue):
+ raise TypeError("'queue' must be a Queue instance")
+ self.queue = queue
+ self.item = item
+
+
+################################################################################
+#
+# SmartQueue and _SmartQueueAction classes
+#
+################################################################################
+
+
+
+class SmartQueue(object):
+
+ """
+
+ A multi-producer, multi-consumer FIFO queue (similar to
+ Queue.Queue) that can be used for exchanging data between tasks.
+ The difference with Queue is that this implements filtering criteria
+ on get and allows multiple get to be signalled for the same put.
+ On the downside, this uses list instead of deque and has lower
+ performance.
+
+ """
+
+ def __init__(self, contents=(), maxsize=0):
+ """
+
+ Create a new Queue instance. contents is a sequence (empty by
+ default) containing the initial contents of the queue. If
+ maxsize is greater than 0, the queue will hold a maximum of
+ maxsize items, and put() will block until space is available
+ in the queue.
+
+ """
+
+ self.maxsize = int(maxsize)
+ self._pending = list(contents)
+
+ def __len__(self):
+ 'Return the number of items in the queue'
+ return len(self._pending)
+
+ def _get(self, criteria=None):
+ #self._pending = filter(lambda x: x[1]<=now, self._pending) # remove expired ones
+ if criteria:
+ found = filter(lambda x: criteria(x), self._pending) # check any matching criteria
+ if found:
+ self._pending.remove(found[0])
+ return found[0]
+ else:
+ return None
+ else:
+ return self._pending.pop(0) if self._pending else None
+
+ def _put(self, item):
+ self._pending.append(item)
+
+ def empty(self):
+ 'Return True is the queue is empty, False otherwise'
+ return (len(self) == 0)
+
+ def full(self):
+ 'Return True is the queue is full, False otherwise'
+ return ((len(self) >= self.maxsize) if (self.maxsize > 0) else False)
+
+ def get(self, timeout=None, criteria=None):
+ """
+
+ A task that yields the result of this method will be resumed
+ when an item is available in the queue and the item matches the
+ given criteria (a function, usually lambda), and the value of the
+ yield expression will be the item. If timeout is not None, a
+ Timeout exception will be raised in the yielding task if an
+ item is not available after timeout seconds have elapsed. For
+ example:
+
+ try:
+ item = (yield queue.get(timeout=5, criteria=lambda x: x.name='kundan'))
+ except Timeout:
+ # No item available after 5 seconds
+
+ """
+
+ return _SmartQueueAction(self, timeout=timeout, criteria=criteria)
+
+ def put(self, item, timeout=None):
+ """
+
+ A task that yields the result of this method will be resumed
+ when item has been added to the queue. If timeout is not
+ None, a Timeout exception will be raised in the yielding task
+ if no space is available after timeout seconds have elapsed.
+ TODO: Otherwise if space is available, the timeout specifies how
+ long to keep the item in the queue before discarding it if it
+ is not fetched in a get. In this case it doesnot throw exception.
+ For example:
+
+ try:
+ yield queue.put(item, timeout=5)
+ except Timeout:
+ # No space available after 5 seconds
+
+ """
+
+ return _SmartQueueAction(self, item, timeout=timeout)
+
+
+class _SmartQueueAction(YieldCondition):
+
+ NO_ITEM = object()
+
+ def __init__(self, queue, item=NO_ITEM, timeout=None, criteria=None):
+ super(_SmartQueueAction, self).__init__(timeout)
+ if not isinstance(queue, SmartQueue):
+ raise TypeError("'queue' must be a SmartQueue instance")
+ self.queue = queue
+ self.item = item
+ self.criteria = criteria
+ self.expires = (timeout is not None) and (time.time() + timeout) or 0
+
+
+################################################################################
+#
+# TaskManager class
+#
+################################################################################
+
+
+
+class TaskManager(object):
+
+ """
+
+ Engine for running a set of cooperatively-multitasking tasks
+ within a single Python thread
+
+ """
+
+ def __init__(self):
+ """
+
+ Create a new TaskManager instance. Generally, there will only
+ be one of these per Python process. If you want to run two
+ existing instances simultaneously, merge them first, then run
+ one or the other.
+
+ """
+
+ self._queue = collections.deque()
+ self._read_waits = set()
+ self._write_waits = set()
+ self._exc_waits = set()
+ self._queue_waits = collections.defaultdict(self._double_deque)
+ self._timeouts = []
+
+ @staticmethod
+ def _double_deque():
+ return (collections.deque(), collections.deque())
+
+ def merge(self, other):
+ """
+
+ Merge this TaskManager with another. After the merge, the two
+ objects share the same (merged) internal data structures, so
+ either can be used to manage the combined task set.
+
+ """
+
+ if not isinstance(other, TaskManager):
+ raise TypeError("'other' must be a TaskManager instance")
+
+ # Merge the data structures
+ self._queue.extend(other._queue)
+ self._read_waits |= other._read_waits
+ self._write_waits |= other._write_waits
+ self._exc_waits |= other._exc_waits
+ self._queue_waits.update(other._queue_waits)
+ self._timeouts.extend(other._timeouts)
+ heapq.heapify(self._timeouts)
+
+ # Make other reference the merged data structures. This is
+ # necessary because other's tasks may reference and use other
+ # (e.g. to add a new task in response to an event).
+ other._queue = self._queue
+ other._read_waits = self._read_waits
+ other._write_waits = self._write_waits
+ other._exc_waits = self._exc_waits
+ other._queue_waits = self._queue_waits
+ other._timeouts = self._timeouts
+
+ def add(self, task):
+ 'Add a new task (i.e. a generator instance) to the run queue'
+
+ if not isinstance(task, types.GeneratorType):
+ raise TypeError("'task' must be a generator")
+ self._enqueue(task)
+
+ def _enqueue(self, task, input=None, exc_info=()):
+ self._queue.append((task, input, exc_info))
+
+ def run(self):
+ """
+
+ Call run_next() repeatedly until there are no tasks that are
+ currently runnable, waiting for I/O, or waiting to time out.
+ Note that this method can block indefinitely (e.g. if there
+ are only I/O waits and no timeouts). If this is unacceptable,
+ use run_next() instead.
+
+ """
+ while self.has_runnable() or self.has_io_waits() or self.has_timeouts():
+ self.run_next()
+
+ def has_runnable(self):
+ """
+
+ Return True is there are runnable tasks in the queue, False
+ otherwise
+
+ """
+ return bool(self._queue)
+
+ def has_io_waits(self):
+ """
+
+ Return True is there are tasks waiting for I/O, False
+ otherwise
+
+ """
+ return bool(self._read_waits or self._write_waits or self._exc_waits)
+
+ def has_timeouts(self):
+ """
+
+ Return True is there are tasks with pending timeouts, False
+ otherwise
+
+ """
+ return bool(self._timeouts)
+
+ def run_next(self, timeout=None):
+ """
+
+ Perform one iteration of the run cycle: check whether any
+ pending I/O operations can be performed, check whether any
+ timeouts have expired, then run all currently runnable tasks.
+
+ The timeout argument specifies the maximum time to wait for
+ some task to become runnable. If timeout is None and there
+ are no currently runnable tasks, but there are tasks waiting
+ to perform I/O or time out, then this method will block until
+ at least one of the waiting tasks becomes runnable. To
+ prevent this method from blocking indefinitely, use timeout to
+ specify the maximum number of seconds to wait.
+
+ If there are runnable tasks in the queue when run_next() is
+ called, then it will check for I/O readiness using a
+ non-blocking call to select() (i.e. a poll), and only
+ already-expired timeouts will be handled. This ensures both
+ that the task manager is never idle when tasks can be run and
+ that tasks waiting for I/O never starve.
+
+ """
+
+ if self.has_io_waits():
+ self._handle_io_waits(self._fix_run_timeout(timeout))
+
+ if self.has_timeouts():
+ self._handle_timeouts(self._fix_run_timeout(timeout))
+
+ # Run all tasks currently in the queue
+ for dummy in xrange(len(self._queue)):
+ task, input, exc_info = self._queue.popleft()
+ try:
+ if exc_info:
+ output = task.throw(*exc_info)
+ else:
+ output = task.send(input)
+ except StopIteration, e:
+ if isinstance(task, _ChildTask):
+ if not e.args:
+ output = None
+ elif len(e.args) == 1:
+ output = e.args[0]
+ else:
+ output = e.args
+ self._enqueue(task.parent, input=output)
+ except:
+ if isinstance(task, _ChildTask):
+ # Propagate exception to parent
+ self._enqueue(task.parent, exc_info=sys.exc_info())
+ else:
+ # No parent task, so just die
+ raise
+ else:
+ self._handle_task_output(task, output)
+
+ def _fix_run_timeout(self, timeout):
+ if self.has_runnable():
+ # Don't block if there are tasks in the queue
+ timeout = 0.0
+ elif self.has_timeouts():
+ # If there are timeouts, block only until the first expiration
+ expiration_timeout = max(0.0, self._timeouts[0][0] - time.time())
+ if (timeout is None) or (timeout > expiration_timeout):
+ timeout = expiration_timeout
+ return timeout
+
+ def _handle_io_waits(self, timeout):
+ # The error handling here is (mostly) borrowed from Twisted
+ try:
+ read_ready, write_ready, exc_ready = \
+ select.select(self._read_waits,
+ self._write_waits,
+ self._exc_waits,
+ timeout)
+ except (TypeError, ValueError):
+ self._remove_bad_file_descriptors()
+ except (select.error, IOError), err:
+ if err[0] == errno.EINTR:
+ pass
+ elif ((err[0] == errno.EBADF) or
+ ((sys.platform == 'win32') and
+ (err[0] == 10038))): # WSAENOTSOCK
+ self._remove_bad_file_descriptors()
+ else:
+ # Not an error we can handle, so die
+ raise
+ else:
+ for fd in set(read_ready + write_ready + exc_ready):
+ try:
+ input = (fd._eval() if isinstance(fd, FDAction) else None)
+ self._enqueue(fd.task, input=input)
+ except:
+ self._enqueue(fd.task, exc_info=sys.exc_info())
+ fd._remove_from_fdsets(self._read_waits,
+ self._write_waits,
+ self._exc_waits)
+ if fd._expires():
+ self._remove_timeout(fd)
+
+ def _remove_bad_file_descriptors(self):
+ for fd in (self._read_waits | self._write_waits | self._exc_waits):
+ try:
+ select.select([fd], [fd], [fd], 0.0)
+ except:
+ self._enqueue(fd.task, exc_info=sys.exc_info())
+ fd._remove_from_fdsets(self._read_waits,
+ self._write_waits,
+ self._exc_waits)
+ if fd._expires():
+ self._remove_timeout(fd)
+
+ def _add_timeout(self, item, handler):
+ item.handle_expiration = handler
+ heapq.heappush(self._timeouts, (item.expiration, item))
+
+ def _remove_timeout(self, item):
+ self._timeouts.remove((item.expiration, item))
+ heapq.heapify(self._timeouts)
+
+ def _handle_timeouts(self, timeout):
+ if (not self.has_runnable()) and (timeout > 0.0):
+ time.sleep(timeout)
+
+ current_time = time.time()
+
+ while self._timeouts and (self._timeouts[0][0] <= current_time):
+ item = heapq.heappop(self._timeouts)[1]
+ if isinstance(item, _SleepDelay):
+ self._enqueue(item.task)
+ else:
+ self._enqueue(item.task, exc_info=(Timeout,))
+ item.handle_expiration()
+
+ def _handle_task_output(self, task, output):
+ if isinstance(output, types.GeneratorType):
+ self._enqueue(_ChildTask(task, output))
+ elif isinstance(output, YieldCondition):
+ output.task = task
+ if isinstance(output, _SleepDelay):
+ self._add_timeout(output, None)
+ elif isinstance(output, FDReady):
+ self._handle_fdready(task, output)
+ elif isinstance(output, _QueueAction):
+ self._handle_queue_action(task, output)
+ elif isinstance(output, _SmartQueueAction):
+ self._handle_smart_queue_action(task, output)
+ else:
+ # Return any other output as input and send task to
+ # end of queue
+ self._enqueue(task, input=output)
+
+ def _handle_fdready(self, task, output):
+ output._add_to_fdsets(self._read_waits,
+ self._write_waits,
+ self._exc_waits)
+ if output._expires():
+ self._add_timeout(output,
+ (lambda:
+ output._remove_from_fdsets(self._read_waits,
+ self._write_waits,
+ self._exc_waits)))
+
+ def _handle_queue_action(self, task, output):
+ get_waits, put_waits = self._queue_waits[output.queue]
+
+ if output.item is output.NO_ITEM:
+ # Action is a get
+ if output.queue.empty():
+ get_waits.append(output)
+ if output._expires():
+ self._add_timeout(output,
+ (lambda: get_waits.remove(output)))
+ else:
+ item = output.queue._get()
+ self._enqueue(task, input=item)
+ if put_waits:
+ action = put_waits.popleft()
+ output.queue._put(action.item)
+ self._enqueue(action.task)
+ if action._expires():
+ self._remove_timeout(action)
+ else:
+ # Action is a put
+ if output.queue.full():
+ put_waits.append(output)
+ if output._expires():
+ self._add_timeout(output,
+ (lambda: put_waits.remove(output)))
+ else:
+ output.queue._put(output.item)
+ self._enqueue(task)
+ if get_waits:
+ action = get_waits.popleft()
+ item = output.queue._get()
+ self._enqueue(action.task, input=item)
+ if action._expires():
+ self._remove_timeout(action)
+
+
+ def _handle_smart_queue_action(self, task, output):
+ get_waits, put_waits = self._queue_waits[output.queue]
+
+ if output.item is output.NO_ITEM:
+ # Action is a get
+ item = output.queue._get(criteria=output.criteria)
+ if item is None:
+ get_waits.append(output)
+ if output._expires():
+ self._add_timeout(output,
+ (lambda: get_waits.remove(output)))
+ else:
+ self._enqueue(task, input=item)
+ if put_waits:
+ action = put_waits.popleft()
+ output.queue._put(action.item)
+ self._enqueue(action.task)
+ if action._expires():
+ self._remove_timeout(action)
+ else:
+ # Action is a put
+ if output.queue.full():
+ put_waits.append(output)
+ if output._expires():
+ self._add_timeout(output,
+ (lambda: put_waits.remove(output)))
+ else:
+ output.queue._put(output.item)
+ self._enqueue(task)
+ if get_waits:
+ actions = []
+ for action in get_waits:
+ item = output.queue._get(criteria=action.criteria)
+ if item is not None:
+ actions.append((action, item))
+ for action,item in actions:
+ get_waits.remove(action)
+ self._enqueue(action.task, input=item)
+ if action._expires():
+ self._remove_timeout(action)
+
+
+
+################################################################################
+#
+# Default TaskManager instance
+#
+################################################################################
+
+
+
+_default_task_manager = None
+
+
+def get_default_task_manager():
+ 'Return the default TaskManager instance'
+ global _default_task_manager
+ if _default_task_manager is None:
+ _default_task_manager = TaskManager()
+ return _default_task_manager
+
+
+def add(task):
+ 'Add a task to the default TaskManager instance'
+ get_default_task_manager().add(task)
+
+
+def run():
+ 'Run the default TaskManager instance'
+ get_default_task_manager().run()
+
+
+
+################################################################################
+#
+# Test routine
+#
+################################################################################
+
+
+
+if __name__ == '__main__':
+ if sys.platform == 'win32':
+ # Make sure WSAStartup() is called
+ import socket
+
+ def printer(name):
+ for i in xrange(1, 4):
+ print '%s:\t%d' % (name, i)
+ yield
+
+ t = TaskManager()
+ t.add(printer('first'))
+ t.add(printer('second'))
+ t.add(printer('third'))
+
+ queue = Queue()
+
+ def receiver():
+ print 'receiver started'
+ print 'receiver received: %s' % (yield queue.get())
+ print 'receiver finished'
+
+ def sender():
+ print 'sender started'
+ yield queue.put('from sender')
+ print 'sender finished'
+
+ def bad_descriptor():
+ print 'bad_descriptor running'
+ try:
+ yield readable(12)
+ except:
+ print 'exception in bad_descriptor:', sys.exc_info()[1]
+
+ def sleeper():
+ print 'sleeper started'
+ yield sleep(1)
+ print 'sleeper finished'
+
+ def timeout_immediately():
+ print 'timeout_immediately running'
+ try:
+ yield Queue().get(timeout=0)
+ except Timeout:
+ print 'timeout_immediately timed out'
+
+ t2 = TaskManager()
+ t2.add(receiver())
+ t2.add(bad_descriptor())
+ t2.add(sender())
+ t2.add(sleeper())
+ t2.add(timeout_immediately())
+
+ def parent():
+ print 'child returned: %s' % ((yield child()),)
+ try:
+ yield child(raise_exc=True)
+ except:
+ print 'exception in child:', sys.exc_info()[1]
+
+ def child(raise_exc=False):
+ yield
+ if raise_exc:
+ raise RuntimeError('foo')
+ raise StopIteration(1, 2, 3)
+
+ t3 = TaskManager()
+ t3.add(parent())
+
+ t.merge(t2)
+ t.merge(t3)
+ t.run()
+
+ assert not(t.has_runnable() or t.has_io_waits() or t.has_timeouts())