X-Git-Url: http://git.maemo.org/git/?p=gonvert;a=blobdiff_plain;f=src%2Futil%2Fconcurrent.py;fp=src%2Futil%2Fconcurrent.py;h=a6499feb86118e2a3c0fd95bf96faa599a290421;hp=503a1b444013b448cd41f2b43dbbc093bc2b3c35;hb=3f92789cfd41f67f89f69801d032b98de93ba923;hpb=7c7cf777ed5a70550f8a7becca6a81515fb1e4b6 diff --git a/src/util/concurrent.py b/src/util/concurrent.py index 503a1b4..a6499fe 100644 --- a/src/util/concurrent.py +++ b/src/util/concurrent.py @@ -7,6 +7,76 @@ import errno import time import functools import contextlib +import logging + +import misc + + +_moduleLogger = logging.getLogger(__name__) + + +class AsyncLinearExecution(object): + + def __init__(self, pool, func): + self._pool = pool + self._func = func + self._run = None + + def start(self, *args, **kwds): + assert self._run is None, "Task already started" + self._run = self._func(*args, **kwds) + trampoline, args, kwds = self._run.send(None) # priming the function + self._pool.add_task( + trampoline, + args, + kwds, + self.on_success, + self.on_error, + ) + + @misc.log_exception(_moduleLogger) + def on_success(self, result): + _moduleLogger.debug("Processing success for: %r", self._func) + try: + trampoline, args, kwds = self._run.send(result) + except StopIteration, e: + pass + else: + self._pool.add_task( + trampoline, + args, + kwds, + self.on_success, + self.on_error, + ) + + @misc.log_exception(_moduleLogger) + def on_error(self, error): + _moduleLogger.debug("Processing error for: %r", self._func) + try: + trampoline, args, kwds = self._run.throw(error) + except StopIteration, e: + pass + else: + self._pool.add_task( + trampoline, + args, + kwds, + self.on_success, + self.on_error, + ) + + def __repr__(self): + return "" % (self._func.__name__, id(self)) + + def __hash__(self): + return hash(self._func) + + def __eq__(self, other): + return self._func == other._func + + def __ne__(self, other): + return self._func != other._func def synchronized(lock):