Starting on threading work
authorEd Page <epage@wolverstone.(none)>
Sat, 11 Sep 2010 05:07:36 +0000 (00:07 -0500)
committerEd Page <epage@wolverstone.(none)>
Sat, 11 Sep 2010 05:07:36 +0000 (00:07 -0500)
src/dialcentral_qt.py
src/session.py
src/util/concurrent.py
src/util/go_utils.py
src/util/qore_utils.py [new file with mode: 0644]
src/util/qui_utils.py [new file with mode: 0644]

index 4812c01..a9f529c 100755 (executable)
@@ -1015,7 +1015,8 @@ def run():
 
 
 if __name__ == "__main__":
-       logging.basicConfig(level = logging.DEBUG)
+       logFormat = '(%(asctime)s) %(levelname)-5s %(threadName)s.%(name)s: %(message)s'
+       logging.basicConfig(level=logging.DEBUG, format=logFormat)
        try:
                os.makedirs(constants._data_path_)
        except OSError, e:
index 5c7f5f0..5802f52 100644 (file)
@@ -3,6 +3,8 @@ import logging
 
 from PyQt4 import QtCore
 
+from util import qore_utils
+from util import concurrent
 
 _moduleLogger = logging.getLogger(__name__)
 
@@ -19,8 +21,9 @@ class Draft(QtCore.QObject):
 
        recipientsChanged = QtCore.pyqtSignal()
 
-       def __init__(self):
+       def __init__(self, pool):
                self._contacts = {}
+               self._pool = pool
 
        def send(self, text):
                assert 0 < len(self._contacts)
@@ -79,11 +82,12 @@ class Session(QtCore.QObject):
 
        def __init__(self, cachePath = None):
                QtCore.QObject.__init__(self)
+               self._pool = qore_utils.AsyncPool()
                self._loggedInTime = self._LOGGEDOUT_TIME
                self._loginOps = []
                self._cachePath = cachePath
                self._username = None
-               self._draft = Draft()
+               self._draft = Draft(self._pool)
 
                self._contacts = []
                self._messages = []
@@ -108,6 +112,7 @@ class Session(QtCore.QObject):
                else:
                        cookiePath = None
 
+               self._pool.start()
                self.error.emit("Not Implemented")
 
                # if the username is the same, do nothing
@@ -116,6 +121,7 @@ class Session(QtCore.QObject):
 
        def logout(self):
                assert self.state != self.LOGGEDOUT_STATE
+               self._pool.stop()
                self.error.emit("Not Implemented")
 
        def clear(self):
@@ -169,6 +175,8 @@ class Session(QtCore.QObject):
                self.error.emit("Not Implemented")
 
        def _update_contacts(self):
+               le = concurrent.AsyncLinearExecution(self._asyncPool, self._login)
+               le.start()
                self.error.emit("Not Implemented")
 
        def _update_messages(self):
index 503a1b4..812de24 100644 (file)
@@ -7,6 +7,64 @@ import errno
 import time
 import functools
 import contextlib
+import logging
+
+import misc
+
+
+_moduleLogger = logging.getLogger(__name__)
+
+
+class AsyncLinearExecution(object):
+
+       def __init__(self, pool, func):
+               self._pool = pool
+               self._func = func
+               self._run = None
+
+       def start(self, *args, **kwds):
+               assert self._run is None
+               self._run = self._func(*args, **kwds)
+               trampoline, args, kwds = self._run.send(None) # priming the function
+               self._pool.add_task(
+                       trampoline,
+                       args,
+                       kwds,
+                       self.on_success,
+                       self.on_error,
+               )
+
+       @misc.log_exception(_moduleLogger)
+       def on_success(self, result):
+               _moduleLogger.debug("Processing success for: %r", self._func)
+               try:
+                       trampoline, args, kwds = self._run.send(result)
+               except StopIteration, e:
+                       pass
+               else:
+                       self._pool.add_task(
+                               trampoline,
+                               args,
+                               kwds,
+                               self.on_success,
+                               self.on_error,
+                       )
+
+       @misc.log_exception(_moduleLogger)
+       def on_error(self, error):
+               _moduleLogger.debug("Processing error for: %r", self._func)
+               try:
+                       trampoline, args, kwds = self._run.throw(error)
+               except StopIteration, e:
+                       pass
+               else:
+                       self._pool.add_task(
+                               trampoline,
+                               args,
+                               kwds,
+                               self.on_success,
+                               self.on_error,
+                       )
 
 
 def synchronized(lock):
index d066542..eaa2fe1 100644 (file)
@@ -200,58 +200,6 @@ class AsyncPool(object):
                _moduleLogger.debug("Shutting down worker thread")
 
 
-class AsyncLinearExecution(object):
-
-       def __init__(self, pool, func):
-               self._pool = pool
-               self._func = func
-               self._run = None
-
-       def start(self, *args, **kwds):
-               assert self._run is None
-               self._run = self._func(*args, **kwds)
-               trampoline, args, kwds = self._run.send(None) # priming the function
-               self._pool.add_task(
-                       trampoline,
-                       args,
-                       kwds,
-                       self.on_success,
-                       self.on_error,
-               )
-
-       @misc.log_exception(_moduleLogger)
-       def on_success(self, result):
-               _moduleLogger.debug("Processing success for: %r", self._func)
-               try:
-                       trampoline, args, kwds = self._run.send(result)
-               except StopIteration, e:
-                       pass
-               else:
-                       self._pool.add_task(
-                               trampoline,
-                               args,
-                               kwds,
-                               self.on_success,
-                               self.on_error,
-                       )
-
-       @misc.log_exception(_moduleLogger)
-       def on_error(self, error):
-               _moduleLogger.debug("Processing error for: %r", self._func)
-               try:
-                       trampoline, args, kwds = self._run.throw(error)
-               except StopIteration, e:
-                       pass
-               else:
-                       self._pool.add_task(
-                               trampoline,
-                               args,
-                               kwds,
-                               self.on_success,
-                               self.on_error,
-                       )
-
-
 class AutoSignal(object):
 
        def __init__(self, toplevel):
diff --git a/src/util/qore_utils.py b/src/util/qore_utils.py
new file mode 100644 (file)
index 0000000..f7d9dc5
--- /dev/null
@@ -0,0 +1,80 @@
+import logging
+
+from PyQt4 import QtCore
+
+import misc
+
+
+_moduleLogger = logging.getLogger(__name__)
+
+
+class ParentThreadSignals(QtCore.QObject):
+
+       taskComplete  = QtCore.pyqtSignal(object)
+
+
+class WorkerThreadSignals(QtCore.QObject):
+
+       addTask = QtCore.pyqtSignal(object)
+
+
+class AsyncPool(QtCore.QObject):
+
+       def __init__(self):
+               _moduleLogger.info("main?")
+               self._thread = QtCore.QThread()
+               self._isRunning = True
+               self._parent = ParentThreadSignals()
+               self._parent.taskComplete.connect(self._on_task_complete)
+               self._worker = WorkerThreadSignals()
+               self._worker.moveToThread(self._thread)
+               self._worker.addTask.connect(self._on_task_added)
+
+       def start(self):
+               _moduleLogger.info("main?")
+               self._thread.exec_()
+
+       def stop(self):
+               _moduleLogger.info("main?")
+               self._isRunning = False
+
+       def add_task(self, func, args, kwds, on_success, on_error):
+               _moduleLogger.info("main?")
+               assert self._isRunning
+               task = func, args, kwds, on_success, on_error
+               self._worker.addTask.emit(task)
+
+       @misc.log_exception(_moduleLogger)
+       def _on_task_added(self, task):
+               _moduleLogger.info("worker?")
+               if not self._isRunning:
+                       _moduleLogger.error("Dropping task")
+
+               func, args, kwds, on_success, on_error = task
+
+               try:
+                       result = func(*args, **kwds)
+                       isError = False
+               except Exception, e:
+                       _moduleLogger.error("Error, passing it back to the main thread")
+                       result = e
+                       isError = True
+
+               taskResult = on_success, on_error, isError, result
+               self._parent.taskComplete.emit(taskResult)
+
+       @misc.log_exception(_moduleLogger)
+       def _on_task_complete(self, taskResult):
+               _moduleLogger.info("main?")
+               on_success, on_error, isError, result = taskResult
+               if not self._isRunning:
+                       if isError:
+                               _moduleLogger.error("Masking: %s" % (result, ))
+                       isError = True
+                       result = StopIteration("Cancelling all callbacks")
+               callback = on_success if not isError else on_error
+               try:
+                       callback(result)
+               except Exception:
+                       _moduleLogger.exception("Callback errored")
+               return False
diff --git a/src/util/qui_utils.py b/src/util/qui_utils.py
new file mode 100644 (file)
index 0000000..e69de29