From: Ed Page Date: Sat, 11 Sep 2010 05:07:36 +0000 (-0500) Subject: Starting on threading work X-Git-Url: http://git.maemo.org/git/?p=gc-dialer;a=commitdiff_plain;h=5fe7c09e1c137c54f93de04be8a6018d7126c18f Starting on threading work --- diff --git a/src/dialcentral_qt.py b/src/dialcentral_qt.py index 4812c01..a9f529c 100755 --- a/src/dialcentral_qt.py +++ b/src/dialcentral_qt.py @@ -1015,7 +1015,8 @@ def run(): if __name__ == "__main__": - logging.basicConfig(level = logging.DEBUG) + logFormat = '(%(asctime)s) %(levelname)-5s %(threadName)s.%(name)s: %(message)s' + logging.basicConfig(level=logging.DEBUG, format=logFormat) try: os.makedirs(constants._data_path_) except OSError, e: diff --git a/src/session.py b/src/session.py index 5c7f5f0..5802f52 100644 --- a/src/session.py +++ b/src/session.py @@ -3,6 +3,8 @@ import logging from PyQt4 import QtCore +from util import qore_utils +from util import concurrent _moduleLogger = logging.getLogger(__name__) @@ -19,8 +21,9 @@ class Draft(QtCore.QObject): recipientsChanged = QtCore.pyqtSignal() - def __init__(self): + def __init__(self, pool): self._contacts = {} + self._pool = pool def send(self, text): assert 0 < len(self._contacts) @@ -79,11 +82,12 @@ class Session(QtCore.QObject): def __init__(self, cachePath = None): QtCore.QObject.__init__(self) + self._pool = qore_utils.AsyncPool() self._loggedInTime = self._LOGGEDOUT_TIME self._loginOps = [] self._cachePath = cachePath self._username = None - self._draft = Draft() + self._draft = Draft(self._pool) self._contacts = [] self._messages = [] @@ -108,6 +112,7 @@ class Session(QtCore.QObject): else: cookiePath = None + self._pool.start() self.error.emit("Not Implemented") # if the username is the same, do nothing @@ -116,6 +121,7 @@ class Session(QtCore.QObject): def logout(self): assert self.state != self.LOGGEDOUT_STATE + self._pool.stop() self.error.emit("Not Implemented") def clear(self): @@ -169,6 +175,8 @@ class Session(QtCore.QObject): self.error.emit("Not Implemented") def _update_contacts(self): + le = concurrent.AsyncLinearExecution(self._asyncPool, self._login) + le.start() self.error.emit("Not Implemented") def _update_messages(self): diff --git a/src/util/concurrent.py b/src/util/concurrent.py index 503a1b4..812de24 100644 --- a/src/util/concurrent.py +++ b/src/util/concurrent.py @@ -7,6 +7,64 @@ import errno import time import functools import contextlib +import logging + +import misc + + +_moduleLogger = logging.getLogger(__name__) + + +class AsyncLinearExecution(object): + + def __init__(self, pool, func): + self._pool = pool + self._func = func + self._run = None + + def start(self, *args, **kwds): + assert self._run is None + self._run = self._func(*args, **kwds) + trampoline, args, kwds = self._run.send(None) # priming the function + self._pool.add_task( + trampoline, + args, + kwds, + self.on_success, + self.on_error, + ) + + @misc.log_exception(_moduleLogger) + def on_success(self, result): + _moduleLogger.debug("Processing success for: %r", self._func) + try: + trampoline, args, kwds = self._run.send(result) + except StopIteration, e: + pass + else: + self._pool.add_task( + trampoline, + args, + kwds, + self.on_success, + self.on_error, + ) + + @misc.log_exception(_moduleLogger) + def on_error(self, error): + _moduleLogger.debug("Processing error for: %r", self._func) + try: + trampoline, args, kwds = self._run.throw(error) + except StopIteration, e: + pass + else: + self._pool.add_task( + trampoline, + args, + kwds, + self.on_success, + self.on_error, + ) def synchronized(lock): diff --git a/src/util/go_utils.py b/src/util/go_utils.py index d066542..eaa2fe1 100644 --- a/src/util/go_utils.py +++ b/src/util/go_utils.py @@ -200,58 +200,6 @@ class AsyncPool(object): _moduleLogger.debug("Shutting down worker thread") -class AsyncLinearExecution(object): - - def __init__(self, pool, func): - self._pool = pool - self._func = func - self._run = None - - def start(self, *args, **kwds): - assert self._run is None - self._run = self._func(*args, **kwds) - trampoline, args, kwds = self._run.send(None) # priming the function - self._pool.add_task( - trampoline, - args, - kwds, - self.on_success, - self.on_error, - ) - - @misc.log_exception(_moduleLogger) - def on_success(self, result): - _moduleLogger.debug("Processing success for: %r", self._func) - try: - trampoline, args, kwds = self._run.send(result) - except StopIteration, e: - pass - else: - self._pool.add_task( - trampoline, - args, - kwds, - self.on_success, - self.on_error, - ) - - @misc.log_exception(_moduleLogger) - def on_error(self, error): - _moduleLogger.debug("Processing error for: %r", self._func) - try: - trampoline, args, kwds = self._run.throw(error) - except StopIteration, e: - pass - else: - self._pool.add_task( - trampoline, - args, - kwds, - self.on_success, - self.on_error, - ) - - class AutoSignal(object): def __init__(self, toplevel): diff --git a/src/util/qore_utils.py b/src/util/qore_utils.py new file mode 100644 index 0000000..f7d9dc5 --- /dev/null +++ b/src/util/qore_utils.py @@ -0,0 +1,80 @@ +import logging + +from PyQt4 import QtCore + +import misc + + +_moduleLogger = logging.getLogger(__name__) + + +class ParentThreadSignals(QtCore.QObject): + + taskComplete = QtCore.pyqtSignal(object) + + +class WorkerThreadSignals(QtCore.QObject): + + addTask = QtCore.pyqtSignal(object) + + +class AsyncPool(QtCore.QObject): + + def __init__(self): + _moduleLogger.info("main?") + self._thread = QtCore.QThread() + self._isRunning = True + self._parent = ParentThreadSignals() + self._parent.taskComplete.connect(self._on_task_complete) + self._worker = WorkerThreadSignals() + self._worker.moveToThread(self._thread) + self._worker.addTask.connect(self._on_task_added) + + def start(self): + _moduleLogger.info("main?") + self._thread.exec_() + + def stop(self): + _moduleLogger.info("main?") + self._isRunning = False + + def add_task(self, func, args, kwds, on_success, on_error): + _moduleLogger.info("main?") + assert self._isRunning + task = func, args, kwds, on_success, on_error + self._worker.addTask.emit(task) + + @misc.log_exception(_moduleLogger) + def _on_task_added(self, task): + _moduleLogger.info("worker?") + if not self._isRunning: + _moduleLogger.error("Dropping task") + + func, args, kwds, on_success, on_error = task + + try: + result = func(*args, **kwds) + isError = False + except Exception, e: + _moduleLogger.error("Error, passing it back to the main thread") + result = e + isError = True + + taskResult = on_success, on_error, isError, result + self._parent.taskComplete.emit(taskResult) + + @misc.log_exception(_moduleLogger) + def _on_task_complete(self, taskResult): + _moduleLogger.info("main?") + on_success, on_error, isError, result = taskResult + if not self._isRunning: + if isError: + _moduleLogger.error("Masking: %s" % (result, )) + isError = True + result = StopIteration("Cancelling all callbacks") + callback = on_success if not isError else on_error + try: + callback(result) + except Exception: + _moduleLogger.exception("Callback errored") + return False diff --git a/src/util/qui_utils.py b/src/util/qui_utils.py new file mode 100644 index 0000000..e69de29