3 from __future__ import with_statement
15 _moduleLogger = logging.getLogger("gtk_toolbox")
18 @contextlib.contextmanager
19 def flock(path, timeout=-1):
26 while timeSpent <= timeout or timeout == WAIT_FOREVER:
28 fd = os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR)
32 if e.errno != errno.EEXIST:
37 assert acquired, "Failed to grab file-lock %s within timeout %d" % (path, timeout)
47 Decorator that makes a generator-function into a function that will continue execution on next call
51 @functools.wraps(func)
52 def decorated_func(*args, **kwds):
54 a.append(func(*args, **kwds))
68 ... def grep_sink(pattern):
69 ... print "Looking for %s" % pattern
72 ... if pattern in line:
74 >>> g = grep_sink("python")
76 >>> g.send("Yeah but no but yeah but no")
77 >>> g.send("A series of tubes")
78 >>> g.send("python generators rock!")
79 python generators rock!
83 @functools.wraps(func)
84 def start(*args, **kwargs):
85 cr = func(*args, **kwargs)
93 def printer_sink(format = "%s"):
95 >>> pr = printer_sink("%r")
102 >>> p = printer_sink()
107 >>> # p.throw(RuntimeError, "Goodbye")
113 print format % (item, )
119 Good for uses like with cochain to pick up any slack
126 def comap(function, target):
128 >>> p = printer_sink()
129 >>> cm = comap(lambda x: x+1, p)
140 mappedItem = function(*item)
141 target.send(mappedItem)
143 _moduleLogger.exception("Forwarding exception!")
144 target.throw(e.__class__, str(e))
147 def _flush_queue(queue):
148 while not queue.empty():
153 def queue_sink(queue):
155 >>> q = Queue.Queue()
156 >>> qs = queue_sink(q)
159 >>> qs.throw(RuntimeError, "Goodbye")
162 >>> print [i for i in _flush_queue(q)]
163 [(None, 'Hello'), (None, 'World'), (<type 'exceptions.RuntimeError'>, 'Goodbye'), (None, 'Meh'), (<type 'exceptions.GeneratorExit'>, None)]
168 queue.put((None, item))
170 queue.put((e.__class__, str(e)))
171 except GeneratorExit:
172 queue.put((GeneratorExit, None))
176 def decode_item(item, target):
180 elif item[0] is GeneratorExit:
184 target.throw(item[0], item[1])
188 def nonqueue_source(queue, target):
192 isDone = decode_item(item, target)
193 while not queue.empty():
197 def threaded_stage(target, thread_factory = threading.Thread):
198 messages = Queue.Queue()
200 run_source = functools.partial(nonqueue_source, messages, target)
201 thread = thread_factory(target=run_source)
202 thread.setDaemon(True)
205 # Sink running in current thread
206 return queue_sink(messages)
209 def safecall(f, errorDisplay=None, default=None, exception=Exception):
211 Returns modified f. When the modified f is called and throws an
212 exception, the default value is returned
214 def _safecall(*args, **argv):
216 return f(*args,**argv)
218 if errorDisplay is not None:
219 errorDisplay.push_exception(e)