X-Git-Url: http://git.maemo.org/git/?p=theonering;a=blobdiff_plain;f=src%2Futil%2Fgo_utils.py;h=97d671c6c639d5ac83eba0032e9bc78e5e6db7a3;hp=2753ec87c645d8c84d2979cb8a550b27ea52a8e2;hb=4beea848e2ea4297c8e223637f38bc3e529dec0d;hpb=934531c83d4dc81dbd8a831478ff40f405c633ef diff --git a/src/util/go_utils.py b/src/util/go_utils.py index 2753ec8..97d671c 100644 --- a/src/util/go_utils.py +++ b/src/util/go_utils.py @@ -4,9 +4,38 @@ from __future__ import with_statement import time import functools +import threading +import Queue +import logging import gobject +import algorithms +import misc + + +_moduleLogger = logging.getLogger(__name__) + + +def make_idler(func): + """ + Decorator that makes a generator-function into a function that will continue execution on next call + """ + a = [] + + @functools.wraps(func) + def decorated_func(*args, **kwds): + if not a: + a.append(func(*args, **kwds)) + try: + a[0].next() + return True + except StopIteration: + del a[:] + return False + + return decorated_func + def async(func): """ @@ -29,6 +58,218 @@ def async(func): return new_function +class Async(object): + + def __init__(self, func, once = True): + self.__func = func + self.__idleId = None + self.__once = once + + def start(self): + assert self.__idleId is None + if self.__once: + self.__idleId = gobject.idle_add(self._on_once) + else: + self.__idleId = gobject.idle_add(self.__func) + + def is_running(self): + return self.__idleId is not None + + def cancel(self): + if self.__idleId is not None: + gobject.source_remove(self.__idleId) + self.__idleId = None + + def __call__(self): + return self.start() + + @misc.log_exception(_moduleLogger) + def _on_once(self): + self.cancel() + try: + self.__func() + except Exception: + pass + return False + + +class Timeout(object): + + def __init__(self, func, once = True): + self.__func = func + self.__timeoutId = None + self.__once = once + + def start(self, **kwds): + assert self.__timeoutId is None + + callback = self._on_once if self.__once else self.__func + + assert len(kwds) == 1 + timeoutInSeconds = kwds["seconds"] + assert 0 <= timeoutInSeconds + + if timeoutInSeconds == 0: + self.__timeoutId = gobject.idle_add(callback) + else: + self.__timeoutId = timeout_add_seconds(timeoutInSeconds, callback) + + def is_running(self): + return self.__timeoutId is not None + + def cancel(self): + if self.__timeoutId is not None: + gobject.source_remove(self.__timeoutId) + self.__timeoutId = None + + def __call__(self, **kwds): + return self.start(**kwds) + + @misc.log_exception(_moduleLogger) + def _on_once(self): + self.cancel() + try: + self.__func() + except Exception: + pass + return False + + +_QUEUE_EMPTY = object() + + +class AsyncPool(object): + + def __init__(self): + self.__workQueue = Queue.Queue() + self.__thread = threading.Thread( + name = type(self).__name__, + target = self.__consume_queue, + ) + self.__isRunning = True + + def start(self): + self.__thread.start() + + def stop(self): + self.__isRunning = False + for _ in algorithms.itr_available(self.__workQueue): + pass # eat up queue to cut down dumb work + self.__workQueue.put(_QUEUE_EMPTY) + + def clear_tasks(self): + for _ in algorithms.itr_available(self.__workQueue): + pass # eat up queue to cut down dumb work + + def add_task(self, func, args, kwds, on_success, on_error): + task = func, args, kwds, on_success, on_error + self.__workQueue.put(task) + + @misc.log_exception(_moduleLogger) + def __trampoline_callback(self, on_success, on_error, isError, result): + if not self.__isRunning: + if isError: + _moduleLogger.error("Masking: %s" % (result, )) + isError = True + result = StopIteration("Cancelling all callbacks") + callback = on_success if not isError else on_error + try: + callback(result) + except Exception: + _moduleLogger.exception("Callback errored") + return False + + @misc.log_exception(_moduleLogger) + def __consume_queue(self): + while True: + task = self.__workQueue.get() + if task is _QUEUE_EMPTY: + break + func, args, kwds, on_success, on_error = task + + try: + result = func(*args, **kwds) + isError = False + except Exception, e: + _moduleLogger.exception("Error, passing it back to the main thread") + result = e + isError = True + self.__workQueue.task_done() + + gobject.idle_add(self.__trampoline_callback, on_success, on_error, isError, result) + _moduleLogger.debug("Shutting down worker thread") + + +class AsyncLinearExecution(object): + + def __init__(self, pool, func): + self._pool = pool + self._func = func + self._run = None + + def start(self, *args, **kwds): + assert self._run is None + self._run = self._func(*args, **kwds) + trampoline, args, kwds = self._run.send(None) # priming the function + self._pool.add_task( + trampoline, + args, + kwds, + self.on_success, + self.on_error, + ) + + @misc.log_exception(_moduleLogger) + def on_success(self, result): + #_moduleLogger.debug("Processing success for: %r", self._func) + try: + trampoline, args, kwds = self._run.send(result) + except StopIteration, e: + pass + else: + self._pool.add_task( + trampoline, + args, + kwds, + self.on_success, + self.on_error, + ) + + @misc.log_exception(_moduleLogger) + def on_error(self, error): + #_moduleLogger.debug("Processing error for: %r", self._func) + try: + trampoline, args, kwds = self._run.throw(error) + except StopIteration, e: + pass + else: + self._pool.add_task( + trampoline, + args, + kwds, + self.on_success, + self.on_error, + ) + + +class AutoSignal(object): + + def __init__(self, toplevel): + self.__disconnectPool = [] + toplevel.connect("destroy", self.__on_destroy) + + def connect_auto(self, widget, *args): + id = widget.connect(*args) + self.__disconnectPool.append((widget, id)) + + @misc.log_exception(_moduleLogger) + def __on_destroy(self, widget): + _moduleLogger.info("Destroy: %r (%s to clean up)" % (self, len(self.__disconnectPool))) + for widget, id in self.__disconnectPool: + widget.disconnect(id) + del self.__disconnectPool[:] + + def throttled(minDelay, queue): """ Throttle the calls to a function by queueing all the calls that happen