Starting on threading work
[gc-dialer] / src / util / concurrent.py
1 #!/usr/bin/env python
2
3 from __future__ import with_statement
4
5 import os
6 import errno
7 import time
8 import functools
9 import contextlib
10 import logging
11
12 import misc
13
14
15 _moduleLogger = logging.getLogger(__name__)
16
17
18 class AsyncLinearExecution(object):
19
20         def __init__(self, pool, func):
21                 self._pool = pool
22                 self._func = func
23                 self._run = None
24
25         def start(self, *args, **kwds):
26                 assert self._run is None
27                 self._run = self._func(*args, **kwds)
28                 trampoline, args, kwds = self._run.send(None) # priming the function
29                 self._pool.add_task(
30                         trampoline,
31                         args,
32                         kwds,
33                         self.on_success,
34                         self.on_error,
35                 )
36
37         @misc.log_exception(_moduleLogger)
38         def on_success(self, result):
39                 _moduleLogger.debug("Processing success for: %r", self._func)
40                 try:
41                         trampoline, args, kwds = self._run.send(result)
42                 except StopIteration, e:
43                         pass
44                 else:
45                         self._pool.add_task(
46                                 trampoline,
47                                 args,
48                                 kwds,
49                                 self.on_success,
50                                 self.on_error,
51                         )
52
53         @misc.log_exception(_moduleLogger)
54         def on_error(self, error):
55                 _moduleLogger.debug("Processing error for: %r", self._func)
56                 try:
57                         trampoline, args, kwds = self._run.throw(error)
58                 except StopIteration, e:
59                         pass
60                 else:
61                         self._pool.add_task(
62                                 trampoline,
63                                 args,
64                                 kwds,
65                                 self.on_success,
66                                 self.on_error,
67                         )
68
69
70 def synchronized(lock):
71         """
72         Synchronization decorator.
73
74         >>> import misc
75         >>> misc.validate_decorator(synchronized(object()))
76         """
77
78         def wrap(f):
79
80                 @functools.wraps(f)
81                 def newFunction(*args, **kw):
82                         lock.acquire()
83                         try:
84                                 return f(*args, **kw)
85                         finally:
86                                 lock.release()
87                 return newFunction
88         return wrap
89
90
91 @contextlib.contextmanager
92 def qlock(queue, gblock = True, gtimeout = None, pblock = True, ptimeout = None):
93         """
94         Locking with a queue, good for when you want to lock an item passed around
95
96         >>> import Queue
97         >>> item = 5
98         >>> lock = Queue.Queue()
99         >>> lock.put(item)
100         >>> with qlock(lock) as i:
101         ...     print i
102         5
103         """
104         item = queue.get(gblock, gtimeout)
105         try:
106                 yield item
107         finally:
108                 queue.put(item, pblock, ptimeout)
109
110
111 @contextlib.contextmanager
112 def flock(path, timeout=-1):
113         WAIT_FOREVER = -1
114         DELAY = 0.1
115         timeSpent = 0
116
117         acquired = False
118
119         while timeSpent <= timeout or timeout == WAIT_FOREVER:
120                 try:
121                         fd = os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR)
122                         acquired = True
123                         break
124                 except OSError, e:
125                         if e.errno != errno.EEXIST:
126                                 raise
127                 time.sleep(DELAY)
128                 timeSpent += DELAY
129
130         assert acquired, "Failed to grab file-lock %s within timeout %d" % (path, timeout)
131
132         try:
133                 yield fd
134         finally:
135                 os.unlink(path)