for data in b.get_recent():
pprint.pprint(data)
- if True:
+ if False:
print "Contacts: ",
for contact in b.get_contacts():
pprint.pprint(contact)
- if False:
+ if True:
print "Texts: ",
for message in b.get_texts():
pprint.pprint(message.to_dict())
>>> make_pretty("1234567")
'123-4567'
>>> make_pretty("2345678901")
- '(234) 567-8901'
+ '+1 (234) 567-8901'
>>> make_pretty("12345678901")
- '1 (234) 567-8901'
+ '+1 (234) 567-8901'
>>> make_pretty("01234567890")
'+012 (345) 678-90'
>>> make_pretty("+01234567890")
if phonenumber is None or phonenumber is "":
return ""
- phonenumber = util_misc.strip_number(phonenumber)
+ phonenumber = util_misc.normalize_number(phonenumber)
if phonenumber[0] == "+":
prettynumber = _make_pretty_international(phonenumber[1:])
return
# Update callback
- uglyNumber = util_misc.strip_number(userHandleAndAlias[1])
+ uglyNumber = util_misc.normalize_number(userHandleAndAlias[1])
self.session.backend.set_callback_number(uglyNumber)
# Inform of change
import tp
import gtk_toolbox
-import handle
_moduleLogger = logging.getLogger("channel.call")
"""
contact = self._conn.get_handle_by_id(telepathy.constants.HANDLE_TYPE_CONTACT, contactId)
assert self.__contactHandle == contact, "%r != %r" % (self.__contactHandle, contact)
- contactId, contactNumber = handle.ContactHandle.from_handle_name(contact.name)
+ contactNumber = contact.phoneNumber
self.CallStateChanged(self.__contactHandle, telepathy.constants.CHANNEL_CALL_STATE_RINGING)
self.__cancelId = gobject.idle_add(self._on_cancel)
def _get_text_channel(self, props):
_, surpress_handler, h = self._get_type_requested_handle(props)
- accountNumber = util_misc.strip_number(self._conn.session.backend.get_account_number())
+ accountNumber = util_misc.normalize_number(self._conn.session.backend.get_account_number())
if h.phoneNumber == accountNumber:
_moduleLogger.debug('New Debug channel')
chan = channel.debug_prompt.DebugPromptChannel(self._conn, self, props, h)
requestedContactId, requestedContactNumber = handle.ContactHandle.from_handle_name(
requestedHandleName
)
+ if not requestedContactId:
+ # Sometimes GV doesn't give us a contactid for contacts, so
+ # let's slow things down just a tad for better consistency for
+ # the user
+ ids = list(self.session.addressbook.find_contacts_with_number(requestedContactNumber))
+ if ids:
+ requestedContactId = ids[0]
h = handle.create_handle(self, 'contact', requestedContactId, requestedContactNumber)
elif handleType == telepathy.HANDLE_TYPE_LIST:
# Support only server side (immutable) lists
def _on_conversations_updated(self, conv, conversationIds):
_moduleLogger.debug("Incoming messages from: %r" % (conversationIds, ))
for contactId, phoneNumber in conversationIds:
- h = handle.create_handle(self, 'contact', contactId, phoneNumber)
+ handleName = handle.ContactHandle.to_handle_name(contactId, phoneNumber)
+ h = self.get_handle_by_name(telepathy.HANDLE_TYPE_CONTACT, handleName)
# Just let the TextChannel decide whether it should be reported to the user or not
props = self._generate_props(telepathy.CHANNEL_TYPE_TEXT, h, False)
channel = self.__channelManager.channel_for_props(props, signal=True)
return iter(self._contacts[contactId][1])
def find_contacts_with_number(self, queryNumber):
- strippedQueryNumber = util_misc.strip_number(queryNumber)
- for contactId, (contactName, contactDetails) in self.get_contact_ids():
+ strippedQueryNumber = util_misc.normalize_number(queryNumber)
+ for contactId, (contactName, contactDetails) in self._contacts.iteritems():
for phoneType, number in contactDetails:
if number == strippedQueryNumber:
yield contactId
contactNumbers = [
(
numberDetails.get("phoneType", "Mobile"),
- util_misc.strip_number(numberDetails["phoneNumber"]),
+ util_misc.normalize_number(numberDetails["phoneNumber"]),
)
for numberDetails in contactDetails["numbers"]
]
#!/usr/bin/python
-# @bug Its inconsistent as to whether messages from contacts come as from the
-# contact or just a raw number. It seems GV is inconsistent about populating the contact id, so we might have to pull from the addressbook
# @bug False positives on startup. Luckily the object path for the channel is
# unique, so can use that to cache some of the data out to file
+from __future__ import with_statement
+
import logging
import util.coroutines as coroutines
conversations = list(self._get_raw_conversations())
conversations.sort()
for conversation in conversations:
- key = conversation.contactId, util_misc.strip_number(conversation.number)
+ key = conversation.contactId, util_misc.normalize_number(conversation.number)
try:
mergedConversations = self._conversations[key]
except KeyError:
type(self).__name__, self.id, self.name
)
- def is_same(self, handleType, handleName):
- return self.get_name() == handleName and self.get_type() == handleType
-
id = property(tp.Handle.get_id)
type = property(tp.Handle.get_type)
name = property(tp.Handle.get_name)
class ContactHandle(TheOneRingHandle):
+ _DELIMETER = "|"
+
def __init__(self, connection, id, contactId, phoneNumber):
handleType = telepathy.HANDLE_TYPE_CONTACT
handleName = self.to_handle_name(contactId, phoneNumber)
TheOneRingHandle.__init__(self, connection, id, handleType, handleName)
self._contactId = contactId
- self._phoneNumber = util_misc.strip_number(phoneNumber)
+ self._phoneNumber = util_misc.normalize_number(phoneNumber)
- @staticmethod
- def from_handle_name(handleName):
- parts = handleName.split("#", 1)
+ @classmethod
+ def from_handle_name(cls, handleName):
+ """
+ >>> ContactHandle.from_handle_name("+1 555 123-1234")
+ ('', '+15551231234')
+ >>> ContactHandle.from_handle_name("+15551231234")
+ ('', '+15551231234')
+ >>> ContactHandle.from_handle_name("123456|+15551231234")
+ ('123456', '+15551231234')
+ """
+ parts = handleName.split(cls._DELIMETER, 1)
if len(parts) == 2:
contactId, contactNumber = parts[0:2]
elif len(parts) == 1:
else:
raise RuntimeError("Invalid handle: %s" % handleName)
- contactNumber = util_misc.strip_number(contactNumber)
+ contactNumber = util_misc.normalize_number(contactNumber)
return contactId, contactNumber
- @staticmethod
- def to_handle_name(contactId, contactNumber):
- handleName = "#".join((contactId, util_misc.strip_number(contactNumber)))
- return handleName
-
@classmethod
- def normalize_handle_name(cls, name):
- if "#" in name:
- # Already a properly formatted name, run through the ringer just in case
- return cls.to_handle_name(*cls.from_handle_name(name))
- return name
+ def to_handle_name(cls, contactId, contactNumber):
+ """
+ >>> ContactHandle.to_handle_name('', "+1 555 123-1234")
+ '+15551231234'
+ >>> ContactHandle.to_handle_name('', "+15551231234")
+ '+15551231234'
+ >>> ContactHandle.to_handle_name('123456', "+15551231234")
+ '123456|+15551231234'
+ """
+ contactNumber = util_misc.normalize_number(contactNumber)
+ if contactId:
+ handleName = cls._DELIMETER.join((contactId, contactNumber))
else:
- return cls.to_handle_name("", name)
-
- def is_same(self, handleType, handleName):
- handleName = self.normalize_handle_name(handleName)
- _moduleLogger.info("%r == %r %r?" % (self, handleType, handleName))
- return self.get_name() == handleName and self.get_type() == handleType
+ handleName = contactNumber
+ return handleName
@property
def contactID(self):
del frame
-def strip_number(prettynumber):
+def normalize_number(prettynumber):
"""
function to take a phone number and strip out all non-numeric
characters
- >>> strip_number("+012-(345)-678-90")
- '01234567890'
+ >>> normalize_number("+012-(345)-678-90")
+ '+01234567890'
+ >>> normalize_number("1-(345)-678-9000")
+ '+13456789000'
+ >>> normalize_number("+1-(345)-678-9000")
+ '+13456789000'
"""
uglynumber = re.sub('[^0-9+]', '', prettynumber)
+ if uglynumber.startswith("1") and len(uglynumber) == 11:
+ uglynumber = "+"+uglynumber
+ elif len(uglynumber) == 10:
+ uglynumber = "+1"+uglynumber
+
+ #validateRe = re.compile("^\+?[0-9]{10,}$")
+ #assert validateRe.match(uglynumber) is not None
+
return uglynumber
+++ /dev/null
-from __future__ import with_statement
-
-import logging
-
-import sys
-sys.path.append("../src")
-
-import util.coroutines as coroutines
-
-import gvoice
-
-
-logging.basicConfig(level=logging.DEBUG)
-
-
-class MockBackend(object):
-
- def __init__(self, contactsData):
- self.contactsData = contactsData
-
- def get_contacts(self):
- return (
- (i, contactData)
- for (i, contactData) in enumerate(self.contactsData)
- )
-
-
-def generate_update_callback(callbackData):
-
- @coroutines.func_sink
- @coroutines.expand_positional
- def callback(book, addedContacts, removedContacts, changedContacts):
- callbackData.append((book, addedContacts, removedContacts, changedContacts))
-
- return callback
-
-
-def test_no_contacts():
- callbackData = []
- callback = generate_update_callback(callbackData)
-
- backend = MockBackend([])
- book = gvoice.addressbook.Addressbook(backend)
- book.updateSignalHandler.register_sink(callback)
- assert len(callbackData) == 0, "%r" % callbackData
-
- book.update()
- assert len(callbackData) == 0, "%r" % callbackData
-
- book.update(force=True)
- assert len(callbackData) == 0, "%r" % callbackData
-
- contacts = list(book.get_contacts())
- assert len(contacts) == 0
-
-
-def test_one_contact_no_details():
- callbackData = []
- callback = generate_update_callback(callbackData)
-
- backend = MockBackend([
- {
- "name": "One",
- "numbers": [],
- },
- ])
- book = gvoice.addressbook.Addressbook(backend)
- book.updateSignalHandler.register_sink(callback)
- assert len(callbackData) == 0, "%r" % callbackData
-
- contacts = list(book.get_contacts())
- assert len(contacts) == 0, "%r" % contacts
-
- book.update()
- assert len(callbackData) == 1, "%r" % callbackData
- del callbackData[:]
- contacts = list(book.get_contacts())
- assert len(contacts) == 1, "%r" % contacts
- id = contacts[0]
- name = book.get_contact_name(id)
- assert name == backend.contactsData[id]["name"]
-
- book.update()
- assert len(callbackData) == 0, "%r" % callbackData
-
- book.update(force=True)
- assert len(callbackData) == 0, "%r" % callbackData
-
- contacts = list(book.get_contacts())
- assert len(contacts) == 1
- id = contacts[0]
- name = book.get_contact_name(id)
- assert name == backend.contactsData[id]["name"]
-
-
-def test_one_contact_with_details():
- callbackData = []
- callback = generate_update_callback(callbackData)
-
- backend = MockBackend([
- {
- "name": "One",
- "numbers": [
- {"phoneType": "Type A", "phoneNumber": "123"},
- {"phoneType": "Type B", "phoneNumber": "456"},
- {"phoneType": "Type C", "phoneNumber": "789"}],
- },
- ])
- book = gvoice.addressbook.Addressbook(backend)
- book.updateSignalHandler.register_sink(callback)
- assert len(callbackData) == 0, "%r" % callbackData
-
- contacts = list(book.get_contacts())
- assert len(contacts) == 0, "%r" % contacts
-
- book.update()
- assert len(callbackData) == 1, "%r" % callbackData
- del callbackData[:]
- contacts = list(book.get_contacts())
- assert len(contacts) == 1, "%r" % contacts
- id = contacts[0]
- name = book.get_contact_name(id)
- assert name == backend.contactsData[id]["name"]
-
- book.update()
- assert len(callbackData) == 0, "%r" % callbackData
-
- book.update(force=True)
- assert len(callbackData) == 0, "%r" % callbackData
-
- contacts = list(book.get_contacts())
- assert len(contacts) == 1
- id = contacts[0]
- name = book.get_contact_name(id)
- assert name == backend.contactsData[id]["name"]
-
-
-def test_adding_a_contact():
- callbackData = []
- callback = generate_update_callback(callbackData)
-
- backend = MockBackend([
- {
- "name": "One",
- "numbers": [],
- },
- ])
- book = gvoice.addressbook.Addressbook(backend)
- book.updateSignalHandler.register_sink(callback)
- assert len(callbackData) == 0, "%r" % callbackData
-
- book.update()
- assert len(callbackData) == 1, "%r" % callbackData
- del callbackData[:]
-
- book.update()
- assert len(callbackData) == 0, "%r" % callbackData
-
- book.update(force=True)
- assert len(callbackData) == 0, "%r" % callbackData
-
- backend.contactsData.append({
- "name": "Two",
- "numbers": [],
- })
-
- book.update()
- assert len(callbackData) == 0, "%r" % callbackData
-
- book.update(force=True)
- assert len(callbackData) == 1, "%r" % callbackData
-
- callbackBook, addedContacts, removedContacts, changedContacts = callbackData[0]
- assert callbackBook is book
- assert len(addedContacts) == 1
- assert 1 in addedContacts
- assert len(removedContacts) == 0
- assert len(changedContacts) == 0
-
-
-def test_removing_a_contact():
- callbackData = []
- callback = generate_update_callback(callbackData)
-
- backend = MockBackend([
- {
- "name": "One",
- "numbers": [],
- },
- ])
- book = gvoice.addressbook.Addressbook(backend)
- book.updateSignalHandler.register_sink(callback)
- assert len(callbackData) == 0, "%r" % callbackData
-
- book.update()
- assert len(callbackData) == 1, "%r" % callbackData
- del callbackData[:]
-
- book.update()
- assert len(callbackData) == 0, "%r" % callbackData
-
- book.update(force=True)
- assert len(callbackData) == 0, "%r" % callbackData
-
- del backend.contactsData[:]
-
- book.update()
- assert len(callbackData) == 0, "%r" % callbackData
-
- book.update(force=True)
- assert len(callbackData) == 1, "%r" % callbackData
-
- callbackBook, addedContacts, removedContacts, changedContacts = callbackData[0]
- assert callbackBook is book
- assert len(addedContacts) == 0
- assert len(removedContacts) == 1
- assert 0 in removedContacts
- assert len(changedContacts) == 0
+++ /dev/null
-from __future__ import with_statement
-
-import datetime
-import logging
-
-import sys
-sys.path.append("../src")
-
-import util.coroutines as coroutines
-
-import gvoice
-
-
-logging.basicConfig(level=logging.DEBUG)
-
-
-class MockBackend(object):
-
- def __init__(self, conversationsData):
- self.conversationsData = conversationsData
-
- def get_messages(self):
- return self.conversationsData
-
-
-def generate_update_callback(callbackData):
-
- @coroutines.func_sink
- @coroutines.expand_positional
- def callback(conversations, updatedIds):
- callbackData.append((conversations, updatedIds))
-
- return callback
-
-
-def test_no_conversations():
- callbackData = []
- callback = generate_update_callback(callbackData)
-
- backend = MockBackend([])
- conversings = gvoice.conversations.Conversations(backend)
- conversings.updateSignalHandler.register_sink(callback)
- assert len(callbackData) == 0, "%r" % callbackData
-
- conversings.update()
- assert len(callbackData) == 0, "%r" % callbackData
-
- conversings.update(force=True)
- assert len(callbackData) == 0, "%r" % callbackData
-
- contacts = list(conversings.get_conversations())
- assert len(contacts) == 0
-
-
-def test_a_conversation():
- callbackData = []
- callback = generate_update_callback(callbackData)
-
- backend = MockBackend([
- {
- "id": "conv1",
- "contactId": "con1",
- "name": "Con Man",
- "time": datetime.datetime(2000, 1, 1),
- "relTime": "Sometime back",
- "prettyNumber": "(555) 555-1224",
- "number": "5555551224",
- "location": "",
- "messageParts": [
- ("Innocent Man", "Body of Message", "Forever ago")
- ],
- },
- ])
- conversings = gvoice.conversations.Conversations(backend)
- conversings.updateSignalHandler.register_sink(callback)
- assert len(callbackData) == 0, "%r" % callbackData
-
- conversings.update()
- assert len(callbackData) == 1, "%r" % callbackData
- del callbackData[:]
-
- cons = list(conversings.get_conversations())
- assert len(cons) == 1
- assert cons[0] == ("con1", "5555551224"), cons
-
- conversings.update()
- assert len(callbackData) == 0, "%r" % callbackData
-
- conversings.update(force=True)
- assert len(callbackData) == 0, "%r" % callbackData
-
-
-def test_adding_a_conversation():
- callbackData = []
- callback = generate_update_callback(callbackData)
-
- backend = MockBackend([
- {
- "id": "conv1",
- "contactId": "con1",
- "name": "Con Man",
- "time": datetime.datetime(2000, 1, 1),
- "relTime": "Sometime back",
- "prettyNumber": "(555) 555-1224",
- "number": "5555551224",
- "location": "",
- "messageParts": [
- ("Innocent Man", "Body of Message", "Forever ago")
- ],
- },
- ])
- conversings = gvoice.conversations.Conversations(backend)
- conversings.updateSignalHandler.register_sink(callback)
- assert len(callbackData) == 0, "%r" % callbackData
-
- conversings.update()
- assert len(callbackData) == 1, "%r" % callbackData
- del callbackData[:]
-
- cons = list(conversings.get_conversations())
- assert len(cons) == 1
- assert cons[0] == ("con1", "5555551224"), cons
-
- conversings.update()
- assert len(callbackData) == 0, "%r" % callbackData
-
- conversings.update(force=True)
- assert len(callbackData) == 0, "%r" % callbackData
-
- backend.conversationsData.append(
- {
- "id": "conv2",
- "contactId": "con2",
- "name": "Pretty Man",
- "time": datetime.datetime(2003, 1, 1),
- "relTime": "Somewhere over the rainbow",
- "prettyNumber": "(555) 555-2244",
- "number": "5555552244",
- "location": "",
- "messageParts": [
- ("Con Man", "Body of Message somewhere", "Maybe")
- ],
- },
- )
-
- conversings.update()
- assert len(callbackData) == 0, "%r" % callbackData
-
- conversings.update(force=True)
- assert len(callbackData) == 1, "%r" % callbackData
- idsOnly = callbackData[0][1]
- assert ("con2", "5555552244") in idsOnly, idsOnly
-
- cons = list(conversings.get_conversations())
- assert len(cons) == 2
- assert ("con1", "5555551224") in cons, cons
- assert ("con2", "5555552244") in cons, cons
-
-
-def test_merging_a_conversation():
- callbackData = []
- callback = generate_update_callback(callbackData)
-
- backend = MockBackend([
- {
- "id": "conv1",
- "contactId": "con1",
- "name": "Con Man",
- "time": datetime.datetime(2000, 1, 1),
- "relTime": "Sometime back",
- "prettyNumber": "(555) 555-1224",
- "number": "5555551224",
- "location": "",
- "messageParts": [
- ("Innocent Man", "Body of Message", "Forever ago")
- ],
- },
- ])
- conversings = gvoice.conversations.Conversations(backend)
- conversings.updateSignalHandler.register_sink(callback)
- assert len(callbackData) == 0, "%r" % callbackData
-
- conversings.update()
- assert len(callbackData) == 1, "%r" % callbackData
- del callbackData[:]
-
- cons = list(conversings.get_conversations())
- assert len(cons) == 1
- assert cons[0] == ("con1", "5555551224"), cons
-
- conversings.update()
- assert len(callbackData) == 0, "%r" % callbackData
-
- conversings.update(force=True)
- assert len(callbackData) == 0, "%r" % callbackData
-
- backend.conversationsData.append(
- {
- "id": "conv1",
- "contactId": "con1",
- "name": "Con Man",
- "time": datetime.datetime(2003, 1, 1),
- "relTime": "Sometime back",
- "prettyNumber": "(555) 555-1224",
- "number": "5555551224",
- "location": "",
- "messageParts": [
- ("Innocent Man", "Mwahahaah", "somewhat closer")
- ],
- },
- )
-
- conversings.update()
- assert len(callbackData) == 0, "%r" % callbackData
-
- conversings.update(force=True)
- assert len(callbackData) == 1, "%r" % callbackData
- idsOnly = callbackData[0][1]
- assert ("con1", "5555551224") in idsOnly, idsOnly
- convseration = conversings.get_conversation(idsOnly.pop())
- assert len(convseration["messageParts"]) == 2, convseration["messageParts"]
+++ /dev/null
-from __future__ import with_statement
-
-import cookielib
-import logging
-
-import test_utils
-
-import sys
-sys.path.append("../src")
-
-import gvoice
-
-
-logging.basicConfig(level=logging.DEBUG)
-
-
-def generate_mock(cookiesSucceed, username, password):
-
- class MockModule(object):
-
- class MozillaEmulator(object):
-
- def __init__(self, trycount = 1):
- self.cookies = cookielib.LWPCookieJar()
- self.trycount = trycount
-
- def download(self, url,
- postdata = None, extraheaders = None, forbid_redirect = False,
- trycount = None, only_head = False,
- ):
- return ""
-
- def load_cookies(self, *args):
- pass
-
- def save_cookies(self, *args):
- pass
-
- def clear_cookies(self, *args):
- pass
-
- return MockModule
-
-
-def test_not_logged_in():
- correctUsername, correctPassword = "", ""
- MockBrowserModule = generate_mock(False, correctUsername, correctPassword)
- gvoice.backend.browser_emu, RealBrowser = MockBrowserModule, gvoice.backend.browser_emu
- try:
- backend = gvoice.backend.GVoiceBackend()
- assert not backend.is_authed()
- assert not backend.login("bad_name", "bad_password")
- backend.logout()
- with test_utils.expected(RuntimeError):
- backend.call("5551234567")
- with test_utils.expected(RuntimeError):
- backend.send_sms("5551234567", "Hello World")
- assert backend.get_account_number() == "", "%s" % backend.get_account_number()
- gvoice.backend.set_sane_callback(backend)
- assert backend.get_callback_number() == ""
- with test_utils.expected(Exception):
- recent = list(backend.get_recent())
- with test_utils.expected(Exception):
- messages = list(backend.get_messages())
- finally:
- gvoice.backend.browser_emu = RealBrowser