From e2b913d6c65eca22a75ead429543db4f2b2b1f4a Mon Sep 17 00:00:00 2001 From: Ed Page Date: Tue, 16 Mar 2010 21:51:04 -0500 Subject: [PATCH] Switching login, conversations, and addressbook to run things in a worker thread --- src/connection.py | 36 +++++++++++++++++++++--------------- src/gvoice/addressbook.py | 28 ++++++++++++++++++++-------- src/gvoice/conversations.py | 18 ++++++++++++++++-- src/gvoice/session.py | 37 +++++++++++++++++++++++++++---------- src/theonering.py | 2 ++ src/util/go_utils.py | 6 +++--- 6 files changed, 89 insertions(+), 38 deletions(-) diff --git a/src/connection.py b/src/connection.py index 236dc9a..0f43867 100644 --- a/src/connection.py +++ b/src/connection.py @@ -140,7 +140,6 @@ class TheOneRingConnection( autogv.AutoDisconnect(weakref.ref(self)), autogv.DelayEnableContactIntegration(constants._telepathy_implementation_name_), ] - self._delayedConnect = gobject_utils.Async(self._delayed_connect) _moduleLogger.info("Connection to the account %s created" % account) self._timedDisconnect = autogv.TimedDisconnect(weakref.ref(self)) @@ -189,23 +188,27 @@ class TheOneRingConnection( if self._status != telepathy.CONNECTION_STATUS_DISCONNECTED: _moduleLogger.info("Attempting connect when not disconnected") return - _moduleLogger.info("Kicking off connect") - self._delayedConnect.start() - self._timedDisconnect.stop() - - @misc_utils.log_exception(_moduleLogger) - def _delayed_connect(self): _moduleLogger.info("Connecting...") self.StatusChanged( telepathy.CONNECTION_STATUS_CONNECTING, telepathy.CONNECTION_STATUS_REASON_REQUESTED ) + self._timedDisconnect.stop() + self.session.login( + self.__credentials[0], + self.__credentials[1], + self._on_login, + self._on_login_error, + ) + + @misc_utils.log_exception(_moduleLogger) + def _on_login(self): + _moduleLogger.info("Connected, setting up...") try: self.__session.load(self.__cachePath) for plumber in self._plumbing: plumber.start() - self.session.login(*self.__credentials) if not self.__callbackNumberParameter: callback = gvoice.backend.get_sane_callback( self.session.backend @@ -219,22 +222,26 @@ class TheOneRingConnection( publishHandle = self.get_handle_by_name(telepathy.HANDLE_TYPE_LIST, "publish") publishProps = self.generate_props(telepathy.CHANNEL_TYPE_CONTACT_LIST, publishHandle, False) self.__channelManager.channel_for_props(publishProps, signal=True) - except gvoice.backend.NetworkError: - _moduleLogger.exception("Connection Failed") - self.disconnect(telepathy.CONNECTION_STATUS_REASON_NETWORK_ERROR) - return except Exception: - _moduleLogger.exception("Connection Failed") + _moduleLogger.exception("Setup failed") self.disconnect(telepathy.CONNECTION_STATUS_REASON_AUTHENTICATION_FAILED) return - _moduleLogger.info("Connected") + _moduleLogger.info("Connected and set up") self.StatusChanged( telepathy.CONNECTION_STATUS_CONNECTED, telepathy.CONNECTION_STATUS_REASON_REQUESTED ) @misc_utils.log_exception(_moduleLogger) + def _on_login_error(self, error): + _moduleLogger.error(error) + if isinstance(error, gvoice.backend.NetworkError): + self.disconnect(telepathy.CONNECTION_STATUS_REASON_NETWORK_ERROR) + else: + self.disconnect(telepathy.CONNECTION_STATUS_REASON_AUTHENTICATION_FAILED) + + @misc_utils.log_exception(_moduleLogger) def Disconnect(self): """ For org.freedesktop.telepathy.Connection @@ -282,7 +289,6 @@ class TheOneRingConnection( def disconnect(self, reason): _moduleLogger.info("Disconnecting") - self._delayedConnect.cancel() self._timedDisconnect.stop() # Not having the disconnect first can cause weird behavior with clients diff --git a/src/gvoice/addressbook.py b/src/gvoice/addressbook.py index 2f6c7e2..fea9ef6 100644 --- a/src/gvoice/addressbook.py +++ b/src/gvoice/addressbook.py @@ -15,20 +15,29 @@ class Addressbook(object): _RESPONSE_GOOD = 0 _RESPONSE_BLOCKED = 3 - def __init__(self, backend): + def __init__(self, backend, asyncPool): self._backend = backend self._numbers = {} + self._asyncPool = asyncPool self.updateSignalHandler = coroutines.CoTee() def update(self, force=False): if not force and self._numbers: return + self._asyncPool.add_task( + self._backend.get_contacts, + (), + {}, + self._on_get_contacts, + self._on_get_contacts_failed, + ) + + def _on_get_contacts(self, contacts): oldContacts = self._numbers oldContactNumbers = set(self.get_numbers()) - self._numbers = {} - self._populate_contacts() + self._numbers = self._populate_contacts(contacts) newContactNumbers = set(self.get_numbers()) addedContacts = newContactNumbers - oldContactNumbers @@ -43,6 +52,10 @@ class Addressbook(object): message = self, addedContacts, removedContacts, changedContacts self.updateSignalHandler.stage.send(message) + @misc_utils.log_exception(_moduleLogger) + def _on_get_contacts_failed(self, error): + _moduleLogger.error(error) + def get_numbers(self): return self._numbers.iterkeys() @@ -64,10 +77,8 @@ class Addressbook(object): except KeyError: return False - def _populate_contacts(self): - if self._numbers: - return - contacts = self._backend.get_contacts() + def _populate_contacts(self, contacts): + numbers = {} for contactId, contactDetails in contacts: contactName = contactDetails["name"] contactNumbers = ( @@ -77,7 +88,8 @@ class Addressbook(object): ) for numberDetails in contactDetails["numbers"] ) - self._numbers.update( + numbers.update( (number, (contactName, phoneType, contactDetails)) for (number, phoneType) in contactNumbers ) + return numbers diff --git a/src/gvoice/conversations.py b/src/gvoice/conversations.py index 8d61a58..2aff337 100644 --- a/src/gvoice/conversations.py +++ b/src/gvoice/conversations.py @@ -24,8 +24,9 @@ class Conversations(object): OLDEST_COMPATIBLE_FORMAT_VERSION = misc_utils.parse_version("0.8.0") OLDEST_MESSAGE_WINDOW = datetime.timedelta(days=60) - def __init__(self, getter): + def __init__(self, getter, asyncPool): self._get_raw_conversations = getter + self._asyncPool = asyncPool self._conversations = {} self.updateSignalHandler = coroutines.CoTee() @@ -70,11 +71,20 @@ class Conversations(object): def update(self, force=False): if not force and self._conversations: return + self._asyncPool.add_task( + self._get_raw_conversations, + (), + {}, + self._on_get_conversations, + self._on_get_conversations_failed, + ) + @misc_utils.log_exception(_moduleLogger) + def _on_get_conversations(self, conversationResult): oldConversationIds = set(self._conversations.iterkeys()) updateConversationIds = set() - conversations = list(self._get_raw_conversations()) + conversations = list(conversationResult) conversations.sort() for conversation in conversations: key = misc_utils.normalize_number(conversation.number) @@ -99,6 +109,10 @@ class Conversations(object): message = (self, updateConversationIds, ) self.updateSignalHandler.stage.send(message) + @misc_utils.log_exception(_moduleLogger) + def _on_get_conversations_failed(self, error): + _moduleLogger.error(error) + def get_conversations(self): return self._conversations.iterkeys() diff --git a/src/gvoice/session.py b/src/gvoice/session.py index 3a7de00..597c7e3 100644 --- a/src/gvoice/session.py +++ b/src/gvoice/session.py @@ -9,6 +9,8 @@ import addressbook import conversations import state_machine +import util.go_utils as gobject_utils + _moduleLogger = logging.getLogger(__name__) @@ -35,6 +37,7 @@ class Session(object): self._username = None self._password = None + self._asyncPool = gobject_utils.AsyncPool() self._backend = backend.GVoiceBackend(cookiePath) if defaults["contacts"][0] == state_machine.UpdateStateMachine.INFINITE_PERIOD: @@ -43,7 +46,7 @@ class Session(object): contactsPeriodInSeconds = state_machine.to_seconds( **{defaults["contacts"][1]: defaults["contacts"][0],} ) - self._addressbook = addressbook.Addressbook(self._backend) + self._addressbook = addressbook.Addressbook(self._backend, self._asyncPool) self._addressbookStateMachine = state_machine.UpdateStateMachine([self.addressbook], "Addressbook") self._addressbookStateMachine.set_state_strategy( state_machine.StateMachine.STATE_DND, @@ -66,7 +69,7 @@ class Session(object): **{defaults["voicemail"][1]: defaults["voicemail"][0],} ) idleVoicemailPeriodInSeconds = max(voicemailPeriodInSeconds * 4, self._MINIMUM_MESSAGE_PERIOD) - self._voicemails = conversations.Conversations(self._backend.get_voicemails) + self._voicemails = conversations.Conversations(self._backend.get_voicemails, self._asyncPool) self._voicemailsStateMachine = state_machine.UpdateStateMachine([self.voicemails], "Voicemail") self._voicemailsStateMachine.set_state_strategy( state_machine.StateMachine.STATE_DND, @@ -98,7 +101,7 @@ class Session(object): **{defaults["texts"][1]: defaults["texts"][0],} ) idleTextsPeriodInSeconds = max(textsPeriodInSeconds * 4, self._MINIMUM_MESSAGE_PERIOD) - self._texts = conversations.Conversations(self._backend.get_texts) + self._texts = conversations.Conversations(self._backend.get_texts, self._asyncPool) self._textsStateMachine = state_machine.UpdateStateMachine([self.texts], "Texting") self._textsStateMachine.set_state_strategy( state_machine.StateMachine.STATE_DND, @@ -145,14 +148,28 @@ class Session(object): ) self._masterStateMachine.close() - def login(self, username, password): + def login(self, username, password, on_success, on_error): self._username = username self._password = password - self._backend.login(self._username, self._password) + self._asyncPool.start() + self._asyncPool.add_task( + self._backend.login, + (self._username, self._password), + {}, + self.__on_login_success(on_success), + on_error + ) + + def __on_login_success(self, user_success): - self._masterStateMachine.start() + def _actual_success(self, *args, **kwds): + self._masterStateMachine.start() + user_success(*args, **kwds) + + return _actual_success def logout(self): + self._asyncPool.stop() self._masterStateMachine.stop() self._backend.logout() @@ -192,13 +209,13 @@ class Session(object): @property def backend(self): - """ - Login enforcing backend - """ - assert self.is_logged_in(), "User not logged in" return self._backend @property + def pool(self): + return self._asyncPool + + @property def addressbook(self): return self._addressbook diff --git a/src/theonering.py b/src/theonering.py index 62fd686..da5d8a2 100755 --- a/src/theonering.py +++ b/src/theonering.py @@ -73,6 +73,8 @@ def run_theonering(persist): mainloop = gobject.MainLoop(is_running=True) + gobject.threads_init() + dbus.glib.init_threads() while mainloop.is_running(): try: mainloop.run() diff --git a/src/util/go_utils.py b/src/util/go_utils.py index ef6cb72..b3682ba 100644 --- a/src/util/go_utils.py +++ b/src/util/go_utils.py @@ -131,7 +131,7 @@ class Timeout(object): return False -__QUEUE_EMPTY = object() +_QUEUE_EMPTY = object() class AsyncPool(object): @@ -151,7 +151,7 @@ class AsyncPool(object): self.__isRunning = False for _ in algorithms.itr_available(self.__workQueue): pass # eat up queue to cut down dumb work - self.__workQueue.put(__QUEUE_EMPTY) + self.__workQueue.put(_QUEUE_EMPTY) def add_task(self, func, args, kwds, on_success, on_error): task = func, args, kwds, on_success, on_error @@ -172,7 +172,7 @@ class AsyncPool(object): def __consume_queue(self): while True: task = self.__workQueue.get() - if task is __QUEUE_EMPTY: + if task is _QUEUE_EMPTY: break func, args, kwds, on_success, on_error = task -- 1.7.9.5