3 from __future__ import with_statement
15 import util.coroutines as coroutines
16 import util.misc as misc_utils
19 _moduleLogger = logging.getLogger(__name__)
22 class Conversations(object):
24 def __init__(self, getter):
25 self._get_raw_conversations = getter
26 self._conversations = {}
28 self.updateSignalHandler = coroutines.CoTee()
32 return repr(self._get_raw_conversations.__name__)
35 assert not self._conversations
37 with open(path, "rb") as f:
38 fileVersion, fileBuild, convs = pickle.load(f)
39 except (pickle.PickleError, IOError, EOFError, ValueError):
40 _moduleLogger.exception("While loading for %s" % self._name)
43 if misc_utils.compare_versions(
44 misc_utils.parse_version("0.8.0"),
45 misc_utils.parse_version(fileVersion),
47 self._conversations = convs
50 "%s Skipping cache due to version mismatch (%s-%s)" % (
51 self._name, fileVersion, fileBuild
57 dataToDump = (constants.__version__, constants.__build__, self._conversations)
58 with open(path, "wb") as f:
59 pickle.dump(dataToDump, f, pickle.HIGHEST_PROTOCOL)
60 except (pickle.PickleError, IOError):
61 _moduleLogger.exception("While saving for %s" % self._name)
63 def update(self, force=False):
64 if not force and self._conversations:
67 oldConversationIds = set(self._conversations.iterkeys())
69 updateConversationIds = set()
70 conversations = list(self._get_raw_conversations())
72 for conversation in conversations:
73 key = misc_utils.normalize_number(conversation.number)
75 mergedConversations = self._conversations[key]
77 mergedConversations = MergedConversations()
78 self._conversations[key] = mergedConversations
81 mergedConversations.append_conversation(conversation)
82 isConversationUpdated = True
83 except RuntimeError, e:
85 _moduleLogger.debug("%s Skipping conversation for %r because '%s'" % (self._name, key, e))
86 isConversationUpdated = False
88 if isConversationUpdated:
89 updateConversationIds.add(key)
91 if updateConversationIds:
92 message = (self, updateConversationIds, )
93 self.updateSignalHandler.stage.send(message)
95 def get_conversations(self):
96 return self._conversations.iterkeys()
98 def get_conversation(self, key):
99 return self._conversations[key]
101 def clear_conversation(self, key):
103 del self._conversations[key]
105 _moduleLogger.info("%s Conversation never existed for %r" % (self._name, key, ))
108 self._conversations.clear()
111 class MergedConversations(object):
114 self._conversations = []
116 def append_conversation(self, newConversation):
117 self._validate(newConversation)
119 for similarConversation in self._find_related_conversation(newConversation.id):
120 self._update_previous_related_conversation(similarConversation, newConversation)
121 self._remove_repeats(similarConversation, newConversation)
124 # Hack to reduce a race window with GV marking messages as read
125 # because it thinks we replied when really we replied to the
126 # previous message. Clients of this code are expected to handle
127 # this gracefully. Other race conditions may exist but clients are
128 # responsible for them
129 if newConversation.messages:
130 newConversation.isRead = False
132 newConversation.isRead = True
133 self._conversations.append(newConversation)
137 selfDict["conversations"] = [conv.to_dict() for conv in self._conversations]
141 def conversations(self):
142 return self._conversations
144 def _validate(self, newConversation):
145 if not self._conversations:
148 for constantField in ("number", ):
149 assert getattr(self._conversations[0], constantField) == getattr(newConversation, constantField), "Constant field changed, soemthing is seriously messed up: %r v %r" % (
150 getattr(self._conversations[0], constantField),
151 getattr(newConversation, constantField),
154 if newConversation.time <= self._conversations[-1].time:
155 raise RuntimeError("Conversations got out of order")
157 def _find_related_conversation(self, convId):
158 similarConversations = (
160 for conversation in self._conversations
161 if conversation.id == convId
163 return similarConversations
165 def _update_previous_related_conversation(self, relatedConversation, newConversation):
166 for commonField in ("isSpam", "isTrash", "isArchived"):
167 newValue = getattr(newConversation, commonField)
168 setattr(relatedConversation, commonField, newValue)
170 def _remove_repeats(self, relatedConversation, newConversation):
171 newConversationMessages = newConversation.messages
172 newConversation.messages = [
174 for newMessage in newConversationMessages
175 if newMessage not in relatedConversation.messages
177 _moduleLogger.debug("Found %d new messages in conversation %s (%d/%d)" % (
178 len(newConversationMessages) - len(newConversation.messages),
180 len(newConversation.messages),
181 len(newConversationMessages),
183 assert 0 < len(newConversation.messages), "Everything shouldn't have been removed"
186 def filter_out_read(conversations):
189 for conversation in conversations
190 if not conversation.isRead and not conversation.isArchived
194 def is_message_from_self(message):
195 return message.whoFrom == "Me:"
198 def filter_out_self(conversations):
201 for newConversation in conversations
202 if len(newConversation.messages) and any(
203 not is_message_from_self(message)
204 for message in newConversation.messages
209 class FilterOutReported(object):
211 NULL_TIMESTAMP = datetime.datetime(1, 1, 1)
214 self._lastMessageTimestamp = self.NULL_TIMESTAMP
216 def get_last_timestamp(self):
217 return self._lastMessageTimestamp
219 def __call__(self, conversations):
220 filteredConversations = [
222 for conversation in conversations
223 if self._lastMessageTimestamp < conversation.time
225 if filteredConversations and self._lastMessageTimestamp < filteredConversations[0].time:
226 self._lastMessageTimestamp = filteredConversations[0].time
227 return filteredConversations