3 from PyQt4 import QtCore
8 _moduleLogger = logging.getLogger(__name__)
11 class QThread44(QtCore.QThread):
13 This is to imitate QThread in Qt 4.4+ for when running on older version
14 See http://labs.trolltech.com/blogs/2010/06/17/youre-doing-it-wrong
15 (On Lucid I have Qt 4.7 and this is still an issue)
18 def __init__(self, parent = None):
19 QtCore.QThread.__init__(self, parent)
25 class _ParentThread(QtCore.QObject):
27 def __init__(self, pool):
28 QtCore.QObject.__init__(self)
31 @QtCore.pyqtSlot(object)
32 @misc.log_exception(_moduleLogger)
33 def _on_task_complete(self, taskResult):
34 on_success, on_error, isError, result = taskResult
35 if not self._pool._isRunning:
37 _moduleLogger.error("Masking: %s" % (result, ))
39 result = StopIteration("Cancelling all callbacks")
40 callback = on_success if not isError else on_error
44 _moduleLogger.exception("Callback errored")
47 class _WorkerThread(QtCore.QObject):
49 taskComplete = QtCore.pyqtSignal(object)
51 def __init__(self, pool):
52 QtCore.QObject.__init__(self)
55 @QtCore.pyqtSlot(object)
56 @misc.log_exception(_moduleLogger)
57 def _on_task_added(self, task):
58 if not self._pool._isRunning:
59 _moduleLogger.error("Dropping task")
61 func, args, kwds, on_success, on_error = task
64 result = func(*args, **kwds)
67 _moduleLogger.error("Error, passing it back to the main thread")
71 taskResult = on_success, on_error, isError, result
72 self.taskComplete.emit(taskResult)
75 @misc.log_exception(_moduleLogger)
76 def _on_stop_requested(self):
77 self._pool._thread.quit()
80 class AsyncPool(QtCore.QObject):
82 _addTask = QtCore.pyqtSignal(object)
83 _stopPool = QtCore.pyqtSignal()
86 QtCore.QObject.__init__(self)
87 self._thread = QThread44()
88 self._isRunning = True
89 self._parent = _ParentThread(self)
90 self._worker = _WorkerThread(self)
91 self._worker.moveToThread(self._thread)
93 self._addTask.connect(self._worker._on_task_added)
94 self._worker.taskComplete.connect(self._parent._on_task_complete)
95 self._stopPool.connect(self._worker._on_stop_requested)
101 self._isRunning = False
102 self._stopPool.emit()
104 def add_task(self, func, args, kwds, on_success, on_error):
105 assert self._isRunning
106 task = func, args, kwds, on_success, on_error
107 self._addTask.emit(task)