Adding an async pool to try and fix things up
authorEd Page <eopage@byu.net>
Wed, 17 Mar 2010 01:42:28 +0000 (20:42 -0500)
committerEd Page <eopage@byu.net>
Wed, 17 Mar 2010 01:42:28 +0000 (20:42 -0500)
src/util/go_utils.py

index 33ddfb5..ef6cb72 100644 (file)
@@ -4,14 +4,17 @@ from __future__ import with_statement
 
 import time
 import functools
+import threading
+import Queue
 import logging
 
 import gobject
 
+import algorithms
 import misc
 
 
-_moduleLogger = logging.getLogger("go_utils")
+_moduleLogger = logging.getLogger(__name__)
 
 
 def make_idler(func):
@@ -85,8 +88,9 @@ class Async(object):
                self.cancel()
                try:
                        self.__func()
-               finally:
-                       return False
+               except Exception:
+                       pass
+               return False
 
 
 class Timeout(object):
@@ -122,8 +126,66 @@ class Timeout(object):
                self.cancel()
                try:
                        self.__func()
-               finally:
-                       return False
+               except Exception:
+                       pass
+               return False
+
+
+__QUEUE_EMPTY = object()
+
+
+class AsyncPool(object):
+
+       def __init__(self):
+               self.__workQueue = Queue.Queue()
+               self.__thread = threading.Thread(
+                       name = type(self).__name__,
+                       target = self.__consume_queue,
+               )
+               self.__isRunning = True
+
+       def start(self):
+               self.__thread.start()
+
+       def stop(self):
+               self.__isRunning = False
+               for _ in algorithms.itr_available(self.__workQueue):
+                       pass # eat up queue to cut down dumb work
+               self.__workQueue.put(__QUEUE_EMPTY)
+
+       def add_task(self, func, args, kwds, on_success, on_error):
+               task = func, args, kwds, on_success, on_error
+               self.__workQueue.put(task)
+
+       @misc.log_exception(_moduleLogger)
+       def __trampoline_callback(self, callback, result):
+               if self.__isRunning:
+                       try:
+                               callback(result)
+                       except Exception:
+                               pass
+               else:
+                       _moduleLogger.info("Blocked call to %r" % (callback, ))
+               return False
+
+       @misc.log_exception(_moduleLogger)
+       def __consume_queue(self):
+               while True:
+                       task = self.__workQueue.get()
+                       if task is __QUEUE_EMPTY:
+                               break
+                       func, args, kwds, on_success, on_error = task
+
+                       try:
+                               result = func(*args, **kwds)
+                               isError = False
+                       except Exception, e:
+                               result = e
+                               isError = True
+                       self.__workQueue.task_done()
+
+                       callback = on_success if not isError else on_error
+                       gobject.idle_add(self.__trampoline_callback, callback, result)
 
 
 def throttled(minDelay, queue):