1 #!/usr/bin/env python
\r
5 * Pull pipelining (iterators)
\r
6 * Push pipelining (coroutines)
\r
7 * State machines (coroutines)
\r
8 * "Cooperative multitasking" (coroutines)
\r
9 * Algorithm -> Object transform for cohesiveness (for example context managers) (coroutines)
\r
11 Design considerations
\r
12 * When should a stage pass on exceptions or have it thrown within it?
\r
13 * When should a stage pass on GeneratorExits?
\r
14 * Is there a way to either turn a push generator into a iterator or to use
\r
15 comprehensions syntax for push generators (I doubt it)
\r
16 * When should the stage try and send data in both directions
\r
17 * Since pull generators (generators), push generators (coroutines), subroutines, and coroutines are all coroutines, maybe we should rename the push generators to not confuse them, like signals/slots? and then refer to two-way generators as coroutines
\r
18 ** If so, make s* and co* implementation of functions
\r
27 import xml.parsers.expat
\r
30 def autostart(func):
\r
33 ... def grep_sink(pattern):
\r
34 ... print "Looking for %s" % pattern
\r
37 ... if pattern in line:
\r
39 >>> g = grep_sink("python")
\r
41 >>> g.send("Yeah but no but yeah but no")
\r
42 >>> g.send("A series of tubes")
\r
43 >>> g.send("python generators rock!")
\r
44 python generators rock!
\r
48 @functools.wraps(func)
\r
49 def start(*args, **kwargs):
\r
50 cr = func(*args, **kwargs)
\r
58 def printer_sink(format = "%s"):
\r
60 >>> pr = printer_sink("%r")
\r
61 >>> pr.send("Hello")
\r
67 >>> p = printer_sink()
\r
72 >>> # p.throw(RuntimeError, "Goodbye")
\r
78 print format % (item, )
\r
84 Good for uses like with cochain to pick up any slack
\r
90 def itr_source(itr, target):
\r
92 >>> itr_source(xrange(2), printer_sink())
\r
101 def cofilter(predicate, target):
\r
103 >>> p = printer_sink()
\r
104 >>> cf = cofilter(None, p)
\r
106 >>> cf.send("Hello")
\r
109 >>> cf.send([1, 2])
\r
117 >>> # cf.throw(RuntimeError, "Goodbye")
\r
118 >>> # cf.send(False)
\r
119 >>> # cf.send(True)
\r
122 if predicate is None:
\r
128 if predicate(item):
\r
130 except StandardError, e:
\r
131 target.throw(e.__class__, e.message)
\r
135 def comap(function, target):
\r
137 >>> p = printer_sink()
\r
138 >>> cm = comap(lambda x: x+1, p)
\r
145 >>> # cm.throw(RuntimeError, "Goodbye")
\r
153 mappedItem = function(item)
\r
154 target.send(mappedItem)
\r
155 except StandardError, e:
\r
156 target.throw(e.__class__, e.message)
\r
159 def func_sink(function):
\r
160 return comap(function, null_sink())
\r
163 def expand_positional(function):
\r
165 @functools.wraps(function)
\r
166 def expander(item):
\r
167 return function(*item)
\r
173 def append_sink(l):
\r
176 >>> apps = append_sink(l)
\r
189 def last_n_sink(l, n = 1):
\r
192 >>> lns = last_n_sink(l)
\r
202 extraCount = len(l) - n + 1
\r
204 del l[0:extraCount]
\r
209 def coreduce(target, function, initializer = None):
\r
211 >>> reduceResult = []
\r
212 >>> lns = last_n_sink(reduceResult)
\r
213 >>> cr = coreduce(lns, lambda x, y: x + y, 0)
\r
217 >>> print reduceResult
\r
219 >>> cr = coreduce(lns, lambda x, y: x + y)
\r
223 >>> print reduceResult
\r
227 cumulativeRef = initializer
\r
230 if isFirst and initializer is None:
\r
231 cumulativeRef = item
\r
233 cumulativeRef = function(cumulativeRef, item)
\r
234 target.send(cumulativeRef)
\r
239 def cotee(targets):
\r
241 Takes a sequence of coroutines and sends the received items to all of them
\r
243 >>> ct = cotee((printer_sink("1 %s"), printer_sink("2 %s")))
\r
244 >>> ct.send("Hello")
\r
247 >>> ct.send("World")
\r
250 >>> # ct.throw(RuntimeError, "Goodbye")
\r
251 >>> # ct.send("Meh")
\r
257 for target in targets:
\r
259 except StandardError, e:
\r
260 for target in targets:
\r
261 target.throw(e.__class__, e.message)
\r
264 class CoTee(object):
\r
267 >>> ct.register_sink(printer_sink("1 %s"))
\r
268 >>> ct.register_sink(printer_sink("2 %s"))
\r
269 >>> ct.stage.send("Hello")
\r
272 >>> ct.stage.send("World")
\r
275 >>> ct.register_sink(printer_sink("3 %s"))
\r
276 >>> ct.stage.send("Foo")
\r
280 >>> # ct.stage.throw(RuntimeError, "Goodbye")
\r
281 >>> # ct.stage.send("Meh")
\r
282 >>> # ct.stage.close()
\r
285 def __init__(self):
\r
286 self.stage = self._stage()
\r
289 def register_sink(self, sink):
\r
290 self._targets.append(sink)
\r
292 def unregister_sink(self, sink):
\r
293 self._targets.remove(sink)
\r
296 self.stage = self._stage()
\r
303 for target in self._targets:
\r
305 except StandardError, e:
\r
306 for target in self._targets:
\r
307 target.throw(e.__class__, e.message)
\r
310 def _flush_queue(queue):
\r
311 while not queue.empty():
\r
316 def cocount(target, start = 0):
\r
318 >>> cc = cocount(printer_sink("%s"))
\r
328 for i in itertools.count(start):
\r
334 def coenumerate(target, start = 0):
\r
336 >>> ce = coenumerate(printer_sink("%r"))
\r
346 for i in itertools.count(start):
\r
348 decoratedItem = i, item
\r
349 target.send(decoratedItem)
\r
353 def corepeat(target, elem):
\r
355 >>> cr = corepeat(printer_sink("%s"), "Hello World")
\r
371 def cointercept(target, elems):
\r
373 >>> cr = cointercept(printer_sink("%s"), [1, 2, 3, 4])
\r
383 Traceback (most recent call last):
\r
384 File "/usr/lib/python2.5/doctest.py", line 1228, in __run
\r
385 compileflags, 1) in test.globs
\r
386 File "<doctest __main__.cointercept[5]>", line 1, in <module>
\r
397 def codropwhile(target, pred):
\r
399 >>> cdw = codropwhile(printer_sink("%s"), lambda x: x)
\r
400 >>> cdw.send([0, 1, 2])
\r
403 >>> cdw.send(False)
\r
404 >>> cdw.send([0, 1, 2])
\r
422 def cotakewhile(target, pred):
\r
424 >>> ctw = cotakewhile(printer_sink("%s"), lambda x: x)
\r
425 >>> ctw.send([0, 1, 2])
\r
431 >>> ctw.send(False)
\r
432 >>> ctw.send([0, 1, 2])
\r
447 def coslice(target, lower, upper):
\r
449 >>> cs = coslice(printer_sink("%r"), 3, 5)
\r
460 for i in xrange(lower):
\r
462 for i in xrange(upper - lower):
\r
470 def cochain(targets):
\r
472 >>> cr = cointercept(printer_sink("good %s"), [1, 2, 3, 4])
\r
473 >>> cc = cochain([cr, printer_sink("end %s")])
\r
486 for target in targets:
\r
489 item = behind.pop()
\r
494 except StopIteration:
\r
495 behind.append(item)
\r
499 def queue_sink(queue):
\r
501 >>> q = Queue.Queue()
\r
502 >>> qs = queue_sink(q)
\r
503 >>> qs.send("Hello")
\r
504 >>> qs.send("World")
\r
505 >>> qs.throw(RuntimeError, "Goodbye")
\r
508 >>> print [i for i in _flush_queue(q)]
\r
509 [(None, 'Hello'), (None, 'World'), (<type 'exceptions.RuntimeError'>, 'Goodbye'), (None, 'Meh'), (<type 'exceptions.GeneratorExit'>, None)]
\r
514 queue.put((None, item))
\r
515 except StandardError, e:
\r
516 queue.put((e.__class__, e.message))
\r
517 except GeneratorExit:
\r
518 queue.put((GeneratorExit, None))
\r
522 def decode_item(item, target):
\r
523 if item[0] is None:
\r
524 target.send(item[1])
\r
526 elif item[0] is GeneratorExit:
\r
530 target.throw(item[0], item[1])
\r
534 def queue_source(queue, target):
\r
536 >>> q = Queue.Queue()
\r
538 ... (None, 'Hello'),
\r
539 ... (None, 'World'),
\r
540 ... (GeneratorExit, None),
\r
543 >>> qs = queue_source(q, printer_sink())
\r
550 isDone = decode_item(item, target)
\r
553 def threaded_stage(target, thread_factory = threading.Thread):
\r
554 messages = Queue.Queue()
\r
556 run_source = functools.partial(queue_source, messages, target)
\r
557 thread_factory(target=run_source).start()
\r
559 # Sink running in current thread
\r
560 return functools.partial(queue_sink, messages)
\r
564 def pickle_sink(f):
\r
568 pickle.dump((None, item), f)
\r
569 except StandardError, e:
\r
570 pickle.dump((e.__class__, e.message), f)
\r
571 except GeneratorExit:
\r
572 pickle.dump((GeneratorExit, ), f)
\r
574 except StopIteration:
\r
579 def pickle_source(f, target):
\r
583 item = pickle.load(f)
\r
584 isDone = decode_item(item, target)
\r
589 class EventHandler(object, xml.sax.ContentHandler):
\r
595 def __init__(self, target):
\r
596 object.__init__(self)
\r
597 xml.sax.ContentHandler.__init__(self)
\r
598 self._target = target
\r
600 def startElement(self, name, attrs):
\r
601 self._target.send((self.START, (name, attrs._attrs)))
\r
603 def characters(self, text):
\r
604 self._target.send((self.TEXT, text))
\r
606 def endElement(self, name):
\r
607 self._target.send((self.END, name))
\r
610 def expat_parse(f, target):
\r
611 parser = xml.parsers.expat.ParserCreate()
\r
612 parser.buffer_size = 65536
\r
613 parser.buffer_text = True
\r
614 parser.returns_unicode = False
\r
615 parser.StartElementHandler = lambda name, attrs: target.send(('start', (name, attrs)))
\r
616 parser.EndElementHandler = lambda name: target.send(('end', name))
\r
617 parser.CharacterDataHandler = lambda data: target.send(('text', data))
\r
618 parser.ParseFile(f)
\r
621 if __name__ == "__main__":
\r