json, icon, and misc fixes and improvements made to ejpi/gonvert
[gc-dialer] / dialcentral / util / go_utils.py
1 #!/usr/bin/env python
2
3 from __future__ import with_statement
4
5 import time
6 import functools
7 import threading
8 import Queue
9 import logging
10
11 import gobject
12
13 import algorithms
14 import misc
15
16
17 _moduleLogger = logging.getLogger(__name__)
18
19
20 def make_idler(func):
21         """
22         Decorator that makes a generator-function into a function that will continue execution on next call
23         """
24         a = []
25
26         @functools.wraps(func)
27         def decorated_func(*args, **kwds):
28                 if not a:
29                         a.append(func(*args, **kwds))
30                 try:
31                         a[0].next()
32                         return True
33                 except StopIteration:
34                         del a[:]
35                         return False
36
37         return decorated_func
38
39
40 def async(func):
41         """
42         Make a function mainloop friendly. the function will be called at the
43         next mainloop idle state.
44
45         >>> import misc
46         >>> misc.validate_decorator(async)
47         """
48
49         @functools.wraps(func)
50         def new_function(*args, **kwargs):
51
52                 def async_function():
53                         func(*args, **kwargs)
54                         return False
55
56                 gobject.idle_add(async_function)
57
58         return new_function
59
60
61 class Async(object):
62
63         def __init__(self, func, once = True):
64                 self.__func = func
65                 self.__idleId = None
66                 self.__once = once
67
68         def start(self):
69                 assert self.__idleId is None
70                 if self.__once:
71                         self.__idleId = gobject.idle_add(self._on_once)
72                 else:
73                         self.__idleId = gobject.idle_add(self.__func)
74
75         def is_running(self):
76                 return self.__idleId is not None
77
78         def cancel(self):
79                 if self.__idleId is not None:
80                         gobject.source_remove(self.__idleId)
81                         self.__idleId = None
82
83         def __call__(self):
84                 return self.start()
85
86         @misc.log_exception(_moduleLogger)
87         def _on_once(self):
88                 self.cancel()
89                 try:
90                         self.__func()
91                 except Exception:
92                         pass
93                 return False
94
95
96 class Timeout(object):
97
98         def __init__(self, func, once = True):
99                 self.__func = func
100                 self.__timeoutId = None
101                 self.__once = once
102
103         def start(self, **kwds):
104                 assert self.__timeoutId is None
105
106                 callback = self._on_once if self.__once else self.__func
107
108                 assert len(kwds) == 1
109                 timeoutInSeconds = kwds["seconds"]
110                 assert 0 <= timeoutInSeconds
111
112                 if timeoutInSeconds == 0:
113                         self.__timeoutId = gobject.idle_add(callback)
114                 else:
115                         self.__timeoutId = timeout_add_seconds(timeoutInSeconds, callback)
116
117         def is_running(self):
118                 return self.__timeoutId is not None
119
120         def cancel(self):
121                 if self.__timeoutId is not None:
122                         gobject.source_remove(self.__timeoutId)
123                         self.__timeoutId = None
124
125         def __call__(self, **kwds):
126                 return self.start(**kwds)
127
128         @misc.log_exception(_moduleLogger)
129         def _on_once(self):
130                 self.cancel()
131                 try:
132                         self.__func()
133                 except Exception:
134                         pass
135                 return False
136
137
138 _QUEUE_EMPTY = object()
139
140
141 class FutureThread(object):
142
143         def __init__(self):
144                 self.__workQueue = Queue.Queue()
145                 self.__thread = threading.Thread(
146                         name = type(self).__name__,
147                         target = self.__consume_queue,
148                 )
149                 self.__isRunning = True
150
151         def start(self):
152                 self.__thread.start()
153
154         def stop(self):
155                 self.__isRunning = False
156                 for _ in algorithms.itr_available(self.__workQueue):
157                         pass # eat up queue to cut down dumb work
158                 self.__workQueue.put(_QUEUE_EMPTY)
159
160         def clear_tasks(self):
161                 for _ in algorithms.itr_available(self.__workQueue):
162                         pass # eat up queue to cut down dumb work
163
164         def add_task(self, func, args, kwds, on_success, on_error):
165                 task = func, args, kwds, on_success, on_error
166                 self.__workQueue.put(task)
167
168         @misc.log_exception(_moduleLogger)
169         def __trampoline_callback(self, on_success, on_error, isError, result):
170                 if not self.__isRunning:
171                         if isError:
172                                 _moduleLogger.error("Masking: %s" % (result, ))
173                         isError = True
174                         result = StopIteration("Cancelling all callbacks")
175                 callback = on_success if not isError else on_error
176                 try:
177                         callback(result)
178                 except Exception:
179                         _moduleLogger.exception("Callback errored")
180                 return False
181
182         @misc.log_exception(_moduleLogger)
183         def __consume_queue(self):
184                 while True:
185                         task = self.__workQueue.get()
186                         if task is _QUEUE_EMPTY:
187                                 break
188                         func, args, kwds, on_success, on_error = task
189
190                         try:
191                                 result = func(*args, **kwds)
192                                 isError = False
193                         except Exception, e:
194                                 _moduleLogger.error("Error, passing it back to the main thread")
195                                 result = e
196                                 isError = True
197                         self.__workQueue.task_done()
198
199                         gobject.idle_add(self.__trampoline_callback, on_success, on_error, isError, result)
200                 _moduleLogger.debug("Shutting down worker thread")
201
202
203 class AutoSignal(object):
204
205         def __init__(self, toplevel):
206                 self.__disconnectPool = []
207                 toplevel.connect("destroy", self.__on_destroy)
208
209         def connect_auto(self, widget, *args):
210                 id = widget.connect(*args)
211                 self.__disconnectPool.append((widget, id))
212
213         @misc.log_exception(_moduleLogger)
214         def __on_destroy(self, widget):
215                 _moduleLogger.info("Destroy: %r (%s to clean up)" % (self, len(self.__disconnectPool)))
216                 for widget, id in self.__disconnectPool:
217                         widget.disconnect(id)
218                 del self.__disconnectPool[:]
219
220
221 def throttled(minDelay, queue):
222         """
223         Throttle the calls to a function by queueing all the calls that happen
224         before the minimum delay
225
226         >>> import misc
227         >>> import Queue
228         >>> misc.validate_decorator(throttled(0, Queue.Queue()))
229         """
230
231         def actual_decorator(func):
232
233                 lastCallTime = [None]
234
235                 def process_queue():
236                         if 0 < len(queue):
237                                 func, args, kwargs = queue.pop(0)
238                                 lastCallTime[0] = time.time() * 1000
239                                 func(*args, **kwargs)
240                         return False
241
242                 @functools.wraps(func)
243                 def new_function(*args, **kwargs):
244                         now = time.time() * 1000
245                         if (
246                                 lastCallTime[0] is None or
247                                 (now - lastCallTime >= minDelay)
248                         ):
249                                 lastCallTime[0] = now
250                                 func(*args, **kwargs)
251                         else:
252                                 queue.append((func, args, kwargs))
253                                 lastCallDelta = now - lastCallTime[0]
254                                 processQueueTimeout = int(minDelay * len(queue) - lastCallDelta)
255                                 gobject.timeout_add(processQueueTimeout, process_queue)
256
257                 return new_function
258
259         return actual_decorator
260
261
262 def _old_timeout_add_seconds(timeout, callback):
263         return gobject.timeout_add(timeout * 1000, callback)
264
265
266 def _timeout_add_seconds(timeout, callback):
267         return gobject.timeout_add_seconds(timeout, callback)
268
269
270 try:
271         gobject.timeout_add_seconds
272         timeout_add_seconds = _timeout_add_seconds
273 except AttributeError:
274         timeout_add_seconds = _old_timeout_add_seconds