X-Git-Url: http://git.maemo.org/git/?p=theonering;a=blobdiff_plain;f=src%2Futil%2Fgo_utils.py;h=97d671c6c639d5ac83eba0032e9bc78e5e6db7a3;hp=38e20c23225697682fdf965fa12a020c02f67e0e;hb=4beea848e2ea4297c8e223637f38bc3e529dec0d;hpb=887113c5927f329599cc7044c7b689d1134f0dac diff --git a/src/util/go_utils.py b/src/util/go_utils.py index 38e20c2..97d671c 100644 --- a/src/util/go_utils.py +++ b/src/util/go_utils.py @@ -95,20 +95,24 @@ class Async(object): class Timeout(object): - def __init__(self, func): + def __init__(self, func, once = True): self.__func = func self.__timeoutId = None + self.__once = once def start(self, **kwds): assert self.__timeoutId is None + callback = self._on_once if self.__once else self.__func + assert len(kwds) == 1 timeoutInSeconds = kwds["seconds"] assert 0 <= timeoutInSeconds + if timeoutInSeconds == 0: - self.__timeoutId = gobject.idle_add(self._on_once) + self.__timeoutId = gobject.idle_add(callback) else: - self.__timeoutId = timeout_add_seconds(timeoutInSeconds, self._on_once) + self.__timeoutId = timeout_add_seconds(timeoutInSeconds, callback) def is_running(self): return self.__timeoutId is not None @@ -153,6 +157,10 @@ class AsyncPool(object): pass # eat up queue to cut down dumb work self.__workQueue.put(_QUEUE_EMPTY) + def clear_tasks(self): + for _ in algorithms.itr_available(self.__workQueue): + pass # eat up queue to cut down dumb work + def add_task(self, func, args, kwds, on_success, on_error): task = func, args, kwds, on_success, on_error self.__workQueue.put(task) @@ -168,7 +176,7 @@ class AsyncPool(object): try: callback(result) except Exception: - pass + _moduleLogger.exception("Callback errored") return False @misc.log_exception(_moduleLogger) @@ -183,45 +191,83 @@ class AsyncPool(object): result = func(*args, **kwds) isError = False except Exception, e: + _moduleLogger.exception("Error, passing it back to the main thread") result = e isError = True self.__workQueue.task_done() gobject.idle_add(self.__trampoline_callback, on_success, on_error, isError, result) + _moduleLogger.debug("Shutting down worker thread") -class LinearExecution(object): +class AsyncLinearExecution(object): - def __init__(self, func): + def __init__(self, pool, func): + self._pool = pool self._func = func self._run = None def start(self, *args, **kwds): assert self._run is None - kwds["on_success"] = self.on_success - kwds["on_error"] = self.on_error self._run = self._func(*args, **kwds) trampoline, args, kwds = self._run.send(None) # priming the function - trampoline(*args, **kwds) + self._pool.add_task( + trampoline, + args, + kwds, + self.on_success, + self.on_error, + ) @misc.log_exception(_moduleLogger) - def on_success(self, *args, **kwds): - assert not kwds + def on_success(self, result): + #_moduleLogger.debug("Processing success for: %r", self._func) try: - trampoline, args, kwds = self._run.send(args) + trampoline, args, kwds = self._run.send(result) except StopIteration, e: pass else: - trampoline(*args, **kwds) + self._pool.add_task( + trampoline, + args, + kwds, + self.on_success, + self.on_error, + ) @misc.log_exception(_moduleLogger) def on_error(self, error): + #_moduleLogger.debug("Processing error for: %r", self._func) try: trampoline, args, kwds = self._run.throw(error) except StopIteration, e: pass else: - trampoline(*args, **kwds) + self._pool.add_task( + trampoline, + args, + kwds, + self.on_success, + self.on_error, + ) + + +class AutoSignal(object): + + def __init__(self, toplevel): + self.__disconnectPool = [] + toplevel.connect("destroy", self.__on_destroy) + + def connect_auto(self, widget, *args): + id = widget.connect(*args) + self.__disconnectPool.append((widget, id)) + + @misc.log_exception(_moduleLogger) + def __on_destroy(self, widget): + _moduleLogger.info("Destroy: %r (%s to clean up)" % (self, len(self.__disconnectPool))) + for widget, id in self.__disconnectPool: + widget.disconnect(id) + del self.__disconnectPool[:] def throttled(minDelay, queue):