From da9af3284a8c0bf8189266e0878e7211402d69af Mon Sep 17 00:00:00 2001 From: Ed Page Date: Thu, 24 Mar 2011 20:30:00 -0500 Subject: [PATCH] Doing a better job of managing the lifetime of my objects --- src/session.py | 33 +++++++++++++++++---------------- src/util/concurrent.py | 27 ++++++++++++++++++++++++--- src/util/go_utils.py | 2 +- src/util/qore_utils.py | 22 +++++++++++----------- 4 files changed, 53 insertions(+), 31 deletions(-) diff --git a/src/session.py b/src/session.py index 4b8a4bf..b1a2fc3 100644 --- a/src/session.py +++ b/src/session.py @@ -48,11 +48,11 @@ class Draft(QtCore.QObject): recipientsChanged = qt_compat.Signal() - def __init__(self, pool, backend, errorLog): + def __init__(self, asyncQueue, backend, errorLog): QtCore.QObject.__init__(self) self._errorLog = errorLog self._contacts = {} - self._pool = pool + self._asyncQueue = asyncQueue self._backend = backend self._busyReason = None self._message = "" @@ -61,7 +61,7 @@ class Draft(QtCore.QObject): assert 0 < len(self._contacts), "No contacts selected" assert 0 < len(self._message), "No message to send" numbers = [misc_utils.make_ugly(contact.selectedNumber) for contact in self._contacts.itervalues()] - le = concurrent.AsyncLinearExecution(self._pool, self._send) + le = self._asyncQueue.add_async(self._send) le.start(numbers, self._message) def call(self): @@ -69,11 +69,11 @@ class Draft(QtCore.QObject): assert len(self._message) == 0, "Cannot send message with call" (contact, ) = self._contacts.itervalues() number = misc_utils.make_ugly(contact.selectedNumber) - le = concurrent.AsyncLinearExecution(self._pool, self._call) + le = self._asyncQueue.add_async(self._call) le.start(number) def cancel(self): - le = concurrent.AsyncLinearExecution(self._pool, self._cancel) + le = self._asyncQueue.add_async(self._cancel) le.start() def _get_message(self): @@ -234,7 +234,8 @@ class Session(QtCore.QObject): def __init__(self, errorLog, cachePath = None): QtCore.QObject.__init__(self) self._errorLog = errorLog - self._pool = qore_utils.AsyncPool() + self._pool = qore_utils.FutureThread() + self._asyncQueue = concurrent.AsyncTaskQueue(self._pool) self._backend = [] self._loggedInTime = self._LOGGEDOUT_TIME self._loginOps = [] @@ -242,7 +243,7 @@ class Session(QtCore.QObject): self._voicemailCachePath = None self._username = None self._password = None - self._draft = Draft(self._pool, self._backend, self._errorLog) + self._draft = Draft(self._asyncQueue, self._backend, self._errorLog) self._delayedRelogin = QtCore.QTimer() self._delayedRelogin.setInterval(0) self._delayedRelogin.setSingleShot(True) @@ -283,7 +284,7 @@ class Session(QtCore.QObject): self._backend[0:0] = [gv_backend.GVDialer(cookiePath)] self._pool.start() - le = concurrent.AsyncLinearExecution(self._pool, self._login) + le = self._asyncQueue.add_async(self._login) le.start(username, password) def logout(self): @@ -316,11 +317,11 @@ class Session(QtCore.QObject): def update_account(self, force = True): if not force and self._contacts: return - le = concurrent.AsyncLinearExecution(self._pool, self._update_account), (), {} + le = self._asyncQueue.add_async(self._update_account), (), {} self._perform_op_while_loggedin(le) def refresh_connection(self): - le = concurrent.AsyncLinearExecution(self._pool, self._refresh_authentication) + le = self._asyncQueue.add_async(self._refresh_authentication) le.start() def get_contacts(self): @@ -332,7 +333,7 @@ class Session(QtCore.QObject): def update_messages(self, messageType, force = True): if not force and self._messages: return - le = concurrent.AsyncLinearExecution(self._pool, self._update_messages), (messageType, ), {} + le = self._asyncQueue.add_async(self._update_messages), (messageType, ), {} self._perform_op_while_loggedin(le) def get_messages(self): @@ -344,7 +345,7 @@ class Session(QtCore.QObject): def update_history(self, historyType, force = True): if not force and self._history: return - le = concurrent.AsyncLinearExecution(self._pool, self._update_history), (historyType, ), {} + le = self._asyncQueue.add_async(self._update_history), (historyType, ), {} self._perform_op_while_loggedin(le) def get_history(self): @@ -354,11 +355,11 @@ class Session(QtCore.QObject): return self._historyUpdateTime def update_dnd(self): - le = concurrent.AsyncLinearExecution(self._pool, self._update_dnd), (), {} + le = self._asyncQueue.add_async(self._update_dnd), (), {} self._perform_op_while_loggedin(le) def set_dnd(self, dnd): - le = concurrent.AsyncLinearExecution(self._pool, self._set_dnd) + le = self._asyncQueue.add_async(self._set_dnd) le.start(dnd) def is_available(self, messageId): @@ -372,7 +373,7 @@ class Session(QtCore.QObject): return actualPath def download_voicemail(self, messageId): - le = concurrent.AsyncLinearExecution(self._pool, self._download_voicemail) + le = self._asyncQueue.add_async(self._download_voicemail) le.start(messageId) def _set_dnd(self, dnd): @@ -410,7 +411,7 @@ class Session(QtCore.QObject): return self._callback def set_callback_number(self, callback): - le = concurrent.AsyncLinearExecution(self._pool, self._set_callback_number) + le = self._asyncQueue.add_async(self._set_callback_number) le.start(callback) def _set_callback_number(self, callback): diff --git a/src/util/concurrent.py b/src/util/concurrent.py index a6499fe..4d31468 100644 --- a/src/util/concurrent.py +++ b/src/util/concurrent.py @@ -15,12 +15,33 @@ import misc _moduleLogger = logging.getLogger(__name__) -class AsyncLinearExecution(object): +class AsyncTaskQueue(object): + + def __init__(self, taskPool): + self._asyncs = [] + self._taskPool = taskPool + + def add_async(self, func): + self.flush() + a = _AsyncGeneratorTask(self._taskPool, func) + self._asyncs.append(a) + return a + + def flush(self): + self._asyncs = [a for a in self._asyncs if not a.isDone] + + +class _AsyncGeneratorTask(object): def __init__(self, pool, func): self._pool = pool self._func = func self._run = None + self._isDone = False + + @property + def isDone(self): + return self._isDone def start(self, *args, **kwds): assert self._run is None, "Task already started" @@ -40,7 +61,7 @@ class AsyncLinearExecution(object): try: trampoline, args, kwds = self._run.send(result) except StopIteration, e: - pass + self._isDone = True else: self._pool.add_task( trampoline, @@ -56,7 +77,7 @@ class AsyncLinearExecution(object): try: trampoline, args, kwds = self._run.throw(error) except StopIteration, e: - pass + self._isDone = True else: self._pool.add_task( trampoline, diff --git a/src/util/go_utils.py b/src/util/go_utils.py index eaa2fe1..61e731d 100644 --- a/src/util/go_utils.py +++ b/src/util/go_utils.py @@ -138,7 +138,7 @@ class Timeout(object): _QUEUE_EMPTY = object() -class AsyncPool(object): +class FutureThread(object): def __init__(self): self.__workQueue = Queue.Queue() diff --git a/src/util/qore_utils.py b/src/util/qore_utils.py index e9b8c49..654a87a 100644 --- a/src/util/qore_utils.py +++ b/src/util/qore_utils.py @@ -25,15 +25,15 @@ class QThread44(QtCore.QThread): class _ParentThread(QtCore.QObject): - def __init__(self, pool): + def __init__(self, futureThread): QtCore.QObject.__init__(self) - self._pool = pool + self._futureThread = futureThread @qt_compat.Slot(object) @misc.log_exception(_moduleLogger) def _on_task_complete(self, taskResult): on_success, on_error, isError, result = taskResult - if not self._pool._isRunning: + if not self._futureThread._isRunning: if isError: _moduleLogger.error("Masking: %s" % (result, )) isError = True @@ -49,14 +49,14 @@ class _WorkerThread(QtCore.QObject): taskComplete = qt_compat.Signal(object) - def __init__(self, pool): + def __init__(self, futureThread): QtCore.QObject.__init__(self) - self._pool = pool + self._futureThread = futureThread @qt_compat.Slot(object) @misc.log_exception(_moduleLogger) def _on_task_added(self, task): - if not self._pool._isRunning: + if not self._futureThread._isRunning: _moduleLogger.error("Dropping task") func, args, kwds, on_success, on_error = task @@ -75,13 +75,13 @@ class _WorkerThread(QtCore.QObject): @qt_compat.Slot() @misc.log_exception(_moduleLogger) def _on_stop_requested(self): - self._pool._thread.quit() + self._futureThread._thread.quit() -class AsyncPool(QtCore.QObject): +class FutureThread(QtCore.QObject): _addTask = qt_compat.Signal(object) - _stopPool = qt_compat.Signal() + _stopFutureThread = qt_compat.Signal() def __init__(self): QtCore.QObject.__init__(self) @@ -93,7 +93,7 @@ class AsyncPool(QtCore.QObject): self._addTask.connect(self._worker._on_task_added) self._worker.taskComplete.connect(self._parent._on_task_complete) - self._stopPool.connect(self._worker._on_stop_requested) + self._stopFutureThread.connect(self._worker._on_stop_requested) def start(self): self._thread.start() @@ -101,7 +101,7 @@ class AsyncPool(QtCore.QObject): def stop(self): self._isRunning = False - self._stopPool.emit() + self._stopFutureThread.emit() def add_task(self, func, args, kwds, on_success, on_error): assert self._isRunning, "Task queue not started" -- 1.7.9.5