Forgot to add some files to previous commit
[theonering] / src / gtk_toolbox.py
1 #!/usr/bin/python
2
3 from __future__ import with_statement
4
5 import os
6 import errno
7 import time
8 import functools
9 import contextlib
10 import logging
11 import threading
12 import Queue
13
14
15 @contextlib.contextmanager
16 def flock(path, timeout=-1):
17         WAIT_FOREVER = -1
18         DELAY = 0.1
19         timeSpent = 0
20
21         acquired = False
22
23         while timeSpent <= timeout or timeout == WAIT_FOREVER:
24                 try:
25                         fd = os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR)
26                         acquired = True
27                         break
28                 except OSError, e:
29                         if e.errno != errno.EEXIST:
30                                 raise
31                 time.sleep(DELAY)
32                 timeSpent += DELAY
33
34         assert acquired, "Failed to grab file-lock %s within timeout %d" % (path, timeout)
35
36         try:
37                 yield fd
38         finally:
39                 os.unlink(path)
40
41
42 def make_idler(func):
43         """
44         Decorator that makes a generator-function into a function that will continue execution on next call
45         """
46         a = []
47
48         @functools.wraps(func)
49         def decorated_func(*args, **kwds):
50                 if not a:
51                         a.append(func(*args, **kwds))
52                 try:
53                         a[0].next()
54                         return True
55                 except StopIteration:
56                         del a[:]
57                         return False
58
59         return decorated_func
60
61
62 def autostart(func):
63         """
64         >>> @autostart
65         ... def grep_sink(pattern):
66         ...     print "Looking for %s" % pattern
67         ...     while True:
68         ...             line = yield
69         ...             if pattern in line:
70         ...                     print line,
71         >>> g = grep_sink("python")
72         Looking for python
73         >>> g.send("Yeah but no but yeah but no")
74         >>> g.send("A series of tubes")
75         >>> g.send("python generators rock!")
76         python generators rock!
77         >>> g.close()
78         """
79
80         @functools.wraps(func)
81         def start(*args, **kwargs):
82                 cr = func(*args, **kwargs)
83                 cr.next()
84                 return cr
85
86         return start
87
88
89 @autostart
90 def printer_sink(format = "%s"):
91         """
92         >>> pr = printer_sink("%r")
93         >>> pr.send("Hello")
94         'Hello'
95         >>> pr.send("5")
96         '5'
97         >>> pr.send(5)
98         5
99         >>> p = printer_sink()
100         >>> p.send("Hello")
101         Hello
102         >>> p.send("World")
103         World
104         >>> # p.throw(RuntimeError, "Goodbye")
105         >>> # p.send("Meh")
106         >>> # p.close()
107         """
108         while True:
109                 item = yield
110                 print format % (item, )
111
112
113 @autostart
114 def null_sink():
115         """
116         Good for uses like with cochain to pick up any slack
117         """
118         while True:
119                 item = yield
120
121
122 @autostart
123 def comap(function, target):
124         """
125         >>> p = printer_sink()
126         >>> cm = comap(lambda x: x+1, p)
127         >>> cm.send((0, ))
128         1
129         >>> cm.send((1.0, ))
130         2.0
131         >>> cm.send((-2, ))
132         -1
133         """
134         while True:
135                 try:
136                         item = yield
137                         mappedItem = function(*item)
138                         target.send(mappedItem)
139                 except Exception, e:
140                         logging.exception("Forwarding exception!")
141                         target.throw(e.__class__, str(e))
142
143
144 def _flush_queue(queue):
145         while not queue.empty():
146                 yield queue.get()
147
148
149 @autostart
150 def queue_sink(queue):
151         """
152         >>> q = Queue.Queue()
153         >>> qs = queue_sink(q)
154         >>> qs.send("Hello")
155         >>> qs.send("World")
156         >>> qs.throw(RuntimeError, "Goodbye")
157         >>> qs.send("Meh")
158         >>> qs.close()
159         >>> print [i for i in _flush_queue(q)]
160         [(None, 'Hello'), (None, 'World'), (<type 'exceptions.RuntimeError'>, 'Goodbye'), (None, 'Meh'), (<type 'exceptions.GeneratorExit'>, None)]
161         """
162         while True:
163                 try:
164                         item = yield
165                         queue.put((None, item))
166                 except Exception, e:
167                         queue.put((e.__class__, str(e)))
168                 except GeneratorExit:
169                         queue.put((GeneratorExit, None))
170                         raise
171
172
173 def decode_item(item, target):
174         if item[0] is None:
175                 target.send(item[1])
176                 return False
177         elif item[0] is GeneratorExit:
178                 target.close()
179                 return True
180         else:
181                 target.throw(item[0], item[1])
182                 return False
183
184
185 def nonqueue_source(queue, target):
186         isDone = False
187         while not isDone:
188                 item = queue.get()
189                 isDone = decode_item(item, target)
190                 while not queue.empty():
191                         queue.get_nowait()
192
193
194 def threaded_stage(target, thread_factory = threading.Thread):
195         messages = Queue.Queue()
196
197         run_source = functools.partial(nonqueue_source, messages, target)
198         thread = thread_factory(target=run_source)
199         thread.setDaemon(True)
200         thread.start()
201
202         # Sink running in current thread
203         return queue_sink(messages)
204
205
206 def safecall(f, errorDisplay=None, default=None, exception=Exception):
207         '''
208         Returns modified f. When the modified f is called and throws an
209         exception, the default value is returned
210         '''
211         def _safecall(*args, **argv):
212                 try:
213                         return f(*args,**argv)
214                 except exception, e:
215                         if errorDisplay is not None:
216                                 errorDisplay.push_exception(e)
217                         return default
218         return _safecall