1 ################################################################################
3 # Copyright (c) 2007 Christopher J. Stawarz
5 # Permission is hereby granted, free of charge, to any person
6 # obtaining a copy of this software and associated documentation files
7 # (the "Software"), to deal in the Software without restriction,
8 # including without limitation the rights to use, copy, modify, merge,
9 # publish, distribute, sublicense, and/or sell copies of the Software,
10 # and to permit persons to whom the Software is furnished to do so,
11 # subject to the following conditions:
13 # The above copyright notice and this permission notice shall be
14 # included in all copies or substantial portions of the Software.
16 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
17 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
18 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
19 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
20 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
21 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
22 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25 ################################################################################
31 Cooperative multitasking and asynchronous I/O using generators
33 multitask allows Python programs to use generators (a.k.a. coroutines)
34 to perform cooperative multitasking and asynchronous I/O.
35 Applications written using multitask consist of a set of cooperating
36 tasks that yield to a shared task manager whenever they perform a
37 (potentially) blocking operation, such as I/O on a socket or getting
38 data from a queue. The task manager temporarily suspends the task
39 (allowing other tasks to run in the meantime) and then restarts it
40 when the blocking operation is complete. Such an approach is suitable
41 for applications that would otherwise have to use select() and/or
42 multiple threads to achieve concurrency.
44 The functions and classes in the multitask module allow tasks to yield
45 for I/O operations on sockets and file descriptors, adding/removing
46 data to/from queues, or sleeping for a specified interval. When
47 yielding, a task can also specify a timeout. If the operation for
48 which the task yielded has not completed after the given number of
49 seconds, the task is restarted, and a Timeout exception is raised at
50 the point of yielding.
52 As a very simple example, here's how one could use multitask to allow
53 two unrelated tasks to run concurrently:
55 >>> def printer(message):
60 >>> multitask.add(printer('hello'))
61 >>> multitask.add(printer('goodbye'))
71 For a more useful example, here's how one could implement a
72 multitasking server that can handle multiple concurrent client
77 conn, address = (yield multitask.accept(sock))
78 multitask.add(client_handler(conn))
80 def client_handler(sock):
82 request = (yield multitask.recv(sock, 1024))
85 response = handle_request(request)
86 yield multitask.send(sock, response)
88 multitask.add(listener(sock))
91 Tasks can also yield other tasks, which allows for composition of
92 tasks and reuse of existing multitasking code. A child task runs
93 until it either completes or raises an exception. To return output to
94 its parent, a child task raises StopIteration, passing the output
95 value(s) to the StopIteration constructor. An unhandled exception
96 raised within a child task is propagated to its parent. For example:
99 ... print (yield return_none())
100 ... print (yield return_one())
101 ... print (yield return_many())
103 ... yield raise_exception()
104 ... except Exception, e:
105 ... print 'caught exception: %s' % e
107 >>> def return_none():
111 ... # or raise StopIteration
112 ... # or raise StopIteration(None)
114 >>> def return_one():
116 ... raise StopIteration(1)
118 >>> def return_many():
120 ... raise StopIteration(2, 3) # or raise StopIteration((2, 3))
122 >>> def raise_exception():
124 ... raise RuntimeError('foo')
126 >>> multitask.add(parent())
131 caught exception: foo
138 from functools
import partial
147 __author__
= 'Christopher Stawarz <cstawarz@csail.mit.edu>'
148 __version__
= '0.2.0'
149 __revision__
= int('$Revision: 5025 $'.split()[1])
153 ################################################################################
155 # Timeout exception type
157 ################################################################################
161 class Timeout(Exception):
162 'Raised in a yielding task when an operation times out'
167 ################################################################################
171 ################################################################################
175 class _ChildTask(object):
177 def __init__(self
, parent
, task
):
181 def send(self
, value
):
182 return self
.task
.send(value
)
184 def throw(self
, type, value
=None, traceback
=None):
185 return self
.task
.throw(type, value
, traceback
)
189 ################################################################################
191 # YieldCondition class
193 ################################################################################
197 class YieldCondition(object):
201 Base class for objects that are yielded by a task to the task
202 manager and specify the condition(s) under which the task should
203 be restarted. Only subclasses of this class are useful to
208 def __init__(self
, timeout
=None):
211 If timeout is None, the task will be suspended indefinitely
212 until the condition is met. Otherwise, if the condition is
213 not met within timeout seconds, a Timeout exception will be
214 raised in the yielding task.
219 self
.handle_expiration
= None
222 self
.expiration
= None
224 self
.expiration
= time
.time() + float(timeout
)
227 return (self
.expiration
is not None)
231 ################################################################################
233 # _SleepDelay class and related functions
235 ################################################################################
239 class _SleepDelay(YieldCondition
):
241 def __init__(self
, seconds
):
242 seconds
= float(seconds
)
244 raise ValueError("'seconds' must be greater than 0")
245 super(_SleepDelay
, self
).__init
__(seconds
)
251 A task that yields the result of this function will be resumed
252 after the specified number of seconds have elapsed. For example:
255 yield sleep(5) # Sleep for five seconds
256 do_something() # Done sleeping; get back to work
260 return _SleepDelay(seconds
)
264 ################################################################################
266 # FDReady class and related functions
268 ################################################################################
272 class FDReady(YieldCondition
):
276 A task that yields an instance of this class will be suspended
277 until a specified file descriptor is ready for I/O.
281 def __init__(self
, fd
, read
=False, write
=False, exc
=False, timeout
=None):
284 Resume the yielding task when fd is ready for reading,
285 writing, and/or "exceptional" condition handling. fd can be
286 any object accepted by select.select() (meaning an integer or
287 an object with a fileno() method that returns an integer).
288 Any exception raised by select() due to fd will be re-raised
289 in the yielding task.
291 If timeout is not None, a Timeout exception will be raised in
292 the yielding task if fd is not ready after timeout seconds
297 super(FDReady
, self
).__init
__(timeout
)
299 self
.fd
= (fd
if _is_file_descriptor(fd
) else fd
.fileno())
301 if not (read
or write
or exc
):
302 raise ValueError("'read', 'write', and 'exc' cannot all be false")
308 'Return the file descriptor on which the yielding task is waiting'
311 def _add_to_fdsets(self
, read_fds
, write_fds
, exc_fds
):
312 for add
, fdset
in ((self
.read
, read_fds
),
313 (self
.write
, write_fds
),
314 (self
.exc
, exc_fds
)):
318 def _remove_from_fdsets(self
, read_fds
, write_fds
, exc_fds
):
319 for fdset
in (read_fds
, write_fds
, exc_fds
):
323 def _is_file_descriptor(fd
):
324 return isinstance(fd
, (int, long))
327 def readable(fd
, timeout
=None):
330 A task that yields the result of this function will be resumed
331 when fd is readable. If timeout is not None, a Timeout exception
332 will be raised in the yielding task if fd is not readable after
333 timeout seconds have elapsed. For example:
336 yield readable(sock, timeout=5)
337 data = sock.recv(1024)
339 # No data after 5 seconds
343 return FDReady(fd
, read
=True, timeout
=timeout
)
346 def writable(fd
, timeout
=None):
349 A task that yields the result of this function will be resumed
350 when fd is writable. If timeout is not None, a Timeout exception
351 will be raised in the yielding task if fd is not writable after
352 timeout seconds have elapsed. For example:
355 yield writable(sock, timeout=5)
356 nsent = sock.send(data)
358 # Can't send after 5 seconds
362 return FDReady(fd
, write
=True, timeout
=timeout
)
366 ################################################################################
368 # FDAction class and related functions
370 ################################################################################
374 class FDAction(FDReady
):
378 A task that yields an instance of this class will be suspended
379 until an I/O operation on a specified file descriptor is complete.
383 def __init__(self
, fd
, func
, args
=(), kwargs
={}, read
=False, write
=False,
387 Resume the yielding task when fd is ready for reading,
388 writing, and/or "exceptional" condition handling. fd can be
389 any object accepted by select.select() (meaning an integer or
390 an object with a fileno() method that returns an integer).
391 Any exception raised by select() due to fd will be re-raised
392 in the yielding task.
394 The value of the yield expression will be the result of
395 calling func with the specified args and kwargs (which
396 presumably performs a read, write, or other I/O operation on
397 fd). If func raises an exception, it will be re-raised in the
398 yielding task. Thus, FDAction is really just a convenient
399 subclass of FDReady that requests that the task manager
400 perform an I/O operation on the calling task's behalf.
402 If kwargs contains a timeout argument that is not None, a
403 Timeout exception will be raised in the yielding task if fd is
404 not ready after timeout seconds have elapsed.
408 timeout
= kwargs
.pop('timeout', None)
409 super(FDAction
, self
).__init
__(fd
, read
, write
, exc
, timeout
)
416 return self
.func(*(self
.args
), **(self
.kwargs
))
419 def read(fd
, *args
, **kwargs
):
422 A task that yields the result of this function will be resumed
423 when fd is readable, and the value of the yield expression will be
424 the result of reading from fd. If a timeout keyword is given and
425 is not None, a Timeout exception will be raised in the yielding
426 task if fd is not readable after timeout seconds have elapsed.
427 Other arguments will be passed to the read function (os.read() if
428 fd is an integer, fd.read() otherwise). For example:
431 data = (yield read(fd, 1024, timeout=5))
433 # No data after 5 seconds
437 func
= (partial(os
.read
, fd
) if _is_file_descriptor(fd
) else fd
.read
)
438 return FDAction(fd
, func
, args
, kwargs
, read
=True)
441 def readline(fd
, *args
, **kwargs
):
444 A task that yields the result of this function will be resumed
445 when fd is readable, and the value of the yield expression will be
446 the result of reading a line from fd. If a timeout keyword is
447 given and is not None, a Timeout exception will be raised in the
448 yielding task if fd is not readable after timeout seconds have
449 elapsed. Other arguments will be passed to fd.readline(). For
453 data = (yield readline(fd, timeout=5))
455 # No data after 5 seconds
459 return FDAction(fd
, fd
.readline
, args
, kwargs
, read
=True)
462 def write(fd
, *args
, **kwargs
):
465 A task that yields the result of this function will be resumed
466 when fd is writable, and the value of the yield expression will be
467 the result of writing to fd. If a timeout keyword is given and is
468 not None, a Timeout exception will be raised in the yielding task
469 if fd is not writable after timeout seconds have elapsed. Other
470 arguments will be passed to the write function (os.write() if fd
471 is an integer, fd.write() otherwise). For example:
474 nbytes = (yield write(fd, data, timeout=5))
476 # Can't write after 5 seconds
480 func
= (partial(os
.write
, fd
) if _is_file_descriptor(fd
) else fd
.write
)
481 return FDAction(fd
, func
, args
, kwargs
, write
=True)
484 def accept(sock
, *args
, **kwargs
):
487 A task that yields the result of this function will be resumed
488 when sock is readable, and the value of the yield expression will
489 be the result of accepting a new connection on sock. If a timeout
490 keyword is given and is not None, a Timeout exception will be
491 raised in the yielding task if sock is not readable after timeout
492 seconds have elapsed. Other arguments will be passed to
493 sock.accept(). For example:
496 conn, address = (yield accept(sock, timeout=5))
498 # No connections after 5 seconds
502 return FDAction(sock
, sock
.accept
, args
, kwargs
, read
=True)
505 def recv(sock
, *args
, **kwargs
):
508 A task that yields the result of this function will be resumed
509 when sock is readable, and the value of the yield expression will
510 be the result of receiving from sock. If a timeout keyword is
511 given and is not None, a Timeout exception will be raised in the
512 yielding task if sock is not readable after timeout seconds have
513 elapsed. Other arguments will be passed to sock.recv(). For
517 data = (yield recv(sock, 1024, timeout=5))
519 # No data after 5 seconds
523 return FDAction(sock
, sock
.recv
, args
, kwargs
, read
=True)
526 def recvfrom(sock
, *args
, **kwargs
):
529 A task that yields the result of this function will be resumed
530 when sock is readable, and the value of the yield expression will
531 be the result of receiving from sock. If a timeout keyword is
532 given and is not None, a Timeout exception will be raised in the
533 yielding task if sock is not readable after timeout seconds have
534 elapsed. Other arguments will be passed to sock.recvfrom(). For
538 data, address = (yield recvfrom(sock, 1024, timeout=5))
540 # No data after 5 seconds
544 return FDAction(sock
, sock
.recvfrom
, args
, kwargs
, read
=True)
547 def send(sock
, *args
, **kwargs
):
550 A task that yields the result of this function will be resumed
551 when sock is writable, and the value of the yield expression will
552 be the result of sending to sock. If a timeout keyword is given
553 and is not None, a Timeout exception will be raised in the
554 yielding task if sock is not writable after timeout seconds have
555 elapsed. Other arguments will be passed to the sock.send(). For
559 nsent = (yield send(sock, data, timeout=5))
561 # Can't send after 5 seconds
565 return FDAction(sock
, sock
.send
, args
, kwargs
, write
=True)
568 def sendto(sock
, *args
, **kwargs
):
571 A task that yields the result of this function will be resumed
572 when sock is writable, and the value of the yield expression will
573 be the result of sending to sock. If a timeout keyword is given
574 and is not None, a Timeout exception will be raised in the
575 yielding task if sock is not writable after timeout seconds have
576 elapsed. Other arguments will be passed to the sock.sendto().
580 nsent = (yield sendto(sock, data, address, timeout=5))
582 # Can't send after 5 seconds
586 return FDAction(sock
, sock
.sendto
, args
, kwargs
, write
=True)
590 ################################################################################
592 # Queue and _QueueAction classes
594 ################################################################################
602 A multi-producer, multi-consumer FIFO queue (similar to
603 Queue.Queue) that can be used for exchanging data between tasks
607 def __init__(self
, contents
=(), maxsize
=0):
610 Create a new Queue instance. contents is a sequence (empty by
611 default) containing the initial contents of the queue. If
612 maxsize is greater than 0, the queue will hold a maximum of
613 maxsize items, and put() will block until space is available
618 self
.maxsize
= int(maxsize
)
619 self
._queue
= collections
.deque(contents
)
622 'Return the number of items in the queue'
623 return len(self
._queue
)
626 return self
._queue
.popleft()
628 def _put(self
, item
):
629 self
._queue
.append(item
)
632 'Return True is the queue is empty, False otherwise'
633 return (len(self
) == 0)
636 'Return True is the queue is full, False otherwise'
637 return ((len(self
) >= self
.maxsize
) if (self
.maxsize
> 0) else False)
639 def get(self
, timeout
=None):
642 A task that yields the result of this method will be resumed
643 when an item is available in the queue, and the value of the
644 yield expression will be the item. If timeout is not None, a
645 Timeout exception will be raised in the yielding task if an
646 item is not available after timeout seconds have elapsed. For
650 item = (yield queue.get(timeout=5))
652 # No item available after 5 seconds
656 return _QueueAction(self
, timeout
=timeout
)
658 def put(self
, item
, timeout
=None):
661 A task that yields the result of this method will be resumed
662 when item has been added to the queue. If timeout is not
663 None, a Timeout exception will be raised in the yielding task
664 if no space is available after timeout seconds have elapsed.
668 yield queue.put(item, timeout=5)
670 # No space available after 5 seconds
674 return _QueueAction(self
, item
, timeout
=timeout
)
677 class _QueueAction(YieldCondition
):
681 def __init__(self
, queue
, item
=NO_ITEM
, timeout
=None):
682 super(_QueueAction
, self
).__init
__(timeout
)
683 if not isinstance(queue
, Queue
):
684 raise TypeError("'queue' must be a Queue instance")
689 ################################################################################
691 # SmartQueue and _SmartQueueAction classes
693 ################################################################################
697 class SmartQueue(object):
701 A multi-producer, multi-consumer FIFO queue (similar to
702 Queue.Queue) that can be used for exchanging data between tasks.
703 The difference with Queue is that this implements filtering criteria
704 on get and allows multiple get to be signalled for the same put.
705 On the downside, this uses list instead of deque and has lower
710 def __init__(self
, contents
=(), maxsize
=0):
713 Create a new Queue instance. contents is a sequence (empty by
714 default) containing the initial contents of the queue. If
715 maxsize is greater than 0, the queue will hold a maximum of
716 maxsize items, and put() will block until space is available
721 self
.maxsize
= int(maxsize
)
722 self
._pending
= list(contents
)
725 'Return the number of items in the queue'
726 return len(self
._pending
)
728 def _get(self
, criteria
=None):
729 #self._pending = filter(lambda x: x[1]<=now, self._pending) # remove expired ones
731 found
= filter(lambda x
: criteria(x
), self
._pending
) # check any matching criteria
733 self
._pending
.remove(found
[0])
738 return self
._pending
.pop(0) if self
._pending
else None
740 def _put(self
, item
):
741 self
._pending
.append(item
)
744 'Return True is the queue is empty, False otherwise'
745 return (len(self
) == 0)
748 'Return True is the queue is full, False otherwise'
749 return ((len(self
) >= self
.maxsize
) if (self
.maxsize
> 0) else False)
751 def get(self
, timeout
=None, criteria
=None):
754 A task that yields the result of this method will be resumed
755 when an item is available in the queue and the item matches the
756 given criteria (a function, usually lambda), and the value of the
757 yield expression will be the item. If timeout is not None, a
758 Timeout exception will be raised in the yielding task if an
759 item is not available after timeout seconds have elapsed. For
763 item = (yield queue.get(timeout=5, criteria=lambda x: x.name='kundan'))
765 # No item available after 5 seconds
769 return _SmartQueueAction(self
, timeout
=timeout
, criteria
=criteria
)
771 def put(self
, item
, timeout
=None):
774 A task that yields the result of this method will be resumed
775 when item has been added to the queue. If timeout is not
776 None, a Timeout exception will be raised in the yielding task
777 if no space is available after timeout seconds have elapsed.
778 TODO: Otherwise if space is available, the timeout specifies how
779 long to keep the item in the queue before discarding it if it
780 is not fetched in a get. In this case it doesnot throw exception.
784 yield queue.put(item, timeout=5)
786 # No space available after 5 seconds
790 return _SmartQueueAction(self
, item
, timeout
=timeout
)
793 class _SmartQueueAction(YieldCondition
):
797 def __init__(self
, queue
, item
=NO_ITEM
, timeout
=None, criteria
=None):
798 super(_SmartQueueAction
, self
).__init
__(timeout
)
799 if not isinstance(queue
, SmartQueue
):
800 raise TypeError("'queue' must be a SmartQueue instance")
803 self
.criteria
= criteria
804 self
.expires
= (timeout
is not None) and (time
.time() + timeout
) or 0
807 ################################################################################
811 ################################################################################
815 class TaskManager(object):
819 Engine for running a set of cooperatively-multitasking tasks
820 within a single Python thread
827 Create a new TaskManager instance. Generally, there will only
828 be one of these per Python process. If you want to run two
829 existing instances simultaneously, merge them first, then run
834 self
._queue
= collections
.deque()
835 self
._read
_waits
= set()
836 self
._write
_waits
= set()
837 self
._exc
_waits
= set()
838 self
._queue
_waits
= collections
.defaultdict(self
._double
_deque
)
843 return (collections
.deque(), collections
.deque())
845 def merge(self
, other
):
848 Merge this TaskManager with another. After the merge, the two
849 objects share the same (merged) internal data structures, so
850 either can be used to manage the combined task set.
854 if not isinstance(other
, TaskManager
):
855 raise TypeError("'other' must be a TaskManager instance")
857 # Merge the data structures
858 self
._queue
.extend(other
._queue
)
859 self
._read
_waits |
= other
._read
_waits
860 self
._write
_waits |
= other
._write
_waits
861 self
._exc
_waits |
= other
._exc
_waits
862 self
._queue
_waits
.update(other
._queue
_waits
)
863 self
._timeouts
.extend(other
._timeouts
)
864 heapq
.heapify(self
._timeouts
)
866 # Make other reference the merged data structures. This is
867 # necessary because other's tasks may reference and use other
868 # (e.g. to add a new task in response to an event).
869 other
._queue
= self
._queue
870 other
._read
_waits
= self
._read
_waits
871 other
._write
_waits
= self
._write
_waits
872 other
._exc
_waits
= self
._exc
_waits
873 other
._queue
_waits
= self
._queue
_waits
874 other
._timeouts
= self
._timeouts
877 'Add a new task (i.e. a generator instance) to the run queue'
879 if not isinstance(task
, types
.GeneratorType
):
880 raise TypeError("'task' must be a generator")
883 def _enqueue(self
, task
, input=None, exc_info
=()):
884 self
._queue
.append((task
, input, exc_info
))
889 Call run_next() repeatedly until there are no tasks that are
890 currently runnable, waiting for I/O, or waiting to time out.
891 Note that this method can block indefinitely (e.g. if there
892 are only I/O waits and no timeouts). If this is unacceptable,
893 use run_next() instead.
896 while self
.has_runnable() or self
.has_io_waits() or self
.has_timeouts():
899 def has_runnable(self
):
902 Return True is there are runnable tasks in the queue, False
906 return bool(self
._queue
)
908 def has_io_waits(self
):
911 Return True is there are tasks waiting for I/O, False
915 return bool(self
._read
_waits
or self
._write
_waits
or self
._exc
_waits
)
917 def has_timeouts(self
):
920 Return True is there are tasks with pending timeouts, False
924 return bool(self
._timeouts
)
926 def run_next(self
, timeout
=None):
929 Perform one iteration of the run cycle: check whether any
930 pending I/O operations can be performed, check whether any
931 timeouts have expired, then run all currently runnable tasks.
933 The timeout argument specifies the maximum time to wait for
934 some task to become runnable. If timeout is None and there
935 are no currently runnable tasks, but there are tasks waiting
936 to perform I/O or time out, then this method will block until
937 at least one of the waiting tasks becomes runnable. To
938 prevent this method from blocking indefinitely, use timeout to
939 specify the maximum number of seconds to wait.
941 If there are runnable tasks in the queue when run_next() is
942 called, then it will check for I/O readiness using a
943 non-blocking call to select() (i.e. a poll), and only
944 already-expired timeouts will be handled. This ensures both
945 that the task manager is never idle when tasks can be run and
946 that tasks waiting for I/O never starve.
950 if self
.has_io_waits():
951 self
._handle
_io
_waits
(self
._fix
_run
_timeout
(timeout
))
953 if self
.has_timeouts():
954 self
._handle
_timeouts
(self
._fix
_run
_timeout
(timeout
))
956 # Run all tasks currently in the queue
957 for dummy
in xrange(len(self
._queue
)):
958 task
, input, exc_info
= self
._queue
.popleft()
961 output
= task
.throw(*exc_info
)
963 output
= task
.send(input)
964 except StopIteration, e
:
965 if isinstance(task
, _ChildTask
):
968 elif len(e
.args
) == 1:
972 self
._enqueue
(task
.parent
, input=output
)
974 if isinstance(task
, _ChildTask
):
975 # Propagate exception to parent
976 self
._enqueue
(task
.parent
, exc_info
=sys
.exc_info())
978 # No parent task, so just die
981 self
._handle
_task
_output
(task
, output
)
983 def _fix_run_timeout(self
, timeout
):
984 if self
.has_runnable():
985 # Don't block if there are tasks in the queue
987 elif self
.has_timeouts():
988 # If there are timeouts, block only until the first expiration
989 expiration_timeout
= max(0.0, self
._timeouts
[0][0] - time
.time())
990 if (timeout
is None) or (timeout
> expiration_timeout
):
991 timeout
= expiration_timeout
994 def _handle_io_waits(self
, timeout
):
995 # The error handling here is (mostly) borrowed from Twisted
997 read_ready
, write_ready
, exc_ready
= \
998 select
.select(self
._read
_waits
,
1002 except (TypeError, ValueError):
1003 self
._remove
_bad
_file
_descriptors
()
1004 except (select
.error
, IOError), err
:
1005 if err
[0] == errno
.EINTR
:
1007 elif ((err
[0] == errno
.EBADF
) or
1008 ((sys
.platform
== 'win32') and
1009 (err
[0] == 10038))): # WSAENOTSOCK
1010 self
._remove
_bad
_file
_descriptors
()
1012 # Not an error we can handle, so die
1015 for fd
in set(read_ready
+ write_ready
+ exc_ready
):
1017 input = (fd
._eval
() if isinstance(fd
, FDAction
) else None)
1018 self
._enqueue
(fd
.task
, input=input)
1020 self
._enqueue
(fd
.task
, exc_info
=sys
.exc_info())
1021 fd
._remove
_from
_fdsets
(self
._read
_waits
,
1025 self
._remove
_timeout
(fd
)
1027 def _remove_bad_file_descriptors(self
):
1028 for fd
in (self
._read
_waits | self
._write
_waits | self
._exc
_waits
):
1030 select
.select([fd
], [fd
], [fd
], 0.0)
1032 self
._enqueue
(fd
.task
, exc_info
=sys
.exc_info())
1033 fd
._remove
_from
_fdsets
(self
._read
_waits
,
1037 self
._remove
_timeout
(fd
)
1039 def _add_timeout(self
, item
, handler
):
1040 item
.handle_expiration
= handler
1041 heapq
.heappush(self
._timeouts
, (item
.expiration
, item
))
1043 def _remove_timeout(self
, item
):
1044 self
._timeouts
.remove((item
.expiration
, item
))
1045 heapq
.heapify(self
._timeouts
)
1047 def _handle_timeouts(self
, timeout
):
1048 if (not self
.has_runnable()) and (timeout
> 0.0):
1051 current_time
= time
.time()
1053 while self
._timeouts
and (self
._timeouts
[0][0] <= current_time
):
1054 item
= heapq
.heappop(self
._timeouts
)[1]
1055 if isinstance(item
, _SleepDelay
):
1056 self
._enqueue
(item
.task
)
1058 self
._enqueue
(item
.task
, exc_info
=(Timeout
,))
1059 item
.handle_expiration()
1061 def _handle_task_output(self
, task
, output
):
1062 if isinstance(output
, types
.GeneratorType
):
1063 self
._enqueue
(_ChildTask(task
, output
))
1064 elif isinstance(output
, YieldCondition
):
1066 if isinstance(output
, _SleepDelay
):
1067 self
._add
_timeout
(output
, None)
1068 elif isinstance(output
, FDReady
):
1069 self
._handle
_fdready
(task
, output
)
1070 elif isinstance(output
, _QueueAction
):
1071 self
._handle
_queue
_action
(task
, output
)
1072 elif isinstance(output
, _SmartQueueAction
):
1073 self
._handle
_smart
_queue
_action
(task
, output
)
1075 # Return any other output as input and send task to
1077 self
._enqueue
(task
, input=output
)
1079 def _handle_fdready(self
, task
, output
):
1080 output
._add
_to
_fdsets
(self
._read
_waits
,
1083 if output
._expires
():
1084 self
._add
_timeout
(output
,
1086 output
._remove
_from
_fdsets
(self
._read
_waits
,
1090 def _handle_queue_action(self
, task
, output
):
1091 get_waits
, put_waits
= self
._queue
_waits
[output
.queue
]
1093 if output
.item
is output
.NO_ITEM
:
1095 if output
.queue
.empty():
1096 get_waits
.append(output
)
1097 if output
._expires
():
1098 self
._add
_timeout
(output
,
1099 (lambda: get_waits
.remove(output
)))
1101 item
= output
.queue
._get
()
1102 self
._enqueue
(task
, input=item
)
1104 action
= put_waits
.popleft()
1105 output
.queue
._put
(action
.item
)
1106 self
._enqueue
(action
.task
)
1107 if action
._expires
():
1108 self
._remove
_timeout
(action
)
1111 if output
.queue
.full():
1112 put_waits
.append(output
)
1113 if output
._expires
():
1114 self
._add
_timeout
(output
,
1115 (lambda: put_waits
.remove(output
)))
1117 output
.queue
._put
(output
.item
)
1120 action
= get_waits
.popleft()
1121 item
= output
.queue
._get
()
1122 self
._enqueue
(action
.task
, input=item
)
1123 if action
._expires
():
1124 self
._remove
_timeout
(action
)
1127 def _handle_smart_queue_action(self
, task
, output
):
1128 get_waits
, put_waits
= self
._queue
_waits
[output
.queue
]
1130 if output
.item
is output
.NO_ITEM
:
1132 item
= output
.queue
._get
(criteria
=output
.criteria
)
1134 get_waits
.append(output
)
1135 if output
._expires
():
1136 self
._add
_timeout
(output
,
1137 (lambda: get_waits
.remove(output
)))
1139 self
._enqueue
(task
, input=item
)
1141 action
= put_waits
.popleft()
1142 output
.queue
._put
(action
.item
)
1143 self
._enqueue
(action
.task
)
1144 if action
._expires
():
1145 self
._remove
_timeout
(action
)
1148 if output
.queue
.full():
1149 put_waits
.append(output
)
1150 if output
._expires
():
1151 self
._add
_timeout
(output
,
1152 (lambda: put_waits
.remove(output
)))
1154 output
.queue
._put
(output
.item
)
1158 for action
in get_waits
:
1159 item
= output
.queue
._get
(criteria
=action
.criteria
)
1160 if item
is not None:
1161 actions
.append((action
, item
))
1162 for action
,item
in actions
:
1163 get_waits
.remove(action
)
1164 self
._enqueue
(action
.task
, input=item
)
1165 if action
._expires
():
1166 self
._remove
_timeout
(action
)
1170 ################################################################################
1172 # Default TaskManager instance
1174 ################################################################################
1178 _default_task_manager
= None
1181 def get_default_task_manager():
1182 'Return the default TaskManager instance'
1183 global _default_task_manager
1184 if _default_task_manager
is None:
1185 _default_task_manager
= TaskManager()
1186 return _default_task_manager
1190 'Add a task to the default TaskManager instance'
1191 get_default_task_manager().add(task
)
1195 'Run the default TaskManager instance'
1196 get_default_task_manager().run()
1200 ################################################################################
1204 ################################################################################
1208 if __name__
== '__main__':
1209 if sys
.platform
== 'win32':
1210 # Make sure WSAStartup() is called
1214 for i
in xrange(1, 4):
1215 print '%s:\t%d' % (name
, i
)
1219 t
.add(printer('first'))
1220 t
.add(printer('second'))
1221 t
.add(printer('third'))
1226 print 'receiver started'
1227 print 'receiver received: %s' % (yield queue
.get())
1228 print 'receiver finished'
1231 print 'sender started'
1232 yield queue
.put('from sender')
1233 print 'sender finished'
1235 def bad_descriptor():
1236 print 'bad_descriptor running'
1240 print 'exception in bad_descriptor:', sys
.exc_info()[1]
1243 print 'sleeper started'
1245 print 'sleeper finished'
1247 def timeout_immediately():
1248 print 'timeout_immediately running'
1250 yield Queue().get(timeout
=0)
1252 print 'timeout_immediately timed out'
1256 t2
.add(bad_descriptor())
1259 t2
.add(timeout_immediately())
1262 print 'child returned: %s' % ((yield child()),)
1264 yield child(raise_exc
=True)
1266 print 'exception in child:', sys
.exc_info()[1]
1268 def child(raise_exc
=False):
1271 raise RuntimeError('foo')
1272 raise StopIteration(1, 2, 3)
1281 assert not(t
.has_runnable() or t
.has_io_waits() or t
.has_timeouts())