5 * Pull pipelining (iterators)
6 * Push pipelining (coroutines)
7 * State machines (coroutines)
8 * "Cooperative multitasking" (coroutines)
9 * Algorithm -> Object transform for cohesiveness (for example context managers) (coroutines)
12 * When should a stage pass on exceptions or have it thrown within it?
13 * When should a stage pass on GeneratorExits?
14 * Is there a way to either turn a push generator into a iterator or to use
15 comprehensions syntax for push generators (I doubt it)
16 * When should the stage try and send data in both directions
17 * Since pull generators (generators), push generators (coroutines), subroutines, and coroutines are all coroutines, maybe we should rename the push generators to not confuse them, like signals/slots? and then refer to two-way generators as coroutines
18 ** If so, make s* and co* implementation of functions
27 import xml.parsers.expat
33 ... def grep_sink(pattern):
34 ... print "Looking for %s" % pattern
37 ... if pattern in line:
39 >>> g = grep_sink("python")
41 >>> g.send("Yeah but no but yeah but no")
42 >>> g.send("A series of tubes")
43 >>> g.send("python generators rock!")
44 python generators rock!
48 @functools.wraps(func)
49 def start(*args, **kwargs):
50 cr = func(*args, **kwargs)
58 def printer_sink(format = "%s"):
60 >>> pr = printer_sink("%r")
67 >>> p = printer_sink()
72 >>> # p.throw(RuntimeError, "Goodbye")
78 print format % (item, )
84 Good for uses like with cochain to pick up any slack
90 def itr_source(itr, target):
92 >>> itr_source(xrange(2), printer_sink())
101 def cofilter(predicate, target):
103 >>> p = printer_sink()
104 >>> cf = cofilter(None, p)
117 >>> # cf.throw(RuntimeError, "Goodbye")
122 if predicate is None:
130 except StandardError, e:
131 target.throw(e.__class__, e.message)
135 def comap(function, target):
137 >>> p = printer_sink()
138 >>> cm = comap(lambda x: x+1, p)
145 >>> # cm.throw(RuntimeError, "Goodbye")
153 mappedItem = function(item)
154 target.send(mappedItem)
155 except StandardError, e:
156 target.throw(e.__class__, e.message)
159 def func_sink(function):
160 return comap(function, null_sink())
167 >>> apps = append_sink(l)
180 def last_n_sink(l, n = 1):
183 >>> lns = last_n_sink(l)
193 extraCount = len(l) - n + 1
200 def coreduce(target, function, initializer = None):
202 >>> reduceResult = []
203 >>> lns = last_n_sink(reduceResult)
204 >>> cr = coreduce(lns, lambda x, y: x + y, 0)
208 >>> print reduceResult
210 >>> cr = coreduce(lns, lambda x, y: x + y)
214 >>> print reduceResult
218 cumulativeRef = initializer
221 if isFirst and initializer is None:
224 cumulativeRef = function(cumulativeRef, item)
225 target.send(cumulativeRef)
232 Takes a sequence of coroutines and sends the received items to all of them
234 >>> ct = cotee((printer_sink("1 %s"), printer_sink("2 %s")))
241 >>> # ct.throw(RuntimeError, "Goodbye")
248 for target in targets:
250 except StandardError, e:
251 for target in targets:
252 target.throw(e.__class__, e.message)
258 >>> ct.register_sink(printer_sink("1 %s"))
259 >>> ct.register_sink(printer_sink("2 %s"))
260 >>> ct.stage.send("Hello")
263 >>> ct.stage.send("World")
266 >>> ct.register_sink(printer_sink("3 %s"))
267 >>> ct.stage.send("Foo")
271 >>> # ct.stage.throw(RuntimeError, "Goodbye")
272 >>> # ct.stage.send("Meh")
273 >>> # ct.stage.close()
277 self.stage = self._stage()
280 def register_sink(self, sink):
281 self._targets.append(sink)
283 def unregister_sink(self, sink):
284 self._targets.remove(sink)
287 self.stage = self._stage()
294 for target in self._targets:
296 except StandardError, e:
297 for target in self._targets:
298 target.throw(e.__class__, e.message)
301 def _flush_queue(queue):
302 while not queue.empty():
307 def cocount(target, start = 0):
309 >>> cc = cocount(printer_sink("%s"))
319 for i in itertools.count(start):
325 def coenumerate(target, start = 0):
327 >>> ce = coenumerate(printer_sink("%r"))
337 for i in itertools.count(start):
339 decoratedItem = i, item
340 target.send(decoratedItem)
344 def corepeat(target, elem):
346 >>> cr = corepeat(printer_sink("%s"), "Hello World")
362 def cointercept(target, elems):
364 >>> cr = cointercept(printer_sink("%s"), [1, 2, 3, 4])
374 Traceback (most recent call last):
375 File "/usr/lib/python2.5/doctest.py", line 1228, in __run
376 compileflags, 1) in test.globs
377 File "<doctest __main__.cointercept[5]>", line 1, in <module>
388 def codropwhile(target, pred):
390 >>> cdw = codropwhile(printer_sink("%s"), lambda x: x)
391 >>> cdw.send([0, 1, 2])
395 >>> cdw.send([0, 1, 2])
413 def cotakewhile(target, pred):
415 >>> ctw = cotakewhile(printer_sink("%s"), lambda x: x)
416 >>> ctw.send([0, 1, 2])
423 >>> ctw.send([0, 1, 2])
438 def coslice(target, lower, upper):
440 >>> cs = coslice(printer_sink("%r"), 3, 5)
451 for i in xrange(lower):
453 for i in xrange(upper - lower):
461 def cochain(targets):
463 >>> cr = cointercept(printer_sink("good %s"), [1, 2, 3, 4])
464 >>> cc = cochain([cr, printer_sink("end %s")])
477 for target in targets:
485 except StopIteration:
490 def queue_sink(queue):
492 >>> q = Queue.Queue()
493 >>> qs = queue_sink(q)
496 >>> qs.throw(RuntimeError, "Goodbye")
499 >>> print [i for i in _flush_queue(q)]
500 [(None, 'Hello'), (None, 'World'), (<type 'exceptions.RuntimeError'>, 'Goodbye'), (None, 'Meh'), (<type 'exceptions.GeneratorExit'>, None)]
505 queue.put((None, item))
506 except StandardError, e:
507 queue.put((e.__class__, e.message))
508 except GeneratorExit:
509 queue.put((GeneratorExit, None))
513 def decode_item(item, target):
517 elif item[0] is GeneratorExit:
521 target.throw(item[0], item[1])
525 def queue_source(queue, target):
527 >>> q = Queue.Queue()
531 ... (GeneratorExit, None),
534 >>> qs = queue_source(q, printer_sink())
541 isDone = decode_item(item, target)
544 def threaded_stage(target, thread_factory = threading.Thread):
545 messages = Queue.Queue()
547 run_source = functools.partial(queue_source, messages, target)
548 thread_factory(target=run_source).start()
550 # Sink running in current thread
551 return functools.partial(queue_sink, messages)
559 pickle.dump((None, item), f)
560 except StandardError, e:
561 pickle.dump((e.__class__, e.message), f)
562 except GeneratorExit:
563 pickle.dump((GeneratorExit, ), f)
565 except StopIteration:
570 def pickle_source(f, target):
574 item = pickle.load(f)
575 isDone = decode_item(item, target)
580 class EventHandler(object, xml.sax.ContentHandler):
586 def __init__(self, target):
587 object.__init__(self)
588 xml.sax.ContentHandler.__init__(self)
589 self._target = target
591 def startElement(self, name, attrs):
592 self._target.send((self.START, (name, attrs._attrs)))
594 def characters(self, text):
595 self._target.send((self.TEXT, text))
597 def endElement(self, name):
598 self._target.send((self.END, name))
601 def expat_parse(f, target):
602 parser = xml.parsers.expat.ParserCreate()
603 parser.buffer_size = 65536
604 parser.buffer_text = True
605 parser.returns_unicode = False
606 parser.StartElementHandler = lambda name, attrs: target.send(('start', (name, attrs)))
607 parser.EndElementHandler = lambda name: target.send(('end', name))
608 parser.CharacterDataHandler = lambda data: target.send(('text', data))
612 if __name__ == "__main__":