Switching login, conversations, and addressbook to run things in a worker thread
authorEd Page <eopage@byu.net>
Wed, 17 Mar 2010 02:51:04 +0000 (21:51 -0500)
committerEd Page <eopage@byu.net>
Wed, 17 Mar 2010 02:51:04 +0000 (21:51 -0500)
src/connection.py
src/gvoice/addressbook.py
src/gvoice/conversations.py
src/gvoice/session.py
src/theonering.py
src/util/go_utils.py

index 236dc9a..0f43867 100644 (file)
@@ -140,7 +140,6 @@ class TheOneRingConnection(
                        autogv.AutoDisconnect(weakref.ref(self)),
                        autogv.DelayEnableContactIntegration(constants._telepathy_implementation_name_),
                ]
-               self._delayedConnect = gobject_utils.Async(self._delayed_connect)
 
                _moduleLogger.info("Connection to the account %s created" % account)
                self._timedDisconnect = autogv.TimedDisconnect(weakref.ref(self))
@@ -189,23 +188,27 @@ class TheOneRingConnection(
                if self._status != telepathy.CONNECTION_STATUS_DISCONNECTED:
                        _moduleLogger.info("Attempting connect when not disconnected")
                        return
-               _moduleLogger.info("Kicking off connect")
-               self._delayedConnect.start()
-               self._timedDisconnect.stop()
-
-       @misc_utils.log_exception(_moduleLogger)
-       def _delayed_connect(self):
                _moduleLogger.info("Connecting...")
                self.StatusChanged(
                        telepathy.CONNECTION_STATUS_CONNECTING,
                        telepathy.CONNECTION_STATUS_REASON_REQUESTED
                )
+               self._timedDisconnect.stop()
+               self.session.login(
+                       self.__credentials[0],
+                       self.__credentials[1],
+                       self._on_login,
+                       self._on_login_error,
+               )
+
+       @misc_utils.log_exception(_moduleLogger)
+       def _on_login(self):
+               _moduleLogger.info("Connected, setting up...")
                try:
                        self.__session.load(self.__cachePath)
 
                        for plumber in self._plumbing:
                                plumber.start()
-                       self.session.login(*self.__credentials)
                        if not self.__callbackNumberParameter:
                                callback = gvoice.backend.get_sane_callback(
                                        self.session.backend
@@ -219,22 +222,26 @@ class TheOneRingConnection(
                        publishHandle = self.get_handle_by_name(telepathy.HANDLE_TYPE_LIST, "publish")
                        publishProps = self.generate_props(telepathy.CHANNEL_TYPE_CONTACT_LIST, publishHandle, False)
                        self.__channelManager.channel_for_props(publishProps, signal=True)
-               except gvoice.backend.NetworkError:
-                       _moduleLogger.exception("Connection Failed")
-                       self.disconnect(telepathy.CONNECTION_STATUS_REASON_NETWORK_ERROR)
-                       return
                except Exception:
-                       _moduleLogger.exception("Connection Failed")
+                       _moduleLogger.exception("Setup failed")
                        self.disconnect(telepathy.CONNECTION_STATUS_REASON_AUTHENTICATION_FAILED)
                        return
 
-               _moduleLogger.info("Connected")
+               _moduleLogger.info("Connected and set up")
                self.StatusChanged(
                        telepathy.CONNECTION_STATUS_CONNECTED,
                        telepathy.CONNECTION_STATUS_REASON_REQUESTED
                )
 
        @misc_utils.log_exception(_moduleLogger)
+       def _on_login_error(self, error):
+               _moduleLogger.error(error)
+               if isinstance(error, gvoice.backend.NetworkError):
+                       self.disconnect(telepathy.CONNECTION_STATUS_REASON_NETWORK_ERROR)
+               else:
+                       self.disconnect(telepathy.CONNECTION_STATUS_REASON_AUTHENTICATION_FAILED)
+
+       @misc_utils.log_exception(_moduleLogger)
        def Disconnect(self):
                """
                For org.freedesktop.telepathy.Connection
@@ -282,7 +289,6 @@ class TheOneRingConnection(
        def disconnect(self, reason):
                _moduleLogger.info("Disconnecting")
 
-               self._delayedConnect.cancel()
                self._timedDisconnect.stop()
 
                # Not having the disconnect first can cause weird behavior with clients
index 2f6c7e2..fea9ef6 100644 (file)
@@ -15,20 +15,29 @@ class Addressbook(object):
        _RESPONSE_GOOD = 0
        _RESPONSE_BLOCKED = 3
 
-       def __init__(self, backend):
+       def __init__(self, backend, asyncPool):
                self._backend = backend
                self._numbers = {}
+               self._asyncPool = asyncPool
 
                self.updateSignalHandler = coroutines.CoTee()
 
        def update(self, force=False):
                if not force and self._numbers:
                        return
+               self._asyncPool.add_task(
+                       self._backend.get_contacts,
+                       (),
+                       {},
+                       self._on_get_contacts,
+                       self._on_get_contacts_failed,
+               )
+
+       def _on_get_contacts(self, contacts):
                oldContacts = self._numbers
                oldContactNumbers = set(self.get_numbers())
 
-               self._numbers = {}
-               self._populate_contacts()
+               self._numbers = self._populate_contacts(contacts)
                newContactNumbers = set(self.get_numbers())
 
                addedContacts = newContactNumbers - oldContactNumbers
@@ -43,6 +52,10 @@ class Addressbook(object):
                        message = self, addedContacts, removedContacts, changedContacts
                        self.updateSignalHandler.stage.send(message)
 
+       @misc_utils.log_exception(_moduleLogger)
+       def _on_get_contacts_failed(self, error):
+               _moduleLogger.error(error)
+
        def get_numbers(self):
                return self._numbers.iterkeys()
 
@@ -64,10 +77,8 @@ class Addressbook(object):
                except KeyError:
                        return False
 
-       def _populate_contacts(self):
-               if self._numbers:
-                       return
-               contacts = self._backend.get_contacts()
+       def _populate_contacts(self, contacts):
+               numbers = {}
                for contactId, contactDetails in contacts:
                        contactName = contactDetails["name"]
                        contactNumbers = (
@@ -77,7 +88,8 @@ class Addressbook(object):
                                )
                                for numberDetails in contactDetails["numbers"]
                        )
-                       self._numbers.update(
+                       numbers.update(
                                (number, (contactName, phoneType, contactDetails))
                                for (number, phoneType) in contactNumbers
                        )
+               return numbers
index 8d61a58..2aff337 100644 (file)
@@ -24,8 +24,9 @@ class Conversations(object):
        OLDEST_COMPATIBLE_FORMAT_VERSION = misc_utils.parse_version("0.8.0")
        OLDEST_MESSAGE_WINDOW = datetime.timedelta(days=60)
 
-       def __init__(self, getter):
+       def __init__(self, getter, asyncPool):
                self._get_raw_conversations = getter
+               self._asyncPool = asyncPool
                self._conversations = {}
 
                self.updateSignalHandler = coroutines.CoTee()
@@ -70,11 +71,20 @@ class Conversations(object):
        def update(self, force=False):
                if not force and self._conversations:
                        return
+               self._asyncPool.add_task(
+                       self._get_raw_conversations,
+                       (),
+                       {},
+                       self._on_get_conversations,
+                       self._on_get_conversations_failed,
+               )
 
+       @misc_utils.log_exception(_moduleLogger)
+       def _on_get_conversations(self, conversationResult):
                oldConversationIds = set(self._conversations.iterkeys())
 
                updateConversationIds = set()
-               conversations = list(self._get_raw_conversations())
+               conversations = list(conversationResult)
                conversations.sort()
                for conversation in conversations:
                        key = misc_utils.normalize_number(conversation.number)
@@ -99,6 +109,10 @@ class Conversations(object):
                        message = (self, updateConversationIds, )
                        self.updateSignalHandler.stage.send(message)
 
+       @misc_utils.log_exception(_moduleLogger)
+       def _on_get_conversations_failed(self, error):
+               _moduleLogger.error(error)
+
        def get_conversations(self):
                return self._conversations.iterkeys()
 
index 3a7de00..597c7e3 100644 (file)
@@ -9,6 +9,8 @@ import addressbook
 import conversations
 import state_machine
 
+import util.go_utils as gobject_utils
+
 
 _moduleLogger = logging.getLogger(__name__)
 
@@ -35,6 +37,7 @@ class Session(object):
                self._username = None
                self._password = None
 
+               self._asyncPool = gobject_utils.AsyncPool()
                self._backend = backend.GVoiceBackend(cookiePath)
 
                if defaults["contacts"][0] == state_machine.UpdateStateMachine.INFINITE_PERIOD:
@@ -43,7 +46,7 @@ class Session(object):
                        contactsPeriodInSeconds = state_machine.to_seconds(
                                **{defaults["contacts"][1]: defaults["contacts"][0],}
                        )
-               self._addressbook = addressbook.Addressbook(self._backend)
+               self._addressbook = addressbook.Addressbook(self._backend, self._asyncPool)
                self._addressbookStateMachine = state_machine.UpdateStateMachine([self.addressbook], "Addressbook")
                self._addressbookStateMachine.set_state_strategy(
                        state_machine.StateMachine.STATE_DND,
@@ -66,7 +69,7 @@ class Session(object):
                                **{defaults["voicemail"][1]: defaults["voicemail"][0],}
                        )
                        idleVoicemailPeriodInSeconds = max(voicemailPeriodInSeconds * 4, self._MINIMUM_MESSAGE_PERIOD)
-               self._voicemails = conversations.Conversations(self._backend.get_voicemails)
+               self._voicemails = conversations.Conversations(self._backend.get_voicemails, self._asyncPool)
                self._voicemailsStateMachine = state_machine.UpdateStateMachine([self.voicemails], "Voicemail")
                self._voicemailsStateMachine.set_state_strategy(
                        state_machine.StateMachine.STATE_DND,
@@ -98,7 +101,7 @@ class Session(object):
                                **{defaults["texts"][1]: defaults["texts"][0],}
                        )
                        idleTextsPeriodInSeconds = max(textsPeriodInSeconds * 4, self._MINIMUM_MESSAGE_PERIOD)
-               self._texts = conversations.Conversations(self._backend.get_texts)
+               self._texts = conversations.Conversations(self._backend.get_texts, self._asyncPool)
                self._textsStateMachine = state_machine.UpdateStateMachine([self.texts], "Texting")
                self._textsStateMachine.set_state_strategy(
                        state_machine.StateMachine.STATE_DND,
@@ -145,14 +148,28 @@ class Session(object):
                )
                self._masterStateMachine.close()
 
-       def login(self, username, password):
+       def login(self, username, password, on_success, on_error):
                self._username = username
                self._password = password
-               self._backend.login(self._username, self._password)
+               self._asyncPool.start()
+               self._asyncPool.add_task(
+                       self._backend.login,
+                       (self._username, self._password),
+                       {},
+                       self.__on_login_success(on_success),
+                       on_error
+               )
+
+       def __on_login_success(self, user_success):
 
-               self._masterStateMachine.start()
+               def _actual_success(self, *args, **kwds):
+                       self._masterStateMachine.start()
+                       user_success(*args, **kwds)
+
+               return _actual_success
 
        def logout(self):
+               self._asyncPool.stop()
                self._masterStateMachine.stop()
                self._backend.logout()
 
@@ -192,13 +209,13 @@ class Session(object):
 
        @property
        def backend(self):
-               """
-               Login enforcing backend
-               """
-               assert self.is_logged_in(), "User not logged in"
                return self._backend
 
        @property
+       def pool(self):
+               return self._asyncPool
+
+       @property
        def addressbook(self):
                return self._addressbook
 
index 62fd686..da5d8a2 100755 (executable)
@@ -73,6 +73,8 @@ def run_theonering(persist):
 
        mainloop = gobject.MainLoop(is_running=True)
 
+       gobject.threads_init()
+       dbus.glib.init_threads()
        while mainloop.is_running():
                try:
                        mainloop.run()
index ef6cb72..b3682ba 100644 (file)
@@ -131,7 +131,7 @@ class Timeout(object):
                return False
 
 
-__QUEUE_EMPTY = object()
+_QUEUE_EMPTY = object()
 
 
 class AsyncPool(object):
@@ -151,7 +151,7 @@ class AsyncPool(object):
                self.__isRunning = False
                for _ in algorithms.itr_available(self.__workQueue):
                        pass # eat up queue to cut down dumb work
-               self.__workQueue.put(__QUEUE_EMPTY)
+               self.__workQueue.put(_QUEUE_EMPTY)
 
        def add_task(self, func, args, kwds, on_success, on_error):
                task = func, args, kwds, on_success, on_error
@@ -172,7 +172,7 @@ class AsyncPool(object):
        def __consume_queue(self):
                while True:
                        task = self.__workQueue.get()
-                       if task is __QUEUE_EMPTY:
+                       if task is _QUEUE_EMPTY:
                                break
                        func, args, kwds, on_success, on_error = task