):
def __init__(self, connection):
- h = None
- telepathy.server.ChannelTypeStreamedMedia.__init__(self, connection, h)
+ telepathy.server.ChannelTypeStreamedMedia.__init__(self, connection, None)
telepathy.server.ChannelInterfaceGroup.__init__(self)
telepathy.server.ChannelInterfaceChatState.__init__(self)
"""
For org.freedesktop.Telepathy.Channel.Type.StreamedMedia
"""
- pass
+ return ()
def RemoveStreams(self, streams):
"""
For org.freedesktop.Telepathy.Channel.Type.StreamedMedia
"""
- pass
+ raise telepathy.NotImplemented("Cannot remove a stream")
def RequestStreamDirection(self, stream, streamDirection):
"""
@note Since streams are short lived, not bothering to implement this
"""
_moduleLogger.info("A request was made to change the stream direction")
+ raise telepathy.NotImplemented("Cannot change directions")
- def RequestStreams(self, contact, streamType):
+ def RequestStreams(self, contact, streamTypes):
"""
For org.freedesktop.Telepathy.Channel.Type.StreamedMedia
@returns [(Stream ID, contact, stream type, stream state, stream direction, pending send flags)]
"""
- pass
+ for streamType in streamTypes:
+ if streamType != telepathy.constants.MEDIA_STREAM_TYPE_AUDIO:
+ raise telepathy.NotImplemented("Audio is the only stream type supported")
+
+ contactId = contact.name
+
+ addressbook = self._conn.session.addressbook
+ phones = addressbook.get_contact_details(contactId)
+ firstNumber = phones.next()
+ self._conn.session.backend.dial(firstNumber)
+
+ streamId = 0
+ streamState = telepathy.constants.MEDIA_STREAM_STATE_DISCONNECTED
+ streamDirection = telepathy.constants.MEDIA_STREAM_DIRECTION_BIDIRECTIONAL
+ pendingSendFlags = telepathy.constants.MEDIA_STREAM_PENDING_REMOTE_SEND
+ return [(streamId, contact, streamTypes[0], streamState, streamDirection, pendingSendFlags)]
def GetCallStates(self):
"""
Get the current call states for all contacts involved in this call.
@returns {Contact: telepathy.constants.CHANNEL_CALL_STATE_*}
"""
- pass
+ return {}
def _process_refresh(self, addressbook, added, removed):
connection = self._conn
handlesAdded = [
- handle.create_handle(connection, "contact", contactId)
+ handle.create_handle(connection, "contact", contactId, phoneNumber)
for contactId in added
if contactId
+ for (phoneType, phoneNumber) in addressbook.get_contact_details(contactId)
]
handlesRemoved = [
- handle.create_handle(connection, "contact", contactId)
+ handle.create_handle(connection, "contact", contactId, phoneNumber)
for contactId in removed
if contactId
+ for (phoneType, phoneNumber) in addressbook.get_contact_details(contactId)
]
message = ""
actor = 0
Look into implementing ChannelInterfaceMessages for rich text formatting
"""
- def __init__(self, connection):
- h = None
+ def __init__(self, connection, h):
telepathy.server.ChannelTypeText.__init__(self, connection, h)
self._nextRecievedId = 0
telepathy.server.ChannelTypeText.Close(self)
self.remove_from_connection()
- def _on_message_received(self, sender, message):
+ def _on_message_received(self, contactId, contactNumber, message):
"""
@todo Attatch this to receiving a message
"""
currentReceivedId = self._nextRecievedId
timestamp = int(time.time())
- h = handle.create_handle(self._conn, "contact", sender.account)
+ h = handle.create_handle(self._conn, "contact", contactId, contactNumber)
type = telepathy.CHANNEL_TEXT_MESSAGE_TYPE_NORMAL
message = message.content
chan = self._listChannels[handle]
except KeyError, e:
if handle.get_type() != telepathy.HANDLE_TYPE_LIST:
- raise RuntimeError("Unsupported channel type %r" % handle.get_type())
+ raise telepathy.NotImplemented("Only server lists are allowed")
_moduleLogger.debug("Requesting new contact list channel")
chan = channel.contact_list.create_contact_list_channel(self._connRef(), handle)
raise telepathy.NotImplemented("Only Contacts are allowed")
_moduleLogger.debug("Requesting new text channel")
- contact = handle.contact
- chan = channel.text.TextChannel(self._connRef())
+ chan = channel.text.TextChannel(self._connRef(), None)
self._textChannels[handle] = chan
self._connRef().add_channel(chan, handle, suppress_handler)
return chan
try:
chan = self._callChannels[handle]
except KeyError, e:
- if handle.get_type() != telepathy.HANDLE_TYPE_CONTACT:
- raise telepathy.NotImplemented("Only Contacts are allowed")
+ if handle.get_type() != telepathy.HANDLE_TYPE_NONE:
+ raise telepathy.NotImplemented("Using deprecated means to create a call")
_moduleLogger.debug("Requesting new call channel")
- contact = handle.contact
chan = channel.call.CallChannel(self._connRef())
self._callChannels[handle] = chan
self._connRef().add_channel(chan, handle, suppress_handler)
self.add_client_handle(h, sender)
return handles
- def _create_contact_handle(self, name):
- requestedContactId = name
-
- contacts = self.session.addressbook.get_contacts()
- contactsFound = [
- contactId for contactId in contacts
- if contactId == requestedContactId
- ]
-
- if 0 < len(contactsFound):
- contactId = contactsFound[0]
- if len(contactsFound) != 1:
- _moduleLogger.error("Contact ID was not unique: %s for %s" % (contactId, ))
- else:
- contactId = requestedContactId
- h = handle.create_handle(self, 'contact', contactId)
+ def _create_contact_handle(self, requestedHandleName):
+ """
+ @todo Determine if nay of this is really needed
+ """
+ requestedContactId, requestedContactName = handle.ContactHandle.from_handle_name(
+ requestedHandleName
+ )
+ h = handle.create_handle(self, 'contact', requestedContactId, requestedHandleName)
+ return h
+
+ def _on_invite_text(self, contactId):
+ """
+ @todo Make this work
+ """
+ h = self._create_contact_handle(contactId)
+
+ channelManager = self._channelManager
+ channel = channelManager.channel_for_text(handle)
self._changedContacts = set()
self.updateSignalHandler = coroutines.CoTee()
+ self.update()
def update(self, force=False):
if not force and self._contacts:
if self._has_contact_changed(contactId, oldContacts)
)
- message = self, self._addedContacts, self._removedContacts, self._changedContacts
- self.updateSignalHandler.stage.send(message)
+ if self._addedContacts or self._removedContacts or self._changedContacts:
+ message = self, self._addedContacts, self._removedContacts, self._changedContacts
+ self.updateSignalHandler.stage.send(message)
def get_contacts(self):
return self._contacts.iterkeys()
return
contacts = self._backend.get_contacts()
for contactId, contactName in contacts:
- self._contacts[contactId] = (contactName, {})
+ self._contacts[contactId] = (contactName, [])
def _populate_contact_details(self, contactId):
if self._get_contact_details(contactId):
return
- self._get_contact_details(contactId).update(
+ self._get_contact_details(contactId).extend(
self._backend.get_contact_details(contactId)
)
oldContactDetails = oldContact[1]
if oldContactName != self.get_contact_name(contactId):
return True
- if not oldContactDetails[1]:
+ if not oldContactDetails:
return False
# if its already in the old cache, purposefully add it into the new cache
return oldContactDetails != self.get_contact_details(contactId)
_moduleLogger.exception("Translating error: %s" % str(e))
raise NetworkError("%s is not accesible" % self._loginURL)
galxTokens = self._galxRe.search(tokenPage)
- galxToken = galxTokens.group(1)
+ if galxTokens is not None:
+ galxToken = galxTokens.group(1)
+ else:
+ galxToken = ""
+ _moduleLogger.debug("Could not grab GALX token")
loginPostData = urllib.urlencode({
'Email' : username,
callbackNumber = match.group(2)
callbackName = match.group(1)
self._callbackNumbers[callbackNumber] = callbackName
+ if len(self._callbackNumbers) == 0:
+ _moduleLogger.debug("Could not extract callback numbers from GoogleVoice (the troublesome page follows):\n%s" % page)
def _send_validation(self, number):
if not self.is_valid_syntax(number):
import backend
import addressbook
+import conversations
_moduleLogger = logging.getLogger("gvoice.session")
self._password = None
self._backend = None
self._addressbook = None
+ self._conversations = None
def login(self, username, password):
self._username = username
self._password = None
self._backend = None
self._addressbook = None
+ self._conversations = None
def is_logged_in(self):
if self._backend is None:
if self._addressbook is None:
_moduleLogger.info("Initializing addressbook")
self._addressbook = addressbook.Addressbook(self.backend)
- self._addressbook.update()
return self._addressbook
+
+ @property
+ def conversations(self):
+ """
+ Delay initialized addressbook
+ """
+ if self._conversations is None:
+ _moduleLogger.info("Initializing conversations")
+ self._conversations = conversations.Conversationst(self.backend)
+ return self._conversations
self.profile = connection.username
+def strip_number(prettynumber):
+ """
+ function to take a phone number and strip out all non-numeric
+ characters
+
+ >>> strip_number("+012-(345)-678-90")
+ '01234567890'
+ """
+ import re
+ uglynumber = re.sub('\D', '', prettynumber)
+ return uglynumber
+
+
class ContactHandle(TheOneRingHandle):
- def __init__(self, connection, id, contactId):
+ def __init__(self, connection, id, contactId, phoneNumber):
handleType = telepathy.HANDLE_TYPE_CONTACT
- handleName = contactId
+ handleName = self.to_handle_name(contactId, phoneNumber)
TheOneRingHandle.__init__(self, connection, id, handleType, handleName)
self._contactId = contactId
+ self._phoneNumber = phoneNumber
+
+ @staticmethod
+ def from_handle_name(handleName):
+ parts = handleName.split("#")
+ assert len(parts) == 2
+ contactId, contactNumber = parts[0:2]
+ return contactId, contactNumber
+
+ @staticmethod
+ def to_handle_name(contactId, contactNumber):
+ handleName = "#".join((contactId, strip_number(contactNumber)))
+ return handleName
@property
def contactID(self):
import telepathy
-class LocationMixin(telepathy.server.ConnectionInterfaceLocation):
+#class LocationMixin(telepathy.server.ConnectionInterfaceLocation):
+class LocationMixin(object):
def __init__(self):
- telepathy.server.ConnectionInterfaceLocation.__init__(self)
+ #telepathy.server.ConnectionInterfaceLocation.__init__(self)
+ pass
@property
def session(self):
bool_results = function_map(preds, item)
return combiner(pass_bool, bool_results)
+
+
+def pushback_itr(itr):
+ """
+ >>> list(pushback_itr(xrange(5)))
+ [0, 1, 2, 3, 4]
+ >>>
+ >>> first = True
+ >>> itr = pushback_itr(xrange(5))
+ >>> for i in itr:
+ ... print i
+ ... if first and i == 2:
+ ... first = False
+ ... print itr.send(i)
+ 0
+ 1
+ 2
+ None
+ 2
+ 3
+ 4
+ >>>
+ >>> first = True
+ >>> itr = pushback_itr(xrange(5))
+ >>> for i in itr:
+ ... print i
+ ... if first and i == 2:
+ ... first = False
+ ... print itr.send(i)
+ ... print itr.send(i)
+ 0
+ 1
+ 2
+ None
+ None
+ 2
+ 2
+ 3
+ 4
+ >>>
+ >>> itr = pushback_itr(xrange(5))
+ >>> print itr.next()
+ 0
+ >>> print itr.next()
+ 1
+ >>> print itr.send(10)
+ None
+ >>> print itr.next()
+ 10
+ >>> print itr.next()
+ 2
+ >>> print itr.send(20)
+ None
+ >>> print itr.send(30)
+ None
+ >>> print itr.send(40)
+ None
+ >>> print itr.next()
+ 40
+ >>> print itr.next()
+ 30
+ >>> print itr.send(50)
+ None
+ >>> print itr.next()
+ 50
+ >>> print itr.next()
+ 20
+ >>> print itr.next()
+ 3
+ >>> print itr.next()
+ 4
+ """
+ for item in itr:
+ maybePushedBack = yield item
+ queue = []
+ while queue or maybePushedBack is not None:
+ if maybePushedBack is not None:
+ queue.append(maybePushedBack)
+ maybePushedBack = yield None
+ else:
+ item = queue.pop()
+ maybePushedBack = yield item
+
+
+if __name__ == "__main__":
+ import doctest
+ print doctest.testmod()
--- /dev/null
+from __future__ import with_statement
+
+import logging
+
+import sys
+sys.path.append("../src")
+
+import util.coroutines as coroutines
+
+import gvoice
+
+
+logging.basicConfig(level=logging.DEBUG)
+
+
+class MockBackend(object):
+
+ def __init__(self, contactsData):
+ self.contactsData = contactsData
+
+ def get_contacts(self):
+ return (
+ (i, contactData["name"])
+ for (i, contactData) in enumerate(self.contactsData)
+ )
+
+ def get_contact_details(self, contactId):
+ return self.contactsData[contactId]["details"]
+
+
+def generate_update_callback(callbackData):
+
+ @coroutines.func_sink
+ @coroutines.expand_positional
+ def callback(book, addedContacts, removedContacts, changedContacts):
+ callbackData.append((book, addedContacts, removedContacts, changedContacts))
+
+ return callback
+
+
+def test_no_contacts():
+ callbackData = []
+ callback = generate_update_callback(callbackData)
+
+ backend = MockBackend([])
+ book = gvoice.addressbook.Addressbook(backend)
+ book.updateSignalHandler.register_sink(callback)
+ assert len(callbackData) == 0, "%r" % callbackData
+
+ book.update()
+ assert len(callbackData) == 0, "%r" % callbackData
+
+ book.update(force=True)
+ assert len(callbackData) == 0, "%r" % callbackData
+
+ contacts = list(book.get_contacts())
+ assert len(contacts) == 0
+
+
+def test_one_contact_no_details():
+ callbackData = []
+ callback = generate_update_callback(callbackData)
+
+ backend = MockBackend([
+ {
+ "name": "One",
+ "details": [],
+ },
+ ])
+ book = gvoice.addressbook.Addressbook(backend)
+ book.updateSignalHandler.register_sink(callback)
+ assert len(callbackData) == 0, "%r" % callbackData
+
+ book.update()
+ assert len(callbackData) == 0, "%r" % callbackData
+
+ book.update(force=True)
+ assert len(callbackData) == 0, "%r" % callbackData
+
+ contacts = list(book.get_contacts())
+ assert len(contacts) == 1
+ id = contacts[0]
+ name = book.get_contact_name(id)
+ assert name == backend.contactsData[id]["name"]
+
+ contactDetails = list(book.get_contact_details(id))
+ assert len(contactDetails) == 0
+
+
+def test_one_contact_with_details():
+ callbackData = []
+ callback = generate_update_callback(callbackData)
+
+ backend = MockBackend([
+ {
+ "name": "One",
+ "details": [("Type A", "123"), ("Type B", "456"), ("Type C", "789")],
+ },
+ ])
+ book = gvoice.addressbook.Addressbook(backend)
+ book.updateSignalHandler.register_sink(callback)
+ assert len(callbackData) == 0, "%r" % callbackData
+
+ book.update()
+ assert len(callbackData) == 0, "%r" % callbackData
+
+ book.update(force=True)
+ assert len(callbackData) == 0, "%r" % callbackData
+
+ contacts = list(book.get_contacts())
+ assert len(contacts) == 1
+ id = contacts[0]
+ name = book.get_contact_name(id)
+ assert name == backend.contactsData[id]["name"]
+
+ contactDetails = list(book.get_contact_details(id))
+ print "%r" % contactDetails
+ assert len(contactDetails) == 3
+ assert contactDetails[0][0] == "Type A"
+ assert contactDetails[0][1] == "123"
+ assert contactDetails[1][0] == "Type B"
+ assert contactDetails[1][1] == "456"
+ assert contactDetails[2][0] == "Type C"
+ assert contactDetails[2][1] == "789"
+
+
+def test_adding_a_contact():
+ callbackData = []
+ callback = generate_update_callback(callbackData)
+
+ backend = MockBackend([
+ {
+ "name": "One",
+ "details": [],
+ },
+ ])
+ book = gvoice.addressbook.Addressbook(backend)
+ book.updateSignalHandler.register_sink(callback)
+ assert len(callbackData) == 0, "%r" % callbackData
+
+ book.update()
+ assert len(callbackData) == 0, "%r" % callbackData
+
+ book.update(force=True)
+ assert len(callbackData) == 0, "%r" % callbackData
+
+ backend.contactsData.append({
+ "name": "Two",
+ "details": [],
+ })
+
+ book.update()
+ assert len(callbackData) == 0, "%r" % callbackData
+
+ book.update(force=True)
+ assert len(callbackData) == 1, "%r" % callbackData
+
+ callbackBook, addedContacts, removedContacts, changedContacts = callbackData[0]
+ assert callbackBook is book
+ assert len(addedContacts) == 1
+ assert 1 in addedContacts
+ assert len(removedContacts) == 0
+ assert len(changedContacts) == 0
+
+
+def test_removing_a_contact():
+ callbackData = []
+ callback = generate_update_callback(callbackData)
+
+ backend = MockBackend([
+ {
+ "name": "One",
+ "details": [],
+ },
+ ])
+ book = gvoice.addressbook.Addressbook(backend)
+ book.updateSignalHandler.register_sink(callback)
+ assert len(callbackData) == 0, "%r" % callbackData
+
+ book.update()
+ assert len(callbackData) == 0, "%r" % callbackData
+
+ book.update(force=True)
+ assert len(callbackData) == 0, "%r" % callbackData
+
+ del backend.contactsData[:]
+
+ book.update()
+ assert len(callbackData) == 0, "%r" % callbackData
+
+ book.update(force=True)
+ assert len(callbackData) == 1, "%r" % callbackData
+
+ callbackBook, addedContacts, removedContacts, changedContacts = callbackData[0]
+ assert callbackBook is book
+ assert len(addedContacts) == 0
+ assert len(removedContacts) == 1
+ assert 0 in removedContacts
+ assert len(changedContacts) == 0
from __future__ import with_statement
-import os
-import warnings
import cookielib
+import logging
import test_utils
import sys
sys.path.append("../src")
-import gv_backend
+import gvoice
+
+
+logging.basicConfig(level=logging.DEBUG)
def generate_mock(cookiesSucceed, username, password):
def test_not_logged_in():
correctUsername, correctPassword = "", ""
MockBrowserModule = generate_mock(False, correctUsername, correctPassword)
- gv_backend.browser_emu, RealBrowser = MockBrowserModule, gv_backend.browser_emu
+ gvoice.backend.browser_emu, RealBrowser = MockBrowserModule, gvoice.backend.browser_emu
try:
- backend = gv_backend.GVDialer()
+ backend = gvoice.backend.GVoiceBackend()
assert not backend.is_authed()
assert not backend.login("bad_name", "bad_password")
backend.logout()
with test_utils.expected(RuntimeError):
backend.send_sms("5551234567", "Hello World")
assert backend.get_account_number() == "", "%s" % backend.get_account_number()
- backend.set_sane_callback()
+ gvoice.backend.set_sane_callback(backend)
assert backend.get_callback_number() == ""
with test_utils.expected(Exception):
recent = list(backend.get_recent())
with test_utils.expected(Exception):
messages = list(backend.get_messages())
finally:
- gv_backend.browser_emu = RealBrowser
+ gvoice.backend.browser_emu = RealBrowser