import logging
import inspect
+from xml.sax import saxutils
from xml.etree import ElementTree
try:
import browser_emu
-_moduleLogger = logging.getLogger("gvoice.backend")
+_moduleLogger = logging.getLogger(__name__)
class NetworkError(RuntimeError):
return "%s (%s): %s" % (
self.whoFrom,
self.when,
- "".join(str(part) for part in self.body)
+ "".join(unicode(part) for part in self.body)
)
def to_dict(self):
self._XML_ACCOUNT_URL = SECURE_URL_BASE + "contacts/"
# HACK really this redirects to the main pge and we are grabbing some javascript
self._XML_CONTACTS_URL = "http://www.google.com/voice/inbox/search/contact"
+ self._CSV_CONTACTS_URL = "http://mail.google.com/mail/contacts/data/export"
self._XML_RECENT_URL = SECURE_URL_BASE + "inbox/recent/"
self.XML_FEEDS = (
Attempts to detect a current session
@note Once logged in try not to reauth more than once a minute.
@returns If authenticated
+ @blocks
"""
isRecentledAuthed = (time.time() - self._lastAuthed) < 120
isPreviouslyAuthed = self._token is not None
"""
Attempt to login to GoogleVoice
@returns Whether login was successful or not
+ @blocks
"""
self.logout()
galxToken = self._get_token()
self._lastAuthed = time.time()
return True
+ def persist(self):
+ self._browser.save_cookies()
+
+ def shutdown(self):
+ self._browser.save_cookies()
+ self._token = None
+ self._lastAuthed = 0.0
+
def logout(self):
self._browser.clear_cookies()
self._browser.save_cookies()
self._lastAuthed = 0.0
def is_dnd(self):
+ """
+ @blocks
+ """
isDndPage = self._get_page(self._isDndURL)
dndGroup = self._isDndRe.search(isDndPage)
return isDnd
def set_dnd(self, doNotDisturb):
+ """
+ @blocks
+ """
dndPostData = {
"doNotDisturb": 1 if doNotDisturb else 0,
}
def call(self, outgoingNumber):
"""
This is the main function responsible for initating the callback
+ @blocks
"""
outgoingNumber = self._send_validation(outgoingNumber)
subscriberNumber = None
"""
Cancels a call matching outgoing and forwarding numbers (if given).
Will raise an error if no matching call is being placed
+ @blocks
"""
page = self._get_page_with_token(
self._callCancelURL,
)
self._parse_with_validation(page)
- def send_sms(self, phoneNumber, message):
- phoneNumber = self._send_validation(phoneNumber)
+ def send_sms(self, phoneNumbers, message):
+ """
+ @blocks
+ """
+ validatedPhoneNumbers = [
+ self._send_validation(phoneNumber)
+ for phoneNumber in phoneNumbers
+ ]
+ flattenedPhoneNumbers = ",".join(validatedPhoneNumbers)
page = self._get_page_with_token(
self._sendSmsURL,
{
- 'phoneNumber': phoneNumber,
- 'text': message
+ 'phoneNumber': flattenedPhoneNumbers,
+ 'text': unicode(message).encode("utf-8"),
},
)
self._parse_with_validation(page)
"""
Search your Google Voice Account history for calls, voicemails, and sms
Returns ``Folder`` instance containting matching messages
+ @blocks
"""
page = self._get_page(
self._XML_SEARCH_URL,
return json
def get_feed(self, feed):
+ """
+ @blocks
+ """
actualFeed = "_XML_%s_URL" % feed.upper()
feedUrl = getattr(self, actualFeed)
which can either be a ``Message`` instance, or a SHA1 identifier.
Saves files to ``adir`` (defaults to current directory).
Message hashes can be found in ``self.voicemail().messages`` for example.
- Returns location of saved file.
+ @returns location of saved file.
+ @blocks
"""
page = self._get_page(self._downloadVoicemailURL, {"id": messageId})
fn = os.path.join(adir, '%s.mp3' % messageId)
def get_recent(self):
"""
@returns Iterable of (personsName, phoneNumber, exact date, relative date, action)
+ @blocks
"""
- for action, url in (
- ("Received", self._XML_RECEIVED_URL),
- ("Missed", self._XML_MISSED_URL),
- ("Placed", self._XML_PLACED_URL),
- ):
- flatXml = self._get_page(url)
-
- allRecentHtml = self._grab_html(flatXml)
- allRecentData = self._parse_history(allRecentHtml)
- for recentCallData in allRecentData:
- recentCallData["action"] = action
- yield recentCallData
+ recentPages = [
+ (action, self._get_page(url))
+ for action, url in (
+ ("Received", self._XML_RECEIVED_URL),
+ ("Missed", self._XML_MISSED_URL),
+ ("Placed", self._XML_PLACED_URL),
+ )
+ ]
+ return self._parse_recent(recentPages)
def get_contacts(self):
"""
@returns Iterable of (contact id, contact name)
+ @blocks
"""
page = self._get_page(self._XML_CONTACTS_URL)
- contactsBody = self._contactsBodyRe.search(page)
- if contactsBody is None:
- raise RuntimeError("Could not extract contact information")
- accountData = _fake_parse_json(contactsBody.group(1))
- for contactId, contactDetails in accountData["contacts"].iteritems():
- # A zero contact id is the catch all for unknown contacts
- if contactId != "0":
- yield contactId, contactDetails
+ return self._process_contacts(page)
+
+ def get_csv_contacts(self):
+ data = {
+ "groupToExport": "mine",
+ "exportType": "ALL",
+ "out": "OUTLOOK_CSV",
+ }
+ contacts = self._get_page(self._CSV_CONTACTS_URL, data)
+ return contacts
def get_voicemails(self):
+ """
+ @blocks
+ """
voicemailPage = self._get_page(self._XML_VOICEMAIL_URL)
voicemailHtml = self._grab_html(voicemailPage)
voicemailJson = self._grab_json(voicemailPage)
+ if voicemailJson is None:
+ return ()
parsedVoicemail = self._parse_voicemail(voicemailHtml)
voicemails = self._merge_conversation_sources(parsedVoicemail, voicemailJson)
return voicemails
def get_texts(self):
+ """
+ @blocks
+ """
smsPage = self._get_page(self._XML_SMS_URL)
smsHtml = self._grab_html(smsPage)
smsJson = self._grab_json(smsPage)
+ if smsJson is None:
+ return ()
parsedSms = self._parse_sms(smsHtml)
smss = self._merge_conversation_sources(parsedSms, smsJson)
return smss
def mark_message(self, messageId, asRead):
+ """
+ @blocks
+ """
postData = {
"read": 1 if asRead else 0,
"id": messageId,
markPage = self._get_page(self._markAsReadURL, postData)
def archive_message(self, messageId):
+ """
+ @blocks
+ """
postData = {
"id": messageId,
}
raise RuntimeError("Not Authenticated")
return number
+ def _parse_recent(self, recentPages):
+ for action, flatXml in recentPages:
+ allRecentHtml = self._grab_html(flatXml)
+ allRecentData = self._parse_history(allRecentHtml)
+ for recentCallData in allRecentData:
+ recentCallData["action"] = action
+ yield recentCallData
+
+ def _process_contacts(self, page):
+ contactsBody = self._contactsBodyRe.search(page)
+ if contactsBody is None:
+ raise RuntimeError("Could not extract contact information")
+ accountData = _fake_parse_json(contactsBody.group(1))
+ for contactId, contactDetails in accountData["contacts"].iteritems():
+ # A zero contact id is the catch all for unknown contacts
+ if contactId != "0":
+ yield contactId, contactDetails
+
def _parse_history(self, historyHtml):
splitVoicemail = self._seperateVoicemailsRegex.split(historyHtml)
for messageId, messageHtml in itergroup(splitVoicemail[1:], 2):
exactTimeGroup = self._exactVoicemailTimeRegex.search(messageHtml)
exactTime = exactTimeGroup.group(1).strip() if exactTimeGroup else ""
- exactTime = datetime.datetime.strptime(exactTime, "%m/%d/%y %I:%M %p")
+ exactTime = google_strptime(exactTime)
relativeTimeGroup = self._relativeVoicemailTimeRegex.search(messageHtml)
relativeTime = relativeTimeGroup.group(1).strip() if relativeTimeGroup else ""
locationGroup = self._voicemailLocationRegex.search(messageHtml)
yield {
"id": messageId.strip(),
"contactId": contactId,
- "name": name,
+ "name": unescape(name),
"time": exactTime,
"relTime": relativeTime,
"prettyNumber": prettyNumber,
"number": number,
- "location": location,
+ "location": unescape(location),
}
@staticmethod
exactTimeGroup = self._exactVoicemailTimeRegex.search(messageHtml)
exactTimeText = exactTimeGroup.group(1).strip() if exactTimeGroup else ""
- conv.time = datetime.datetime.strptime(exactTimeText, "%m/%d/%y %I:%M %p")
+ conv.time = google_strptime(exactTimeText)
relativeTimeGroup = self._relativeVoicemailTimeRegex.search(messageHtml)
conv.relTime = relativeTimeGroup.group(1).strip() if relativeTimeGroup else ""
locationGroup = self._voicemailLocationRegex.search(messageHtml)
- conv.location = locationGroup.group(1).strip() if locationGroup else ""
+ conv.location = unescape(locationGroup.group(1).strip() if locationGroup else "")
nameGroup = self._voicemailNameRegex.search(messageHtml)
- conv.name = nameGroup.group(1).strip() if nameGroup else ""
+ conv.name = unescape(nameGroup.group(1).strip() if nameGroup else "")
numberGroup = self._voicemailNumberRegex.search(messageHtml)
conv.number = numberGroup.group(1).strip() if numberGroup else ""
prettyNumberGroup = self._prettyVoicemailNumberRegex.search(messageHtml)
exactTimeGroup = self._exactVoicemailTimeRegex.search(messageHtml)
exactTimeText = exactTimeGroup.group(1).strip() if exactTimeGroup else ""
- conv.time = datetime.datetime.strptime(exactTimeText, "%m/%d/%y %I:%M %p")
+ conv.time = google_strptime(exactTimeText)
relativeTimeGroup = self._relativeVoicemailTimeRegex.search(messageHtml)
conv.relTime = relativeTimeGroup.group(1).strip() if relativeTimeGroup else ""
conv.location = ""
nameGroup = self._voicemailNameRegex.search(messageHtml)
- conv.name = nameGroup.group(1).strip() if nameGroup else ""
+ conv.name = unescape(nameGroup.group(1).strip() if nameGroup else "")
numberGroup = self._voicemailNumberRegex.search(messageHtml)
conv.number = numberGroup.group(1).strip() if numberGroup else ""
prettyNumberGroup = self._prettyVoicemailNumberRegex.search(messageHtml)
return json
+_UNESCAPE_ENTITIES = {
+ """: '"',
+ " ": " ",
+ "'": "'",
+}
+
+
+def unescape(text):
+ plain = saxutils.unescape(text, _UNESCAPE_ENTITIES)
+ return plain
+
+
+def google_strptime(time):
+ """
+ Hack: Google always returns the time in the same locale. Sadly if the
+ local system's locale is different, there isn't a way to perfectly handle
+ the time. So instead we handle implement some time formatting
+ """
+ abbrevTime = time[:-3]
+ parsedTime = datetime.datetime.strptime(abbrevTime, "%m/%d/%y %I:%M")
+ if time.endswith("PM"):
+ parsedTime += datetime.timedelta(hours=12)
+ return parsedTime
+
+
def itergroup(iterator, count, padValue = None):
"""
Iterate in groups of 'count' values. If there
def safe_eval(s):
_TRUE_REGEX = re.compile("true")
_FALSE_REGEX = re.compile("false")
+ _COMMENT_REGEX = re.compile("^\s+//.*$", re.M)
s = _TRUE_REGEX.sub("True", s)
s = _FALSE_REGEX.sub("False", s)
- return eval(s, {}, {})
+ s = _COMMENT_REGEX.sub("#", s)
+ try:
+ results = eval(s, {}, {})
+ except SyntaxError:
+ _moduleLogger.exception("Oops")
+ results = None
+ return results
def _fake_parse_json(flattened):
Validates that the JSON response is A-OK
"""
try:
- assert 'ok' in response and response['ok']
+ assert response is not None
+ assert 'ok' in response
+ assert response['ok']
except AssertionError:
raise RuntimeError('There was a problem with GV: %s' % response)
("isdnd", backend._isDndURL),
("account", backend._XML_ACCOUNT_URL),
("contacts", backend._XML_CONTACTS_URL),
+ ("csv", backend._CSV_CONTACTS_URL),
("voicemail", backend._XML_VOICEMAIL_URL),
("sms", backend._XML_SMS_URL),