Improving support for a worker thread and moving more code over to it
[theonering] / src / util / go_utils.py
index e8fde8b..52ccf92 100644 (file)
@@ -4,14 +4,17 @@ from __future__ import with_statement
 
 import time
 import functools
+import threading
+import Queue
 import logging
 
 import gobject
 
+import algorithms
 import misc
 
 
-_moduleLogger = logging.getLogger("go_utils")
+_moduleLogger = logging.getLogger(__name__)
 
 
 def make_idler(func):
@@ -85,8 +88,9 @@ class Async(object):
                self.cancel()
                try:
                        self.__func()
-               finally:
-                       return False
+               except Exception:
+                       pass
+               return False
 
 
 class Timeout(object):
@@ -104,7 +108,7 @@ class Timeout(object):
                if timeoutInSeconds == 0:
                        self.__timeoutId = gobject.idle_add(self._on_once)
                else:
-                       timeout_add_seconds(timeoutInSeconds, self._on_once)
+                       self.__timeoutId = timeout_add_seconds(timeoutInSeconds, self._on_once)
 
        def is_running(self):
                return self.__timeoutId is not None
@@ -122,8 +126,122 @@ class Timeout(object):
                self.cancel()
                try:
                        self.__func()
-               finally:
-                       return False
+               except Exception:
+                       pass
+               return False
+
+
+_QUEUE_EMPTY = object()
+
+
+class AsyncPool(object):
+
+       def __init__(self):
+               self.__workQueue = Queue.Queue()
+               self.__thread = threading.Thread(
+                       name = type(self).__name__,
+                       target = self.__consume_queue,
+               )
+               self.__isRunning = True
+
+       def start(self):
+               self.__thread.start()
+
+       def stop(self):
+               self.__isRunning = False
+               for _ in algorithms.itr_available(self.__workQueue):
+                       pass # eat up queue to cut down dumb work
+               self.__workQueue.put(_QUEUE_EMPTY)
+
+       def add_task(self, func, args, kwds, on_success, on_error):
+               task = func, args, kwds, on_success, on_error
+               self.__workQueue.put(task)
+
+       @misc.log_exception(_moduleLogger)
+       def __trampoline_callback(self, on_success, on_error, isError, result):
+               if not self.__isRunning:
+                       if isError:
+                               _moduleLogger.error("Masking: %s" % (result, ))
+                       isError = True
+                       result = StopIteration("Cancelling all callbacks")
+               callback = on_success if not isError else on_error
+               try:
+                       callback(result)
+               except Exception:
+                       _moduleLogger.exception("Callback errored")
+                       pass
+               return False
+
+       @misc.log_exception(_moduleLogger)
+       def __consume_queue(self):
+               while True:
+                       task = self.__workQueue.get()
+                       if task is _QUEUE_EMPTY:
+                               break
+                       func, args, kwds, on_success, on_error = task
+
+                       try:
+                               result = func(*args, **kwds)
+                               isError = False
+                       except Exception, e:
+                               _moduleLogger.error("Error, passing it back to the main thread")
+                               result = e
+                               isError = True
+                       self.__workQueue.task_done()
+
+                       gobject.idle_add(self.__trampoline_callback, on_success, on_error, isError, result)
+
+
+class AsyncLinearExecution(object):
+
+       def __init__(self, pool, func):
+               self._pool = pool
+               self._func = func
+               self._run = None
+
+       def start(self, *args, **kwds):
+               assert self._run is None
+               self._run = self._func(*args, **kwds)
+               trampoline, args, kwds = self._run.send(None) # priming the function
+               self._pool.add_task(
+                       trampoline,
+                       args,
+                       kwds,
+                       self.on_success,
+                       self.on_error,
+               )
+
+       @misc.log_exception(_moduleLogger)
+       def on_success(self, result):
+               _moduleLogger.debug("Processing success for: %r", self._func)
+               try:
+                       trampoline, args, kwds = self._run.send(result)
+               except StopIteration, e:
+                       pass
+               else:
+                       self._pool.add_task(
+                               trampoline,
+                               args,
+                               kwds,
+                               self.on_success,
+                               self.on_error,
+                       )
+
+       @misc.log_exception(_moduleLogger)
+       def on_error(self, error):
+               _moduleLogger.debug("Processing error for: %r", self._func)
+               try:
+                       trampoline, args, kwds = self._run.throw(error)
+               except StopIteration, e:
+                       pass
+               else:
+                       self._pool.add_task(
+                               trampoline,
+                               args,
+                               kwds,
+                               self.on_success,
+                               self.on_error,
+                       )
 
 
 def throttled(minDelay, queue):