X-Git-Url: http://git.maemo.org/git/?a=blobdiff_plain;f=src%2Futil%2Fgo_utils.py;h=52ccf922c0bb899b1ff7e1ca3764c1468de22e15;hb=228a823e3e7bf2cbf419fe83cad32d135d6896bb;hp=47fb16b21a7bcfc72d424b47fa3f1da00894eefa;hpb=5bfb4228f1fff939aa06dfbe446cf17b5ce3ed76;p=theonering diff --git a/src/util/go_utils.py b/src/util/go_utils.py index 47fb16b..52ccf92 100644 --- a/src/util/go_utils.py +++ b/src/util/go_utils.py @@ -4,14 +4,17 @@ from __future__ import with_statement import time import functools +import threading +import Queue import logging import gobject +import algorithms import misc -_moduleLogger = logging.getLogger("go_utils") +_moduleLogger = logging.getLogger(__name__) def make_idler(func): @@ -69,6 +72,9 @@ class Async(object): else: self.__idleId = gobject.idle_add(self.__func) + def is_running(self): + return self.__idleId is not None + def cancel(self): if self.__idleId is not None: gobject.source_remove(self.__idleId) @@ -82,8 +88,9 @@ class Async(object): self.cancel() try: self.__func() - finally: - return False + except Exception: + pass + return False class Timeout(object): @@ -101,7 +108,10 @@ class Timeout(object): if timeoutInSeconds == 0: self.__timeoutId = gobject.idle_add(self._on_once) else: - timeout_add_seconds(timeoutInSeconds, self._on_once) + self.__timeoutId = timeout_add_seconds(timeoutInSeconds, self._on_once) + + def is_running(self): + return self.__timeoutId is not None def cancel(self): if self.__timeoutId is not None: @@ -116,8 +126,122 @@ class Timeout(object): self.cancel() try: self.__func() - finally: - return False + except Exception: + pass + return False + + +_QUEUE_EMPTY = object() + + +class AsyncPool(object): + + def __init__(self): + self.__workQueue = Queue.Queue() + self.__thread = threading.Thread( + name = type(self).__name__, + target = self.__consume_queue, + ) + self.__isRunning = True + + def start(self): + self.__thread.start() + + def stop(self): + self.__isRunning = False + for _ in algorithms.itr_available(self.__workQueue): + pass # eat up queue to cut down dumb work + self.__workQueue.put(_QUEUE_EMPTY) + + def add_task(self, func, args, kwds, on_success, on_error): + task = func, args, kwds, on_success, on_error + self.__workQueue.put(task) + + @misc.log_exception(_moduleLogger) + def __trampoline_callback(self, on_success, on_error, isError, result): + if not self.__isRunning: + if isError: + _moduleLogger.error("Masking: %s" % (result, )) + isError = True + result = StopIteration("Cancelling all callbacks") + callback = on_success if not isError else on_error + try: + callback(result) + except Exception: + _moduleLogger.exception("Callback errored") + pass + return False + + @misc.log_exception(_moduleLogger) + def __consume_queue(self): + while True: + task = self.__workQueue.get() + if task is _QUEUE_EMPTY: + break + func, args, kwds, on_success, on_error = task + + try: + result = func(*args, **kwds) + isError = False + except Exception, e: + _moduleLogger.error("Error, passing it back to the main thread") + result = e + isError = True + self.__workQueue.task_done() + + gobject.idle_add(self.__trampoline_callback, on_success, on_error, isError, result) + + +class AsyncLinearExecution(object): + + def __init__(self, pool, func): + self._pool = pool + self._func = func + self._run = None + + def start(self, *args, **kwds): + assert self._run is None + self._run = self._func(*args, **kwds) + trampoline, args, kwds = self._run.send(None) # priming the function + self._pool.add_task( + trampoline, + args, + kwds, + self.on_success, + self.on_error, + ) + + @misc.log_exception(_moduleLogger) + def on_success(self, result): + _moduleLogger.debug("Processing success for: %r", self._func) + try: + trampoline, args, kwds = self._run.send(result) + except StopIteration, e: + pass + else: + self._pool.add_task( + trampoline, + args, + kwds, + self.on_success, + self.on_error, + ) + + @misc.log_exception(_moduleLogger) + def on_error(self, error): + _moduleLogger.debug("Processing error for: %r", self._func) + try: + trampoline, args, kwds = self._run.throw(error) + except StopIteration, e: + pass + else: + self._pool.add_task( + trampoline, + args, + kwds, + self.on_success, + self.on_error, + ) def throttled(minDelay, queue):