import time
import functools
+import threading
+import Queue
import logging
import gobject
+import algorithms
import misc
-_moduleLogger = logging.getLogger("go_utils")
+_moduleLogger = logging.getLogger(__name__)
def make_idler(func):
self.cancel()
try:
self.__func()
- finally:
- return False
+ except Exception:
+ pass
+ return False
class Timeout(object):
self.cancel()
try:
self.__func()
- finally:
- return False
+ except Exception:
+ pass
+ return False
+
+
+__QUEUE_EMPTY = object()
+
+
+class AsyncPool(object):
+
+ def __init__(self):
+ self.__workQueue = Queue.Queue()
+ self.__thread = threading.Thread(
+ name = type(self).__name__,
+ target = self.__consume_queue,
+ )
+ self.__isRunning = True
+
+ def start(self):
+ self.__thread.start()
+
+ def stop(self):
+ self.__isRunning = False
+ for _ in algorithms.itr_available(self.__workQueue):
+ pass # eat up queue to cut down dumb work
+ self.__workQueue.put(__QUEUE_EMPTY)
+
+ def add_task(self, func, args, kwds, on_success, on_error):
+ task = func, args, kwds, on_success, on_error
+ self.__workQueue.put(task)
+
+ @misc.log_exception(_moduleLogger)
+ def __trampoline_callback(self, callback, result):
+ if self.__isRunning:
+ try:
+ callback(result)
+ except Exception:
+ pass
+ else:
+ _moduleLogger.info("Blocked call to %r" % (callback, ))
+ return False
+
+ @misc.log_exception(_moduleLogger)
+ def __consume_queue(self):
+ while True:
+ task = self.__workQueue.get()
+ if task is __QUEUE_EMPTY:
+ break
+ func, args, kwds, on_success, on_error = task
+
+ try:
+ result = func(*args, **kwds)
+ isError = False
+ except Exception, e:
+ result = e
+ isError = True
+ self.__workQueue.task_done()
+
+ callback = on_success if not isError else on_error
+ gobject.idle_add(self.__trampoline_callback, callback, result)
def throttled(minDelay, queue):