Bump to 1.1.5
[gonvert] / gonvert / util / concurrent.py
1 #!/usr/bin/env python
2
3 from __future__ import with_statement
4
5 import os
6 import errno
7 import time
8 import functools
9 import contextlib
10 import logging
11
12 import misc
13
14
15 _moduleLogger = logging.getLogger(__name__)
16
17
18 class AsyncTaskQueue(object):
19
20         def __init__(self, taskPool):
21                 self._asyncs = []
22                 self._taskPool = taskPool
23
24         def add_async(self, func):
25                 self.flush()
26                 a = AsyncGeneratorTask(self._taskPool, func)
27                 self._asyncs.append(a)
28                 return a
29
30         def flush(self):
31                 self._asyncs = [a for a in self._asyncs if not a.isDone]
32
33
34 class AsyncGeneratorTask(object):
35
36         def __init__(self, pool, func):
37                 self._pool = pool
38                 self._func = func
39                 self._run = None
40                 self._isDone = False
41
42         @property
43         def isDone(self):
44                 return self._isDone
45
46         def start(self, *args, **kwds):
47                 assert self._run is None, "Task already started"
48                 self._run = self._func(*args, **kwds)
49                 trampoline, args, kwds = self._run.send(None) # priming the function
50                 self._pool.add_task(
51                         trampoline,
52                         args,
53                         kwds,
54                         self.on_success,
55                         self.on_error,
56                 )
57
58         @misc.log_exception(_moduleLogger)
59         def on_success(self, result):
60                 _moduleLogger.debug("Processing success for: %r", self._func)
61                 try:
62                         trampoline, args, kwds = self._run.send(result)
63                 except StopIteration, e:
64                         self._isDone = True
65                 else:
66                         self._pool.add_task(
67                                 trampoline,
68                                 args,
69                                 kwds,
70                                 self.on_success,
71                                 self.on_error,
72                         )
73
74         @misc.log_exception(_moduleLogger)
75         def on_error(self, error):
76                 _moduleLogger.debug("Processing error for: %r", self._func)
77                 try:
78                         trampoline, args, kwds = self._run.throw(error)
79                 except StopIteration, e:
80                         self._isDone = True
81                 else:
82                         self._pool.add_task(
83                                 trampoline,
84                                 args,
85                                 kwds,
86                                 self.on_success,
87                                 self.on_error,
88                         )
89
90         def __repr__(self):
91                 return "<async %s at 0x%x>" % (self._func.__name__, id(self))
92
93         def __hash__(self):
94                 return hash(self._func)
95
96         def __eq__(self, other):
97                 return self._func == other._func
98
99         def __ne__(self, other):
100                 return self._func != other._func
101
102
103 def synchronized(lock):
104         """
105         Synchronization decorator.
106
107         >>> import misc
108         >>> misc.validate_decorator(synchronized(object()))
109         """
110
111         def wrap(f):
112
113                 @functools.wraps(f)
114                 def newFunction(*args, **kw):
115                         lock.acquire()
116                         try:
117                                 return f(*args, **kw)
118                         finally:
119                                 lock.release()
120                 return newFunction
121         return wrap
122
123
124 @contextlib.contextmanager
125 def qlock(queue, gblock = True, gtimeout = None, pblock = True, ptimeout = None):
126         """
127         Locking with a queue, good for when you want to lock an item passed around
128
129         >>> import Queue
130         >>> item = 5
131         >>> lock = Queue.Queue()
132         >>> lock.put(item)
133         >>> with qlock(lock) as i:
134         ...     print i
135         5
136         """
137         item = queue.get(gblock, gtimeout)
138         try:
139                 yield item
140         finally:
141                 queue.put(item, pblock, ptimeout)
142
143
144 @contextlib.contextmanager
145 def flock(path, timeout=-1):
146         WAIT_FOREVER = -1
147         DELAY = 0.1
148         timeSpent = 0
149
150         acquired = False
151
152         while timeSpent <= timeout or timeout == WAIT_FOREVER:
153                 try:
154                         fd = os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR)
155                         acquired = True
156                         break
157                 except OSError, e:
158                         if e.errno != errno.EEXIST:
159                                 raise
160                 time.sleep(DELAY)
161                 timeSpent += DELAY
162
163         assert acquired, "Failed to grab file-lock %s within timeout %d" % (path, timeout)
164
165         try:
166                 yield fd
167         finally:
168                 os.unlink(path)