import time
import functools
+import threading
+import Queue
import logging
import gobject
+import algorithms
import misc
-_moduleLogger = logging.getLogger("go_utils")
+_moduleLogger = logging.getLogger(__name__)
def make_idler(func):
else:
self.__idleId = gobject.idle_add(self.__func)
+ def is_running(self):
+ return self.__idleId is not None
+
def cancel(self):
if self.__idleId is not None:
gobject.source_remove(self.__idleId)
self.__idleId = None
+ def __call__(self):
+ return self.start()
+
@misc.log_exception(_moduleLogger)
def _on_once(self):
self.cancel()
try:
self.__func()
- finally:
- return False
+ except Exception:
+ pass
+ return False
class Timeout(object):
if timeoutInSeconds == 0:
self.__timeoutId = gobject.idle_add(self._on_once)
else:
- timeout_add_seconds(timeoutInSeconds, self._on_once)
+ self.__timeoutId = timeout_add_seconds(timeoutInSeconds, self._on_once)
+
+ def is_running(self):
+ return self.__timeoutId is not None
def cancel(self):
if self.__timeoutId is not None:
gobject.source_remove(self.__timeoutId)
self.__timeoutId = None
+ def __call__(self, **kwds):
+ return self.start(**kwds)
+
@misc.log_exception(_moduleLogger)
def _on_once(self):
self.cancel()
try:
self.__func()
- finally:
- return False
+ except Exception:
+ pass
+ return False
+
+
+_QUEUE_EMPTY = object()
+
+
+class AsyncPool(object):
+
+ def __init__(self):
+ self.__workQueue = Queue.Queue()
+ self.__thread = threading.Thread(
+ name = type(self).__name__,
+ target = self.__consume_queue,
+ )
+ self.__isRunning = True
+
+ def start(self):
+ self.__thread.start()
+
+ def stop(self):
+ self.__isRunning = False
+ for _ in algorithms.itr_available(self.__workQueue):
+ pass # eat up queue to cut down dumb work
+ self.__workQueue.put(_QUEUE_EMPTY)
+
+ def add_task(self, func, args, kwds, on_success, on_error):
+ task = func, args, kwds, on_success, on_error
+ self.__workQueue.put(task)
+
+ @misc.log_exception(_moduleLogger)
+ def __trampoline_callback(self, on_success, on_error, isError, result):
+ if not self.__isRunning:
+ if isError:
+ _moduleLogger.error("Masking: %s" % (result, ))
+ isError = True
+ result = StopIteration("Cancelling all callbacks")
+ callback = on_success if not isError else on_error
+ try:
+ callback(result)
+ except Exception:
+ _moduleLogger.exception("Callback errored")
+ pass
+ return False
+
+ @misc.log_exception(_moduleLogger)
+ def __consume_queue(self):
+ while True:
+ task = self.__workQueue.get()
+ if task is _QUEUE_EMPTY:
+ break
+ func, args, kwds, on_success, on_error = task
+
+ try:
+ result = func(*args, **kwds)
+ isError = False
+ except Exception, e:
+ _moduleLogger.error("Error, passing it back to the main thread")
+ result = e
+ isError = True
+ self.__workQueue.task_done()
+
+ gobject.idle_add(self.__trampoline_callback, on_success, on_error, isError, result)
+
+
+class AsyncLinearExecution(object):
+
+ def __init__(self, pool, func):
+ self._pool = pool
+ self._func = func
+ self._run = None
+
+ def start(self, *args, **kwds):
+ assert self._run is None
+ self._run = self._func(*args, **kwds)
+ trampoline, args, kwds = self._run.send(None) # priming the function
+ self._pool.add_task(
+ trampoline,
+ args,
+ kwds,
+ self.on_success,
+ self.on_error,
+ )
+
+ @misc.log_exception(_moduleLogger)
+ def on_success(self, result):
+ _moduleLogger.debug("Processing success for: %r", self._func)
+ try:
+ trampoline, args, kwds = self._run.send(result)
+ except StopIteration, e:
+ pass
+ else:
+ self._pool.add_task(
+ trampoline,
+ args,
+ kwds,
+ self.on_success,
+ self.on_error,
+ )
+
+ @misc.log_exception(_moduleLogger)
+ def on_error(self, error):
+ _moduleLogger.debug("Processing error for: %r", self._func)
+ try:
+ trampoline, args, kwds = self._run.throw(error)
+ except StopIteration, e:
+ pass
+ else:
+ self._pool.add_task(
+ trampoline,
+ args,
+ kwds,
+ self.on_success,
+ self.on_error,
+ )
def throttled(minDelay, queue):