Doing a better job of managing the lifetime of my objects
authorEd Page <eopage@byu.net>
Fri, 25 Mar 2011 01:30:00 +0000 (20:30 -0500)
committerEd Page <eopage@byu.net>
Tue, 19 Apr 2011 23:49:32 +0000 (18:49 -0500)
src/session.py
src/util/concurrent.py
src/util/go_utils.py
src/util/qore_utils.py

index 4b8a4bf..b1a2fc3 100644 (file)
@@ -48,11 +48,11 @@ class Draft(QtCore.QObject):
 
        recipientsChanged = qt_compat.Signal()
 
 
        recipientsChanged = qt_compat.Signal()
 
-       def __init__(self, pool, backend, errorLog):
+       def __init__(self, asyncQueue, backend, errorLog):
                QtCore.QObject.__init__(self)
                self._errorLog = errorLog
                self._contacts = {}
                QtCore.QObject.__init__(self)
                self._errorLog = errorLog
                self._contacts = {}
-               self._pool = pool
+               self._asyncQueue = asyncQueue
                self._backend = backend
                self._busyReason = None
                self._message = ""
                self._backend = backend
                self._busyReason = None
                self._message = ""
@@ -61,7 +61,7 @@ class Draft(QtCore.QObject):
                assert 0 < len(self._contacts), "No contacts selected"
                assert 0 < len(self._message), "No message to send"
                numbers = [misc_utils.make_ugly(contact.selectedNumber) for contact in self._contacts.itervalues()]
                assert 0 < len(self._contacts), "No contacts selected"
                assert 0 < len(self._message), "No message to send"
                numbers = [misc_utils.make_ugly(contact.selectedNumber) for contact in self._contacts.itervalues()]
-               le = concurrent.AsyncLinearExecution(self._pool, self._send)
+               le = self._asyncQueue.add_async(self._send)
                le.start(numbers, self._message)
 
        def call(self):
                le.start(numbers, self._message)
 
        def call(self):
@@ -69,11 +69,11 @@ class Draft(QtCore.QObject):
                assert len(self._message) == 0, "Cannot send message with call"
                (contact, ) = self._contacts.itervalues()
                number = misc_utils.make_ugly(contact.selectedNumber)
                assert len(self._message) == 0, "Cannot send message with call"
                (contact, ) = self._contacts.itervalues()
                number = misc_utils.make_ugly(contact.selectedNumber)
-               le = concurrent.AsyncLinearExecution(self._pool, self._call)
+               le = self._asyncQueue.add_async(self._call)
                le.start(number)
 
        def cancel(self):
                le.start(number)
 
        def cancel(self):
-               le = concurrent.AsyncLinearExecution(self._pool, self._cancel)
+               le = self._asyncQueue.add_async(self._cancel)
                le.start()
 
        def _get_message(self):
                le.start()
 
        def _get_message(self):
@@ -234,7 +234,8 @@ class Session(QtCore.QObject):
        def __init__(self, errorLog, cachePath = None):
                QtCore.QObject.__init__(self)
                self._errorLog = errorLog
        def __init__(self, errorLog, cachePath = None):
                QtCore.QObject.__init__(self)
                self._errorLog = errorLog
-               self._pool = qore_utils.AsyncPool()
+               self._pool = qore_utils.FutureThread()
+               self._asyncQueue = concurrent.AsyncTaskQueue(self._pool)
                self._backend = []
                self._loggedInTime = self._LOGGEDOUT_TIME
                self._loginOps = []
                self._backend = []
                self._loggedInTime = self._LOGGEDOUT_TIME
                self._loginOps = []
@@ -242,7 +243,7 @@ class Session(QtCore.QObject):
                self._voicemailCachePath = None
                self._username = None
                self._password = None
                self._voicemailCachePath = None
                self._username = None
                self._password = None
-               self._draft = Draft(self._pool, self._backend, self._errorLog)
+               self._draft = Draft(self._asyncQueue, self._backend, self._errorLog)
                self._delayedRelogin = QtCore.QTimer()
                self._delayedRelogin.setInterval(0)
                self._delayedRelogin.setSingleShot(True)
                self._delayedRelogin = QtCore.QTimer()
                self._delayedRelogin.setInterval(0)
                self._delayedRelogin.setSingleShot(True)
@@ -283,7 +284,7 @@ class Session(QtCore.QObject):
                        self._backend[0:0] = [gv_backend.GVDialer(cookiePath)]
 
                self._pool.start()
                        self._backend[0:0] = [gv_backend.GVDialer(cookiePath)]
 
                self._pool.start()
-               le = concurrent.AsyncLinearExecution(self._pool, self._login)
+               le = self._asyncQueue.add_async(self._login)
                le.start(username, password)
 
        def logout(self):
                le.start(username, password)
 
        def logout(self):
@@ -316,11 +317,11 @@ class Session(QtCore.QObject):
        def update_account(self, force = True):
                if not force and self._contacts:
                        return
        def update_account(self, force = True):
                if not force and self._contacts:
                        return
-               le = concurrent.AsyncLinearExecution(self._pool, self._update_account), (), {}
+               le = self._asyncQueue.add_async(self._update_account), (), {}
                self._perform_op_while_loggedin(le)
 
        def refresh_connection(self):
                self._perform_op_while_loggedin(le)
 
        def refresh_connection(self):
-               le = concurrent.AsyncLinearExecution(self._pool, self._refresh_authentication)
+               le = self._asyncQueue.add_async(self._refresh_authentication)
                le.start()
 
        def get_contacts(self):
                le.start()
 
        def get_contacts(self):
@@ -332,7 +333,7 @@ class Session(QtCore.QObject):
        def update_messages(self, messageType, force = True):
                if not force and self._messages:
                        return
        def update_messages(self, messageType, force = True):
                if not force and self._messages:
                        return
-               le = concurrent.AsyncLinearExecution(self._pool, self._update_messages), (messageType, ), {}
+               le = self._asyncQueue.add_async(self._update_messages), (messageType, ), {}
                self._perform_op_while_loggedin(le)
 
        def get_messages(self):
                self._perform_op_while_loggedin(le)
 
        def get_messages(self):
@@ -344,7 +345,7 @@ class Session(QtCore.QObject):
        def update_history(self, historyType, force = True):
                if not force and self._history:
                        return
        def update_history(self, historyType, force = True):
                if not force and self._history:
                        return
-               le = concurrent.AsyncLinearExecution(self._pool, self._update_history), (historyType, ), {}
+               le = self._asyncQueue.add_async(self._update_history), (historyType, ), {}
                self._perform_op_while_loggedin(le)
 
        def get_history(self):
                self._perform_op_while_loggedin(le)
 
        def get_history(self):
@@ -354,11 +355,11 @@ class Session(QtCore.QObject):
                return self._historyUpdateTime
 
        def update_dnd(self):
                return self._historyUpdateTime
 
        def update_dnd(self):
-               le = concurrent.AsyncLinearExecution(self._pool, self._update_dnd), (), {}
+               le = self._asyncQueue.add_async(self._update_dnd), (), {}
                self._perform_op_while_loggedin(le)
 
        def set_dnd(self, dnd):
                self._perform_op_while_loggedin(le)
 
        def set_dnd(self, dnd):
-               le = concurrent.AsyncLinearExecution(self._pool, self._set_dnd)
+               le = self._asyncQueue.add_async(self._set_dnd)
                le.start(dnd)
 
        def is_available(self, messageId):
                le.start(dnd)
 
        def is_available(self, messageId):
@@ -372,7 +373,7 @@ class Session(QtCore.QObject):
                return actualPath
 
        def download_voicemail(self, messageId):
                return actualPath
 
        def download_voicemail(self, messageId):
-               le = concurrent.AsyncLinearExecution(self._pool, self._download_voicemail)
+               le = self._asyncQueue.add_async(self._download_voicemail)
                le.start(messageId)
 
        def _set_dnd(self, dnd):
                le.start(messageId)
 
        def _set_dnd(self, dnd):
@@ -410,7 +411,7 @@ class Session(QtCore.QObject):
                return self._callback
 
        def set_callback_number(self, callback):
                return self._callback
 
        def set_callback_number(self, callback):
-               le = concurrent.AsyncLinearExecution(self._pool, self._set_callback_number)
+               le = self._asyncQueue.add_async(self._set_callback_number)
                le.start(callback)
 
        def _set_callback_number(self, callback):
                le.start(callback)
 
        def _set_callback_number(self, callback):
index a6499fe..4d31468 100644 (file)
@@ -15,12 +15,33 @@ import misc
 _moduleLogger = logging.getLogger(__name__)
 
 
 _moduleLogger = logging.getLogger(__name__)
 
 
-class AsyncLinearExecution(object):
+class AsyncTaskQueue(object):
+
+       def __init__(self, taskPool):
+               self._asyncs = []
+               self._taskPool = taskPool
+
+       def add_async(self, func):
+               self.flush()
+               a = _AsyncGeneratorTask(self._taskPool, func)
+               self._asyncs.append(a)
+               return a
+
+       def flush(self):
+               self._asyncs = [a for a in self._asyncs if not a.isDone]
+
+
+class _AsyncGeneratorTask(object):
 
        def __init__(self, pool, func):
                self._pool = pool
                self._func = func
                self._run = None
 
        def __init__(self, pool, func):
                self._pool = pool
                self._func = func
                self._run = None
+               self._isDone = False
+
+       @property
+       def isDone(self):
+               return self._isDone
 
        def start(self, *args, **kwds):
                assert self._run is None, "Task already started"
 
        def start(self, *args, **kwds):
                assert self._run is None, "Task already started"
@@ -40,7 +61,7 @@ class AsyncLinearExecution(object):
                try:
                        trampoline, args, kwds = self._run.send(result)
                except StopIteration, e:
                try:
                        trampoline, args, kwds = self._run.send(result)
                except StopIteration, e:
-                       pass
+                       self._isDone = True
                else:
                        self._pool.add_task(
                                trampoline,
                else:
                        self._pool.add_task(
                                trampoline,
@@ -56,7 +77,7 @@ class AsyncLinearExecution(object):
                try:
                        trampoline, args, kwds = self._run.throw(error)
                except StopIteration, e:
                try:
                        trampoline, args, kwds = self._run.throw(error)
                except StopIteration, e:
-                       pass
+                       self._isDone = True
                else:
                        self._pool.add_task(
                                trampoline,
                else:
                        self._pool.add_task(
                                trampoline,
index eaa2fe1..61e731d 100644 (file)
@@ -138,7 +138,7 @@ class Timeout(object):
 _QUEUE_EMPTY = object()
 
 
 _QUEUE_EMPTY = object()
 
 
-class AsyncPool(object):
+class FutureThread(object):
 
        def __init__(self):
                self.__workQueue = Queue.Queue()
 
        def __init__(self):
                self.__workQueue = Queue.Queue()
index e9b8c49..654a87a 100644 (file)
@@ -25,15 +25,15 @@ class QThread44(QtCore.QThread):
 
 class _ParentThread(QtCore.QObject):
 
 
 class _ParentThread(QtCore.QObject):
 
-       def __init__(self, pool):
+       def __init__(self, futureThread):
                QtCore.QObject.__init__(self)
                QtCore.QObject.__init__(self)
-               self._pool = pool
+               self._futureThread = futureThread
 
        @qt_compat.Slot(object)
        @misc.log_exception(_moduleLogger)
        def _on_task_complete(self, taskResult):
                on_success, on_error, isError, result = taskResult
 
        @qt_compat.Slot(object)
        @misc.log_exception(_moduleLogger)
        def _on_task_complete(self, taskResult):
                on_success, on_error, isError, result = taskResult
-               if not self._pool._isRunning:
+               if not self._futureThread._isRunning:
                        if isError:
                                _moduleLogger.error("Masking: %s" % (result, ))
                        isError = True
                        if isError:
                                _moduleLogger.error("Masking: %s" % (result, ))
                        isError = True
@@ -49,14 +49,14 @@ class _WorkerThread(QtCore.QObject):
 
        taskComplete  = qt_compat.Signal(object)
 
 
        taskComplete  = qt_compat.Signal(object)
 
-       def __init__(self, pool):
+       def __init__(self, futureThread):
                QtCore.QObject.__init__(self)
                QtCore.QObject.__init__(self)
-               self._pool = pool
+               self._futureThread = futureThread
 
        @qt_compat.Slot(object)
        @misc.log_exception(_moduleLogger)
        def _on_task_added(self, task):
 
        @qt_compat.Slot(object)
        @misc.log_exception(_moduleLogger)
        def _on_task_added(self, task):
-               if not self._pool._isRunning:
+               if not self._futureThread._isRunning:
                        _moduleLogger.error("Dropping task")
 
                func, args, kwds, on_success, on_error = task
                        _moduleLogger.error("Dropping task")
 
                func, args, kwds, on_success, on_error = task
@@ -75,13 +75,13 @@ class _WorkerThread(QtCore.QObject):
        @qt_compat.Slot()
        @misc.log_exception(_moduleLogger)
        def _on_stop_requested(self):
        @qt_compat.Slot()
        @misc.log_exception(_moduleLogger)
        def _on_stop_requested(self):
-               self._pool._thread.quit()
+               self._futureThread._thread.quit()
 
 
 
 
-class AsyncPool(QtCore.QObject):
+class FutureThread(QtCore.QObject):
 
        _addTask = qt_compat.Signal(object)
 
        _addTask = qt_compat.Signal(object)
-       _stopPool = qt_compat.Signal()
+       _stopFutureThread = qt_compat.Signal()
 
        def __init__(self):
                QtCore.QObject.__init__(self)
 
        def __init__(self):
                QtCore.QObject.__init__(self)
@@ -93,7 +93,7 @@ class AsyncPool(QtCore.QObject):
 
                self._addTask.connect(self._worker._on_task_added)
                self._worker.taskComplete.connect(self._parent._on_task_complete)
 
                self._addTask.connect(self._worker._on_task_added)
                self._worker.taskComplete.connect(self._parent._on_task_complete)
-               self._stopPool.connect(self._worker._on_stop_requested)
+               self._stopFutureThread.connect(self._worker._on_stop_requested)
 
        def start(self):
                self._thread.start()
 
        def start(self):
                self._thread.start()
@@ -101,7 +101,7 @@ class AsyncPool(QtCore.QObject):
 
        def stop(self):
                self._isRunning = False
 
        def stop(self):
                self._isRunning = False
-               self._stopPool.emit()
+               self._stopFutureThread.emit()
 
        def add_task(self, func, args, kwds, on_success, on_error):
                assert self._isRunning, "Task queue not started"
 
        def add_task(self, func, args, kwds, on_success, on_error):
                assert self._isRunning, "Task queue not started"