Fixing line endings and adding a function sink
authorEd Page <epage@Dulcinea.(none)>
Thu, 16 Apr 2009 03:41:05 +0000 (22:41 -0500)
committerEd Page <epage@Dulcinea.(none)>
Thu, 16 Apr 2009 03:41:05 +0000 (22:41 -0500)
src/coroutines.py

index 6d802e8..320eb10 100755 (executable)
-#!/usr/bin/env python\r
-\r
-"""\r
-Uses for generators\r
-* Pull pipelining (iterators)\r
-* Push pipelining (coroutines)\r
-* State machines (coroutines)\r
-* "Cooperative multitasking" (coroutines)\r
-* Algorithm -> Object transform for cohesiveness (for example context managers) (coroutines)\r
-\r
-Design considerations\r
-* When should a stage pass on exceptions or have it thrown within it?\r
-* When should a stage pass on GeneratorExits?\r
-* Is there a way to either turn a push generator into a iterator or to use\r
-       comprehensions syntax for push generators (I doubt it)\r
-* When should the stage try and send data in both directions\r
-* Since pull generators (generators), push generators (coroutines), subroutines, and coroutines are all coroutines, maybe we should rename the push generators to not confuse them, like signals/slots? and then refer to two-way generators as coroutines\r
-** If so, make s* and co* implementation of functions\r
-"""\r
-\r
-import threading\r
-import Queue\r
-import pickle\r
-import functools\r
-import itertools\r
-import xml.sax\r
-import xml.parsers.expat\r
-\r
-\r
-def autostart(func):\r
-       """\r
-       >>> @autostart\r
-       ... def grep_sink(pattern):\r
-       ...     print "Looking for %s" % pattern\r
-       ...     while True:\r
-       ...             line = yield\r
-       ...             if pattern in line:\r
-       ...                     print line,\r
-       >>> g = grep_sink("python")\r
-       Looking for python\r
-       >>> g.send("Yeah but no but yeah but no")\r
-       >>> g.send("A series of tubes")\r
-       >>> g.send("python generators rock!")\r
-       python generators rock!\r
-       >>> g.close()\r
-       """\r
-\r
-       @functools.wraps(func)\r
-       def start(*args, **kwargs):\r
-               cr = func(*args, **kwargs)\r
-               cr.next()\r
-               return cr\r
-\r
-       return start\r
-\r
-\r
-@autostart\r
-def printer_sink(format = "%s"):\r
-       """\r
-       >>> pr = printer_sink("%r")\r
-       >>> pr.send("Hello")\r
-       'Hello'\r
-       >>> pr.send("5")\r
-       '5'\r
-       >>> pr.send(5)\r
-       5\r
-       >>> p = printer_sink()\r
-       >>> p.send("Hello")\r
-       Hello\r
-       >>> p.send("World")\r
-       World\r
-       >>> # p.throw(RuntimeError, "Goodbye")\r
-       >>> # p.send("Meh")\r
-       >>> # p.close()\r
-       """\r
-       while True:\r
-               item = yield\r
-               print format % (item, )\r
-\r
-\r
-@autostart\r
-def null_sink():\r
-       """\r
-       Good for uses like with cochain to pick up any slack\r
-       """\r
-       while True:\r
-               item = yield\r
-\r
-\r
-def itr_source(itr, target):\r
-       """\r
-       >>> itr_source(xrange(2), printer_sink())\r
-       0\r
-       1\r
-       """\r
-       for item in itr:\r
-               target.send(item)\r
-\r
-\r
-@autostart\r
-def cofilter(predicate, target):\r
-       """\r
-       >>> p = printer_sink()\r
-       >>> cf = cofilter(None, p)\r
-       >>> cf.send("")\r
-       >>> cf.send("Hello")\r
-       Hello\r
-       >>> cf.send([])\r
-       >>> cf.send([1, 2])\r
-       [1, 2]\r
-       >>> cf.send(False)\r
-       >>> cf.send(True)\r
-       True\r
-       >>> cf.send(0)\r
-       >>> cf.send(1)\r
-       1\r
-       >>> # cf.throw(RuntimeError, "Goodbye")\r
-       >>> # cf.send(False)\r
-       >>> # cf.send(True)\r
-       >>> # cf.close()\r
-       """\r
-       if predicate is None:\r
-               predicate = bool\r
-\r
-       while True:\r
-               try:\r
-                       item = yield\r
-                       if predicate(item):\r
-                               target.send(item)\r
-               except StandardError, e:\r
-                       target.throw(e.__class__, e.message)\r
-\r
-\r
-@autostart\r
-def comap(function, target):\r
-       """\r
-       >>> p = printer_sink()\r
-       >>> cm = comap(lambda x: x+1, p)\r
-       >>> cm.send(0)\r
-       1\r
-       >>> cm.send(1.0)\r
-       2.0\r
-       >>> cm.send(-2)\r
-       -1\r
-       >>> # cm.throw(RuntimeError, "Goodbye")\r
-       >>> # cm.send(0)\r
-       >>> # cm.send(1.0)\r
-       >>> # cm.close()\r
-       """\r
-       while True:\r
-               try:\r
-                       item = yield\r
-                       mappedItem = function(item)\r
-                       target.send(mappedItem)\r
-               except StandardError, e:\r
-                       target.throw(e.__class__, e.message)\r
-\r
-\r
-@autostart\r
-def append_sink(l):\r
-       """\r
-       >>> l = []\r
-       >>> apps = append_sink(l)\r
-       >>> apps.send(1)\r
-       >>> apps.send(2)\r
-       >>> apps.send(3)\r
-       >>> print l\r
-       [1, 2, 3]\r
-       """\r
-       while True:\r
-               item = yield\r
-               l.append(item)\r
-\r
-\r
-@autostart\r
-def last_n_sink(l, n = 1):\r
-       """\r
-       >>> l = []\r
-       >>> lns = last_n_sink(l)\r
-       >>> lns.send(1)\r
-       >>> lns.send(2)\r
-       >>> lns.send(3)\r
-       >>> print l\r
-       [3]\r
-       """\r
-       del l[:]\r
-       while True:\r
-               item = yield\r
-               extraCount = len(l) - n + 1\r
-               if 0 < extraCount:\r
-                       del l[0:extraCount]\r
-               l.append(item)\r
-\r
-\r
-@autostart\r
-def coreduce(target, function, initializer = None):\r
-       """\r
-       >>> reduceResult = []\r
-       >>> lns = last_n_sink(reduceResult)\r
-       >>> cr = coreduce(lns, lambda x, y: x + y, 0)\r
-       >>> cr.send(1)\r
-       >>> cr.send(2)\r
-       >>> cr.send(3)\r
-       >>> print reduceResult\r
-       [6]\r
-       >>> cr = coreduce(lns, lambda x, y: x + y)\r
-       >>> cr.send(1)\r
-       >>> cr.send(2)\r
-       >>> cr.send(3)\r
-       >>> print reduceResult\r
-       [6]\r
-       """\r
-       isFirst = True\r
-       cumulativeRef = initializer\r
-       while True:\r
-               item = yield\r
-               if isFirst and initializer is None:\r
-                       cumulativeRef = item\r
-               else:\r
-                       cumulativeRef = function(cumulativeRef, item)\r
-               target.send(cumulativeRef)\r
-               isFirst = False\r
-\r
-\r
-@autostart\r
-def cotee(targets):\r
-       """\r
-       Takes a sequence of coroutines and sends the received items to all of them\r
-\r
-       >>> ct = cotee((printer_sink("1 %s"), printer_sink("2 %s")))\r
-       >>> ct.send("Hello")\r
-       1 Hello\r
-       2 Hello\r
-       >>> ct.send("World")\r
-       1 World\r
-       2 World\r
-       >>> # ct.throw(RuntimeError, "Goodbye")\r
-       >>> # ct.send("Meh")\r
-       >>> # ct.close()\r
-       """\r
-       while True:\r
-               try:\r
-                       item = yield\r
-                       for target in targets:\r
-                               target.send(item)\r
-               except StandardError, e:\r
-                       for target in targets:\r
-                               target.throw(e.__class__, e.message)\r
-\r
-\r
-class CoTee(object):\r
-       """\r
-       >>> ct = CoTee()\r
-       >>> ct.register_sink(printer_sink("1 %s"))\r
-       >>> ct.register_sink(printer_sink("2 %s"))\r
-       >>> ct.stage.send("Hello")\r
-       1 Hello\r
-       2 Hello\r
-       >>> ct.stage.send("World")\r
-       1 World\r
-       2 World\r
-       >>> ct.register_sink(printer_sink("3 %s"))\r
-       >>> ct.stage.send("Foo")\r
-       1 Foo\r
-       2 Foo\r
-       3 Foo\r
-       >>> # ct.stage.throw(RuntimeError, "Goodbye")\r
-       >>> # ct.stage.send("Meh")\r
-       >>> # ct.stage.close()\r
-       """\r
-\r
-       def __init__(self):\r
-               self.stage = self._stage()\r
-               self._targets = []\r
-\r
-       def register_sink(self, sink):\r
-               self._targets.append(sink)\r
-\r
-       def unregister_sink(self, sink):\r
-               self._targets.remove(sink)\r
-\r
-       def restart(self):\r
-               self.stage = self._stage()\r
-\r
-       @autostart\r
-       def _stage(self):\r
-               while True:\r
-                       try:\r
-                               item = yield\r
-                               for target in self._targets:\r
-                                       target.send(item)\r
-                       except StandardError, e:\r
-                               for target in self._targets:\r
-                                       target.throw(e.__class__, e.message)\r
-\r
-\r
-def _flush_queue(queue):\r
-       while not queue.empty():\r
-               yield queue.get()\r
-\r
-\r
-@autostart\r
-def cocount(target, start = 0):\r
-       """\r
-       >>> cc = cocount(printer_sink("%s"))\r
-       >>> cc.send("a")\r
-       0\r
-       >>> cc.send(None)\r
-       1\r
-       >>> cc.send([])\r
-       2\r
-       >>> cc.send(0)\r
-       3\r
-       """\r
-       for i in itertools.count(start):\r
-               item = yield\r
-               target.send(i)\r
-\r
-\r
-@autostart\r
-def coenumerate(target, start = 0):\r
-       """\r
-       >>> ce = coenumerate(printer_sink("%r"))\r
-       >>> ce.send("a")\r
-       (0, 'a')\r
-       >>> ce.send(None)\r
-       (1, None)\r
-       >>> ce.send([])\r
-       (2, [])\r
-       >>> ce.send(0)\r
-       (3, 0)\r
-       """\r
-       for i in itertools.count(start):\r
-               item = yield\r
-               decoratedItem = i, item\r
-               target.send(decoratedItem)\r
-\r
-\r
-@autostart\r
-def corepeat(target, elem):\r
-       """\r
-       >>> cr = corepeat(printer_sink("%s"), "Hello World")\r
-       >>> cr.send("a")\r
-       Hello World\r
-       >>> cr.send(None)\r
-       Hello World\r
-       >>> cr.send([])\r
-       Hello World\r
-       >>> cr.send(0)\r
-       Hello World\r
-       """\r
-       while True:\r
-               item = yield\r
-               target.send(elem)\r
-\r
-\r
-@autostart\r
-def cointercept(target, elems):\r
-       """\r
-       >>> cr = cointercept(printer_sink("%s"), [1, 2, 3, 4])\r
-       >>> cr.send("a")\r
-       1\r
-       >>> cr.send(None)\r
-       2\r
-       >>> cr.send([])\r
-       3\r
-       >>> cr.send(0)\r
-       4\r
-       >>> cr.send("Bye")\r
-       Traceback (most recent call last):\r
-         File "/usr/lib/python2.5/doctest.py", line 1228, in __run\r
-           compileflags, 1) in test.globs\r
-         File "<doctest __main__.cointercept[5]>", line 1, in <module>\r
-           cr.send("Bye")\r
-       StopIteration\r
-       """\r
-       item = yield\r
-       for elem in elems:\r
-               target.send(elem)\r
-               item = yield\r
-\r
-\r
-@autostart\r
-def codropwhile(target, pred):\r
-       """\r
-       >>> cdw = codropwhile(printer_sink("%s"), lambda x: x)\r
-       >>> cdw.send([0, 1, 2])\r
-       >>> cdw.send(1)\r
-       >>> cdw.send(True)\r
-       >>> cdw.send(False)\r
-       >>> cdw.send([0, 1, 2])\r
-       [0, 1, 2]\r
-       >>> cdw.send(1)\r
-       1\r
-       >>> cdw.send(True)\r
-       True\r
-       """\r
-       while True:\r
-               item = yield\r
-               if not pred(item):\r
-                       break\r
-\r
-       while True:\r
-               item = yield\r
-               target.send(item)\r
-\r
-\r
-@autostart\r
-def cotakewhile(target, pred):\r
-       """\r
-       >>> ctw = cotakewhile(printer_sink("%s"), lambda x: x)\r
-       >>> ctw.send([0, 1, 2])\r
-       [0, 1, 2]\r
-       >>> ctw.send(1)\r
-       1\r
-       >>> ctw.send(True)\r
-       True\r
-       >>> ctw.send(False)\r
-       >>> ctw.send([0, 1, 2])\r
-       >>> ctw.send(1)\r
-       >>> ctw.send(True)\r
-       """\r
-       while True:\r
-               item = yield\r
-               if not pred(item):\r
-                       break\r
-               target.send(item)\r
-\r
-       while True:\r
-               item = yield\r
-\r
-\r
-@autostart\r
-def coslice(target, lower, upper):\r
-       """\r
-       >>> cs = coslice(printer_sink("%r"), 3, 5)\r
-       >>> cs.send("0")\r
-       >>> cs.send("1")\r
-       >>> cs.send("2")\r
-       >>> cs.send("3")\r
-       '3'\r
-       >>> cs.send("4")\r
-       '4'\r
-       >>> cs.send("5")\r
-       >>> cs.send("6")\r
-       """\r
-       for i in xrange(lower):\r
-               item = yield\r
-       for i in xrange(upper - lower):\r
-               item = yield\r
-               target.send(item)\r
-       while True:\r
-               item = yield\r
-\r
-\r
-@autostart\r
-def cochain(targets):\r
-       """\r
-       >>> cr = cointercept(printer_sink("good %s"), [1, 2, 3, 4])\r
-       >>> cc = cochain([cr, printer_sink("end %s")])\r
-       >>> cc.send("a")\r
-       good 1\r
-       >>> cc.send(None)\r
-       good 2\r
-       >>> cc.send([])\r
-       good 3\r
-       >>> cc.send(0)\r
-       good 4\r
-       >>> cc.send("Bye")\r
-       end Bye\r
-       """\r
-       behind = []\r
-       for target in targets:\r
-               try:\r
-                       while behind:\r
-                               item = behind.pop()\r
-                               target.send(item)\r
-                       while True:\r
-                               item = yield\r
-                               target.send(item)\r
-               except StopIteration:\r
-                       behind.append(item)\r
-\r
-\r
-@autostart\r
-def queue_sink(queue):\r
-       """\r
-       >>> q = Queue.Queue()\r
-       >>> qs = queue_sink(q)\r
-       >>> qs.send("Hello")\r
-       >>> qs.send("World")\r
-       >>> qs.throw(RuntimeError, "Goodbye")\r
-       >>> qs.send("Meh")\r
-       >>> qs.close()\r
-       >>> print [i for i in _flush_queue(q)]\r
-       [(None, 'Hello'), (None, 'World'), (<type 'exceptions.RuntimeError'>, 'Goodbye'), (None, 'Meh'), (<type 'exceptions.GeneratorExit'>, None)]\r
-       """\r
-       while True:\r
-               try:\r
-                       item = yield\r
-                       queue.put((None, item))\r
-               except StandardError, e:\r
-                       queue.put((e.__class__, e.message))\r
-               except GeneratorExit:\r
-                       queue.put((GeneratorExit, None))\r
-                       raise\r
-\r
-\r
-def decode_item(item, target):\r
-       if item[0] is None:\r
-               target.send(item[1])\r
-               return False\r
-       elif item[0] is GeneratorExit:\r
-               target.close()\r
-               return True\r
-       else:\r
-               target.throw(item[0], item[1])\r
-               return False\r
-\r
-\r
-def queue_source(queue, target):\r
-       """\r
-       >>> q = Queue.Queue()\r
-       >>> for i in [\r
-       ...     (None, 'Hello'),\r
-       ...     (None, 'World'),\r
-       ...     (GeneratorExit, None),\r
-       ...     ]:\r
-       ...     q.put(i)\r
-       >>> qs = queue_source(q, printer_sink())\r
-       Hello\r
-       World\r
-       """\r
-       isDone = False\r
-       while not isDone:\r
-               item = queue.get()\r
-               isDone = decode_item(item, target)\r
-\r
-\r
-def threaded_stage(target, thread_factory = threading.Thread):\r
-       messages = Queue.Queue()\r
-\r
-       run_source = functools.partial(queue_source, messages, target)\r
-       thread_factory(target=run_source).start()\r
-\r
-       # Sink running in current thread\r
-       return functools.partial(queue_sink, messages)\r
-\r
-\r
-@autostart\r
-def pickle_sink(f):\r
-       while True:\r
-               try:\r
-                       item = yield\r
-                       pickle.dump((None, item), f)\r
-               except StandardError, e:\r
-                       pickle.dump((e.__class__, e.message), f)\r
-               except GeneratorExit:\r
-                       pickle.dump((GeneratorExit, ), f)\r
-                       raise\r
-               except StopIteration:\r
-                       f.close()\r
-                       return\r
-\r
-\r
-def pickle_source(f, target):\r
-       try:\r
-               isDone = False\r
-               while not isDone:\r
-                       item = pickle.load(f)\r
-                       isDone = decode_item(item, target)\r
-       except EOFError:\r
-               target.close()\r
-\r
-\r
-class EventHandler(object, xml.sax.ContentHandler):\r
-\r
-       START = "start"\r
-       TEXT = "text"\r
-       END = "end"\r
-\r
-       def __init__(self, target):\r
-               object.__init__(self)\r
-               xml.sax.ContentHandler.__init__(self)\r
-               self._target = target\r
-\r
-       def startElement(self, name, attrs):\r
-               self._target.send((self.START, (name, attrs._attrs)))\r
-\r
-       def characters(self, text):\r
-               self._target.send((self.TEXT, text))\r
-\r
-       def endElement(self, name):\r
-               self._target.send((self.END, name))\r
-\r
-\r
-def expat_parse(f, target):\r
-       parser = xml.parsers.expat.ParserCreate()\r
-       parser.buffer_size = 65536\r
-       parser.buffer_text = True\r
-       parser.returns_unicode = False\r
-       parser.StartElementHandler = lambda name, attrs: target.send(('start', (name, attrs)))\r
-       parser.EndElementHandler = lambda name: target.send(('end', name))\r
-       parser.CharacterDataHandler = lambda data: target.send(('text', data))\r
-       parser.ParseFile(f)\r
-\r
-\r
-if __name__ == "__main__":\r
-       import doctest\r
-       doctest.testmod()\r
+#!/usr/bin/env python
+
+"""
+Uses for generators
+* Pull pipelining (iterators)
+* Push pipelining (coroutines)
+* State machines (coroutines)
+* "Cooperative multitasking" (coroutines)
+* Algorithm -> Object transform for cohesiveness (for example context managers) (coroutines)
+
+Design considerations
+* When should a stage pass on exceptions or have it thrown within it?
+* When should a stage pass on GeneratorExits?
+* Is there a way to either turn a push generator into a iterator or to use
+       comprehensions syntax for push generators (I doubt it)
+* When should the stage try and send data in both directions
+* Since pull generators (generators), push generators (coroutines), subroutines, and coroutines are all coroutines, maybe we should rename the push generators to not confuse them, like signals/slots? and then refer to two-way generators as coroutines
+** If so, make s* and co* implementation of functions
+"""
+
+import threading
+import Queue
+import pickle
+import functools
+import itertools
+import xml.sax
+import xml.parsers.expat
+
+
+def autostart(func):
+       """
+       >>> @autostart
+       ... def grep_sink(pattern):
+       ...     print "Looking for %s" % pattern
+       ...     while True:
+       ...             line = yield
+       ...             if pattern in line:
+       ...                     print line,
+       >>> g = grep_sink("python")
+       Looking for python
+       >>> g.send("Yeah but no but yeah but no")
+       >>> g.send("A series of tubes")
+       >>> g.send("python generators rock!")
+       python generators rock!
+       >>> g.close()
+       """
+
+       @functools.wraps(func)
+       def start(*args, **kwargs):
+               cr = func(*args, **kwargs)
+               cr.next()
+               return cr
+
+       return start
+
+
+@autostart
+def printer_sink(format = "%s"):
+       """
+       >>> pr = printer_sink("%r")
+       >>> pr.send("Hello")
+       'Hello'
+       >>> pr.send("5")
+       '5'
+       >>> pr.send(5)
+       5
+       >>> p = printer_sink()
+       >>> p.send("Hello")
+       Hello
+       >>> p.send("World")
+       World
+       >>> # p.throw(RuntimeError, "Goodbye")
+       >>> # p.send("Meh")
+       >>> # p.close()
+       """
+       while True:
+               item = yield
+               print format % (item, )
+
+
+@autostart
+def null_sink():
+       """
+       Good for uses like with cochain to pick up any slack
+       """
+       while True:
+               item = yield
+
+
+def itr_source(itr, target):
+       """
+       >>> itr_source(xrange(2), printer_sink())
+       0
+       1
+       """
+       for item in itr:
+               target.send(item)
+
+
+@autostart
+def cofilter(predicate, target):
+       """
+       >>> p = printer_sink()
+       >>> cf = cofilter(None, p)
+       >>> cf.send("")
+       >>> cf.send("Hello")
+       Hello
+       >>> cf.send([])
+       >>> cf.send([1, 2])
+       [1, 2]
+       >>> cf.send(False)
+       >>> cf.send(True)
+       True
+       >>> cf.send(0)
+       >>> cf.send(1)
+       1
+       >>> # cf.throw(RuntimeError, "Goodbye")
+       >>> # cf.send(False)
+       >>> # cf.send(True)
+       >>> # cf.close()
+       """
+       if predicate is None:
+               predicate = bool
+
+       while True:
+               try:
+                       item = yield
+                       if predicate(item):
+                               target.send(item)
+               except StandardError, e:
+                       target.throw(e.__class__, e.message)
+
+
+@autostart
+def comap(function, target):
+       """
+       >>> p = printer_sink()
+       >>> cm = comap(lambda x: x+1, p)
+       >>> cm.send(0)
+       1
+       >>> cm.send(1.0)
+       2.0
+       >>> cm.send(-2)
+       -1
+       >>> # cm.throw(RuntimeError, "Goodbye")
+       >>> # cm.send(0)
+       >>> # cm.send(1.0)
+       >>> # cm.close()
+       """
+       while True:
+               try:
+                       item = yield
+                       mappedItem = function(item)
+                       target.send(mappedItem)
+               except StandardError, e:
+                       target.throw(e.__class__, e.message)
+
+
+def func_sink(function):
+       return comap(function, null_sink())
+
+
+@autostart
+def append_sink(l):
+       """
+       >>> l = []
+       >>> apps = append_sink(l)
+       >>> apps.send(1)
+       >>> apps.send(2)
+       >>> apps.send(3)
+       >>> print l
+       [1, 2, 3]
+       """
+       while True:
+               item = yield
+               l.append(item)
+
+
+@autostart
+def last_n_sink(l, n = 1):
+       """
+       >>> l = []
+       >>> lns = last_n_sink(l)
+       >>> lns.send(1)
+       >>> lns.send(2)
+       >>> lns.send(3)
+       >>> print l
+       [3]
+       """
+       del l[:]
+       while True:
+               item = yield
+               extraCount = len(l) - n + 1
+               if 0 < extraCount:
+                       del l[0:extraCount]
+               l.append(item)
+
+
+@autostart
+def coreduce(target, function, initializer = None):
+       """
+       >>> reduceResult = []
+       >>> lns = last_n_sink(reduceResult)
+       >>> cr = coreduce(lns, lambda x, y: x + y, 0)
+       >>> cr.send(1)
+       >>> cr.send(2)
+       >>> cr.send(3)
+       >>> print reduceResult
+       [6]
+       >>> cr = coreduce(lns, lambda x, y: x + y)
+       >>> cr.send(1)
+       >>> cr.send(2)
+       >>> cr.send(3)
+       >>> print reduceResult
+       [6]
+       """
+       isFirst = True
+       cumulativeRef = initializer
+       while True:
+               item = yield
+               if isFirst and initializer is None:
+                       cumulativeRef = item
+               else:
+                       cumulativeRef = function(cumulativeRef, item)
+               target.send(cumulativeRef)
+               isFirst = False
+
+
+@autostart
+def cotee(targets):
+       """
+       Takes a sequence of coroutines and sends the received items to all of them
+
+       >>> ct = cotee((printer_sink("1 %s"), printer_sink("2 %s")))
+       >>> ct.send("Hello")
+       1 Hello
+       2 Hello
+       >>> ct.send("World")
+       1 World
+       2 World
+       >>> # ct.throw(RuntimeError, "Goodbye")
+       >>> # ct.send("Meh")
+       >>> # ct.close()
+       """
+       while True:
+               try:
+                       item = yield
+                       for target in targets:
+                               target.send(item)
+               except StandardError, e:
+                       for target in targets:
+                               target.throw(e.__class__, e.message)
+
+
+class CoTee(object):
+       """
+       >>> ct = CoTee()
+       >>> ct.register_sink(printer_sink("1 %s"))
+       >>> ct.register_sink(printer_sink("2 %s"))
+       >>> ct.stage.send("Hello")
+       1 Hello
+       2 Hello
+       >>> ct.stage.send("World")
+       1 World
+       2 World
+       >>> ct.register_sink(printer_sink("3 %s"))
+       >>> ct.stage.send("Foo")
+       1 Foo
+       2 Foo
+       3 Foo
+       >>> # ct.stage.throw(RuntimeError, "Goodbye")
+       >>> # ct.stage.send("Meh")
+       >>> # ct.stage.close()
+       """
+
+       def __init__(self):
+               self.stage = self._stage()
+               self._targets = []
+
+       def register_sink(self, sink):
+               self._targets.append(sink)
+
+       def unregister_sink(self, sink):
+               self._targets.remove(sink)
+
+       def restart(self):
+               self.stage = self._stage()
+
+       @autostart
+       def _stage(self):
+               while True:
+                       try:
+                               item = yield
+                               for target in self._targets:
+                                       target.send(item)
+                       except StandardError, e:
+                               for target in self._targets:
+                                       target.throw(e.__class__, e.message)
+
+
+def _flush_queue(queue):
+       while not queue.empty():
+               yield queue.get()
+
+
+@autostart
+def cocount(target, start = 0):
+       """
+       >>> cc = cocount(printer_sink("%s"))
+       >>> cc.send("a")
+       0
+       >>> cc.send(None)
+       1
+       >>> cc.send([])
+       2
+       >>> cc.send(0)
+       3
+       """
+       for i in itertools.count(start):
+               item = yield
+               target.send(i)
+
+
+@autostart
+def coenumerate(target, start = 0):
+       """
+       >>> ce = coenumerate(printer_sink("%r"))
+       >>> ce.send("a")
+       (0, 'a')
+       >>> ce.send(None)
+       (1, None)
+       >>> ce.send([])
+       (2, [])
+       >>> ce.send(0)
+       (3, 0)
+       """
+       for i in itertools.count(start):
+               item = yield
+               decoratedItem = i, item
+               target.send(decoratedItem)
+
+
+@autostart
+def corepeat(target, elem):
+       """
+       >>> cr = corepeat(printer_sink("%s"), "Hello World")
+       >>> cr.send("a")
+       Hello World
+       >>> cr.send(None)
+       Hello World
+       >>> cr.send([])
+       Hello World
+       >>> cr.send(0)
+       Hello World
+       """
+       while True:
+               item = yield
+               target.send(elem)
+
+
+@autostart
+def cointercept(target, elems):
+       """
+       >>> cr = cointercept(printer_sink("%s"), [1, 2, 3, 4])
+       >>> cr.send("a")
+       1
+       >>> cr.send(None)
+       2
+       >>> cr.send([])
+       3
+       >>> cr.send(0)
+       4
+       >>> cr.send("Bye")
+       Traceback (most recent call last):
+         File "/usr/lib/python2.5/doctest.py", line 1228, in __run
+           compileflags, 1) in test.globs
+         File "<doctest __main__.cointercept[5]>", line 1, in <module>
+           cr.send("Bye")
+       StopIteration
+       """
+       item = yield
+       for elem in elems:
+               target.send(elem)
+               item = yield
+
+
+@autostart
+def codropwhile(target, pred):
+       """
+       >>> cdw = codropwhile(printer_sink("%s"), lambda x: x)
+       >>> cdw.send([0, 1, 2])
+       >>> cdw.send(1)
+       >>> cdw.send(True)
+       >>> cdw.send(False)
+       >>> cdw.send([0, 1, 2])
+       [0, 1, 2]
+       >>> cdw.send(1)
+       1
+       >>> cdw.send(True)
+       True
+       """
+       while True:
+               item = yield
+               if not pred(item):
+                       break
+
+       while True:
+               item = yield
+               target.send(item)
+
+
+@autostart
+def cotakewhile(target, pred):
+       """
+       >>> ctw = cotakewhile(printer_sink("%s"), lambda x: x)
+       >>> ctw.send([0, 1, 2])
+       [0, 1, 2]
+       >>> ctw.send(1)
+       1
+       >>> ctw.send(True)
+       True
+       >>> ctw.send(False)
+       >>> ctw.send([0, 1, 2])
+       >>> ctw.send(1)
+       >>> ctw.send(True)
+       """
+       while True:
+               item = yield
+               if not pred(item):
+                       break
+               target.send(item)
+
+       while True:
+               item = yield
+
+
+@autostart
+def coslice(target, lower, upper):
+       """
+       >>> cs = coslice(printer_sink("%r"), 3, 5)
+       >>> cs.send("0")
+       >>> cs.send("1")
+       >>> cs.send("2")
+       >>> cs.send("3")
+       '3'
+       >>> cs.send("4")
+       '4'
+       >>> cs.send("5")
+       >>> cs.send("6")
+       """
+       for i in xrange(lower):
+               item = yield
+       for i in xrange(upper - lower):
+               item = yield
+               target.send(item)
+       while True:
+               item = yield
+
+
+@autostart
+def cochain(targets):
+       """
+       >>> cr = cointercept(printer_sink("good %s"), [1, 2, 3, 4])
+       >>> cc = cochain([cr, printer_sink("end %s")])
+       >>> cc.send("a")
+       good 1
+       >>> cc.send(None)
+       good 2
+       >>> cc.send([])
+       good 3
+       >>> cc.send(0)
+       good 4
+       >>> cc.send("Bye")
+       end Bye
+       """
+       behind = []
+       for target in targets:
+               try:
+                       while behind:
+                               item = behind.pop()
+                               target.send(item)
+                       while True:
+                               item = yield
+                               target.send(item)
+               except StopIteration:
+                       behind.append(item)
+
+
+@autostart
+def queue_sink(queue):
+       """
+       >>> q = Queue.Queue()
+       >>> qs = queue_sink(q)
+       >>> qs.send("Hello")
+       >>> qs.send("World")
+       >>> qs.throw(RuntimeError, "Goodbye")
+       >>> qs.send("Meh")
+       >>> qs.close()
+       >>> print [i for i in _flush_queue(q)]
+       [(None, 'Hello'), (None, 'World'), (<type 'exceptions.RuntimeError'>, 'Goodbye'), (None, 'Meh'), (<type 'exceptions.GeneratorExit'>, None)]
+       """
+       while True:
+               try:
+                       item = yield
+                       queue.put((None, item))
+               except StandardError, e:
+                       queue.put((e.__class__, e.message))
+               except GeneratorExit:
+                       queue.put((GeneratorExit, None))
+                       raise
+
+
+def decode_item(item, target):
+       if item[0] is None:
+               target.send(item[1])
+               return False
+       elif item[0] is GeneratorExit:
+               target.close()
+               return True
+       else:
+               target.throw(item[0], item[1])
+               return False
+
+
+def queue_source(queue, target):
+       """
+       >>> q = Queue.Queue()
+       >>> for i in [
+       ...     (None, 'Hello'),
+       ...     (None, 'World'),
+       ...     (GeneratorExit, None),
+       ...     ]:
+       ...     q.put(i)
+       >>> qs = queue_source(q, printer_sink())
+       Hello
+       World
+       """
+       isDone = False
+       while not isDone:
+               item = queue.get()
+               isDone = decode_item(item, target)
+
+
+def threaded_stage(target, thread_factory = threading.Thread):
+       messages = Queue.Queue()
+
+       run_source = functools.partial(queue_source, messages, target)
+       thread_factory(target=run_source).start()
+
+       # Sink running in current thread
+       return functools.partial(queue_sink, messages)
+
+
+@autostart
+def pickle_sink(f):
+       while True:
+               try:
+                       item = yield
+                       pickle.dump((None, item), f)
+               except StandardError, e:
+                       pickle.dump((e.__class__, e.message), f)
+               except GeneratorExit:
+                       pickle.dump((GeneratorExit, ), f)
+                       raise
+               except StopIteration:
+                       f.close()
+                       return
+
+
+def pickle_source(f, target):
+       try:
+               isDone = False
+               while not isDone:
+                       item = pickle.load(f)
+                       isDone = decode_item(item, target)
+       except EOFError:
+               target.close()
+
+
+class EventHandler(object, xml.sax.ContentHandler):
+
+       START = "start"
+       TEXT = "text"
+       END = "end"
+
+       def __init__(self, target):
+               object.__init__(self)
+               xml.sax.ContentHandler.__init__(self)
+               self._target = target
+
+       def startElement(self, name, attrs):
+               self._target.send((self.START, (name, attrs._attrs)))
+
+       def characters(self, text):
+               self._target.send((self.TEXT, text))
+
+       def endElement(self, name):
+               self._target.send((self.END, name))
+
+
+def expat_parse(f, target):
+       parser = xml.parsers.expat.ParserCreate()
+       parser.buffer_size = 65536
+       parser.buffer_text = True
+       parser.returns_unicode = False
+       parser.StartElementHandler = lambda name, attrs: target.send(('start', (name, attrs)))
+       parser.EndElementHandler = lambda name: target.send(('end', name))
+       parser.CharacterDataHandler = lambda data: target.send(('text', data))
+       parser.ParseFile(f)
+
+
+if __name__ == "__main__":
+       import doctest
+       doctest.testmod()