From: Ed Page Date: Fri, 26 Mar 2010 12:00:51 +0000 (-0500) Subject: Adding support for coroutines for async ops through trampolines X-Git-Url: http://git.maemo.org/git/?p=theonering;a=commitdiff_plain;h=887113c5927f329599cc7044c7b689d1134f0dac Adding support for coroutines for async ops through trampolines --- diff --git a/src/util/go_utils.py b/src/util/go_utils.py index b3682ba..38e20c2 100644 --- a/src/util/go_utils.py +++ b/src/util/go_utils.py @@ -158,14 +158,17 @@ class AsyncPool(object): self.__workQueue.put(task) @misc.log_exception(_moduleLogger) - def __trampoline_callback(self, callback, result): - if self.__isRunning: - try: - callback(result) - except Exception: - pass - else: - _moduleLogger.info("Blocked call to %r" % (callback, )) + def __trampoline_callback(self, on_success, on_error, isError, result): + if not self.__isRunning: + if isError: + _moduleLogger.error("Masking: %s" % (result, )) + isError = True + result = StopIteration("Cancelling all callbacks") + callback = on_success if not isError else on_error + try: + callback(result) + except Exception: + pass return False @misc.log_exception(_moduleLogger) @@ -184,8 +187,41 @@ class AsyncPool(object): isError = True self.__workQueue.task_done() - callback = on_success if not isError else on_error - gobject.idle_add(self.__trampoline_callback, callback, result) + gobject.idle_add(self.__trampoline_callback, on_success, on_error, isError, result) + + +class LinearExecution(object): + + def __init__(self, func): + self._func = func + self._run = None + + def start(self, *args, **kwds): + assert self._run is None + kwds["on_success"] = self.on_success + kwds["on_error"] = self.on_error + self._run = self._func(*args, **kwds) + trampoline, args, kwds = self._run.send(None) # priming the function + trampoline(*args, **kwds) + + @misc.log_exception(_moduleLogger) + def on_success(self, *args, **kwds): + assert not kwds + try: + trampoline, args, kwds = self._run.send(args) + except StopIteration, e: + pass + else: + trampoline(*args, **kwds) + + @misc.log_exception(_moduleLogger) + def on_error(self, error): + try: + trampoline, args, kwds = self._run.throw(error) + except StopIteration, e: + pass + else: + trampoline(*args, **kwds) def throttled(minDelay, queue):