3 from __future__ import with_statement
15 import util.coroutines as coroutines
16 import util.misc as misc_utils
19 _moduleLogger = logging.getLogger(__name__)
22 class Conversations(object):
24 OLDEST_COMPATIBLE_FORMAT_VERSION = misc_utils.parse_version("0.8.0")
25 OLDEST_MESSAGE_WINDOW = datetime.timedelta(days=60)
27 def __init__(self, getter, asyncPool):
28 self._get_raw_conversations = getter
29 self._asyncPool = asyncPool
30 self._conversations = {}
32 self.updateSignalHandler = coroutines.CoTee()
36 return repr(self._get_raw_conversations.__name__)
39 assert not self._conversations
41 with open(path, "rb") as f:
42 fileVersion, fileBuild, convs = pickle.load(f)
43 except (pickle.PickleError, IOError, EOFError, ValueError):
44 _moduleLogger.exception("While loading for %s" % self._name)
47 if misc_utils.compare_versions(
48 self.OLDEST_COMPATIBLE_FORMAT_VERSION,
49 misc_utils.parse_version(fileVersion),
51 _moduleLogger.info("%s Loaded cache" % (self._name, ))
52 self._conversations = convs
55 "%s Skipping cache due to version mismatch (%s-%s)" % (
56 self._name, fileVersion, fileBuild
62 _moduleLogger.info("%s Saving cache" % (self._name, ))
63 for conv in self._conversations.itervalues():
64 conv.compress(self.OLDEST_MESSAGE_WINDOW)
65 dataToDump = (constants.__version__, constants.__build__, self._conversations)
66 with open(path, "wb") as f:
67 pickle.dump(dataToDump, f, pickle.HIGHEST_PROTOCOL)
68 except (pickle.PickleError, IOError):
69 _moduleLogger.exception("While saving for %s" % self._name)
71 def update(self, force=False):
72 if not force and self._conversations:
74 self._asyncPool.add_task(
75 self._get_raw_conversations,
78 self._on_get_conversations,
79 self._on_get_conversations_failed,
82 @misc_utils.log_exception(_moduleLogger)
83 def _on_get_conversations(self, conversationResult):
84 oldConversationIds = set(self._conversations.iterkeys())
86 updateConversationIds = set()
87 conversations = list(conversationResult)
89 for conversation in conversations:
90 key = misc_utils.normalize_number(conversation.number)
92 mergedConversations = self._conversations[key]
94 mergedConversations = MergedConversations()
95 self._conversations[key] = mergedConversations
98 mergedConversations.append_conversation(conversation)
99 isConversationUpdated = True
100 except RuntimeError, e:
102 _moduleLogger.debug("%s Skipping conversation for %r because '%s'" % (self._name, key, e))
103 isConversationUpdated = False
105 if isConversationUpdated:
106 updateConversationIds.add(key)
108 if updateConversationIds:
109 message = (self, updateConversationIds, )
110 self.updateSignalHandler.stage.send(message)
112 @misc_utils.log_exception(_moduleLogger)
113 def _on_get_conversations_failed(self, error):
114 _moduleLogger.error(error)
116 def get_conversations(self):
117 return self._conversations.iterkeys()
119 def get_conversation(self, key):
120 return self._conversations[key]
122 def clear_conversation(self, key):
124 del self._conversations[key]
126 _moduleLogger.info("%s Conversation never existed for %r" % (self._name, key, ))
129 self._conversations.clear()
132 class MergedConversations(object):
135 self._conversations = []
137 def append_conversation(self, newConversation):
138 self._validate(newConversation)
140 for similarConversation in self._find_related_conversation(newConversation.id):
141 self._update_previous_related_conversation(similarConversation, newConversation)
142 self._remove_repeats(similarConversation, newConversation)
145 # Hack to reduce a race window with GV marking messages as read
146 # because it thinks we replied when really we replied to the
147 # previous message. Clients of this code are expected to handle
148 # this gracefully. Other race conditions may exist but clients are
149 # responsible for them
150 if newConversation.messages:
151 newConversation.isRead = False
153 newConversation.isRead = True
154 self._conversations.append(newConversation)
158 selfDict["conversations"] = [conv.to_dict() for conv in self._conversations]
162 def conversations(self):
163 return self._conversations
165 def compress(self, timedelta):
166 now = datetime.datetime.now()
167 oldNumConvs = len(self._conversations)
168 oldConvs = self._conversations
169 self._conversations = [
171 for conv in self._conversations
172 if (now - conv.time) < timedelta
174 newNumConvs = len(self._conversations)
175 if oldNumConvs != newNumConvs:
176 _moduleLogger.debug("Compressed conversations from %s to %s" % (oldNumConvs, newNumConvs))
178 _moduleLogger.debug("Did not compress, %s" % (newNumConvs))
180 def _validate(self, newConversation):
181 if not self._conversations:
184 for constantField in ("number", ):
185 assert getattr(self._conversations[0], constantField) == getattr(newConversation, constantField), "Constant field changed, soemthing is seriously messed up: %r v %r" % (
186 getattr(self._conversations[0], constantField),
187 getattr(newConversation, constantField),
190 if newConversation.time <= self._conversations[-1].time:
191 raise RuntimeError("Conversations got out of order")
193 def _find_related_conversation(self, convId):
194 similarConversations = (
196 for conversation in self._conversations
197 if conversation.id == convId
199 return similarConversations
201 def _update_previous_related_conversation(self, relatedConversation, newConversation):
202 for commonField in ("isSpam", "isTrash", "isArchived"):
203 newValue = getattr(newConversation, commonField)
204 setattr(relatedConversation, commonField, newValue)
206 def _remove_repeats(self, relatedConversation, newConversation):
207 newConversationMessages = newConversation.messages
208 newConversation.messages = [
210 for newMessage in newConversationMessages
211 if newMessage not in relatedConversation.messages
213 _moduleLogger.debug("Found %d new messages in conversation %s (%d/%d)" % (
214 len(newConversationMessages) - len(newConversation.messages),
216 len(newConversation.messages),
217 len(newConversationMessages),
219 assert 0 < len(newConversation.messages), "Everything shouldn't have been removed"
222 def filter_out_read(conversations):
225 for conversation in conversations
226 if not conversation.isRead and not conversation.isArchived
230 def is_message_from_self(message):
231 return message.whoFrom == "Me:"
234 def filter_out_self(conversations):
237 for newConversation in conversations
238 if len(newConversation.messages) and any(
239 not is_message_from_self(message)
240 for message in newConversation.messages
245 class FilterOutReported(object):
247 NULL_TIMESTAMP = datetime.datetime(1, 1, 1)
250 self._lastMessageTimestamp = self.NULL_TIMESTAMP
252 def get_last_timestamp(self):
253 return self._lastMessageTimestamp
255 def __call__(self, conversations):
256 filteredConversations = [
258 for conversation in conversations
259 if self._lastMessageTimestamp < conversation.time
261 if filteredConversations and self._lastMessageTimestamp < filteredConversations[0].time:
262 self._lastMessageTimestamp = filteredConversations[0].time
263 return filteredConversations