Fixed issues with the credentials dialog
[gc-dialer] / src / util / concurrent.py
1 #!/usr/bin/env python
2
3 from __future__ import with_statement
4
5 import os
6 import errno
7 import time
8 import functools
9 import contextlib
10 import logging
11
12 import misc
13
14
15 _moduleLogger = logging.getLogger(__name__)
16
17
18 class AsyncLinearExecution(object):
19
20         def __init__(self, pool, func):
21                 self._pool = pool
22                 self._func = func
23                 self._run = None
24
25         def start(self, *args, **kwds):
26                 assert self._run is None
27                 self._run = self._func(*args, **kwds)
28                 trampoline, args, kwds = self._run.send(None) # priming the function
29                 self._pool.add_task(
30                         trampoline,
31                         args,
32                         kwds,
33                         self.on_success,
34                         self.on_error,
35                 )
36
37         @misc.log_exception(_moduleLogger)
38         def on_success(self, result):
39                 _moduleLogger.debug("Processing success for: %r", self._func)
40                 try:
41                         trampoline, args, kwds = self._run.send(result)
42                 except StopIteration, e:
43                         pass
44                 else:
45                         self._pool.add_task(
46                                 trampoline,
47                                 args,
48                                 kwds,
49                                 self.on_success,
50                                 self.on_error,
51                         )
52
53         @misc.log_exception(_moduleLogger)
54         def on_error(self, error):
55                 _moduleLogger.debug("Processing error for: %r", self._func)
56                 try:
57                         trampoline, args, kwds = self._run.throw(error)
58                 except StopIteration, e:
59                         pass
60                 else:
61                         self._pool.add_task(
62                                 trampoline,
63                                 args,
64                                 kwds,
65                                 self.on_success,
66                                 self.on_error,
67                         )
68
69         def __repr__(self):
70                 return "<async %s at 0x%x>" % (self._func.__name__, id(self))
71
72         def __hash__(self):
73                 return hash(self._func)
74
75         def __eq__(self, other):
76                 return self._func == other._func
77
78         def __ne__(self, other):
79                 return self._func != other._func
80
81
82 def synchronized(lock):
83         """
84         Synchronization decorator.
85
86         >>> import misc
87         >>> misc.validate_decorator(synchronized(object()))
88         """
89
90         def wrap(f):
91
92                 @functools.wraps(f)
93                 def newFunction(*args, **kw):
94                         lock.acquire()
95                         try:
96                                 return f(*args, **kw)
97                         finally:
98                                 lock.release()
99                 return newFunction
100         return wrap
101
102
103 @contextlib.contextmanager
104 def qlock(queue, gblock = True, gtimeout = None, pblock = True, ptimeout = None):
105         """
106         Locking with a queue, good for when you want to lock an item passed around
107
108         >>> import Queue
109         >>> item = 5
110         >>> lock = Queue.Queue()
111         >>> lock.put(item)
112         >>> with qlock(lock) as i:
113         ...     print i
114         5
115         """
116         item = queue.get(gblock, gtimeout)
117         try:
118                 yield item
119         finally:
120                 queue.put(item, pblock, ptimeout)
121
122
123 @contextlib.contextmanager
124 def flock(path, timeout=-1):
125         WAIT_FOREVER = -1
126         DELAY = 0.1
127         timeSpent = 0
128
129         acquired = False
130
131         while timeSpent <= timeout or timeout == WAIT_FOREVER:
132                 try:
133                         fd = os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR)
134                         acquired = True
135                         break
136                 except OSError, e:
137                         if e.errno != errno.EEXIST:
138                                 raise
139                 time.sleep(DELAY)
140                 timeSpent += DELAY
141
142         assert acquired, "Failed to grab file-lock %s within timeout %d" % (path, timeout)
143
144         try:
145                 yield fd
146         finally:
147                 os.unlink(path)