X-Git-Url: http://git.maemo.org/git/?p=theonering;a=blobdiff_plain;f=src%2Futil%2Fgo_utils.py;h=97d671c6c639d5ac83eba0032e9bc78e5e6db7a3;hp=b3682baec10e8dfb38851ba3643fbdb6d5b8f796;hb=4beea848e2ea4297c8e223637f38bc3e529dec0d;hpb=e2b913d6c65eca22a75ead429543db4f2b2b1f4a;ds=sidebyside diff --git a/src/util/go_utils.py b/src/util/go_utils.py index b3682ba..97d671c 100644 --- a/src/util/go_utils.py +++ b/src/util/go_utils.py @@ -95,20 +95,24 @@ class Async(object): class Timeout(object): - def __init__(self, func): + def __init__(self, func, once = True): self.__func = func self.__timeoutId = None + self.__once = once def start(self, **kwds): assert self.__timeoutId is None + callback = self._on_once if self.__once else self.__func + assert len(kwds) == 1 timeoutInSeconds = kwds["seconds"] assert 0 <= timeoutInSeconds + if timeoutInSeconds == 0: - self.__timeoutId = gobject.idle_add(self._on_once) + self.__timeoutId = gobject.idle_add(callback) else: - self.__timeoutId = timeout_add_seconds(timeoutInSeconds, self._on_once) + self.__timeoutId = timeout_add_seconds(timeoutInSeconds, callback) def is_running(self): return self.__timeoutId is not None @@ -153,19 +157,26 @@ class AsyncPool(object): pass # eat up queue to cut down dumb work self.__workQueue.put(_QUEUE_EMPTY) + def clear_tasks(self): + for _ in algorithms.itr_available(self.__workQueue): + pass # eat up queue to cut down dumb work + def add_task(self, func, args, kwds, on_success, on_error): task = func, args, kwds, on_success, on_error self.__workQueue.put(task) @misc.log_exception(_moduleLogger) - def __trampoline_callback(self, callback, result): - if self.__isRunning: - try: - callback(result) - except Exception: - pass - else: - _moduleLogger.info("Blocked call to %r" % (callback, )) + def __trampoline_callback(self, on_success, on_error, isError, result): + if not self.__isRunning: + if isError: + _moduleLogger.error("Masking: %s" % (result, )) + isError = True + result = StopIteration("Cancelling all callbacks") + callback = on_success if not isError else on_error + try: + callback(result) + except Exception: + _moduleLogger.exception("Callback errored") return False @misc.log_exception(_moduleLogger) @@ -180,12 +191,83 @@ class AsyncPool(object): result = func(*args, **kwds) isError = False except Exception, e: + _moduleLogger.exception("Error, passing it back to the main thread") result = e isError = True self.__workQueue.task_done() - callback = on_success if not isError else on_error - gobject.idle_add(self.__trampoline_callback, callback, result) + gobject.idle_add(self.__trampoline_callback, on_success, on_error, isError, result) + _moduleLogger.debug("Shutting down worker thread") + + +class AsyncLinearExecution(object): + + def __init__(self, pool, func): + self._pool = pool + self._func = func + self._run = None + + def start(self, *args, **kwds): + assert self._run is None + self._run = self._func(*args, **kwds) + trampoline, args, kwds = self._run.send(None) # priming the function + self._pool.add_task( + trampoline, + args, + kwds, + self.on_success, + self.on_error, + ) + + @misc.log_exception(_moduleLogger) + def on_success(self, result): + #_moduleLogger.debug("Processing success for: %r", self._func) + try: + trampoline, args, kwds = self._run.send(result) + except StopIteration, e: + pass + else: + self._pool.add_task( + trampoline, + args, + kwds, + self.on_success, + self.on_error, + ) + + @misc.log_exception(_moduleLogger) + def on_error(self, error): + #_moduleLogger.debug("Processing error for: %r", self._func) + try: + trampoline, args, kwds = self._run.throw(error) + except StopIteration, e: + pass + else: + self._pool.add_task( + trampoline, + args, + kwds, + self.on_success, + self.on_error, + ) + + +class AutoSignal(object): + + def __init__(self, toplevel): + self.__disconnectPool = [] + toplevel.connect("destroy", self.__on_destroy) + + def connect_auto(self, widget, *args): + id = widget.connect(*args) + self.__disconnectPool.append((widget, id)) + + @misc.log_exception(_moduleLogger) + def __on_destroy(self, widget): + _moduleLogger.info("Destroy: %r (%s to clean up)" % (self, len(self.__disconnectPool))) + for widget, id in self.__disconnectPool: + widget.disconnect(id) + del self.__disconnectPool[:] def throttled(minDelay, queue):