What I said was 0.8.15 was really 0.8.16
[theonering] / src / util / go_utils.py
index 38e20c2..46805ab 100644 (file)
@@ -168,7 +168,7 @@ class AsyncPool(object):
                try:
                        callback(result)
                except Exception:
-                       pass
+                       _moduleLogger.exception("Callback errored")
                return False
 
        @misc.log_exception(_moduleLogger)
@@ -183,45 +183,65 @@ class AsyncPool(object):
                                result = func(*args, **kwds)
                                isError = False
                        except Exception, e:
+                               _moduleLogger.error("Error, passing it back to the main thread")
                                result = e
                                isError = True
                        self.__workQueue.task_done()
 
                        gobject.idle_add(self.__trampoline_callback, on_success, on_error, isError, result)
+               _moduleLogger.debug("Shutting down worker thread")
 
 
-class LinearExecution(object):
+class AsyncLinearExecution(object):
 
-       def __init__(self, func):
+       def __init__(self, pool, func):
+               self._pool = pool
                self._func = func
                self._run = None
 
        def start(self, *args, **kwds):
                assert self._run is None
-               kwds["on_success"] = self.on_success
-               kwds["on_error"] = self.on_error
                self._run = self._func(*args, **kwds)
                trampoline, args, kwds = self._run.send(None) # priming the function
-               trampoline(*args, **kwds)
+               self._pool.add_task(
+                       trampoline,
+                       args,
+                       kwds,
+                       self.on_success,
+                       self.on_error,
+               )
 
        @misc.log_exception(_moduleLogger)
-       def on_success(self, *args, **kwds):
-               assert not kwds
+       def on_success(self, result):
+               #_moduleLogger.debug("Processing success for: %r", self._func)
                try:
-                       trampoline, args, kwds = self._run.send(args)
+                       trampoline, args, kwds = self._run.send(result)
                except StopIteration, e:
                        pass
                else:
-                       trampoline(*args, **kwds)
+                       self._pool.add_task(
+                               trampoline,
+                               args,
+                               kwds,
+                               self.on_success,
+                               self.on_error,
+                       )
 
        @misc.log_exception(_moduleLogger)
        def on_error(self, error):
+               #_moduleLogger.debug("Processing error for: %r", self._func)
                try:
                        trampoline, args, kwds = self._run.throw(error)
                except StopIteration, e:
                        pass
                else:
-                       trampoline(*args, **kwds)
+                       self._pool.add_task(
+                               trampoline,
+                               args,
+                               kwds,
+                               self.on_success,
+                               self.on_error,
+                       )
 
 
 def throttled(minDelay, queue):