Updating from skeleton
[gonvert] / src / util / concurrent.py
index a6499fe..f5f6e1d 100644 (file)
@@ -15,12 +15,33 @@ import misc
 _moduleLogger = logging.getLogger(__name__)
 
 
-class AsyncLinearExecution(object):
+class AsyncTaskQueue(object):
+
+       def __init__(self, taskPool):
+               self._asyncs = []
+               self._taskPool = taskPool
+
+       def add_async(self, func):
+               self.flush()
+               a = AsyncGeneratorTask(self._taskPool, func)
+               self._asyncs.append(a)
+               return a
+
+       def flush(self):
+               self._asyncs = [a for a in self._asyncs if not a.isDone]
+
+
+class AsyncGeneratorTask(object):
 
        def __init__(self, pool, func):
                self._pool = pool
                self._func = func
                self._run = None
+               self._isDone = False
+
+       @property
+       def isDone(self):
+               return self._isDone
 
        def start(self, *args, **kwds):
                assert self._run is None, "Task already started"
@@ -40,7 +61,7 @@ class AsyncLinearExecution(object):
                try:
                        trampoline, args, kwds = self._run.send(result)
                except StopIteration, e:
-                       pass
+                       self._isDone = True
                else:
                        self._pool.add_task(
                                trampoline,
@@ -56,7 +77,7 @@ class AsyncLinearExecution(object):
                try:
                        trampoline, args, kwds = self._run.throw(error)
                except StopIteration, e:
-                       pass
+                       self._isDone = True
                else:
                        self._pool.add_task(
                                trampoline,