3 from __future__ import with_statement
17 _moduleLogger = logging.getLogger(__name__)
22 Decorator that makes a generator-function into a function that will continue execution on next call
26 @functools.wraps(func)
27 def decorated_func(*args, **kwds):
29 a.append(func(*args, **kwds))
42 Make a function mainloop friendly. the function will be called at the
43 next mainloop idle state.
46 >>> misc.validate_decorator(async)
49 @functools.wraps(func)
50 def new_function(*args, **kwargs):
56 gobject.idle_add(async_function)
63 def __init__(self, func, once = True):
69 assert self.__idleId is None
71 self.__idleId = gobject.idle_add(self._on_once)
73 self.__idleId = gobject.idle_add(self.__func)
76 return self.__idleId is not None
79 if self.__idleId is not None:
80 gobject.source_remove(self.__idleId)
86 @misc.log_exception(_moduleLogger)
96 class Timeout(object):
98 def __init__(self, func):
100 self.__timeoutId = None
102 def start(self, **kwds):
103 assert self.__timeoutId is None
105 assert len(kwds) == 1
106 timeoutInSeconds = kwds["seconds"]
107 assert 0 <= timeoutInSeconds
108 if timeoutInSeconds == 0:
109 self.__timeoutId = gobject.idle_add(self._on_once)
111 self.__timeoutId = timeout_add_seconds(timeoutInSeconds, self._on_once)
113 def is_running(self):
114 return self.__timeoutId is not None
117 if self.__timeoutId is not None:
118 gobject.source_remove(self.__timeoutId)
119 self.__timeoutId = None
121 def __call__(self, **kwds):
122 return self.start(**kwds)
124 @misc.log_exception(_moduleLogger)
134 _QUEUE_EMPTY = object()
137 class AsyncPool(object):
140 self.__workQueue = Queue.Queue()
141 self.__thread = threading.Thread(
142 name = type(self).__name__,
143 target = self.__consume_queue,
145 self.__isRunning = True
148 self.__thread.start()
151 self.__isRunning = False
152 for _ in algorithms.itr_available(self.__workQueue):
153 pass # eat up queue to cut down dumb work
154 self.__workQueue.put(_QUEUE_EMPTY)
156 def clear_tasks(self):
157 for _ in algorithms.itr_available(self.__workQueue):
158 pass # eat up queue to cut down dumb work
160 def add_task(self, func, args, kwds, on_success, on_error):
161 task = func, args, kwds, on_success, on_error
162 self.__workQueue.put(task)
164 @misc.log_exception(_moduleLogger)
165 def __trampoline_callback(self, on_success, on_error, isError, result):
166 if not self.__isRunning:
168 _moduleLogger.error("Masking: %s" % (result, ))
170 result = StopIteration("Cancelling all callbacks")
171 callback = on_success if not isError else on_error
175 _moduleLogger.exception("Callback errored")
178 @misc.log_exception(_moduleLogger)
179 def __consume_queue(self):
181 task = self.__workQueue.get()
182 if task is _QUEUE_EMPTY:
184 func, args, kwds, on_success, on_error = task
187 result = func(*args, **kwds)
190 _moduleLogger.error("Error, passing it back to the main thread")
193 self.__workQueue.task_done()
195 gobject.idle_add(self.__trampoline_callback, on_success, on_error, isError, result)
196 _moduleLogger.debug("Shutting down worker thread")
199 class AsyncLinearExecution(object):
201 def __init__(self, pool, func):
206 def start(self, *args, **kwds):
207 assert self._run is None
208 self._run = self._func(*args, **kwds)
209 trampoline, args, kwds = self._run.send(None) # priming the function
218 @misc.log_exception(_moduleLogger)
219 def on_success(self, result):
220 _moduleLogger.debug("Processing success for: %r", self._func)
222 trampoline, args, kwds = self._run.send(result)
223 except StopIteration, e:
234 @misc.log_exception(_moduleLogger)
235 def on_error(self, error):
236 _moduleLogger.debug("Processing error for: %r", self._func)
238 trampoline, args, kwds = self._run.throw(error)
239 except StopIteration, e:
251 def throttled(minDelay, queue):
253 Throttle the calls to a function by queueing all the calls that happen
254 before the minimum delay
258 >>> misc.validate_decorator(throttled(0, Queue.Queue()))
261 def actual_decorator(func):
263 lastCallTime = [None]
267 func, args, kwargs = queue.pop(0)
268 lastCallTime[0] = time.time() * 1000
269 func(*args, **kwargs)
272 @functools.wraps(func)
273 def new_function(*args, **kwargs):
274 now = time.time() * 1000
276 lastCallTime[0] is None or
277 (now - lastCallTime >= minDelay)
279 lastCallTime[0] = now
280 func(*args, **kwargs)
282 queue.append((func, args, kwargs))
283 lastCallDelta = now - lastCallTime[0]
284 processQueueTimeout = int(minDelay * len(queue) - lastCallDelta)
285 gobject.timeout_add(processQueueTimeout, process_queue)
289 return actual_decorator
292 def _old_timeout_add_seconds(timeout, callback):
293 return gobject.timeout_add(timeout * 1000, callback)
296 def _timeout_add_seconds(timeout, callback):
297 return gobject.timeout_add_seconds(timeout, callback)
301 gobject.timeout_add_seconds
302 timeout_add_seconds = _timeout_add_seconds
303 except AttributeError:
304 timeout_add_seconds = _old_timeout_add_seconds