Updating CHANGES
[gonvert] / gonvert / util / coroutines.py
1 #!/usr/bin/env python\r
2 \r
3 """\r
4 Uses for generators\r
5 * Pull pipelining (iterators)\r
6 * Push pipelining (coroutines)\r
7 * State machines (coroutines)\r
8 * "Cooperative multitasking" (coroutines)\r
9 * Algorithm -> Object transform for cohesiveness (for example context managers) (coroutines)\r
10 \r
11 Design considerations\r
12 * When should a stage pass on exceptions or have it thrown within it?\r
13 * When should a stage pass on GeneratorExits?\r
14 * Is there a way to either turn a push generator into a iterator or to use\r
15         comprehensions syntax for push generators (I doubt it)\r
16 * When should the stage try and send data in both directions\r
17 * Since pull generators (generators), push generators (coroutines), subroutines, and coroutines are all coroutines, maybe we should rename the push generators to not confuse them, like signals/slots? and then refer to two-way generators as coroutines\r
18 ** If so, make s* and co* implementation of functions\r
19 """\r
20 \r
21 import threading\r
22 import Queue\r
23 import pickle\r
24 import functools\r
25 import itertools\r
26 import xml.sax\r
27 import xml.parsers.expat\r
28 \r
29 \r
30 def autostart(func):\r
31         """\r
32         >>> @autostart\r
33         ... def grep_sink(pattern):\r
34         ...     print "Looking for %s" % pattern\r
35         ...     while True:\r
36         ...             line = yield\r
37         ...             if pattern in line:\r
38         ...                     print line,\r
39         >>> g = grep_sink("python")\r
40         Looking for python\r
41         >>> g.send("Yeah but no but yeah but no")\r
42         >>> g.send("A series of tubes")\r
43         >>> g.send("python generators rock!")\r
44         python generators rock!\r
45         >>> g.close()\r
46         """\r
47 \r
48         @functools.wraps(func)\r
49         def start(*args, **kwargs):\r
50                 cr = func(*args, **kwargs)\r
51                 cr.next()\r
52                 return cr\r
53 \r
54         return start\r
55 \r
56 \r
57 @autostart\r
58 def printer_sink(format = "%s"):\r
59         """\r
60         >>> pr = printer_sink("%r")\r
61         >>> pr.send("Hello")\r
62         'Hello'\r
63         >>> pr.send("5")\r
64         '5'\r
65         >>> pr.send(5)\r
66         5\r
67         >>> p = printer_sink()\r
68         >>> p.send("Hello")\r
69         Hello\r
70         >>> p.send("World")\r
71         World\r
72         >>> # p.throw(RuntimeError, "Goodbye")\r
73         >>> # p.send("Meh")\r
74         >>> # p.close()\r
75         """\r
76         while True:\r
77                 item = yield\r
78                 print format % (item, )\r
79 \r
80 \r
81 @autostart\r
82 def null_sink():\r
83         """\r
84         Good for uses like with cochain to pick up any slack\r
85         """\r
86         while True:\r
87                 item = yield\r
88 \r
89 \r
90 def itr_source(itr, target):\r
91         """\r
92         >>> itr_source(xrange(2), printer_sink())\r
93         0\r
94         1\r
95         """\r
96         for item in itr:\r
97                 target.send(item)\r
98 \r
99 \r
100 @autostart\r
101 def cofilter(predicate, target):\r
102         """\r
103         >>> p = printer_sink()\r
104         >>> cf = cofilter(None, p)\r
105         >>> cf.send("")\r
106         >>> cf.send("Hello")\r
107         Hello\r
108         >>> cf.send([])\r
109         >>> cf.send([1, 2])\r
110         [1, 2]\r
111         >>> cf.send(False)\r
112         >>> cf.send(True)\r
113         True\r
114         >>> cf.send(0)\r
115         >>> cf.send(1)\r
116         1\r
117         >>> # cf.throw(RuntimeError, "Goodbye")\r
118         >>> # cf.send(False)\r
119         >>> # cf.send(True)\r
120         >>> # cf.close()\r
121         """\r
122         if predicate is None:\r
123                 predicate = bool\r
124 \r
125         while True:\r
126                 try:\r
127                         item = yield\r
128                         if predicate(item):\r
129                                 target.send(item)\r
130                 except StandardError, e:\r
131                         target.throw(e.__class__, e.message)\r
132 \r
133 \r
134 @autostart\r
135 def comap(function, target):\r
136         """\r
137         >>> p = printer_sink()\r
138         >>> cm = comap(lambda x: x+1, p)\r
139         >>> cm.send(0)\r
140         1\r
141         >>> cm.send(1.0)\r
142         2.0\r
143         >>> cm.send(-2)\r
144         -1\r
145         >>> # cm.throw(RuntimeError, "Goodbye")\r
146         >>> # cm.send(0)\r
147         >>> # cm.send(1.0)\r
148         >>> # cm.close()\r
149         """\r
150         while True:\r
151                 try:\r
152                         item = yield\r
153                         mappedItem = function(item)\r
154                         target.send(mappedItem)\r
155                 except StandardError, e:\r
156                         target.throw(e.__class__, e.message)\r
157 \r
158 \r
159 def func_sink(function):\r
160         return comap(function, null_sink())\r
161 \r
162 \r
163 def expand_positional(function):\r
164 \r
165         @functools.wraps(function)\r
166         def expander(item):\r
167                 return function(*item)\r
168 \r
169         return expander\r
170 \r
171 \r
172 @autostart\r
173 def append_sink(l):\r
174         """\r
175         >>> l = []\r
176         >>> apps = append_sink(l)\r
177         >>> apps.send(1)\r
178         >>> apps.send(2)\r
179         >>> apps.send(3)\r
180         >>> print l\r
181         [1, 2, 3]\r
182         """\r
183         while True:\r
184                 item = yield\r
185                 l.append(item)\r
186 \r
187 \r
188 @autostart\r
189 def last_n_sink(l, n = 1):\r
190         """\r
191         >>> l = []\r
192         >>> lns = last_n_sink(l)\r
193         >>> lns.send(1)\r
194         >>> lns.send(2)\r
195         >>> lns.send(3)\r
196         >>> print l\r
197         [3]\r
198         """\r
199         del l[:]\r
200         while True:\r
201                 item = yield\r
202                 extraCount = len(l) - n + 1\r
203                 if 0 < extraCount:\r
204                         del l[0:extraCount]\r
205                 l.append(item)\r
206 \r
207 \r
208 @autostart\r
209 def coreduce(target, function, initializer = None):\r
210         """\r
211         >>> reduceResult = []\r
212         >>> lns = last_n_sink(reduceResult)\r
213         >>> cr = coreduce(lns, lambda x, y: x + y, 0)\r
214         >>> cr.send(1)\r
215         >>> cr.send(2)\r
216         >>> cr.send(3)\r
217         >>> print reduceResult\r
218         [6]\r
219         >>> cr = coreduce(lns, lambda x, y: x + y)\r
220         >>> cr.send(1)\r
221         >>> cr.send(2)\r
222         >>> cr.send(3)\r
223         >>> print reduceResult\r
224         [6]\r
225         """\r
226         isFirst = True\r
227         cumulativeRef = initializer\r
228         while True:\r
229                 item = yield\r
230                 if isFirst and initializer is None:\r
231                         cumulativeRef = item\r
232                 else:\r
233                         cumulativeRef = function(cumulativeRef, item)\r
234                 target.send(cumulativeRef)\r
235                 isFirst = False\r
236 \r
237 \r
238 @autostart\r
239 def cotee(targets):\r
240         """\r
241         Takes a sequence of coroutines and sends the received items to all of them\r
242 \r
243         >>> ct = cotee((printer_sink("1 %s"), printer_sink("2 %s")))\r
244         >>> ct.send("Hello")\r
245         1 Hello\r
246         2 Hello\r
247         >>> ct.send("World")\r
248         1 World\r
249         2 World\r
250         >>> # ct.throw(RuntimeError, "Goodbye")\r
251         >>> # ct.send("Meh")\r
252         >>> # ct.close()\r
253         """\r
254         while True:\r
255                 try:\r
256                         item = yield\r
257                         for target in targets:\r
258                                 target.send(item)\r
259                 except StandardError, e:\r
260                         for target in targets:\r
261                                 target.throw(e.__class__, e.message)\r
262 \r
263 \r
264 class CoTee(object):\r
265         """\r
266         >>> ct = CoTee()\r
267         >>> ct.register_sink(printer_sink("1 %s"))\r
268         >>> ct.register_sink(printer_sink("2 %s"))\r
269         >>> ct.stage.send("Hello")\r
270         1 Hello\r
271         2 Hello\r
272         >>> ct.stage.send("World")\r
273         1 World\r
274         2 World\r
275         >>> ct.register_sink(printer_sink("3 %s"))\r
276         >>> ct.stage.send("Foo")\r
277         1 Foo\r
278         2 Foo\r
279         3 Foo\r
280         >>> # ct.stage.throw(RuntimeError, "Goodbye")\r
281         >>> # ct.stage.send("Meh")\r
282         >>> # ct.stage.close()\r
283         """\r
284 \r
285         def __init__(self):\r
286                 self.stage = self._stage()\r
287                 self._targets = []\r
288 \r
289         def register_sink(self, sink):\r
290                 self._targets.append(sink)\r
291 \r
292         def unregister_sink(self, sink):\r
293                 self._targets.remove(sink)\r
294 \r
295         def restart(self):\r
296                 self.stage = self._stage()\r
297 \r
298         @autostart\r
299         def _stage(self):\r
300                 while True:\r
301                         try:\r
302                                 item = yield\r
303                                 for target in self._targets:\r
304                                         target.send(item)\r
305                         except StandardError, e:\r
306                                 for target in self._targets:\r
307                                         target.throw(e.__class__, e.message)\r
308 \r
309 \r
310 def _flush_queue(queue):\r
311         while not queue.empty():\r
312                 yield queue.get()\r
313 \r
314 \r
315 @autostart\r
316 def cocount(target, start = 0):\r
317         """\r
318         >>> cc = cocount(printer_sink("%s"))\r
319         >>> cc.send("a")\r
320         0\r
321         >>> cc.send(None)\r
322         1\r
323         >>> cc.send([])\r
324         2\r
325         >>> cc.send(0)\r
326         3\r
327         """\r
328         for i in itertools.count(start):\r
329                 item = yield\r
330                 target.send(i)\r
331 \r
332 \r
333 @autostart\r
334 def coenumerate(target, start = 0):\r
335         """\r
336         >>> ce = coenumerate(printer_sink("%r"))\r
337         >>> ce.send("a")\r
338         (0, 'a')\r
339         >>> ce.send(None)\r
340         (1, None)\r
341         >>> ce.send([])\r
342         (2, [])\r
343         >>> ce.send(0)\r
344         (3, 0)\r
345         """\r
346         for i in itertools.count(start):\r
347                 item = yield\r
348                 decoratedItem = i, item\r
349                 target.send(decoratedItem)\r
350 \r
351 \r
352 @autostart\r
353 def corepeat(target, elem):\r
354         """\r
355         >>> cr = corepeat(printer_sink("%s"), "Hello World")\r
356         >>> cr.send("a")\r
357         Hello World\r
358         >>> cr.send(None)\r
359         Hello World\r
360         >>> cr.send([])\r
361         Hello World\r
362         >>> cr.send(0)\r
363         Hello World\r
364         """\r
365         while True:\r
366                 item = yield\r
367                 target.send(elem)\r
368 \r
369 \r
370 @autostart\r
371 def cointercept(target, elems):\r
372         """\r
373         >>> cr = cointercept(printer_sink("%s"), [1, 2, 3, 4])\r
374         >>> cr.send("a")\r
375         1\r
376         >>> cr.send(None)\r
377         2\r
378         >>> cr.send([])\r
379         3\r
380         >>> cr.send(0)\r
381         4\r
382         >>> cr.send("Bye")\r
383         Traceback (most recent call last):\r
384           File "/usr/lib/python2.5/doctest.py", line 1228, in __run\r
385             compileflags, 1) in test.globs\r
386           File "<doctest __main__.cointercept[5]>", line 1, in <module>\r
387             cr.send("Bye")\r
388         StopIteration\r
389         """\r
390         item = yield\r
391         for elem in elems:\r
392                 target.send(elem)\r
393                 item = yield\r
394 \r
395 \r
396 @autostart\r
397 def codropwhile(target, pred):\r
398         """\r
399         >>> cdw = codropwhile(printer_sink("%s"), lambda x: x)\r
400         >>> cdw.send([0, 1, 2])\r
401         >>> cdw.send(1)\r
402         >>> cdw.send(True)\r
403         >>> cdw.send(False)\r
404         >>> cdw.send([0, 1, 2])\r
405         [0, 1, 2]\r
406         >>> cdw.send(1)\r
407         1\r
408         >>> cdw.send(True)\r
409         True\r
410         """\r
411         while True:\r
412                 item = yield\r
413                 if not pred(item):\r
414                         break\r
415 \r
416         while True:\r
417                 item = yield\r
418                 target.send(item)\r
419 \r
420 \r
421 @autostart\r
422 def cotakewhile(target, pred):\r
423         """\r
424         >>> ctw = cotakewhile(printer_sink("%s"), lambda x: x)\r
425         >>> ctw.send([0, 1, 2])\r
426         [0, 1, 2]\r
427         >>> ctw.send(1)\r
428         1\r
429         >>> ctw.send(True)\r
430         True\r
431         >>> ctw.send(False)\r
432         >>> ctw.send([0, 1, 2])\r
433         >>> ctw.send(1)\r
434         >>> ctw.send(True)\r
435         """\r
436         while True:\r
437                 item = yield\r
438                 if not pred(item):\r
439                         break\r
440                 target.send(item)\r
441 \r
442         while True:\r
443                 item = yield\r
444 \r
445 \r
446 @autostart\r
447 def coslice(target, lower, upper):\r
448         """\r
449         >>> cs = coslice(printer_sink("%r"), 3, 5)\r
450         >>> cs.send("0")\r
451         >>> cs.send("1")\r
452         >>> cs.send("2")\r
453         >>> cs.send("3")\r
454         '3'\r
455         >>> cs.send("4")\r
456         '4'\r
457         >>> cs.send("5")\r
458         >>> cs.send("6")\r
459         """\r
460         for i in xrange(lower):\r
461                 item = yield\r
462         for i in xrange(upper - lower):\r
463                 item = yield\r
464                 target.send(item)\r
465         while True:\r
466                 item = yield\r
467 \r
468 \r
469 @autostart\r
470 def cochain(targets):\r
471         """\r
472         >>> cr = cointercept(printer_sink("good %s"), [1, 2, 3, 4])\r
473         >>> cc = cochain([cr, printer_sink("end %s")])\r
474         >>> cc.send("a")\r
475         good 1\r
476         >>> cc.send(None)\r
477         good 2\r
478         >>> cc.send([])\r
479         good 3\r
480         >>> cc.send(0)\r
481         good 4\r
482         >>> cc.send("Bye")\r
483         end Bye\r
484         """\r
485         behind = []\r
486         for target in targets:\r
487                 try:\r
488                         while behind:\r
489                                 item = behind.pop()\r
490                                 target.send(item)\r
491                         while True:\r
492                                 item = yield\r
493                                 target.send(item)\r
494                 except StopIteration:\r
495                         behind.append(item)\r
496 \r
497 \r
498 @autostart\r
499 def queue_sink(queue):\r
500         """\r
501         >>> q = Queue.Queue()\r
502         >>> qs = queue_sink(q)\r
503         >>> qs.send("Hello")\r
504         >>> qs.send("World")\r
505         >>> qs.throw(RuntimeError, "Goodbye")\r
506         >>> qs.send("Meh")\r
507         >>> qs.close()\r
508         >>> print [i for i in _flush_queue(q)]\r
509         [(None, 'Hello'), (None, 'World'), (<type 'exceptions.RuntimeError'>, 'Goodbye'), (None, 'Meh'), (<type 'exceptions.GeneratorExit'>, None)]\r
510         """\r
511         while True:\r
512                 try:\r
513                         item = yield\r
514                         queue.put((None, item))\r
515                 except StandardError, e:\r
516                         queue.put((e.__class__, e.message))\r
517                 except GeneratorExit:\r
518                         queue.put((GeneratorExit, None))\r
519                         raise\r
520 \r
521 \r
522 def decode_item(item, target):\r
523         if item[0] is None:\r
524                 target.send(item[1])\r
525                 return False\r
526         elif item[0] is GeneratorExit:\r
527                 target.close()\r
528                 return True\r
529         else:\r
530                 target.throw(item[0], item[1])\r
531                 return False\r
532 \r
533 \r
534 def queue_source(queue, target):\r
535         """\r
536         >>> q = Queue.Queue()\r
537         >>> for i in [\r
538         ...     (None, 'Hello'),\r
539         ...     (None, 'World'),\r
540         ...     (GeneratorExit, None),\r
541         ...     ]:\r
542         ...     q.put(i)\r
543         >>> qs = queue_source(q, printer_sink())\r
544         Hello\r
545         World\r
546         """\r
547         isDone = False\r
548         while not isDone:\r
549                 item = queue.get()\r
550                 isDone = decode_item(item, target)\r
551 \r
552 \r
553 def threaded_stage(target, thread_factory = threading.Thread):\r
554         messages = Queue.Queue()\r
555 \r
556         run_source = functools.partial(queue_source, messages, target)\r
557         thread_factory(target=run_source).start()\r
558 \r
559         # Sink running in current thread\r
560         return functools.partial(queue_sink, messages)\r
561 \r
562 \r
563 @autostart\r
564 def pickle_sink(f):\r
565         while True:\r
566                 try:\r
567                         item = yield\r
568                         pickle.dump((None, item), f)\r
569                 except StandardError, e:\r
570                         pickle.dump((e.__class__, e.message), f)\r
571                 except GeneratorExit:\r
572                         pickle.dump((GeneratorExit, ), f)\r
573                         raise\r
574                 except StopIteration:\r
575                         f.close()\r
576                         return\r
577 \r
578 \r
579 def pickle_source(f, target):\r
580         try:\r
581                 isDone = False\r
582                 while not isDone:\r
583                         item = pickle.load(f)\r
584                         isDone = decode_item(item, target)\r
585         except EOFError:\r
586                 target.close()\r
587 \r
588 \r
589 class EventHandler(object, xml.sax.ContentHandler):\r
590 \r
591         START = "start"\r
592         TEXT = "text"\r
593         END = "end"\r
594 \r
595         def __init__(self, target):\r
596                 object.__init__(self)\r
597                 xml.sax.ContentHandler.__init__(self)\r
598                 self._target = target\r
599 \r
600         def startElement(self, name, attrs):\r
601                 self._target.send((self.START, (name, attrs._attrs)))\r
602 \r
603         def characters(self, text):\r
604                 self._target.send((self.TEXT, text))\r
605 \r
606         def endElement(self, name):\r
607                 self._target.send((self.END, name))\r
608 \r
609 \r
610 def expat_parse(f, target):\r
611         parser = xml.parsers.expat.ParserCreate()\r
612         parser.buffer_size = 65536\r
613         parser.buffer_text = True\r
614         parser.returns_unicode = False\r
615         parser.StartElementHandler = lambda name, attrs: target.send(('start', (name, attrs)))\r
616         parser.EndElementHandler = lambda name: target.send(('end', name))\r
617         parser.CharacterDataHandler = lambda data: target.send(('text', data))\r
618         parser.ParseFile(f)\r
619 \r
620 \r
621 if __name__ == "__main__":\r
622         import doctest\r
623         doctest.testmod()\r