Upload to builder support plus adding some hand tests
[theonering] / hand_tests / generic.py
1 #!/usr/bin/env python
2
3 import sys
4
5 import gobject
6 import dbus.mainloop.glib
7 dbus.mainloop.glib.DBusGMainLoop(set_as_default = True)
8
9 import telepathy
10
11
12 def get_registry():
13         reg = telepathy.client.ManagerRegistry()
14         reg.LoadManagers()
15         return reg
16
17
18 def get_connection_manager(reg):
19         cm = reg.GetManager('theonering')
20         return cm
21
22
23 class Action(object):
24
25         def __init__(self):
26                 self._action = None
27
28         def queue_action(self):
29                 pass
30
31         def append_action(self, action):
32                 assert self._action is None
33                 self._action = action
34
35         def _on_done(self):
36                 if self._action is None:
37                         return
38                 self._action.queue_action()
39
40         def _on_error(self, error):
41                 print error
42
43         def _on_generic_message(self, *args):
44                 pass
45
46
47 class QuitLoop(Action):
48
49         def __init__(self, loop):
50                 super(QuitLoop, self).__init__()
51                 self._loop = loop
52
53         def queue_action(self):
54                 self._loop.quit()
55
56
57 class DisplayParams(Action):
58
59         def __init__(self, cm):
60                 super(DisplayParams, self).__init__()
61                 self._cm = cm
62
63         def queue_action(self):
64                 self._cm[telepathy.interfaces.CONN_MGR_INTERFACE].GetParameters(
65                         'sip',
66                         reply_handler = self._on_done,
67                         error_handler = self._on_error,
68                 )
69
70         def _on_done(self, params):
71                 super(DisplayParams, self)._on_done()
72                 for name, flags, signature, default in params:
73                         print "%s (%s)" % (name, signature),
74
75                         if flags & telepathy.constants.CONN_MGR_PARAM_FLAG_REQUIRED:
76                                 print "required",
77                         if flags & telepathy.constants.CONN_MGR_PARAM_FLAG_REGISTER:
78                                 print "register",
79                         if flags & telepathy.constants.CONN_MGR_PARAM_FLAG_SECRET:
80                                 print "secret",
81                         if flags & telepathy.constants.CONN_MGR_PARAM_FLAG_DBUS_PROPERTY:
82                                 print "dbus-property",
83                         if flags & telepathy.constants.CONN_MGR_PARAM_FLAG_HAS_DEFAULT:
84                                 print "has-default(%s)" % default,
85
86                         print ""
87
88
89 class Connect(Action):
90
91         def __init__(self, cm, username, password, forward):
92                 super(Connect, self).__init__()
93                 self._cm = cm
94                 self._conn = None
95                 self._username = username
96                 self._password = password
97                 self._forward = forward
98
99         @property
100         def conn(self):
101                 return self._conn
102
103         def queue_action(self):
104                 self._cm[telepathy.server.CONNECTION_MANAGER].RequestConnection(
105                         'sip',
106                         {
107                                 'username':  self._username,
108                                 'password': self._password,
109                                 'forward':  self._forward,
110                         },
111                         reply_handler = self._on_connection_requested,
112                         error_handler = self._on_error,
113                 )
114
115         def _on_connection_requested(self, busName, objectPath):
116                 self._conn = telepathy.client.Connection(busName, objectPath)
117                 self._conn[telepathy.server.CONNECTION].connect_to_signal(
118                         'StatusChanged',
119                         self._on_change,
120                 )
121                 self._conn[telepathy.server.CONNECTION].Connect(
122                         reply_handler = self._on_generic_message,
123                         error_handler = self._on_error,
124                 )
125
126         def _on_done(self):
127                 super(Connect, self)._on_done()
128
129         def _on_change(self, status, reason):
130                 if status == telepathy.constants.CONNECTION_STATUS_DISCONNECTED:
131                         print "Disconnected!"
132                         self._conn = None
133                 elif status == telepathy.constants.CONNECTION_STATUS_CONNECTED:
134                         print "Connected"
135                         self._on_done()
136                 else:
137                         print "Status: %r" % status
138
139
140 class Disconnect(Action):
141
142         def __init__(self, connAction):
143                 super(Disconnect, self).__init__()
144                 self._connAction = connAction
145
146         def queue_action(self):
147                 self._connAction.conn[telepathy.server.CONNECTION].Disconnect(
148                         reply_handler = self._on_done,
149                         error_handler = self._on_error,
150                 )
151
152
153 if __name__ == '__main__':
154         loop = gobject.MainLoop()
155
156         reg = get_registry()
157         cm = get_connection_manager(reg)
158
159         dummy = Action()
160         lastAction = dummy
161
162         dp = DisplayParams(cm)
163         lastAction.append_action(dp)
164         lastAction = dp
165
166         username = sys.argv[1]
167         password = sys.argv[2]
168         forward = sys.argv[3]
169         con = Connect(cm, username, password, forward)
170         lastAction.append_action(con)
171         lastAction = con
172
173         dis = Disconnect(con)
174         lastAction.append_action(dis)
175         lastAction = dis
176
177         quitter = QuitLoop(loop)
178         lastAction.append_action(quitter)
179         lastAction = quitter
180
181         dp.queue_action()
182         loop.run()