import constants
import util.coroutines as coroutines
import util.misc as misc_utils
+import util.go_utils as gobject_utils
_moduleLogger = logging.getLogger(__name__)
class Conversations(object):
OLDEST_COMPATIBLE_FORMAT_VERSION = misc_utils.parse_version("0.8.0")
- OLDEST_MESSAGE_WINDOW = datetime.timedelta(days=60)
def __init__(self, getter, asyncPool):
self._get_raw_conversations = getter
return repr(self._get_raw_conversations.__name__)
def load(self, path):
+ _moduleLogger.debug("%s Loading cache" % (self._name, ))
assert not self._conversations
try:
with open(path, "rb") as f:
_moduleLogger.info("%s Loaded cache" % (self._name, ))
self._conversations = convs
self._loadedFromCache = True
+ for key, mergedConv in self._conversations.iteritems():
+ _moduleLogger.debug("%s \tLoaded %s" % (self._name, key))
+ for conv in mergedConv.conversations:
+ message = "%s \t\tLoaded %s (%r) %r %r %r" % (
+ self._name, conv.id, conv.time, conv.isRead, conv.isArchived, len(conv.messages)
+ )
+ _moduleLogger.debug(message)
else:
_moduleLogger.debug(
"%s Skipping cache due to version mismatch (%s-%s)" % (
)
def save(self, path):
+ _moduleLogger.info("%s Saving cache" % (self._name, ))
try:
- _moduleLogger.info("%s Saving cache" % (self._name, ))
- for conv in self._conversations.itervalues():
- conv.compress(self.OLDEST_MESSAGE_WINDOW)
dataToDump = (constants.__version__, constants.__build__, self._conversations)
with open(path, "wb") as f:
pickle.dump(dataToDump, f, pickle.HIGHEST_PROTOCOL)
except (pickle.PickleError, IOError):
_moduleLogger.exception("While saving for %s" % self._name)
+ for key, mergedConv in self._conversations.iteritems():
+ _moduleLogger.debug("%s \tSaving %s" % (self._name, key))
+ for conv in mergedConv.conversations:
+ message = "%s \t\tSaving %s (%r) %r %r %r" % (
+ self._name, conv.id, conv.time, conv.isRead, conv.isArchived, len(conv.messages)
+ )
+ _moduleLogger.debug(message)
+
+ _moduleLogger.info("%s Cache saved" % (self._name, ))
+
def update(self, force=False):
if not force and self._conversations:
return
- self._asyncPool.add_task(
+
+ le = gobject_utils.AsyncLinearExecution(self._asyncPool, self._update)
+ le.start()
+
+ @misc_utils.log_exception(_moduleLogger)
+ def _update(self):
+ conversationResult = yield (
self._get_raw_conversations,
(),
{},
- self._on_get_conversations,
- self._on_get_conversations_failed,
)
- @misc_utils.log_exception(_moduleLogger)
- def _on_get_conversations(self, conversationResult):
oldConversationIds = set(self._conversations.iterkeys())
updateConversationIds = set()
if isConversationUpdated:
updateConversationIds.add(key)
+ for key in updateConversationIds:
+ mergedConv = self._conversations[key]
+ _moduleLogger.debug("%s \tUpdated %s" % (self._name, key))
+ for conv in mergedConv.conversations:
+ message = "%s \t\tUpdated %s (%r) %r %r %r" % (
+ self._name, conv.id, conv.time, conv.isRead, conv.isArchived, len(conv.messages)
+ )
+ _moduleLogger.debug(message)
+
if updateConversationIds:
message = (self, updateConversationIds, )
self.updateSignalHandler.stage.send(message)
self._hasDoneUpdate = True
- @misc_utils.log_exception(_moduleLogger)
- def _on_get_conversations_failed(self, error):
- _moduleLogger.error(error)
-
def get_conversations(self):
return self._conversations.iterkeys()
def conversations(self):
return self._conversations
- def compress(self, timedelta):
- now = datetime.datetime.now()
- oldNumConvs = len(self._conversations)
- oldConvs = self._conversations
- self._conversations = [
- conv
- for conv in self._conversations
- if (now - conv.time) < timedelta
- ]
- newNumConvs = len(self._conversations)
- if oldNumConvs != newNumConvs:
- _moduleLogger.debug("Compressed conversations from %s to %s" % (oldNumConvs, newNumConvs))
- else:
- _moduleLogger.debug("Did not compress, %s" % (newNumConvs))
-
def _validate(self, newConversation):
if not self._conversations:
return