--- /dev/null
+#!/usr/bin/env python\r
+\r
+"""\r
+Uses for generators\r
+* Pull pipelining (iterators)\r
+* Push pipelining (coroutines)\r
+* State machines (coroutines)\r
+* "Cooperative multitasking" (coroutines)\r
+* Algorithm -> Object transform for cohesiveness (for example context managers) (coroutines)\r
+\r
+Design considerations\r
+* When should a stage pass on exceptions or have it thrown within it?\r
+* When should a stage pass on GeneratorExits?\r
+* Is there a way to either turn a push generator into a iterator or to use\r
+ comprehensions syntax for push generators (I doubt it)\r
+* When should the stage try and send data in both directions\r
+* Since pull generators (generators), push generators (coroutines), subroutines, and coroutines are all coroutines, maybe we should rename the push generators to not confuse them, like signals/slots? and then refer to two-way generators as coroutines\r
+** If so, make s* and co* implementation of functions\r
+"""\r
+\r
+import threading\r
+import Queue\r
+import pickle\r
+import functools\r
+import itertools\r
+import xml.sax\r
+import xml.parsers.expat\r
+\r
+\r
+def autostart(func):\r
+ """\r
+ >>> @autostart\r
+ ... def grep_sink(pattern):\r
+ ... print "Looking for %s" % pattern\r
+ ... while True:\r
+ ... line = yield\r
+ ... if pattern in line:\r
+ ... print line,\r
+ >>> g = grep_sink("python")\r
+ Looking for python\r
+ >>> g.send("Yeah but no but yeah but no")\r
+ >>> g.send("A series of tubes")\r
+ >>> g.send("python generators rock!")\r
+ python generators rock!\r
+ >>> g.close()\r
+ """\r
+\r
+ @functools.wraps(func)\r
+ def start(*args, **kwargs):\r
+ cr = func(*args, **kwargs)\r
+ cr.next()\r
+ return cr\r
+\r
+ return start\r
+\r
+\r
+@autostart\r
+def printer_sink(format = "%s"):\r
+ """\r
+ >>> pr = printer_sink("%r")\r
+ >>> pr.send("Hello")\r
+ 'Hello'\r
+ >>> pr.send("5")\r
+ '5'\r
+ >>> pr.send(5)\r
+ 5\r
+ >>> p = printer_sink()\r
+ >>> p.send("Hello")\r
+ Hello\r
+ >>> p.send("World")\r
+ World\r
+ >>> # p.throw(RuntimeError, "Goodbye")\r
+ >>> # p.send("Meh")\r
+ >>> # p.close()\r
+ """\r
+ while True:\r
+ item = yield\r
+ print format % (item, )\r
+\r
+\r
+@autostart\r
+def null_sink():\r
+ """\r
+ Good for uses like with cochain to pick up any slack\r
+ """\r
+ while True:\r
+ item = yield\r
+\r
+\r
+def itr_source(itr, target):\r
+ """\r
+ >>> itr_source(xrange(2), printer_sink())\r
+ 0\r
+ 1\r
+ """\r
+ for item in itr:\r
+ target.send(item)\r
+\r
+\r
+@autostart\r
+def cofilter(predicate, target):\r
+ """\r
+ >>> p = printer_sink()\r
+ >>> cf = cofilter(None, p)\r
+ >>> cf.send("")\r
+ >>> cf.send("Hello")\r
+ Hello\r
+ >>> cf.send([])\r
+ >>> cf.send([1, 2])\r
+ [1, 2]\r
+ >>> cf.send(False)\r
+ >>> cf.send(True)\r
+ True\r
+ >>> cf.send(0)\r
+ >>> cf.send(1)\r
+ 1\r
+ >>> # cf.throw(RuntimeError, "Goodbye")\r
+ >>> # cf.send(False)\r
+ >>> # cf.send(True)\r
+ >>> # cf.close()\r
+ """\r
+ if predicate is None:\r
+ predicate = bool\r
+\r
+ while True:\r
+ try:\r
+ item = yield\r
+ if predicate(item):\r
+ target.send(item)\r
+ except StandardError, e:\r
+ target.throw(e.__class__, e.message)\r
+\r
+\r
+@autostart\r
+def comap(function, target):\r
+ """\r
+ >>> p = printer_sink()\r
+ >>> cm = comap(lambda x: x+1, p)\r
+ >>> cm.send(0)\r
+ 1\r
+ >>> cm.send(1.0)\r
+ 2.0\r
+ >>> cm.send(-2)\r
+ -1\r
+ >>> # cm.throw(RuntimeError, "Goodbye")\r
+ >>> # cm.send(0)\r
+ >>> # cm.send(1.0)\r
+ >>> # cm.close()\r
+ """\r
+ while True:\r
+ try:\r
+ item = yield\r
+ mappedItem = function(item)\r
+ target.send(mappedItem)\r
+ except StandardError, e:\r
+ target.throw(e.__class__, e.message)\r
+\r
+\r
+def func_sink(function):\r
+ return comap(function, null_sink())\r
+\r
+\r
+def expand_positional(function):\r
+\r
+ @functools.wraps(function)\r
+ def expander(item):\r
+ return function(*item)\r
+\r
+ return expander\r
+\r
+\r
+@autostart\r
+def append_sink(l):\r
+ """\r
+ >>> l = []\r
+ >>> apps = append_sink(l)\r
+ >>> apps.send(1)\r
+ >>> apps.send(2)\r
+ >>> apps.send(3)\r
+ >>> print l\r
+ [1, 2, 3]\r
+ """\r
+ while True:\r
+ item = yield\r
+ l.append(item)\r
+\r
+\r
+@autostart\r
+def last_n_sink(l, n = 1):\r
+ """\r
+ >>> l = []\r
+ >>> lns = last_n_sink(l)\r
+ >>> lns.send(1)\r
+ >>> lns.send(2)\r
+ >>> lns.send(3)\r
+ >>> print l\r
+ [3]\r
+ """\r
+ del l[:]\r
+ while True:\r
+ item = yield\r
+ extraCount = len(l) - n + 1\r
+ if 0 < extraCount:\r
+ del l[0:extraCount]\r
+ l.append(item)\r
+\r
+\r
+@autostart\r
+def coreduce(target, function, initializer = None):\r
+ """\r
+ >>> reduceResult = []\r
+ >>> lns = last_n_sink(reduceResult)\r
+ >>> cr = coreduce(lns, lambda x, y: x + y, 0)\r
+ >>> cr.send(1)\r
+ >>> cr.send(2)\r
+ >>> cr.send(3)\r
+ >>> print reduceResult\r
+ [6]\r
+ >>> cr = coreduce(lns, lambda x, y: x + y)\r
+ >>> cr.send(1)\r
+ >>> cr.send(2)\r
+ >>> cr.send(3)\r
+ >>> print reduceResult\r
+ [6]\r
+ """\r
+ isFirst = True\r
+ cumulativeRef = initializer\r
+ while True:\r
+ item = yield\r
+ if isFirst and initializer is None:\r
+ cumulativeRef = item\r
+ else:\r
+ cumulativeRef = function(cumulativeRef, item)\r
+ target.send(cumulativeRef)\r
+ isFirst = False\r
+\r
+\r
+@autostart\r
+def cotee(targets):\r
+ """\r
+ Takes a sequence of coroutines and sends the received items to all of them\r
+\r
+ >>> ct = cotee((printer_sink("1 %s"), printer_sink("2 %s")))\r
+ >>> ct.send("Hello")\r
+ 1 Hello\r
+ 2 Hello\r
+ >>> ct.send("World")\r
+ 1 World\r
+ 2 World\r
+ >>> # ct.throw(RuntimeError, "Goodbye")\r
+ >>> # ct.send("Meh")\r
+ >>> # ct.close()\r
+ """\r
+ while True:\r
+ try:\r
+ item = yield\r
+ for target in targets:\r
+ target.send(item)\r
+ except StandardError, e:\r
+ for target in targets:\r
+ target.throw(e.__class__, e.message)\r
+\r
+\r
+class CoTee(object):\r
+ """\r
+ >>> ct = CoTee()\r
+ >>> ct.register_sink(printer_sink("1 %s"))\r
+ >>> ct.register_sink(printer_sink("2 %s"))\r
+ >>> ct.stage.send("Hello")\r
+ 1 Hello\r
+ 2 Hello\r
+ >>> ct.stage.send("World")\r
+ 1 World\r
+ 2 World\r
+ >>> ct.register_sink(printer_sink("3 %s"))\r
+ >>> ct.stage.send("Foo")\r
+ 1 Foo\r
+ 2 Foo\r
+ 3 Foo\r
+ >>> # ct.stage.throw(RuntimeError, "Goodbye")\r
+ >>> # ct.stage.send("Meh")\r
+ >>> # ct.stage.close()\r
+ """\r
+\r
+ def __init__(self):\r
+ self.stage = self._stage()\r
+ self._targets = []\r
+\r
+ def register_sink(self, sink):\r
+ self._targets.append(sink)\r
+\r
+ def unregister_sink(self, sink):\r
+ self._targets.remove(sink)\r
+\r
+ def restart(self):\r
+ self.stage = self._stage()\r
+\r
+ @autostart\r
+ def _stage(self):\r
+ while True:\r
+ try:\r
+ item = yield\r
+ for target in self._targets:\r
+ target.send(item)\r
+ except StandardError, e:\r
+ for target in self._targets:\r
+ target.throw(e.__class__, e.message)\r
+\r
+\r
+def _flush_queue(queue):\r
+ while not queue.empty():\r
+ yield queue.get()\r
+\r
+\r
+@autostart\r
+def cocount(target, start = 0):\r
+ """\r
+ >>> cc = cocount(printer_sink("%s"))\r
+ >>> cc.send("a")\r
+ 0\r
+ >>> cc.send(None)\r
+ 1\r
+ >>> cc.send([])\r
+ 2\r
+ >>> cc.send(0)\r
+ 3\r
+ """\r
+ for i in itertools.count(start):\r
+ item = yield\r
+ target.send(i)\r
+\r
+\r
+@autostart\r
+def coenumerate(target, start = 0):\r
+ """\r
+ >>> ce = coenumerate(printer_sink("%r"))\r
+ >>> ce.send("a")\r
+ (0, 'a')\r
+ >>> ce.send(None)\r
+ (1, None)\r
+ >>> ce.send([])\r
+ (2, [])\r
+ >>> ce.send(0)\r
+ (3, 0)\r
+ """\r
+ for i in itertools.count(start):\r
+ item = yield\r
+ decoratedItem = i, item\r
+ target.send(decoratedItem)\r
+\r
+\r
+@autostart\r
+def corepeat(target, elem):\r
+ """\r
+ >>> cr = corepeat(printer_sink("%s"), "Hello World")\r
+ >>> cr.send("a")\r
+ Hello World\r
+ >>> cr.send(None)\r
+ Hello World\r
+ >>> cr.send([])\r
+ Hello World\r
+ >>> cr.send(0)\r
+ Hello World\r
+ """\r
+ while True:\r
+ item = yield\r
+ target.send(elem)\r
+\r
+\r
+@autostart\r
+def cointercept(target, elems):\r
+ """\r
+ >>> cr = cointercept(printer_sink("%s"), [1, 2, 3, 4])\r
+ >>> cr.send("a")\r
+ 1\r
+ >>> cr.send(None)\r
+ 2\r
+ >>> cr.send([])\r
+ 3\r
+ >>> cr.send(0)\r
+ 4\r
+ >>> cr.send("Bye")\r
+ Traceback (most recent call last):\r
+ File "/usr/lib/python2.5/doctest.py", line 1228, in __run\r
+ compileflags, 1) in test.globs\r
+ File "<doctest __main__.cointercept[5]>", line 1, in <module>\r
+ cr.send("Bye")\r
+ StopIteration\r
+ """\r
+ item = yield\r
+ for elem in elems:\r
+ target.send(elem)\r
+ item = yield\r
+\r
+\r
+@autostart\r
+def codropwhile(target, pred):\r
+ """\r
+ >>> cdw = codropwhile(printer_sink("%s"), lambda x: x)\r
+ >>> cdw.send([0, 1, 2])\r
+ >>> cdw.send(1)\r
+ >>> cdw.send(True)\r
+ >>> cdw.send(False)\r
+ >>> cdw.send([0, 1, 2])\r
+ [0, 1, 2]\r
+ >>> cdw.send(1)\r
+ 1\r
+ >>> cdw.send(True)\r
+ True\r
+ """\r
+ while True:\r
+ item = yield\r
+ if not pred(item):\r
+ break\r
+\r
+ while True:\r
+ item = yield\r
+ target.send(item)\r
+\r
+\r
+@autostart\r
+def cotakewhile(target, pred):\r
+ """\r
+ >>> ctw = cotakewhile(printer_sink("%s"), lambda x: x)\r
+ >>> ctw.send([0, 1, 2])\r
+ [0, 1, 2]\r
+ >>> ctw.send(1)\r
+ 1\r
+ >>> ctw.send(True)\r
+ True\r
+ >>> ctw.send(False)\r
+ >>> ctw.send([0, 1, 2])\r
+ >>> ctw.send(1)\r
+ >>> ctw.send(True)\r
+ """\r
+ while True:\r
+ item = yield\r
+ if not pred(item):\r
+ break\r
+ target.send(item)\r
+\r
+ while True:\r
+ item = yield\r
+\r
+\r
+@autostart\r
+def coslice(target, lower, upper):\r
+ """\r
+ >>> cs = coslice(printer_sink("%r"), 3, 5)\r
+ >>> cs.send("0")\r
+ >>> cs.send("1")\r
+ >>> cs.send("2")\r
+ >>> cs.send("3")\r
+ '3'\r
+ >>> cs.send("4")\r
+ '4'\r
+ >>> cs.send("5")\r
+ >>> cs.send("6")\r
+ """\r
+ for i in xrange(lower):\r
+ item = yield\r
+ for i in xrange(upper - lower):\r
+ item = yield\r
+ target.send(item)\r
+ while True:\r
+ item = yield\r
+\r
+\r
+@autostart\r
+def cochain(targets):\r
+ """\r
+ >>> cr = cointercept(printer_sink("good %s"), [1, 2, 3, 4])\r
+ >>> cc = cochain([cr, printer_sink("end %s")])\r
+ >>> cc.send("a")\r
+ good 1\r
+ >>> cc.send(None)\r
+ good 2\r
+ >>> cc.send([])\r
+ good 3\r
+ >>> cc.send(0)\r
+ good 4\r
+ >>> cc.send("Bye")\r
+ end Bye\r
+ """\r
+ behind = []\r
+ for target in targets:\r
+ try:\r
+ while behind:\r
+ item = behind.pop()\r
+ target.send(item)\r
+ while True:\r
+ item = yield\r
+ target.send(item)\r
+ except StopIteration:\r
+ behind.append(item)\r
+\r
+\r
+@autostart\r
+def queue_sink(queue):\r
+ """\r
+ >>> q = Queue.Queue()\r
+ >>> qs = queue_sink(q)\r
+ >>> qs.send("Hello")\r
+ >>> qs.send("World")\r
+ >>> qs.throw(RuntimeError, "Goodbye")\r
+ >>> qs.send("Meh")\r
+ >>> qs.close()\r
+ >>> print [i for i in _flush_queue(q)]\r
+ [(None, 'Hello'), (None, 'World'), (<type 'exceptions.RuntimeError'>, 'Goodbye'), (None, 'Meh'), (<type 'exceptions.GeneratorExit'>, None)]\r
+ """\r
+ while True:\r
+ try:\r
+ item = yield\r
+ queue.put((None, item))\r
+ except StandardError, e:\r
+ queue.put((e.__class__, e.message))\r
+ except GeneratorExit:\r
+ queue.put((GeneratorExit, None))\r
+ raise\r
+\r
+\r
+def decode_item(item, target):\r
+ if item[0] is None:\r
+ target.send(item[1])\r
+ return False\r
+ elif item[0] is GeneratorExit:\r
+ target.close()\r
+ return True\r
+ else:\r
+ target.throw(item[0], item[1])\r
+ return False\r
+\r
+\r
+def queue_source(queue, target):\r
+ """\r
+ >>> q = Queue.Queue()\r
+ >>> for i in [\r
+ ... (None, 'Hello'),\r
+ ... (None, 'World'),\r
+ ... (GeneratorExit, None),\r
+ ... ]:\r
+ ... q.put(i)\r
+ >>> qs = queue_source(q, printer_sink())\r
+ Hello\r
+ World\r
+ """\r
+ isDone = False\r
+ while not isDone:\r
+ item = queue.get()\r
+ isDone = decode_item(item, target)\r
+\r
+\r
+def threaded_stage(target, thread_factory = threading.Thread):\r
+ messages = Queue.Queue()\r
+\r
+ run_source = functools.partial(queue_source, messages, target)\r
+ thread_factory(target=run_source).start()\r
+\r
+ # Sink running in current thread\r
+ return functools.partial(queue_sink, messages)\r
+\r
+\r
+@autostart\r
+def pickle_sink(f):\r
+ while True:\r
+ try:\r
+ item = yield\r
+ pickle.dump((None, item), f)\r
+ except StandardError, e:\r
+ pickle.dump((e.__class__, e.message), f)\r
+ except GeneratorExit:\r
+ pickle.dump((GeneratorExit, ), f)\r
+ raise\r
+ except StopIteration:\r
+ f.close()\r
+ return\r
+\r
+\r
+def pickle_source(f, target):\r
+ try:\r
+ isDone = False\r
+ while not isDone:\r
+ item = pickle.load(f)\r
+ isDone = decode_item(item, target)\r
+ except EOFError:\r
+ target.close()\r
+\r
+\r
+class EventHandler(object, xml.sax.ContentHandler):\r
+\r
+ START = "start"\r
+ TEXT = "text"\r
+ END = "end"\r
+\r
+ def __init__(self, target):\r
+ object.__init__(self)\r
+ xml.sax.ContentHandler.__init__(self)\r
+ self._target = target\r
+\r
+ def startElement(self, name, attrs):\r
+ self._target.send((self.START, (name, attrs._attrs)))\r
+\r
+ def characters(self, text):\r
+ self._target.send((self.TEXT, text))\r
+\r
+ def endElement(self, name):\r
+ self._target.send((self.END, name))\r
+\r
+\r
+def expat_parse(f, target):\r
+ parser = xml.parsers.expat.ParserCreate()\r
+ parser.buffer_size = 65536\r
+ parser.buffer_text = True\r
+ parser.returns_unicode = False\r
+ parser.StartElementHandler = lambda name, attrs: target.send(('start', (name, attrs)))\r
+ parser.EndElementHandler = lambda name: target.send(('end', name))\r
+ parser.CharacterDataHandler = lambda data: target.send(('text', data))\r
+ parser.ParseFile(f)\r
+\r
+\r
+if __name__ == "__main__":\r
+ import doctest\r
+ doctest.testmod()\r