@autostart
+def printer_sink(format = "%s"):
+ """
+ >>> pr = printer_sink("%r")
+ >>> pr.send("Hello")
+ 'Hello'
+ >>> pr.send("5")
+ '5'
+ >>> pr.send(5)
+ 5
+ >>> p = printer_sink()
+ >>> p.send("Hello")
+ Hello
+ >>> p.send("World")
+ World
+ >>> # p.throw(RuntimeError, "Goodbye")
+ >>> # p.send("Meh")
+ >>> # p.close()
+ """
+ while True:
+ item = yield
+ print format % (item, )
+
+
+@autostart
def null_sink():
"""
Good for uses like with cochain to pick up any slack
"""
>>> p = printer_sink()
>>> cm = comap(lambda x: x+1, p)
- >>> cm.send(0)
+ >>> cm.send((0, ))
1
- >>> cm.send(1.0)
+ >>> cm.send((1.0, ))
2.0
- >>> cm.send(-2)
+ >>> cm.send((-2, ))
-1
- >>> # cm.throw(RuntimeError, "Goodbye")
- >>> # cm.send(0)
- >>> # cm.send(1.0)
- >>> # cm.close()
"""
while True:
try:
target.throw(e.__class__, e.message)
+def _flush_queue(queue):
+ while not queue.empty():
+ yield queue.get()
+
+
@autostart
def queue_sink(queue):
"""
item = yield
queue.put((None, item))
except StandardError, e:
- queue.put((e.__class__, e.message))
+ queue.put((e.__class__, str(e)))
except GeneratorExit:
queue.put((GeneratorExit, None))
raise
def nonqueue_source(queue, target):
- """
- >>> q = Queue.Queue()
- >>> for i in [
- ... (None, 'Hello'),
- ... (None, 'World'),
- ... (GeneratorExit, None),
- ... ]:
- ... q.put(i)
- >>> qs = queue_source(q, printer_sink())
- Hello
- """
isDone = False
while not isDone:
item = queue.get()