3 from __future__ import with_statement
15 _moduleLogger = logging.getLogger(__name__)
18 class AsyncTaskQueue(object):
20 def __init__(self, taskPool):
22 self._taskPool = taskPool
24 def add_async(self, func):
26 a = _AsyncGeneratorTask(self._taskPool, func)
27 self._asyncs.append(a)
31 self._asyncs = [a for a in self._asyncs if not a.isDone]
34 class _AsyncGeneratorTask(object):
36 def __init__(self, pool, func):
46 def start(self, *args, **kwds):
47 assert self._run is None, "Task already started"
48 self._run = self._func(*args, **kwds)
49 trampoline, args, kwds = self._run.send(None) # priming the function
58 @misc.log_exception(_moduleLogger)
59 def on_success(self, result):
60 _moduleLogger.debug("Processing success for: %r", self._func)
62 trampoline, args, kwds = self._run.send(result)
63 except StopIteration, e:
74 @misc.log_exception(_moduleLogger)
75 def on_error(self, error):
76 _moduleLogger.debug("Processing error for: %r", self._func)
78 trampoline, args, kwds = self._run.throw(error)
79 except StopIteration, e:
91 return "<async %s at 0x%x>" % (self._func.__name__, id(self))
94 return hash(self._func)
96 def __eq__(self, other):
97 return self._func == other._func
99 def __ne__(self, other):
100 return self._func != other._func
103 def synchronized(lock):
105 Synchronization decorator.
108 >>> misc.validate_decorator(synchronized(object()))
114 def newFunction(*args, **kw):
117 return f(*args, **kw)
124 @contextlib.contextmanager
125 def qlock(queue, gblock = True, gtimeout = None, pblock = True, ptimeout = None):
127 Locking with a queue, good for when you want to lock an item passed around
131 >>> lock = Queue.Queue()
133 >>> with qlock(lock) as i:
137 item = queue.get(gblock, gtimeout)
141 queue.put(item, pblock, ptimeout)
144 @contextlib.contextmanager
145 def flock(path, timeout=-1):
152 while timeSpent <= timeout or timeout == WAIT_FOREVER:
154 fd = os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR)
158 if e.errno != errno.EEXIST:
163 assert acquired, "Failed to grab file-lock %s within timeout %d" % (path, timeout)