5 import dbus.mainloop.glib
9 DBUS_PROPERTIES = 'org.freedesktop.DBus.Properties'
12 class AutoAcceptAttempt(object):
14 def __init__(self, bus, chan):
15 self._sessionBus = bus
18 self._selfHandle = None
19 self._initiatorHandle = None
20 self._initiatorID = None
21 self._targetHandle = None
23 self._pendingHandles = None
26 # @bug Unsure why this isn't working
27 self._chan[DBUS_PROPERTIES].Get(
28 telepathy.interfaces.CONNECTION_INTERFACE,
30 reply_handler = self._on_got_self_handle,
31 error_handler = self._on_nothing,
34 self._chan[telepathy.interfaces.CHANNEL_INTERFACE_GROUP].GetSelfHandle(
35 reply_handler = self._on_got_self_handle,
36 error_handler = self._on_nothing,
39 self._chan[DBUS_PROPERTIES].GetAll(
40 telepathy.interfaces.CHANNEL_INTERFACE,
41 reply_handler = self._on_got_all,
42 error_handler = self._on_nothing,
46 # @bug Unsure why this isn't working
47 self._chan[DBUS_PROPERTIES].Get(
48 telepathy.interfaces.CHANNEL_INTERFACE_GROUP,
49 'LocalPendingMembers',
50 reply_handler = self._on_got_pending_members,
51 error_handler = self._on_nothing,
54 self._chan[telepathy.interfaces.CHANNEL_INTERFACE_GROUP].GetLocalPendingMembersWithInfo(
55 reply_handler = self._on_got_pending_members,
56 error_handler = self._on_nothing,
59 def _pickup_if_ready(self):
62 self._initiatorHandle,
68 # Note ready yet, still some outstanding requests
71 if self._selfHandle != self._targetHandle:
72 # Turns out it was an inbound call
75 # @bug does not distinguish between who the call is from for use for TOR auto-pickup
76 self._chan[telepathy.interfaces.CHANNEL_INTERFACE_GROUP].AddMembers(
77 reply_handler = self._on_members_added,
78 error_handler = self._on_nothing,
81 def _on_got_self_handle(self, selfHandle):
82 self._pickup_if_ready()
84 def _on_got_all(self, properties):
85 self._initiatorID = properties["InitiatorID"]
86 self._initiatorHandle = properties["InitiatorHandle"]
87 self._targetID = properties["InitiatorID"]
88 self._targetHandle = properties["InitiatorHandle"]
90 self._pickup_if_ready()
92 def _on_got_pending_members(self, pendings):
93 for pendingHandle, instigatorHandle, reason, message in pendings:
94 print pendingHandle, instigatorHandle, reason, message
96 self._pendingHandles = [pendingWithInfo[0] for pendingWithInfo in pendings]
97 self._pickup_if_ready()
99 def _on_members_added(self):
100 print "Should be picked up now"
102 def _on_nothing(*args):
106 class AutoAcceptCall(object):
109 self._sessionBus = dbus.SessionBus()
110 self._activeAttempts = []
113 self._sessionBus.add_signal_receiver(
114 self._on_new_channel,
116 "org.freedesktop.Telepathy.Connection",
121 def _on_new_channel(self, channelObjectPath, channelType, handleType, handle, supressHandler):
122 if channelType != telepathy.interfaces.CHANNEL_TYPE_STREAMED_MEDIA:
125 serviceName = channelObjectPath.rsplit("/", 1)[0][1:].replace("/", ".")
126 chan = telepathy.client.Channel(serviceName, channelObjectPath)
127 # @bug does not distinguish between preferred CMs
128 # @todo Need a way to be notified on error, ignored, or if picked up
129 attemptPickup = AutoAcceptAttempt(self._sessionBus, chan)
130 self._activeAttempts.append(attemptPickup)
132 def _on_nothing(*args):
136 if __name__ == "__main__":
137 l = dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
138 autoaccept = AutoAcceptCall()
140 gobject.threads_init()
141 gobject.idle_add(autoaccept.start)
143 mainloop = gobject.MainLoop(is_running=True)