--- /dev/null
+#!/usr/bin/env python
+
+from __future__ import with_statement
+
+import os
+import errno
+import time
+import functools
+import contextlib
+import logging
+
+import misc
+
+
+_moduleLogger = logging.getLogger(__name__)
+
+
+class AsyncTaskQueue(object):
+
+ def __init__(self, taskPool):
+ self._asyncs = []
+ self._taskPool = taskPool
+
+ def add_async(self, func):
+ self.flush()
+ a = AsyncGeneratorTask(self._taskPool, func)
+ self._asyncs.append(a)
+ return a
+
+ def flush(self):
+ self._asyncs = [a for a in self._asyncs if not a.isDone]
+
+
+class AsyncGeneratorTask(object):
+
+ def __init__(self, pool, func):
+ self._pool = pool
+ self._func = func
+ self._run = None
+ self._isDone = False
+
+ @property
+ def isDone(self):
+ return self._isDone
+
+ def start(self, *args, **kwds):
+ assert self._run is None, "Task already started"
+ self._run = self._func(*args, **kwds)
+ trampoline, args, kwds = self._run.send(None) # priming the function
+ self._pool.add_task(
+ trampoline,
+ args,
+ kwds,
+ self.on_success,
+ self.on_error,
+ )
+
+ @misc.log_exception(_moduleLogger)
+ def on_success(self, result):
+ _moduleLogger.debug("Processing success for: %r", self._func)
+ try:
+ trampoline, args, kwds = self._run.send(result)
+ except StopIteration, e:
+ self._isDone = True
+ else:
+ self._pool.add_task(
+ trampoline,
+ args,
+ kwds,
+ self.on_success,
+ self.on_error,
+ )
+
+ @misc.log_exception(_moduleLogger)
+ def on_error(self, error):
+ _moduleLogger.debug("Processing error for: %r", self._func)
+ try:
+ trampoline, args, kwds = self._run.throw(error)
+ except StopIteration, e:
+ self._isDone = True
+ else:
+ self._pool.add_task(
+ trampoline,
+ args,
+ kwds,
+ self.on_success,
+ self.on_error,
+ )
+
+ def __repr__(self):
+ return "<async %s at 0x%x>" % (self._func.__name__, id(self))
+
+ def __hash__(self):
+ return hash(self._func)
+
+ def __eq__(self, other):
+ return self._func == other._func
+
+ def __ne__(self, other):
+ return self._func != other._func
+
+
+def synchronized(lock):
+ """
+ Synchronization decorator.
+
+ >>> import misc
+ >>> misc.validate_decorator(synchronized(object()))
+ """
+
+ def wrap(f):
+
+ @functools.wraps(f)
+ def newFunction(*args, **kw):
+ lock.acquire()
+ try:
+ return f(*args, **kw)
+ finally:
+ lock.release()
+ return newFunction
+ return wrap
+
+
+@contextlib.contextmanager
+def qlock(queue, gblock = True, gtimeout = None, pblock = True, ptimeout = None):
+ """
+ Locking with a queue, good for when you want to lock an item passed around
+
+ >>> import Queue
+ >>> item = 5
+ >>> lock = Queue.Queue()
+ >>> lock.put(item)
+ >>> with qlock(lock) as i:
+ ... print i
+ 5
+ """
+ item = queue.get(gblock, gtimeout)
+ try:
+ yield item
+ finally:
+ queue.put(item, pblock, ptimeout)
+
+
+@contextlib.contextmanager
+def flock(path, timeout=-1):
+ WAIT_FOREVER = -1
+ DELAY = 0.1
+ timeSpent = 0
+
+ acquired = False
+
+ while timeSpent <= timeout or timeout == WAIT_FOREVER:
+ try:
+ fd = os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR)
+ acquired = True
+ break
+ except OSError, e:
+ if e.errno != errno.EEXIST:
+ raise
+ time.sleep(DELAY)
+ timeSpent += DELAY
+
+ assert acquired, "Failed to grab file-lock %s within timeout %d" % (path, timeout)
+
+ try:
+ yield fd
+ finally:
+ os.unlink(path)