Adding support for coroutines for async ops through trampolines
[theonering] / src / util / go_utils.py
1 #!/usr/bin/env python
2
3 from __future__ import with_statement
4
5 import time
6 import functools
7 import threading
8 import Queue
9 import logging
10
11 import gobject
12
13 import algorithms
14 import misc
15
16
17 _moduleLogger = logging.getLogger(__name__)
18
19
20 def make_idler(func):
21         """
22         Decorator that makes a generator-function into a function that will continue execution on next call
23         """
24         a = []
25
26         @functools.wraps(func)
27         def decorated_func(*args, **kwds):
28                 if not a:
29                         a.append(func(*args, **kwds))
30                 try:
31                         a[0].next()
32                         return True
33                 except StopIteration:
34                         del a[:]
35                         return False
36
37         return decorated_func
38
39
40 def async(func):
41         """
42         Make a function mainloop friendly. the function will be called at the
43         next mainloop idle state.
44
45         >>> import misc
46         >>> misc.validate_decorator(async)
47         """
48
49         @functools.wraps(func)
50         def new_function(*args, **kwargs):
51
52                 def async_function():
53                         func(*args, **kwargs)
54                         return False
55
56                 gobject.idle_add(async_function)
57
58         return new_function
59
60
61 class Async(object):
62
63         def __init__(self, func, once = True):
64                 self.__func = func
65                 self.__idleId = None
66                 self.__once = once
67
68         def start(self):
69                 assert self.__idleId is None
70                 if self.__once:
71                         self.__idleId = gobject.idle_add(self._on_once)
72                 else:
73                         self.__idleId = gobject.idle_add(self.__func)
74
75         def is_running(self):
76                 return self.__idleId is not None
77
78         def cancel(self):
79                 if self.__idleId is not None:
80                         gobject.source_remove(self.__idleId)
81                         self.__idleId = None
82
83         def __call__(self):
84                 return self.start()
85
86         @misc.log_exception(_moduleLogger)
87         def _on_once(self):
88                 self.cancel()
89                 try:
90                         self.__func()
91                 except Exception:
92                         pass
93                 return False
94
95
96 class Timeout(object):
97
98         def __init__(self, func):
99                 self.__func = func
100                 self.__timeoutId = None
101
102         def start(self, **kwds):
103                 assert self.__timeoutId is None
104
105                 assert len(kwds) == 1
106                 timeoutInSeconds = kwds["seconds"]
107                 assert 0 <= timeoutInSeconds
108                 if timeoutInSeconds == 0:
109                         self.__timeoutId = gobject.idle_add(self._on_once)
110                 else:
111                         self.__timeoutId = timeout_add_seconds(timeoutInSeconds, self._on_once)
112
113         def is_running(self):
114                 return self.__timeoutId is not None
115
116         def cancel(self):
117                 if self.__timeoutId is not None:
118                         gobject.source_remove(self.__timeoutId)
119                         self.__timeoutId = None
120
121         def __call__(self, **kwds):
122                 return self.start(**kwds)
123
124         @misc.log_exception(_moduleLogger)
125         def _on_once(self):
126                 self.cancel()
127                 try:
128                         self.__func()
129                 except Exception:
130                         pass
131                 return False
132
133
134 _QUEUE_EMPTY = object()
135
136
137 class AsyncPool(object):
138
139         def __init__(self):
140                 self.__workQueue = Queue.Queue()
141                 self.__thread = threading.Thread(
142                         name = type(self).__name__,
143                         target = self.__consume_queue,
144                 )
145                 self.__isRunning = True
146
147         def start(self):
148                 self.__thread.start()
149
150         def stop(self):
151                 self.__isRunning = False
152                 for _ in algorithms.itr_available(self.__workQueue):
153                         pass # eat up queue to cut down dumb work
154                 self.__workQueue.put(_QUEUE_EMPTY)
155
156         def add_task(self, func, args, kwds, on_success, on_error):
157                 task = func, args, kwds, on_success, on_error
158                 self.__workQueue.put(task)
159
160         @misc.log_exception(_moduleLogger)
161         def __trampoline_callback(self, on_success, on_error, isError, result):
162                 if not self.__isRunning:
163                         if isError:
164                                 _moduleLogger.error("Masking: %s" % (result, ))
165                         isError = True
166                         result = StopIteration("Cancelling all callbacks")
167                 callback = on_success if not isError else on_error
168                 try:
169                         callback(result)
170                 except Exception:
171                         pass
172                 return False
173
174         @misc.log_exception(_moduleLogger)
175         def __consume_queue(self):
176                 while True:
177                         task = self.__workQueue.get()
178                         if task is _QUEUE_EMPTY:
179                                 break
180                         func, args, kwds, on_success, on_error = task
181
182                         try:
183                                 result = func(*args, **kwds)
184                                 isError = False
185                         except Exception, e:
186                                 result = e
187                                 isError = True
188                         self.__workQueue.task_done()
189
190                         gobject.idle_add(self.__trampoline_callback, on_success, on_error, isError, result)
191
192
193 class LinearExecution(object):
194
195         def __init__(self, func):
196                 self._func = func
197                 self._run = None
198
199         def start(self, *args, **kwds):
200                 assert self._run is None
201                 kwds["on_success"] = self.on_success
202                 kwds["on_error"] = self.on_error
203                 self._run = self._func(*args, **kwds)
204                 trampoline, args, kwds = self._run.send(None) # priming the function
205                 trampoline(*args, **kwds)
206
207         @misc.log_exception(_moduleLogger)
208         def on_success(self, *args, **kwds):
209                 assert not kwds
210                 try:
211                         trampoline, args, kwds = self._run.send(args)
212                 except StopIteration, e:
213                         pass
214                 else:
215                         trampoline(*args, **kwds)
216
217         @misc.log_exception(_moduleLogger)
218         def on_error(self, error):
219                 try:
220                         trampoline, args, kwds = self._run.throw(error)
221                 except StopIteration, e:
222                         pass
223                 else:
224                         trampoline(*args, **kwds)
225
226
227 def throttled(minDelay, queue):
228         """
229         Throttle the calls to a function by queueing all the calls that happen
230         before the minimum delay
231
232         >>> import misc
233         >>> import Queue
234         >>> misc.validate_decorator(throttled(0, Queue.Queue()))
235         """
236
237         def actual_decorator(func):
238
239                 lastCallTime = [None]
240
241                 def process_queue():
242                         if 0 < len(queue):
243                                 func, args, kwargs = queue.pop(0)
244                                 lastCallTime[0] = time.time() * 1000
245                                 func(*args, **kwargs)
246                         return False
247
248                 @functools.wraps(func)
249                 def new_function(*args, **kwargs):
250                         now = time.time() * 1000
251                         if (
252                                 lastCallTime[0] is None or
253                                 (now - lastCallTime >= minDelay)
254                         ):
255                                 lastCallTime[0] = now
256                                 func(*args, **kwargs)
257                         else:
258                                 queue.append((func, args, kwargs))
259                                 lastCallDelta = now - lastCallTime[0]
260                                 processQueueTimeout = int(minDelay * len(queue) - lastCallDelta)
261                                 gobject.timeout_add(processQueueTimeout, process_queue)
262
263                 return new_function
264
265         return actual_decorator
266
267
268 def _old_timeout_add_seconds(timeout, callback):
269         return gobject.timeout_add(timeout * 1000, callback)
270
271
272 def _timeout_add_seconds(timeout, callback):
273         return gobject.timeout_add_seconds(timeout, callback)
274
275
276 try:
277         gobject.timeout_add_seconds
278         timeout_add_seconds = _timeout_add_seconds
279 except AttributeError:
280         timeout_add_seconds = _old_timeout_add_seconds