Fixing line endings and adding a function sink
[doneit] / src / coroutines.py
1 #!/usr/bin/env python
2
3 """
4 Uses for generators
5 * Pull pipelining (iterators)
6 * Push pipelining (coroutines)
7 * State machines (coroutines)
8 * "Cooperative multitasking" (coroutines)
9 * Algorithm -> Object transform for cohesiveness (for example context managers) (coroutines)
10
11 Design considerations
12 * When should a stage pass on exceptions or have it thrown within it?
13 * When should a stage pass on GeneratorExits?
14 * Is there a way to either turn a push generator into a iterator or to use
15         comprehensions syntax for push generators (I doubt it)
16 * When should the stage try and send data in both directions
17 * Since pull generators (generators), push generators (coroutines), subroutines, and coroutines are all coroutines, maybe we should rename the push generators to not confuse them, like signals/slots? and then refer to two-way generators as coroutines
18 ** If so, make s* and co* implementation of functions
19 """
20
21 import threading
22 import Queue
23 import pickle
24 import functools
25 import itertools
26 import xml.sax
27 import xml.parsers.expat
28
29
30 def autostart(func):
31         """
32         >>> @autostart
33         ... def grep_sink(pattern):
34         ...     print "Looking for %s" % pattern
35         ...     while True:
36         ...             line = yield
37         ...             if pattern in line:
38         ...                     print line,
39         >>> g = grep_sink("python")
40         Looking for python
41         >>> g.send("Yeah but no but yeah but no")
42         >>> g.send("A series of tubes")
43         >>> g.send("python generators rock!")
44         python generators rock!
45         >>> g.close()
46         """
47
48         @functools.wraps(func)
49         def start(*args, **kwargs):
50                 cr = func(*args, **kwargs)
51                 cr.next()
52                 return cr
53
54         return start
55
56
57 @autostart
58 def printer_sink(format = "%s"):
59         """
60         >>> pr = printer_sink("%r")
61         >>> pr.send("Hello")
62         'Hello'
63         >>> pr.send("5")
64         '5'
65         >>> pr.send(5)
66         5
67         >>> p = printer_sink()
68         >>> p.send("Hello")
69         Hello
70         >>> p.send("World")
71         World
72         >>> # p.throw(RuntimeError, "Goodbye")
73         >>> # p.send("Meh")
74         >>> # p.close()
75         """
76         while True:
77                 item = yield
78                 print format % (item, )
79
80
81 @autostart
82 def null_sink():
83         """
84         Good for uses like with cochain to pick up any slack
85         """
86         while True:
87                 item = yield
88
89
90 def itr_source(itr, target):
91         """
92         >>> itr_source(xrange(2), printer_sink())
93         0
94         1
95         """
96         for item in itr:
97                 target.send(item)
98
99
100 @autostart
101 def cofilter(predicate, target):
102         """
103         >>> p = printer_sink()
104         >>> cf = cofilter(None, p)
105         >>> cf.send("")
106         >>> cf.send("Hello")
107         Hello
108         >>> cf.send([])
109         >>> cf.send([1, 2])
110         [1, 2]
111         >>> cf.send(False)
112         >>> cf.send(True)
113         True
114         >>> cf.send(0)
115         >>> cf.send(1)
116         1
117         >>> # cf.throw(RuntimeError, "Goodbye")
118         >>> # cf.send(False)
119         >>> # cf.send(True)
120         >>> # cf.close()
121         """
122         if predicate is None:
123                 predicate = bool
124
125         while True:
126                 try:
127                         item = yield
128                         if predicate(item):
129                                 target.send(item)
130                 except StandardError, e:
131                         target.throw(e.__class__, e.message)
132
133
134 @autostart
135 def comap(function, target):
136         """
137         >>> p = printer_sink()
138         >>> cm = comap(lambda x: x+1, p)
139         >>> cm.send(0)
140         1
141         >>> cm.send(1.0)
142         2.0
143         >>> cm.send(-2)
144         -1
145         >>> # cm.throw(RuntimeError, "Goodbye")
146         >>> # cm.send(0)
147         >>> # cm.send(1.0)
148         >>> # cm.close()
149         """
150         while True:
151                 try:
152                         item = yield
153                         mappedItem = function(item)
154                         target.send(mappedItem)
155                 except StandardError, e:
156                         target.throw(e.__class__, e.message)
157
158
159 def func_sink(function):
160         return comap(function, null_sink())
161
162
163 @autostart
164 def append_sink(l):
165         """
166         >>> l = []
167         >>> apps = append_sink(l)
168         >>> apps.send(1)
169         >>> apps.send(2)
170         >>> apps.send(3)
171         >>> print l
172         [1, 2, 3]
173         """
174         while True:
175                 item = yield
176                 l.append(item)
177
178
179 @autostart
180 def last_n_sink(l, n = 1):
181         """
182         >>> l = []
183         >>> lns = last_n_sink(l)
184         >>> lns.send(1)
185         >>> lns.send(2)
186         >>> lns.send(3)
187         >>> print l
188         [3]
189         """
190         del l[:]
191         while True:
192                 item = yield
193                 extraCount = len(l) - n + 1
194                 if 0 < extraCount:
195                         del l[0:extraCount]
196                 l.append(item)
197
198
199 @autostart
200 def coreduce(target, function, initializer = None):
201         """
202         >>> reduceResult = []
203         >>> lns = last_n_sink(reduceResult)
204         >>> cr = coreduce(lns, lambda x, y: x + y, 0)
205         >>> cr.send(1)
206         >>> cr.send(2)
207         >>> cr.send(3)
208         >>> print reduceResult
209         [6]
210         >>> cr = coreduce(lns, lambda x, y: x + y)
211         >>> cr.send(1)
212         >>> cr.send(2)
213         >>> cr.send(3)
214         >>> print reduceResult
215         [6]
216         """
217         isFirst = True
218         cumulativeRef = initializer
219         while True:
220                 item = yield
221                 if isFirst and initializer is None:
222                         cumulativeRef = item
223                 else:
224                         cumulativeRef = function(cumulativeRef, item)
225                 target.send(cumulativeRef)
226                 isFirst = False
227
228
229 @autostart
230 def cotee(targets):
231         """
232         Takes a sequence of coroutines and sends the received items to all of them
233
234         >>> ct = cotee((printer_sink("1 %s"), printer_sink("2 %s")))
235         >>> ct.send("Hello")
236         1 Hello
237         2 Hello
238         >>> ct.send("World")
239         1 World
240         2 World
241         >>> # ct.throw(RuntimeError, "Goodbye")
242         >>> # ct.send("Meh")
243         >>> # ct.close()
244         """
245         while True:
246                 try:
247                         item = yield
248                         for target in targets:
249                                 target.send(item)
250                 except StandardError, e:
251                         for target in targets:
252                                 target.throw(e.__class__, e.message)
253
254
255 class CoTee(object):
256         """
257         >>> ct = CoTee()
258         >>> ct.register_sink(printer_sink("1 %s"))
259         >>> ct.register_sink(printer_sink("2 %s"))
260         >>> ct.stage.send("Hello")
261         1 Hello
262         2 Hello
263         >>> ct.stage.send("World")
264         1 World
265         2 World
266         >>> ct.register_sink(printer_sink("3 %s"))
267         >>> ct.stage.send("Foo")
268         1 Foo
269         2 Foo
270         3 Foo
271         >>> # ct.stage.throw(RuntimeError, "Goodbye")
272         >>> # ct.stage.send("Meh")
273         >>> # ct.stage.close()
274         """
275
276         def __init__(self):
277                 self.stage = self._stage()
278                 self._targets = []
279
280         def register_sink(self, sink):
281                 self._targets.append(sink)
282
283         def unregister_sink(self, sink):
284                 self._targets.remove(sink)
285
286         def restart(self):
287                 self.stage = self._stage()
288
289         @autostart
290         def _stage(self):
291                 while True:
292                         try:
293                                 item = yield
294                                 for target in self._targets:
295                                         target.send(item)
296                         except StandardError, e:
297                                 for target in self._targets:
298                                         target.throw(e.__class__, e.message)
299
300
301 def _flush_queue(queue):
302         while not queue.empty():
303                 yield queue.get()
304
305
306 @autostart
307 def cocount(target, start = 0):
308         """
309         >>> cc = cocount(printer_sink("%s"))
310         >>> cc.send("a")
311         0
312         >>> cc.send(None)
313         1
314         >>> cc.send([])
315         2
316         >>> cc.send(0)
317         3
318         """
319         for i in itertools.count(start):
320                 item = yield
321                 target.send(i)
322
323
324 @autostart
325 def coenumerate(target, start = 0):
326         """
327         >>> ce = coenumerate(printer_sink("%r"))
328         >>> ce.send("a")
329         (0, 'a')
330         >>> ce.send(None)
331         (1, None)
332         >>> ce.send([])
333         (2, [])
334         >>> ce.send(0)
335         (3, 0)
336         """
337         for i in itertools.count(start):
338                 item = yield
339                 decoratedItem = i, item
340                 target.send(decoratedItem)
341
342
343 @autostart
344 def corepeat(target, elem):
345         """
346         >>> cr = corepeat(printer_sink("%s"), "Hello World")
347         >>> cr.send("a")
348         Hello World
349         >>> cr.send(None)
350         Hello World
351         >>> cr.send([])
352         Hello World
353         >>> cr.send(0)
354         Hello World
355         """
356         while True:
357                 item = yield
358                 target.send(elem)
359
360
361 @autostart
362 def cointercept(target, elems):
363         """
364         >>> cr = cointercept(printer_sink("%s"), [1, 2, 3, 4])
365         >>> cr.send("a")
366         1
367         >>> cr.send(None)
368         2
369         >>> cr.send([])
370         3
371         >>> cr.send(0)
372         4
373         >>> cr.send("Bye")
374         Traceback (most recent call last):
375           File "/usr/lib/python2.5/doctest.py", line 1228, in __run
376             compileflags, 1) in test.globs
377           File "<doctest __main__.cointercept[5]>", line 1, in <module>
378             cr.send("Bye")
379         StopIteration
380         """
381         item = yield
382         for elem in elems:
383                 target.send(elem)
384                 item = yield
385
386
387 @autostart
388 def codropwhile(target, pred):
389         """
390         >>> cdw = codropwhile(printer_sink("%s"), lambda x: x)
391         >>> cdw.send([0, 1, 2])
392         >>> cdw.send(1)
393         >>> cdw.send(True)
394         >>> cdw.send(False)
395         >>> cdw.send([0, 1, 2])
396         [0, 1, 2]
397         >>> cdw.send(1)
398         1
399         >>> cdw.send(True)
400         True
401         """
402         while True:
403                 item = yield
404                 if not pred(item):
405                         break
406
407         while True:
408                 item = yield
409                 target.send(item)
410
411
412 @autostart
413 def cotakewhile(target, pred):
414         """
415         >>> ctw = cotakewhile(printer_sink("%s"), lambda x: x)
416         >>> ctw.send([0, 1, 2])
417         [0, 1, 2]
418         >>> ctw.send(1)
419         1
420         >>> ctw.send(True)
421         True
422         >>> ctw.send(False)
423         >>> ctw.send([0, 1, 2])
424         >>> ctw.send(1)
425         >>> ctw.send(True)
426         """
427         while True:
428                 item = yield
429                 if not pred(item):
430                         break
431                 target.send(item)
432
433         while True:
434                 item = yield
435
436
437 @autostart
438 def coslice(target, lower, upper):
439         """
440         >>> cs = coslice(printer_sink("%r"), 3, 5)
441         >>> cs.send("0")
442         >>> cs.send("1")
443         >>> cs.send("2")
444         >>> cs.send("3")
445         '3'
446         >>> cs.send("4")
447         '4'
448         >>> cs.send("5")
449         >>> cs.send("6")
450         """
451         for i in xrange(lower):
452                 item = yield
453         for i in xrange(upper - lower):
454                 item = yield
455                 target.send(item)
456         while True:
457                 item = yield
458
459
460 @autostart
461 def cochain(targets):
462         """
463         >>> cr = cointercept(printer_sink("good %s"), [1, 2, 3, 4])
464         >>> cc = cochain([cr, printer_sink("end %s")])
465         >>> cc.send("a")
466         good 1
467         >>> cc.send(None)
468         good 2
469         >>> cc.send([])
470         good 3
471         >>> cc.send(0)
472         good 4
473         >>> cc.send("Bye")
474         end Bye
475         """
476         behind = []
477         for target in targets:
478                 try:
479                         while behind:
480                                 item = behind.pop()
481                                 target.send(item)
482                         while True:
483                                 item = yield
484                                 target.send(item)
485                 except StopIteration:
486                         behind.append(item)
487
488
489 @autostart
490 def queue_sink(queue):
491         """
492         >>> q = Queue.Queue()
493         >>> qs = queue_sink(q)
494         >>> qs.send("Hello")
495         >>> qs.send("World")
496         >>> qs.throw(RuntimeError, "Goodbye")
497         >>> qs.send("Meh")
498         >>> qs.close()
499         >>> print [i for i in _flush_queue(q)]
500         [(None, 'Hello'), (None, 'World'), (<type 'exceptions.RuntimeError'>, 'Goodbye'), (None, 'Meh'), (<type 'exceptions.GeneratorExit'>, None)]
501         """
502         while True:
503                 try:
504                         item = yield
505                         queue.put((None, item))
506                 except StandardError, e:
507                         queue.put((e.__class__, e.message))
508                 except GeneratorExit:
509                         queue.put((GeneratorExit, None))
510                         raise
511
512
513 def decode_item(item, target):
514         if item[0] is None:
515                 target.send(item[1])
516                 return False
517         elif item[0] is GeneratorExit:
518                 target.close()
519                 return True
520         else:
521                 target.throw(item[0], item[1])
522                 return False
523
524
525 def queue_source(queue, target):
526         """
527         >>> q = Queue.Queue()
528         >>> for i in [
529         ...     (None, 'Hello'),
530         ...     (None, 'World'),
531         ...     (GeneratorExit, None),
532         ...     ]:
533         ...     q.put(i)
534         >>> qs = queue_source(q, printer_sink())
535         Hello
536         World
537         """
538         isDone = False
539         while not isDone:
540                 item = queue.get()
541                 isDone = decode_item(item, target)
542
543
544 def threaded_stage(target, thread_factory = threading.Thread):
545         messages = Queue.Queue()
546
547         run_source = functools.partial(queue_source, messages, target)
548         thread_factory(target=run_source).start()
549
550         # Sink running in current thread
551         return functools.partial(queue_sink, messages)
552
553
554 @autostart
555 def pickle_sink(f):
556         while True:
557                 try:
558                         item = yield
559                         pickle.dump((None, item), f)
560                 except StandardError, e:
561                         pickle.dump((e.__class__, e.message), f)
562                 except GeneratorExit:
563                         pickle.dump((GeneratorExit, ), f)
564                         raise
565                 except StopIteration:
566                         f.close()
567                         return
568
569
570 def pickle_source(f, target):
571         try:
572                 isDone = False
573                 while not isDone:
574                         item = pickle.load(f)
575                         isDone = decode_item(item, target)
576         except EOFError:
577                 target.close()
578
579
580 class EventHandler(object, xml.sax.ContentHandler):
581
582         START = "start"
583         TEXT = "text"
584         END = "end"
585
586         def __init__(self, target):
587                 object.__init__(self)
588                 xml.sax.ContentHandler.__init__(self)
589                 self._target = target
590
591         def startElement(self, name, attrs):
592                 self._target.send((self.START, (name, attrs._attrs)))
593
594         def characters(self, text):
595                 self._target.send((self.TEXT, text))
596
597         def endElement(self, name):
598                 self._target.send((self.END, name))
599
600
601 def expat_parse(f, target):
602         parser = xml.parsers.expat.ParserCreate()
603         parser.buffer_size = 65536
604         parser.buffer_text = True
605         parser.returns_unicode = False
606         parser.StartElementHandler = lambda name, attrs: target.send(('start', (name, attrs)))
607         parser.EndElementHandler = lambda name: target.send(('end', name))
608         parser.CharacterDataHandler = lambda data: target.send(('text', data))
609         parser.ParseFile(f)
610
611
612 if __name__ == "__main__":
613         import doctest
614         doctest.testmod()