Cutting down the thread spawning which should help avoid some bugs and cut down some...
[gc-dialer] / src / gtk_toolbox.py
index 1c9dd88..d6db835 100644 (file)
@@ -7,6 +7,8 @@ import traceback
 import functools
 import contextlib
 import warnings
+import threading
+import Queue
 
 import gobject
 import gtk
@@ -80,6 +82,135 @@ def synchronous_gtk_message(original_func):
        return immediate_func
 
 
+def autostart(func):
+       """
+       >>> @autostart
+       ... def grep_sink(pattern):
+       ...     print "Looking for %s" % pattern
+       ...     while True:
+       ...             line = yield
+       ...             if pattern in line:
+       ...                     print line,
+       >>> g = grep_sink("python")
+       Looking for python
+       >>> g.send("Yeah but no but yeah but no")
+       >>> g.send("A series of tubes")
+       >>> g.send("python generators rock!")
+       python generators rock!
+       >>> g.close()
+       """
+
+       @functools.wraps(func)
+       def start(*args, **kwargs):
+               cr = func(*args, **kwargs)
+               cr.next()
+               return cr
+
+       return start
+
+
+@autostart
+def null_sink():
+       """
+       Good for uses like with cochain to pick up any slack
+       """
+       while True:
+               item = yield
+
+
+@autostart
+def comap(function, target):
+       """
+       >>> p = printer_sink()
+       >>> cm = comap(lambda x: x+1, p)
+       >>> cm.send(0)
+       1
+       >>> cm.send(1.0)
+       2.0
+       >>> cm.send(-2)
+       -1
+       >>> # cm.throw(RuntimeError, "Goodbye")
+       >>> # cm.send(0)
+       >>> # cm.send(1.0)
+       >>> # cm.close()
+       """
+       while True:
+               try:
+                       item = yield
+                       mappedItem = function(*item)
+                       target.send(mappedItem)
+               except StandardError, e:
+                       target.throw(e.__class__, e.message)
+
+
+@autostart
+def queue_sink(queue):
+       """
+       >>> q = Queue.Queue()
+       >>> qs = queue_sink(q)
+       >>> qs.send("Hello")
+       >>> qs.send("World")
+       >>> qs.throw(RuntimeError, "Goodbye")
+       >>> qs.send("Meh")
+       >>> qs.close()
+       >>> print [i for i in _flush_queue(q)]
+       [(None, 'Hello'), (None, 'World'), (<type 'exceptions.RuntimeError'>, 'Goodbye'), (None, 'Meh'), (<type 'exceptions.GeneratorExit'>, None)]
+       """
+       while True:
+               try:
+                       item = yield
+                       queue.put((None, item))
+               except StandardError, e:
+                       queue.put((e.__class__, e.message))
+               except GeneratorExit:
+                       queue.put((GeneratorExit, None))
+                       raise
+
+
+def decode_item(item, target):
+       if item[0] is None:
+               target.send(item[1])
+               return False
+       elif item[0] is GeneratorExit:
+               target.close()
+               return True
+       else:
+               target.throw(item[0], item[1])
+               return False
+
+
+def nonqueue_source(queue, target):
+       """
+       >>> q = Queue.Queue()
+       >>> for i in [
+       ...     (None, 'Hello'),
+       ...     (None, 'World'),
+       ...     (GeneratorExit, None),
+       ...     ]:
+       ...     q.put(i)
+       >>> qs = queue_source(q, printer_sink())
+       Hello
+       """
+       isDone = False
+       while not isDone:
+               item = queue.get()
+               isDone = decode_item(item, target)
+               while not queue.empty():
+                       queue.get_nowait()
+
+
+def threaded_stage(target, thread_factory = threading.Thread):
+       messages = Queue.Queue()
+
+       run_source = functools.partial(nonqueue_source, messages, target)
+       thread = thread_factory(target=run_source)
+       thread.setDaemon(True)
+       thread.start()
+
+       # Sink running in current thread
+       return queue_sink(messages)
+
+
 class LoginWindow(object):
 
        def __init__(self, widgetTree):