1 #!/usr/bin/env python
\r
5 * Pull pipelining (iterators)
\r
6 * Push pipelining (coroutines)
\r
7 * State machines (coroutines)
\r
8 * "Cooperative multitasking" (coroutines)
\r
9 * Algorithm -> Object transform for cohesiveness (for example context managers) (coroutines)
\r
11 Design considerations
\r
12 * When should a stage pass on exceptions or have it thrown within it?
\r
13 * When should a stage pass on GeneratorExits?
\r
14 * Is there a way to either turn a push generator into a iterator or to use
\r
15 comprehensions syntax for push generators (I doubt it)
\r
16 * When should the stage try and send data in both directions
\r
17 * Since pull generators (generators), push generators (coroutines), subroutines, and coroutines are all coroutines, maybe we should rename the push generators to not confuse them, like signals/slots? and then refer to two-way generators as coroutines
\r
18 ** If so, make s* and co* implementation of functions
\r
27 import xml.parsers.expat
\r
30 def autostart(func):
\r
33 ... def grep_sink(pattern):
\r
34 ... print "Looking for %s" % pattern
\r
37 ... if pattern in line:
\r
39 >>> g = grep_sink("python")
\r
41 >>> g.send("Yeah but no but yeah but no")
\r
42 >>> g.send("A series of tubes")
\r
43 >>> g.send("python generators rock!")
\r
44 python generators rock!
\r
48 @functools.wraps(func)
\r
49 def start(*args, **kwargs):
\r
50 cr = func(*args, **kwargs)
\r
58 def printer_sink(format = "%s"):
\r
60 >>> pr = printer_sink("%r")
\r
61 >>> pr.send("Hello")
\r
67 >>> p = printer_sink()
\r
72 >>> # p.throw(RuntimeError, "Goodbye")
\r
78 print format % (item, )
\r
84 Good for uses like with cochain to pick up any slack
\r
90 def itr_source(itr, target):
\r
92 >>> itr_source(xrange(2), printer_sink())
\r
101 def cofilter(predicate, target):
\r
103 >>> p = printer_sink()
\r
104 >>> cf = cofilter(None, p)
\r
106 >>> cf.send("Hello")
\r
109 >>> cf.send([1, 2])
\r
117 >>> # cf.throw(RuntimeError, "Goodbye")
\r
118 >>> # cf.send(False)
\r
119 >>> # cf.send(True)
\r
122 if predicate is None:
\r
128 if predicate(item):
\r
130 except StandardError, e:
\r
131 target.throw(e.__class__, e.message)
\r
135 def comap(function, target):
\r
137 >>> p = printer_sink()
\r
138 >>> cm = comap(lambda x: x+1, p)
\r
145 >>> # cm.throw(RuntimeError, "Goodbye")
\r
153 mappedItem = function(item)
\r
154 target.send(mappedItem)
\r
155 except StandardError, e:
\r
156 target.throw(e.__class__, e.message)
\r
160 def append_sink(l):
\r
163 >>> apps = append_sink(l)
\r
176 def last_n_sink(l, n = 1):
\r
179 >>> lns = last_n_sink(l)
\r
189 extraCount = len(l) - n + 1
\r
191 del l[0:extraCount]
\r
196 def coreduce(target, function, initializer = None):
\r
198 >>> reduceResult = []
\r
199 >>> lns = last_n_sink(reduceResult)
\r
200 >>> cr = coreduce(lns, lambda x, y: x + y, 0)
\r
204 >>> print reduceResult
\r
206 >>> cr = coreduce(lns, lambda x, y: x + y)
\r
210 >>> print reduceResult
\r
214 cumulativeRef = initializer
\r
217 if isFirst and initializer is None:
\r
218 cumulativeRef = item
\r
220 cumulativeRef = function(cumulativeRef, item)
\r
221 target.send(cumulativeRef)
\r
226 def cotee(targets):
\r
228 Takes a sequence of coroutines and sends the received items to all of them
\r
230 >>> ct = cotee((printer_sink("1 %s"), printer_sink("2 %s")))
\r
231 >>> ct.send("Hello")
\r
234 >>> ct.send("World")
\r
237 >>> # ct.throw(RuntimeError, "Goodbye")
\r
238 >>> # ct.send("Meh")
\r
244 for target in targets:
\r
246 except StandardError, e:
\r
247 for target in targets:
\r
248 target.throw(e.__class__, e.message)
\r
251 class CoTee(object):
\r
254 >>> ct.register_sink(printer_sink("1 %s"))
\r
255 >>> ct.register_sink(printer_sink("2 %s"))
\r
256 >>> ct.stage.send("Hello")
\r
259 >>> ct.stage.send("World")
\r
262 >>> ct.register_sink(printer_sink("3 %s"))
\r
263 >>> ct.stage.send("Foo")
\r
267 >>> # ct.stage.throw(RuntimeError, "Goodbye")
\r
268 >>> # ct.stage.send("Meh")
\r
269 >>> # ct.stage.close()
\r
272 def __init__(self):
\r
273 self.stage = self._stage()
\r
276 def register_sink(self, sink):
\r
277 self._targets.append(sink)
\r
279 def unregister_sink(self, sink):
\r
280 self._targets.remove(sink)
\r
283 self.stage = self._stage()
\r
290 for target in self._targets:
\r
292 except StandardError, e:
\r
293 for target in self._targets:
\r
294 target.throw(e.__class__, e.message)
\r
297 def _flush_queue(queue):
\r
298 while not queue.empty():
\r
303 def cocount(target, start = 0):
\r
305 >>> cc = cocount(printer_sink("%s"))
\r
315 for i in itertools.count(start):
\r
321 def coenumerate(target, start = 0):
\r
323 >>> ce = coenumerate(printer_sink("%r"))
\r
333 for i in itertools.count(start):
\r
335 decoratedItem = i, item
\r
336 target.send(decoratedItem)
\r
340 def corepeat(target, elem):
\r
342 >>> cr = corepeat(printer_sink("%s"), "Hello World")
\r
358 def cointercept(target, elems):
\r
360 >>> cr = cointercept(printer_sink("%s"), [1, 2, 3, 4])
\r
370 Traceback (most recent call last):
\r
371 File "/usr/lib/python2.5/doctest.py", line 1228, in __run
\r
372 compileflags, 1) in test.globs
\r
373 File "<doctest __main__.cointercept[5]>", line 1, in <module>
\r
384 def codropwhile(target, pred):
\r
386 >>> cdw = codropwhile(printer_sink("%s"), lambda x: x)
\r
387 >>> cdw.send([0, 1, 2])
\r
390 >>> cdw.send(False)
\r
391 >>> cdw.send([0, 1, 2])
\r
409 def cotakewhile(target, pred):
\r
411 >>> ctw = cotakewhile(printer_sink("%s"), lambda x: x)
\r
412 >>> ctw.send([0, 1, 2])
\r
418 >>> ctw.send(False)
\r
419 >>> ctw.send([0, 1, 2])
\r
434 def coslice(target, lower, upper):
\r
436 >>> cs = coslice(printer_sink("%r"), 3, 5)
\r
447 for i in xrange(lower):
\r
449 for i in xrange(upper - lower):
\r
457 def cochain(targets):
\r
459 >>> cr = cointercept(printer_sink("good %s"), [1, 2, 3, 4])
\r
460 >>> cc = cochain([cr, printer_sink("end %s")])
\r
473 for target in targets:
\r
476 item = behind.pop()
\r
481 except StopIteration:
\r
482 behind.append(item)
\r
486 def queue_sink(queue):
\r
488 >>> q = Queue.Queue()
\r
489 >>> qs = queue_sink(q)
\r
490 >>> qs.send("Hello")
\r
491 >>> qs.send("World")
\r
492 >>> qs.throw(RuntimeError, "Goodbye")
\r
495 >>> print [i for i in _flush_queue(q)]
\r
496 [(None, 'Hello'), (None, 'World'), (<type 'exceptions.RuntimeError'>, 'Goodbye'), (None, 'Meh'), (<type 'exceptions.GeneratorExit'>, None)]
\r
501 queue.put((None, item))
\r
502 except StandardError, e:
\r
503 queue.put((e.__class__, e.message))
\r
504 except GeneratorExit:
\r
505 queue.put((GeneratorExit, None))
\r
509 def decode_item(item, target):
\r
510 if item[0] is None:
\r
511 target.send(item[1])
\r
513 elif item[0] is GeneratorExit:
\r
517 target.throw(item[0], item[1])
\r
521 def queue_source(queue, target):
\r
523 >>> q = Queue.Queue()
\r
525 ... (None, 'Hello'),
\r
526 ... (None, 'World'),
\r
527 ... (GeneratorExit, None),
\r
530 >>> qs = queue_source(q, printer_sink())
\r
537 isDone = decode_item(item, target)
\r
540 def threaded_stage(target, thread_factory = threading.Thread):
\r
541 messages = Queue.Queue()
\r
543 run_source = functools.partial(queue_source, messages, target)
\r
544 thread_factory(target=run_source).start()
\r
546 # Sink running in current thread
\r
547 return functools.partial(queue_sink, messages)
\r
551 def pickle_sink(f):
\r
555 pickle.dump((None, item), f)
\r
556 except StandardError, e:
\r
557 pickle.dump((e.__class__, e.message), f)
\r
558 except GeneratorExit:
\r
559 pickle.dump((GeneratorExit, ), f)
\r
561 except StopIteration:
\r
566 def pickle_source(f, target):
\r
570 item = pickle.load(f)
\r
571 isDone = decode_item(item, target)
\r
576 class EventHandler(object, xml.sax.ContentHandler):
\r
582 def __init__(self, target):
\r
583 object.__init__(self)
\r
584 xml.sax.ContentHandler.__init__(self)
\r
585 self._target = target
\r
587 def startElement(self, name, attrs):
\r
588 self._target.send((self.START, (name, attrs._attrs)))
\r
590 def characters(self, text):
\r
591 self._target.send((self.TEXT, text))
\r
593 def endElement(self, name):
\r
594 self._target.send((self.END, name))
\r
597 def expat_parse(f, target):
\r
598 parser = xml.parsers.expat.ParserCreate()
\r
599 parser.buffer_size = 65536
\r
600 parser.buffer_text = True
\r
601 parser.returns_unicode = False
\r
602 parser.StartElementHandler = lambda name, attrs: target.send(('start', (name, attrs)))
\r
603 parser.EndElementHandler = lambda name: target.send(('end', name))
\r
604 parser.CharacterDataHandler = lambda data: target.send(('text', data))
\r
605 parser.ParseFile(f)
\r
608 if __name__ == "__main__":
\r