3 from __future__ import with_statement
15 import util.coroutines as coroutines
16 import util.misc as misc_utils
19 _moduleLogger = logging.getLogger(__name__)
22 class Conversations(object):
24 def __init__(self, getter):
25 self._get_raw_conversations = getter
26 self._conversations = {}
28 self.updateSignalHandler = coroutines.CoTee()
32 return repr(self._get_raw_conversations.__name__)
35 assert not self._conversations
37 with open(path, "rb") as f:
38 fileVersion, fileBuild, convs = pickle.load(f)
39 except (pickle.PickleError, IOError, EOFError, ValueError):
40 _moduleLogger.exception("While loading for %s" % self._name)
43 if fileVersion == constants.__version__ and fileBuild == constants.__build__:
44 self._conversations = convs
47 "%s Skipping cache due to version mismatch (%s-%s)" % (self._name, fileVersion, fileBuild)
52 dataToDump = (constants.__version__, constants.__build__, self._conversations)
53 with open(path, "wb") as f:
54 pickle.dump(dataToDump, f, pickle.HIGHEST_PROTOCOL)
55 except (pickle.PickleError, IOError):
56 _moduleLogger.exception("While saving for %s" % self._name)
58 def update(self, force=False):
59 if not force and self._conversations:
62 oldConversationIds = set(self._conversations.iterkeys())
64 updateConversationIds = set()
65 conversations = list(self._get_raw_conversations())
67 for conversation in conversations:
68 key = misc_utils.normalize_number(conversation.number)
70 mergedConversations = self._conversations[key]
72 mergedConversations = MergedConversations()
73 self._conversations[key] = mergedConversations
76 mergedConversations.append_conversation(conversation)
77 isConversationUpdated = True
78 except RuntimeError, e:
80 _moduleLogger.debug("%s Skipping conversation for %r because '%s'" % (self._name, key, e))
81 isConversationUpdated = False
83 if isConversationUpdated:
84 updateConversationIds.add(key)
86 if updateConversationIds:
87 message = (self, updateConversationIds, )
88 self.updateSignalHandler.stage.send(message)
90 def get_conversations(self):
91 return self._conversations.iterkeys()
93 def get_conversation(self, key):
94 return self._conversations[key]
96 def clear_conversation(self, key):
98 del self._conversations[key]
100 _moduleLogger.info("%s Conversation never existed for %r" % (self._name, key, ))
103 self._conversations.clear()
106 class MergedConversations(object):
109 self._conversations = []
111 def append_conversation(self, newConversation):
112 self._validate(newConversation)
114 for similarConversation in self._find_related_conversation(newConversation.id):
115 self._update_previous_related_conversation(similarConversation, newConversation)
116 self._remove_repeats(similarConversation, newConversation)
119 # Hack to reduce a race window with GV marking messages as read
120 # because it thinks we replied when really we replied to the
121 # previous message. Clients of this code are expected to handle
122 # this gracefully. Other race conditions may exist but clients are
123 # responsible for them
124 if newConversation.messages:
125 newConversation.isRead = False
127 newConversation.isRead = True
128 self._conversations.append(newConversation)
132 selfDict["conversations"] = [conv.to_dict() for conv in self._conversations]
136 def conversations(self):
137 return self._conversations
139 def _validate(self, newConversation):
140 if not self._conversations:
143 for constantField in ("number", ):
144 assert getattr(self._conversations[0], constantField) == getattr(newConversation, constantField), "Constant field changed, soemthing is seriously messed up: %r v %r" % (
145 getattr(self._conversations[0], constantField),
146 getattr(newConversation, constantField),
149 if newConversation.time <= self._conversations[-1].time:
150 raise RuntimeError("Conversations got out of order")
152 def _find_related_conversation(self, convId):
153 similarConversations = (
155 for conversation in self._conversations
156 if conversation.id == convId
158 return similarConversations
160 def _update_previous_related_conversation(self, relatedConversation, newConversation):
161 for commonField in ("isSpam", "isTrash", "isArchived"):
162 newValue = getattr(newConversation, commonField)
163 setattr(relatedConversation, commonField, newValue)
165 def _remove_repeats(self, relatedConversation, newConversation):
166 newConversationMessages = newConversation.messages
167 newConversation.messages = [
169 for newMessage in newConversationMessages
170 if newMessage not in relatedConversation.messages
172 _moduleLogger.debug("Found %d new messages in conversation %s (%d/%d)" % (
173 len(newConversationMessages) - len(newConversation.messages),
175 len(newConversation.messages),
176 len(newConversationMessages),
178 assert 0 < len(newConversation.messages), "Everything shouldn't have been removed"
181 def filter_out_read(conversations):
184 for conversation in conversations
185 if not conversation.isRead and not conversation.isArchived
189 def is_message_from_self(message):
190 return message.whoFrom == "Me:"
193 def filter_out_self(conversations):
196 for newConversation in conversations
197 if len(newConversation.messages) and any(
198 not is_message_from_self(message)
199 for message in newConversation.messages
204 class FilterOutReported(object):
206 NULL_TIMESTAMP = datetime.datetime(1, 1, 1)
209 self._lastMessageTimestamp = self.NULL_TIMESTAMP
211 def get_last_timestamp(self):
212 return self._lastMessageTimestamp
214 def __call__(self, conversations):
215 filteredConversations = [
217 for conversation in conversations
218 if self._lastMessageTimestamp < conversation.time
220 if filteredConversations and self._lastMessageTimestamp < filteredConversations[0].time:
221 self._lastMessageTimestamp = filteredConversations[0].time
222 return filteredConversations