+++ /dev/null
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-"""
-Copyright (C) 2007 Christoph Würstle
-This program is free software; you can redistribute it and/or modify
-it under the terms of the GNU General Public License version 2 as
-published by the Free Software Foundation.
-"""
-
-
-import os
-import sys
-import logging
-
-
-_moduleLogger = logging.getLogger(__name__)
-sys.path.append("/opt/REPLACEME/lib")
-
-
-import constants
-import REPLACEME_gtk
-
-
-if __name__ == "__main__":
- try:
- os.makedirs(constants._data_path_)
- except OSError, e:
- if e.errno != 17:
- raise
-
- try:
- os.makedirs(constants._cache_path_)
- except OSError, e:
- if e.errno != 17:
- raise
-
- logFormat = '(%(asctime)s) %(levelname)-5s %(threadName)s.%(name)s: %(message)s'
- logging.basicConfig(level=logging.DEBUG, filename=constants._user_logpath_, format=logFormat)
- _moduleLogger.info("%s %s-%s" % (constants.__app_name__, constants.__version__, constants.__build__))
- _moduleLogger.info("OS: %s" % (os.uname()[0], ))
- _moduleLogger.info("Kernel: %s (%s) for %s" % os.uname()[2:])
- _moduleLogger.info("Hostname: %s" % os.uname()[1])
-
- REPLACEME_gtk.run()
+++ /dev/null
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-from __future__ import with_statement
-
-import gc
-import logging
-import ConfigParser
-
-import gobject
-import dbus
-import gtk
-
-try:
- import osso
-except ImportError:
- osso = None
-
-import constants
-import hildonize
-import util.misc as misc_utils
-
-
-_moduleLogger = logging.getLogger(__name__)
-PROFILE_STARTUP = False
-
-
-class REPLACEMEProgram(hildonize.get_app_class()):
-
- def __init__(self):
- super(REPLACEMEProgram, self).__init__()
-
- if not hildonize.IS_HILDON_SUPPORTED:
- _moduleLogger.info("No hildonization support")
-
- if osso is not None:
- self._osso_c = osso.Context(constants.__app_name__, constants.__version__, False)
- self._deviceState = osso.DeviceState(self._osso_c)
- self._deviceState.set_device_state_callback(self._on_device_state_change, 0)
- else:
- _moduleLogger.info("No osso support")
- self._osso_c = None
- self._deviceState = None
-
- def _save_settings(self):
- config = ConfigParser.SafeConfigParser()
-
- self._REPLACEME.save_settings(config, "Windows")
-
- with open(constants._user_settings_, "wb") as configFile:
- config.write(configFile)
-
- def _load_settings(self):
- config = ConfigParser.SafeConfigParser()
- config.read(constants._user_settings_)
-
- self._REPLACEME.load_settings(config, "Windows")
-
- @misc_utils.log_exception(_moduleLogger)
- def _on_device_state_change(self, shutdown, save_unsaved_data, memory_low, system_inactivity, message, userData):
- """
- For system_inactivity, we have no background tasks to pause
-
- @note Hildon specific
- """
- if memory_low:
- gc.collect()
-
- if save_unsaved_data or shutdown:
- self._save_settings()
-
- @misc_utils.log_exception(_moduleLogger)
- def _on_destroy(self, widget = None, data = None):
- try:
- self.quit()
- finally:
- gtk.main_quit()
-
- def quit(self):
- try:
- self._save_settings()
- except Exception:
- _moduleLogger.exception("Error saving settigns")
-
- try:
- self._deviceState.close()
- except AttributeError:
- pass # Either None or close was removed (in Fremantle)
- except Exception:
- _moduleLogger.exception("Error closing device state")
- try:
- self._osso_c.close()
- except AttributeError:
- pass # Either None or close was removed (in Fremantle)
- except Exception:
- _moduleLogger.exception("Error closing osso state")
-
-
-def run():
- gobject.threads_init()
- gtk.gdk.threads_init()
- l = dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
-
- # HACK Playback while silent on Maemo 5
- hildonize.set_application_name("FMRadio")
-
- app = REPLACEMElProgram()
- if not PROFILE_STARTUP:
- try:
- gtk.main()
- except KeyboardInterrupt:
- app.quit()
- raise
- else:
- app.quit()
-
-
-if __name__ == "__main__":
- logging.basicConfig(level=logging.DEBUG)
- run()
-#!/usr/bin/python
+#!/usr/bin/env python
import os
import sys
import logging
-_moduleLogger = logging.getLogger("dialcentral")
-sys.path.append("/usr/lib/ejpi/")
+_moduleLogger = logging.getLogger(__name__)
+sys.path.append("/opt/ejpi/lib")
import constants
import ejpi_glade
-try:
- os.makedirs(constants._data_path_)
-except OSError, e:
- if e.errno != 17:
- raise
+if __name__ == "__main__":
+ try:
+ os.makedirs(constants._data_path_)
+ except OSError, e:
+ if e.errno != 17:
+ raise
-logging.basicConfig(level=logging.DEBUG, filename=constants._user_logpath_)
-_moduleLogger.info("ejpi %s-%s" % (constants.__version__, constants.__build__))
-_moduleLogger.info("OS: %s" % (os.uname()[0], ))
-_moduleLogger.info("Kernel: %s (%s) for %s" % os.uname()[2:])
-_moduleLogger.info("Hostname: %s" % os.uname()[1])
+ logFormat = '(%(asctime)s) %(levelname)-5s %(threadName)s.%(name)s: %(message)s'
+ logging.basicConfig(level=logging.DEBUG, filename=constants._user_logpath_, format=logFormat)
+ _moduleLogger.info("%s %s-%s" % (constants.__app_name__, constants.__version__, constants.__build__))
+ _moduleLogger.info("OS: %s" % (os.uname()[0], ))
+ _moduleLogger.info("Kernel: %s (%s) for %s" % os.uname()[2:])
+ _moduleLogger.info("Hostname: %s" % os.uname()[1])
-
-ejpi_glade.run_calculator()
+ ejpi_glade.run()
import gtk.glade
import hildonize
+import gtk_toolbox
+import constants
from libraries import gtkpie
from libraries import gtkpieboard
+import util.misc as misc_utils
import plugin_utils
import history
import gtkhistory
-import gtk_toolbox
-import constants
-_moduleLogger = logging.getLogger("ejpi_glade")
+_moduleLogger = logging.getLogger(__name__)
PLUGIN_SEARCH_PATHS = [
os.path.join(os.path.dirname(__file__), "plugins/"),
self._widgetTree.get_widget("entryView").connect("activate", self._on_push)
self.__pluginButton.connect("clicked", self._on_kb_plugin_selection_button)
- hildonize.set_application_title(self._window, "%s" % constants.__pretty_app_name__)
+ hildonize.set_application_name("%s" % constants.__pretty_app_name__)
self._window.connect("destroy", self._on_close)
self._window.show_all()
"pluginKeyboard": pluginKeyboard,
})
- @gtk_toolbox.log_exception(_moduleLogger)
+ @misc_utils.log_exception(_moduleLogger)
def _on_kb_plugin_selection_button(self, *args):
pluginNames = [plugin["pluginName"] for plugin in self.__activeKeyboards]
oldIndex = pluginNames.index(self.__pluginButton.get_label())
line = " ".join(data for data in lineData)
f.write("%s\n" % line)
- @gtk_toolbox.log_exception(_moduleLogger)
+ @misc_utils.log_exception(_moduleLogger)
def _on_device_state_change(self, shutdown, save_unsaved_data, memory_low, system_inactivity, message, userData):
"""
For system_inactivity, we have no background tasks to pause
if save_unsaved_data or shutdown:
self.__save_history()
- @gtk_toolbox.log_exception(_moduleLogger)
+ @misc_utils.log_exception(_moduleLogger)
def _on_window_state_change(self, widget, event, *args):
if event.new_window_state & gtk.gdk.WINDOW_STATE_FULLSCREEN:
self._isFullScreen = True
else:
self._isFullScreen = False
- @gtk_toolbox.log_exception(_moduleLogger)
+ @misc_utils.log_exception(_moduleLogger)
def _on_close(self, *args, **kwds):
try:
self.__save_history()
finally:
gtk.main_quit()
- @gtk_toolbox.log_exception(_moduleLogger)
+ @misc_utils.log_exception(_moduleLogger)
def _on_copy(self, *args):
equationNode = self.__history.history.peek()
result = str(equationNode.evaluate())
self._clipboard.set_text(result)
- @gtk_toolbox.log_exception(_moduleLogger)
+ @misc_utils.log_exception(_moduleLogger)
def _on_copy_equation(self, *args):
equationNode = self.__history.history.peek()
equation = str(equationNode)
self._clipboard.set_text(equation)
- @gtk_toolbox.log_exception(_moduleLogger)
+ @misc_utils.log_exception(_moduleLogger)
def _on_paste(self, *args):
contents = self._clipboard.wait_for_text()
self.__userEntry.append(contents)
- @gtk_toolbox.log_exception(_moduleLogger)
+ @misc_utils.log_exception(_moduleLogger)
def _on_key_press(self, widget, event, *args):
RETURN_TYPES = (gtk.keysyms.Return, gtk.keysyms.ISO_Enter, gtk.keysyms.KP_Enter)
if (
elif event.keyval in RETURN_TYPES:
self.__history.push_entry()
- @gtk_toolbox.log_exception(_moduleLogger)
+ @misc_utils.log_exception(_moduleLogger)
def _on_push(self, *args):
self.__history.push_entry()
- @gtk_toolbox.log_exception(_moduleLogger)
+ @misc_utils.log_exception(_moduleLogger)
def _on_unpush(self, *args):
self.__historyStore.unpush()
- @gtk_toolbox.log_exception(_moduleLogger)
+ @misc_utils.log_exception(_moduleLogger)
def _on_entry_direct(self, keys, modifiers):
if "shift" in modifiers:
keys = keys.upper()
self.__userEntry.append(keys)
- @gtk_toolbox.log_exception(_moduleLogger)
+ @misc_utils.log_exception(_moduleLogger)
def _on_entry_backspace(self, *args):
self.__userEntry.pop()
- @gtk_toolbox.log_exception(_moduleLogger)
+ @misc_utils.log_exception(_moduleLogger)
def _on_entry_clear(self, *args):
self.__userEntry.clear()
- @gtk_toolbox.log_exception(_moduleLogger)
+ @misc_utils.log_exception(_moduleLogger)
def _on_clear_all(self, *args):
self.__history.clear()
- @gtk_toolbox.log_exception(_moduleLogger)
+ @misc_utils.log_exception(_moduleLogger)
def _on_about_activate(self, *args):
dlg = gtk.AboutDialog()
dlg.set_name(constants.__pretty_app_name__)
sys.exit(1)
-def run_calculator():
+def run():
gtk.gdk.threads_init()
gtkpie.IMAGES.add_path(os.path.join(os.path.dirname(__file__), "libraries/images"), )
if commandOptions.test:
run_doctest()
else:
- run_calculator()
+ run()
import gtk
-import gtk_toolbox
+import util.misc as misc_utils
import hildonize
import history
import operation
-_moduleLogger = logging.getLogger("gtkhistory")
+_moduleLogger = logging.getLogger(__name__)
class GtkCalcHistory(history.AbstractHistory):
data = row[self.DATA_IDX]
yield data
- @gtk_toolbox.log_exception(_moduleLogger)
+ @misc_utils.log_exception(_moduleLogger)
def _on_close_activated(self, treeView, path, viewColumn):
if viewColumn is self.__closeColumn:
del self.__historyStore[path[0]]
import weakref
-import warnings
-from libraries.recipes import algorithms
+from util import algorithms
import operation
+++ /dev/null
-#!/usr/bin/env python
+++ /dev/null
-#!/usr/bin/env python
-
-"""
-@note Source http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/66448
-"""
-
-import itertools
-import functools
-import datetime
-import types
-
-
-def ordered_itr(collection):
- """
- >>> [v for v in ordered_itr({"a": 1, "b": 2})]
- [('a', 1), ('b', 2)]
- >>> [v for v in ordered_itr([3, 1, 10, -20])]
- [-20, 1, 3, 10]
- """
- if isinstance(collection, types.DictType):
- keys = list(collection.iterkeys())
- keys.sort()
- for key in keys:
- yield key, collection[key]
- else:
- values = list(collection)
- values.sort()
- for value in values:
- yield value
-
-
-def itercat(*iterators):
- """
- Concatenate several iterators into one.
-
- >>> [v for v in itercat([1, 2, 3], [4, 1, 3])]
- [1, 2, 3, 4, 1, 3]
- """
- for i in iterators:
- for x in i:
- yield x
-
-
-def iterwhile(func, iterator):
- """
- Iterate for as long as func(value) returns true.
- >>> through = lambda b: b
- >>> [v for v in iterwhile(through, [True, True, False])]
- [True, True]
- """
- iterator = iter(iterator)
- while 1:
- next = iterator.next()
- if not func(next):
- raise StopIteration
- yield next
-
-
-def iterfirst(iterator, count=1):
- """
- Iterate through 'count' first values.
-
- >>> [v for v in iterfirst([1, 2, 3, 4, 5], 3)]
- [1, 2, 3]
- """
- iterator = iter(iterator)
- for i in xrange(count):
- yield iterator.next()
-
-
-def iterstep(iterator, n):
- """
- Iterate every nth value.
-
- >>> [v for v in iterstep([1, 2, 3, 4, 5], 1)]
- [1, 2, 3, 4, 5]
- >>> [v for v in iterstep([1, 2, 3, 4, 5], 2)]
- [1, 3, 5]
- >>> [v for v in iterstep([1, 2, 3, 4, 5], 3)]
- [1, 4]
- """
- iterator = iter(iterator)
- while True:
- yield iterator.next()
- # skip n-1 values
- for dummy in xrange(n-1):
- iterator.next()
-
-
-def itergroup(iterator, count, padValue = None):
- """
- Iterate in groups of 'count' values. If there
- aren't enough values, the last result is padded with
- None.
-
- >>> for val in itergroup([1, 2, 3, 4, 5, 6], 3):
- ... print tuple(val)
- (1, 2, 3)
- (4, 5, 6)
- >>> for val in itergroup([1, 2, 3, 4, 5, 6], 3):
- ... print list(val)
- [1, 2, 3]
- [4, 5, 6]
- >>> for val in itergroup([1, 2, 3, 4, 5, 6, 7], 3):
- ... print tuple(val)
- (1, 2, 3)
- (4, 5, 6)
- (7, None, None)
- >>> for val in itergroup("123456", 3):
- ... print tuple(val)
- ('1', '2', '3')
- ('4', '5', '6')
- >>> for val in itergroup("123456", 3):
- ... print repr("".join(val))
- '123'
- '456'
- """
- paddedIterator = itertools.chain(iterator, itertools.repeat(padValue, count-1))
- nIterators = (paddedIterator, ) * count
- return itertools.izip(*nIterators)
-
-
-def xzip(*iterators):
- """Iterative version of builtin 'zip'."""
- iterators = itertools.imap(iter, iterators)
- while 1:
- yield tuple([x.next() for x in iterators])
-
-
-def xmap(func, *iterators):
- """Iterative version of builtin 'map'."""
- iterators = itertools.imap(iter, iterators)
- values_left = [1]
-
- def values():
- # Emulate map behaviour, i.e. shorter
- # sequences are padded with None when
- # they run out of values.
- values_left[0] = 0
- for i in range(len(iterators)):
- iterator = iterators[i]
- if iterator is None:
- yield None
- else:
- try:
- yield iterator.next()
- values_left[0] = 1
- except StopIteration:
- iterators[i] = None
- yield None
- while 1:
- args = tuple(values())
- if not values_left[0]:
- raise StopIteration
- yield func(*args)
-
-
-def xfilter(func, iterator):
- """Iterative version of builtin 'filter'."""
- iterator = iter(iterator)
- while 1:
- next = iterator.next()
- if func(next):
- yield next
-
-
-def xreduce(func, iterator, default=None):
- """Iterative version of builtin 'reduce'."""
- iterator = iter(iterator)
- try:
- prev = iterator.next()
- except StopIteration:
- return default
- single = 1
- for next in iterator:
- single = 0
- prev = func(prev, next)
- if single:
- return func(prev, default)
- return prev
-
-
-def daterange(begin, end, delta = datetime.timedelta(1)):
- """
- Form a range of dates and iterate over them.
-
- Arguments:
- begin -- a date (or datetime) object; the beginning of the range.
- end -- a date (or datetime) object; the end of the range.
- delta -- (optional) a datetime.timedelta object; how much to step each iteration.
- Default step is 1 day.
-
- Usage:
- """
- if not isinstance(delta, datetime.timedelta):
- delta = datetime.timedelta(delta)
-
- ZERO = datetime.timedelta(0)
-
- if begin < end:
- if delta <= ZERO:
- raise StopIteration
- test = end.__gt__
- else:
- if delta >= ZERO:
- raise StopIteration
- test = end.__lt__
-
- while test(begin):
- yield begin
- begin += delta
-
-
-class LazyList(object):
- """
- A Sequence whose values are computed lazily by an iterator.
-
- Module for the creation and use of iterator-based lazy lists.
- this module defines a class LazyList which can be used to represent sequences
- of values generated lazily. One can also create recursively defined lazy lists
- that generate their values based on ones previously generated.
-
- Backport to python 2.5 by Michael Pust
- """
-
- __author__ = 'Dan Spitz'
-
- def __init__(self, iterable):
- self._exhausted = False
- self._iterator = iter(iterable)
- self._data = []
-
- def __len__(self):
- """Get the length of a LazyList's computed data."""
- return len(self._data)
-
- def __getitem__(self, i):
- """Get an item from a LazyList.
- i should be a positive integer or a slice object."""
- if isinstance(i, int):
- #index has not yet been yielded by iterator (or iterator exhausted
- #before reaching that index)
- if i >= len(self):
- self.exhaust(i)
- elif i < 0:
- raise ValueError('cannot index LazyList with negative number')
- return self._data[i]
-
- #LazyList slices are iterators over a portion of the list.
- elif isinstance(i, slice):
- start, stop, step = i.start, i.stop, i.step
- if any(x is not None and x < 0 for x in (start, stop, step)):
- raise ValueError('cannot index or step through a LazyList with'
- 'a negative number')
- #set start and step to their integer defaults if they are None.
- if start is None:
- start = 0
- if step is None:
- step = 1
-
- def LazyListIterator():
- count = start
- predicate = (
- (lambda: True)
- if stop is None
- else (lambda: count < stop)
- )
- while predicate():
- try:
- yield self[count]
- #slices can go out of actual index range without raising an
- #error
- except IndexError:
- break
- count += step
- return LazyListIterator()
-
- raise TypeError('i must be an integer or slice')
-
- def __iter__(self):
- """return an iterator over each value in the sequence,
- whether it has been computed yet or not."""
- return self[:]
-
- def computed(self):
- """Return an iterator over the values in a LazyList that have
- already been computed."""
- return self[:len(self)]
-
- def exhaust(self, index = None):
- """Exhaust the iterator generating this LazyList's values.
- if index is None, this will exhaust the iterator completely.
- Otherwise, it will iterate over the iterator until either the list
- has a value for index or the iterator is exhausted.
- """
- if self._exhausted:
- return
- if index is None:
- ind_range = itertools.count(len(self))
- else:
- ind_range = range(len(self), index + 1)
-
- for ind in ind_range:
- try:
- self._data.append(self._iterator.next())
- except StopIteration: #iterator is fully exhausted
- self._exhausted = True
- break
-
-
-class RecursiveLazyList(LazyList):
-
- def __init__(self, prod, *args, **kwds):
- super(RecursiveLazyList, self).__init__(prod(self, *args, **kwds))
-
-
-class RecursiveLazyListFactory:
-
- def __init__(self, producer):
- self._gen = producer
-
- def __call__(self, *a, **kw):
- return RecursiveLazyList(self._gen, *a, **kw)
-
-
-def lazylist(gen):
- """
- Decorator for creating a RecursiveLazyList subclass.
- This should decorate a generator function taking the LazyList object as its
- first argument which yields the contents of the list in order.
-
- >>> #fibonnacci sequence in a lazy list.
- >>> @lazylist
- ... def fibgen(lst):
- ... yield 0
- ... yield 1
- ... for a, b in itertools.izip(lst, lst[1:]):
- ... yield a + b
- ...
- >>> #now fibs can be indexed or iterated over as if it were an infinitely long list containing the fibonnaci sequence
- >>> fibs = fibgen()
- >>>
- >>> #prime numbers in a lazy list.
- >>> @lazylist
- ... def primegen(lst):
- ... yield 2
- ... for candidate in itertools.count(3): #start at next number after 2
- ... #if candidate is not divisible by any smaller prime numbers,
- ... #it is a prime.
- ... if all(candidate % p for p in lst.computed()):
- ... yield candidate
- ...
- >>> #same for primes- treat it like an infinitely long list containing all prime numbers.
- >>> primes = primegen()
- >>> print fibs[0], fibs[1], fibs[2], primes[0], primes[1], primes[2]
- 0 1 1 2 3 5
- >>> print list(fibs[:10]), list(primes[:10])
- [0, 1, 1, 2, 3, 5, 8, 13, 21, 34] [2, 3, 5, 7, 11, 13, 17, 19, 23, 29]
- """
- return RecursiveLazyListFactory(gen)
-
-
-def map_func(f):
- """
- >>> import misc
- >>> misc.validate_decorator(map_func)
- """
-
- @functools.wraps(f)
- def wrapper(*args):
- result = itertools.imap(f, args)
- return result
- return wrapper
-
-
-def reduce_func(function):
- """
- >>> import misc
- >>> misc.validate_decorator(reduce_func(lambda x: x))
- """
-
- def decorator(f):
-
- @functools.wraps(f)
- def wrapper(*args):
- result = reduce(function, f(args))
- return result
- return wrapper
- return decorator
-
-
-def any_(iterable):
- """
- @note Python Version <2.5
-
- >>> any_([True, True])
- True
- >>> any_([True, False])
- True
- >>> any_([False, False])
- False
- """
-
- for element in iterable:
- if element:
- return True
- return False
-
-
-def all_(iterable):
- """
- @note Python Version <2.5
-
- >>> all_([True, True])
- True
- >>> all_([True, False])
- False
- >>> all_([False, False])
- False
- """
-
- for element in iterable:
- if not element:
- return False
- return True
-
-
-def for_every(pred, seq):
- """
- for_every takes a one argument predicate function and a sequence.
- @param pred The predicate function should return true or false.
- @returns true if every element in seq returns true for predicate, else returns false.
-
- >>> for_every (lambda c: c > 5,(6,7,8,9))
- True
-
- @author Source:http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/52907
- """
-
- for i in seq:
- if not pred(i):
- return False
- return True
-
-
-def there_exists(pred, seq):
- """
- there_exists takes a one argument predicate function and a sequence.
- @param pred The predicate function should return true or false.
- @returns true if any element in seq returns true for predicate, else returns false.
-
- >>> there_exists (lambda c: c > 5,(6,7,8,9))
- True
-
- @author Source:http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/52907
- """
-
- for i in seq:
- if pred(i):
- return True
- return False
-
-
-def func_repeat(quantity, func, *args, **kwd):
- """
- Meant to be in connection with "reduce"
- """
- for i in xrange(quantity):
- yield func(*args, **kwd)
-
-
-def function_map(preds, item):
- """
- Meant to be in connection with "reduce"
- """
- results = (pred(item) for pred in preds)
-
- return results
-
-
-def functional_if(combiner, preds, item):
- """
- Combines the result of a list of predicates applied to item according to combiner
-
- @see any, every for example combiners
- """
- pass_bool = lambda b: b
-
- bool_results = function_map(preds, item)
- return combiner(pass_bool, bool_results)
-
-
-def pushback_itr(itr):
- """
- >>> list(pushback_itr(xrange(5)))
- [0, 1, 2, 3, 4]
- >>>
- >>> first = True
- >>> itr = pushback_itr(xrange(5))
- >>> for i in itr:
- ... print i
- ... if first and i == 2:
- ... first = False
- ... print itr.send(i)
- 0
- 1
- 2
- None
- 2
- 3
- 4
- >>>
- >>> first = True
- >>> itr = pushback_itr(xrange(5))
- >>> for i in itr:
- ... print i
- ... if first and i == 2:
- ... first = False
- ... print itr.send(i)
- ... print itr.send(i)
- 0
- 1
- 2
- None
- None
- 2
- 2
- 3
- 4
- >>>
- >>> itr = pushback_itr(xrange(5))
- >>> print itr.next()
- 0
- >>> print itr.next()
- 1
- >>> print itr.send(10)
- None
- >>> print itr.next()
- 10
- >>> print itr.next()
- 2
- >>> print itr.send(20)
- None
- >>> print itr.send(30)
- None
- >>> print itr.send(40)
- None
- >>> print itr.next()
- 40
- >>> print itr.next()
- 30
- >>> print itr.send(50)
- None
- >>> print itr.next()
- 50
- >>> print itr.next()
- 20
- >>> print itr.next()
- 3
- >>> print itr.next()
- 4
- """
- for item in itr:
- maybePushedBack = yield item
- queue = []
- while queue or maybePushedBack is not None:
- if maybePushedBack is not None:
- queue.append(maybePushedBack)
- maybePushedBack = yield None
- else:
- item = queue.pop()
- maybePushedBack = yield item
-
-
-if __name__ == "__main__":
- import doctest
- print doctest.testmod()
+++ /dev/null
-#!/usr/bin/env python
-
-from __future__ import with_statement
-
-import os
-import sys
-import cPickle
-import weakref
-import threading
-import errno
-import time
-import functools
-import contextlib
-
-
-def synchronized(lock):
- """
- Synchronization decorator.
-
- >>> import misc
- >>> misc.validate_decorator(synchronized(object()))
- """
-
- def wrap(f):
-
- @functools.wraps(f)
- def newFunction(*args, **kw):
- lock.acquire()
- try:
- return f(*args, **kw)
- finally:
- lock.release()
- return newFunction
- return wrap
-
-
-@contextlib.contextmanager
-def qlock(queue, gblock = True, gtimeout = None, pblock = True, ptimeout = None):
- """
- Locking with a queue, good for when you want to lock an item passed around
-
- >>> import Queue
- >>> item = 5
- >>> lock = Queue.Queue()
- >>> lock.put(item)
- >>> with qlock(lock) as i:
- ... print i
- 5
- """
- item = queue.get(gblock, gtimeout)
- try:
- yield item
- finally:
- queue.put(item, pblock, ptimeout)
-
-
-@contextlib.contextmanager
-def flock(path, timeout=-1):
- WAIT_FOREVER = -1
- DELAY = 0.1
- timeSpent = 0
-
- acquired = False
-
- while timeSpent <= timeout or timeout == WAIT_FOREVER:
- try:
- fd = os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR)
- acquired = True
- break
- except OSError, e:
- if e.errno != errno.EEXIST:
- raise
- time.sleep(DELAY)
- timeSpent += DELAY
-
- assert acquired, "Failed to grab file-lock %s within timeout %d" % (path, timeout)
-
- try:
- yield fd
- finally:
- os.unlink(path)
-
-
-def threaded(f):
- """
- This decorator calls the method in a new thread, so execution returns straight away
-
- >>> import misc
- >>> misc.validate_decorator(threaded)
- """
-
- @functools.wraps(f)
- def wrapper(*args, **kwargs):
- t = threading.Thread(target=f, args=args, kwargs=kwargs)
- t.setDaemon(True)
- t.start()
- return wrapper
-
-
-def fork(f):
- """
- Fork a function into a seperate process and block on it, for forcing reclaiming of resources for highly intensive functions
- @return The original value through pickling. If it is unable to be pickled, then the pickling exception is passed through
- @throws Through pickling, exceptions are passed back and re-raised
- @note source: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/511474
-
- >>> import misc
- >>> misc.validate_decorator(fork)
- """
-
- @functools.wraps(f)
- def wrapper(*args, **kwds):
- pread, pwrite = os.pipe()
- pid = os.fork()
- if pid > 0:
- os.close(pwrite)
- with os.fdopen(pread, 'rb') as f:
- status, result = cPickle.load(f)
- os.waitpid(pid, 0)
- if status == 0:
- return result
- else:
- raise result
- else:
- os.close(pread)
- try:
- result = f(*args, **kwds)
- status = 0
- except Exception, exc:
- result = exc
- status = 1
- with os.fdopen(pwrite, 'wb') as f:
- try:
- cPickle.dump((status, result), f, cPickle.HIGHEST_PROTOCOL)
- except cPickle.PicklingError, exc:
- cPickle.dump((2, exc), f, cPickle.HIGHEST_PROTOCOL)
- f.close()
- sys.exit(0)
- return wrapper
-
-
-@contextlib.contextmanager
-def qlock(queue, gblock = True, gtimeout = None, pblock = True, ptimeout = None):
- """
- Locking with a queue, good for when you want to lock an item passed around
-
- >>> import Queue
- >>> item = 5
- >>> lock = Queue.Queue()
- >>> lock.put(item)
- >>> with qlock(lock) as i:
- ... print i
- 5
- """
- item = queue.get(gblock, gtimeout)
- yield item
- queue.put(item, pblock, ptimeout)
-
-
-class EventSource(object):
- """
- Asynchronous implementation of the observer pattern
-
- >>> sourceRoot = EventSource()
- >>> sourceChild1 = EventSource()
- >>> sourceChild1.register_provided_events("1-event-0", "1-event-1")
- >>> sourceChild2 = EventSource()
- >>> sourceChild2.register_provided_events("1-event-0", "1-event-1")
- >>> sourceRoot.add_children(sourceChild1, sourceChild2)
- """
-
- def __init__(self):
- """
- @warning Not thread safe
- """
-
- self.__callbackQueues = {}
- self.__children = []
-
- def add_children(self, *childrenSources):
- """
- @warning Not thread safe
- """
-
- self.__children.extend(childrenSources)
-
- def remove_children(self, *childrenSources):
- """
- @warning Not thread safe
- """
-
- for child in childrenSources:
- self.__children.remove(child)
-
- def register_provided_events(self, *events):
- """
- @warning Not thread safe
- """
-
- self.__callbackQueues.update(dict((event, []) for event in events))
-
- def notify_observers(self, event, message):
- """
- @warning As threadsafe as the queue used. qlock is recommended for the message if it needs locking
- """
-
- for queue in self.__callbackQueues[event]:
- queue.put(message)
-
- def _register_queue(self, event, queue):
- """
- @warning Not thread safe
- """
-
- if event in self.__callbackQueues:
- self.__callbackQueues[event].append(queue)
- return self
- else:
- for child in self.__children:
- source = child._register_queue(event, queue)
- if source is not None:
- return source
- else:
- return None
-
- def _unregister_queue(self, event, queue):
- """
- @warning Not thread safe
- """
-
- if event in self.__callbackQueues:
- self.__callbackQueues[event].remove(queue)
- return self
- else:
- for child in self.__children:
- source = child._unregister_queue(event, queue)
- if source is not None:
- return source
- else:
- return None
-
-
-class StrongEventSourceProxy(object):
-
- def __init__(self, source):
- """
- @warning Not thread safe
- """
-
- self.source = source
-
- def register(self, event, queue):
- """
- @warning Not thread safe
- """
-
- actualSource = self.source._register_queue(event, queue)
- ActualType = type(self)
- return ActualType(actualSource)
-
- def unregister(self, event, queue):
- """
- @warning Not thread safe
- """
-
- actualSource = self.source._unregister_queue(event, queue)
- ActualType = type(self)
- return ActualType(actualSource)
-
-
-class WeakEventSourceProxy(object):
-
- def __init__(self, source):
- """
- @warning Not thread safe
- """
-
- self.source = weakref.ref(source)
-
- def register(self, event, queue):
- """
- @warning Not thread safe
- """
-
- actualSource = self.source()._register_queue(event, queue)
- ActualType = type(self)
- return ActualType(actualSource)
-
- def unregister(self, event, queue):
- """
- @warning Not thread safe
- """
-
- actualSource = self.source()._unregister_queue(event, queue)
- ActualType = type(self)
- return ActualType(actualSource)
-
-
-class EventObserver(object):
- """
-
- >>> import Queue
- >>> class Observer(EventObserver):
- ... def connect_to_source(self, eventSourceRoot):
- ... self.queue = Queue.Queue()
- ... self.source = eventSourceRoot.register("1-event-0", self.queue)
- >>>
- >>> sourceRoot = EventSource()
- >>> sourceChild1 = EventSource()
- >>> sourceChild1.register_provided_events("1-event-0", "1-event-1")
- >>> sourceChild2 = EventSource()
- >>> sourceChild2.register_provided_events("1-event-0", "1-event-1")
- >>> sourceRoot.add_children(sourceChild1, sourceChild2)
- >>>
- >>> o1 = Observer()
- >>> o1.connect_to_source(StrongEventSourceProxy(sourceRoot))
- >>> o2 = Observer()
- >>> o2.connect_to_source(WeakEventSourceProxy(sourceRoot))
- >>>
- >>> sourceChild1.notify_observers("1-event-0", "Hello World")
- >>> o1.queue.get(False)
- 'Hello World'
- >>> o2.queue.get(False)
- 'Hello World'
- """
-
- def connect_to_source(self, eventSourceRoot):
- raise NotImplementedError
+++ /dev/null
-#!/usr/bin/env python\r
-\r
-"""\r
-Uses for generators\r
-* Pull pipelining (iterators)\r
-* Push pipelining (coroutines)\r
-* State machines (coroutines)\r
-* "Cooperative multitasking" (coroutines)\r
-* Algorithm -> Object transform for cohesiveness (for example context managers) (coroutines)\r
-\r
-Design considerations\r
-* When should a stage pass on exceptions or have it thrown within it?\r
-* When should a stage pass on GeneratorExits?\r
-* Is there a way to either turn a push generator into a iterator or to use\r
- comprehensions syntax for push generators (I doubt it)\r
-* When should the stage try and send data in both directions\r
-* Since pull generators (generators), push generators (coroutines), subroutines, and coroutines are all coroutines, maybe we should rename the push generators to not confuse them, like signals/slots? and then refer to two-way generators as coroutines\r
-** If so, make s* and co* implementation of functions\r
-"""\r
-\r
-import threading\r
-import Queue\r
-import pickle\r
-import functools\r
-import itertools\r
-import xml.sax\r
-import xml.parsers.expat\r
-\r
-\r
-def autostart(func):\r
- """\r
- >>> @autostart\r
- ... def grep_sink(pattern):\r
- ... print "Looking for %s" % pattern\r
- ... while True:\r
- ... line = yield\r
- ... if pattern in line:\r
- ... print line,\r
- >>> g = grep_sink("python")\r
- Looking for python\r
- >>> g.send("Yeah but no but yeah but no")\r
- >>> g.send("A series of tubes")\r
- >>> g.send("python generators rock!")\r
- python generators rock!\r
- >>> g.close()\r
- """\r
-\r
- @functools.wraps(func)\r
- def start(*args, **kwargs):\r
- cr = func(*args, **kwargs)\r
- cr.next()\r
- return cr\r
-\r
- return start\r
-\r
-\r
-@autostart\r
-def printer_sink(format = "%s"):\r
- """\r
- >>> pr = printer_sink("%r")\r
- >>> pr.send("Hello")\r
- 'Hello'\r
- >>> pr.send("5")\r
- '5'\r
- >>> pr.send(5)\r
- 5\r
- >>> p = printer_sink()\r
- >>> p.send("Hello")\r
- Hello\r
- >>> p.send("World")\r
- World\r
- >>> # p.throw(RuntimeError, "Goodbye")\r
- >>> # p.send("Meh")\r
- >>> # p.close()\r
- """\r
- while True:\r
- item = yield\r
- print format % (item, )\r
-\r
-\r
-@autostart\r
-def null_sink():\r
- """\r
- Good for uses like with cochain to pick up any slack\r
- """\r
- while True:\r
- item = yield\r
-\r
-\r
-def itr_source(itr, target):\r
- """\r
- >>> itr_source(xrange(2), printer_sink())\r
- 0\r
- 1\r
- """\r
- for item in itr:\r
- target.send(item)\r
-\r
-\r
-@autostart\r
-def cofilter(predicate, target):\r
- """\r
- >>> p = printer_sink()\r
- >>> cf = cofilter(None, p)\r
- >>> cf.send("")\r
- >>> cf.send("Hello")\r
- Hello\r
- >>> cf.send([])\r
- >>> cf.send([1, 2])\r
- [1, 2]\r
- >>> cf.send(False)\r
- >>> cf.send(True)\r
- True\r
- >>> cf.send(0)\r
- >>> cf.send(1)\r
- 1\r
- >>> # cf.throw(RuntimeError, "Goodbye")\r
- >>> # cf.send(False)\r
- >>> # cf.send(True)\r
- >>> # cf.close()\r
- """\r
- if predicate is None:\r
- predicate = bool\r
-\r
- while True:\r
- try:\r
- item = yield\r
- if predicate(item):\r
- target.send(item)\r
- except StandardError, e:\r
- target.throw(e.__class__, e.message)\r
-\r
-\r
-@autostart\r
-def comap(function, target):\r
- """\r
- >>> p = printer_sink()\r
- >>> cm = comap(lambda x: x+1, p)\r
- >>> cm.send(0)\r
- 1\r
- >>> cm.send(1.0)\r
- 2.0\r
- >>> cm.send(-2)\r
- -1\r
- >>> # cm.throw(RuntimeError, "Goodbye")\r
- >>> # cm.send(0)\r
- >>> # cm.send(1.0)\r
- >>> # cm.close()\r
- """\r
- while True:\r
- try:\r
- item = yield\r
- mappedItem = function(item)\r
- target.send(mappedItem)\r
- except StandardError, e:\r
- target.throw(e.__class__, e.message)\r
-\r
-\r
-def func_sink(function):\r
- return comap(function, null_sink())\r
-\r
-\r
-def expand_positional(function):\r
-\r
- @functools.wraps(function)\r
- def expander(item):\r
- return function(*item)\r
-\r
- return expander\r
-\r
-\r
-@autostart\r
-def append_sink(l):\r
- """\r
- >>> l = []\r
- >>> apps = append_sink(l)\r
- >>> apps.send(1)\r
- >>> apps.send(2)\r
- >>> apps.send(3)\r
- >>> print l\r
- [1, 2, 3]\r
- """\r
- while True:\r
- item = yield\r
- l.append(item)\r
-\r
-\r
-@autostart\r
-def last_n_sink(l, n = 1):\r
- """\r
- >>> l = []\r
- >>> lns = last_n_sink(l)\r
- >>> lns.send(1)\r
- >>> lns.send(2)\r
- >>> lns.send(3)\r
- >>> print l\r
- [3]\r
- """\r
- del l[:]\r
- while True:\r
- item = yield\r
- extraCount = len(l) - n + 1\r
- if 0 < extraCount:\r
- del l[0:extraCount]\r
- l.append(item)\r
-\r
-\r
-@autostart\r
-def coreduce(target, function, initializer = None):\r
- """\r
- >>> reduceResult = []\r
- >>> lns = last_n_sink(reduceResult)\r
- >>> cr = coreduce(lns, lambda x, y: x + y, 0)\r
- >>> cr.send(1)\r
- >>> cr.send(2)\r
- >>> cr.send(3)\r
- >>> print reduceResult\r
- [6]\r
- >>> cr = coreduce(lns, lambda x, y: x + y)\r
- >>> cr.send(1)\r
- >>> cr.send(2)\r
- >>> cr.send(3)\r
- >>> print reduceResult\r
- [6]\r
- """\r
- isFirst = True\r
- cumulativeRef = initializer\r
- while True:\r
- item = yield\r
- if isFirst and initializer is None:\r
- cumulativeRef = item\r
- else:\r
- cumulativeRef = function(cumulativeRef, item)\r
- target.send(cumulativeRef)\r
- isFirst = False\r
-\r
-\r
-@autostart\r
-def cotee(targets):\r
- """\r
- Takes a sequence of coroutines and sends the received items to all of them\r
-\r
- >>> ct = cotee((printer_sink("1 %s"), printer_sink("2 %s")))\r
- >>> ct.send("Hello")\r
- 1 Hello\r
- 2 Hello\r
- >>> ct.send("World")\r
- 1 World\r
- 2 World\r
- >>> # ct.throw(RuntimeError, "Goodbye")\r
- >>> # ct.send("Meh")\r
- >>> # ct.close()\r
- """\r
- while True:\r
- try:\r
- item = yield\r
- for target in targets:\r
- target.send(item)\r
- except StandardError, e:\r
- for target in targets:\r
- target.throw(e.__class__, e.message)\r
-\r
-\r
-class CoTee(object):\r
- """\r
- >>> ct = CoTee()\r
- >>> ct.register_sink(printer_sink("1 %s"))\r
- >>> ct.register_sink(printer_sink("2 %s"))\r
- >>> ct.stage.send("Hello")\r
- 1 Hello\r
- 2 Hello\r
- >>> ct.stage.send("World")\r
- 1 World\r
- 2 World\r
- >>> ct.register_sink(printer_sink("3 %s"))\r
- >>> ct.stage.send("Foo")\r
- 1 Foo\r
- 2 Foo\r
- 3 Foo\r
- >>> # ct.stage.throw(RuntimeError, "Goodbye")\r
- >>> # ct.stage.send("Meh")\r
- >>> # ct.stage.close()\r
- """\r
-\r
- def __init__(self):\r
- self.stage = self._stage()\r
- self._targets = []\r
-\r
- def register_sink(self, sink):\r
- self._targets.append(sink)\r
-\r
- def unregister_sink(self, sink):\r
- self._targets.remove(sink)\r
-\r
- def restart(self):\r
- self.stage = self._stage()\r
-\r
- @autostart\r
- def _stage(self):\r
- while True:\r
- try:\r
- item = yield\r
- for target in self._targets:\r
- target.send(item)\r
- except StandardError, e:\r
- for target in self._targets:\r
- target.throw(e.__class__, e.message)\r
-\r
-\r
-def _flush_queue(queue):\r
- while not queue.empty():\r
- yield queue.get()\r
-\r
-\r
-@autostart\r
-def cocount(target, start = 0):\r
- """\r
- >>> cc = cocount(printer_sink("%s"))\r
- >>> cc.send("a")\r
- 0\r
- >>> cc.send(None)\r
- 1\r
- >>> cc.send([])\r
- 2\r
- >>> cc.send(0)\r
- 3\r
- """\r
- for i in itertools.count(start):\r
- item = yield\r
- target.send(i)\r
-\r
-\r
-@autostart\r
-def coenumerate(target, start = 0):\r
- """\r
- >>> ce = coenumerate(printer_sink("%r"))\r
- >>> ce.send("a")\r
- (0, 'a')\r
- >>> ce.send(None)\r
- (1, None)\r
- >>> ce.send([])\r
- (2, [])\r
- >>> ce.send(0)\r
- (3, 0)\r
- """\r
- for i in itertools.count(start):\r
- item = yield\r
- decoratedItem = i, item\r
- target.send(decoratedItem)\r
-\r
-\r
-@autostart\r
-def corepeat(target, elem):\r
- """\r
- >>> cr = corepeat(printer_sink("%s"), "Hello World")\r
- >>> cr.send("a")\r
- Hello World\r
- >>> cr.send(None)\r
- Hello World\r
- >>> cr.send([])\r
- Hello World\r
- >>> cr.send(0)\r
- Hello World\r
- """\r
- while True:\r
- item = yield\r
- target.send(elem)\r
-\r
-\r
-@autostart\r
-def cointercept(target, elems):\r
- """\r
- >>> cr = cointercept(printer_sink("%s"), [1, 2, 3, 4])\r
- >>> cr.send("a")\r
- 1\r
- >>> cr.send(None)\r
- 2\r
- >>> cr.send([])\r
- 3\r
- >>> cr.send(0)\r
- 4\r
- >>> cr.send("Bye")\r
- Traceback (most recent call last):\r
- File "/usr/lib/python2.5/doctest.py", line 1228, in __run\r
- compileflags, 1) in test.globs\r
- File "<doctest __main__.cointercept[5]>", line 1, in <module>\r
- cr.send("Bye")\r
- StopIteration\r
- """\r
- item = yield\r
- for elem in elems:\r
- target.send(elem)\r
- item = yield\r
-\r
-\r
-@autostart\r
-def codropwhile(target, pred):\r
- """\r
- >>> cdw = codropwhile(printer_sink("%s"), lambda x: x)\r
- >>> cdw.send([0, 1, 2])\r
- >>> cdw.send(1)\r
- >>> cdw.send(True)\r
- >>> cdw.send(False)\r
- >>> cdw.send([0, 1, 2])\r
- [0, 1, 2]\r
- >>> cdw.send(1)\r
- 1\r
- >>> cdw.send(True)\r
- True\r
- """\r
- while True:\r
- item = yield\r
- if not pred(item):\r
- break\r
-\r
- while True:\r
- item = yield\r
- target.send(item)\r
-\r
-\r
-@autostart\r
-def cotakewhile(target, pred):\r
- """\r
- >>> ctw = cotakewhile(printer_sink("%s"), lambda x: x)\r
- >>> ctw.send([0, 1, 2])\r
- [0, 1, 2]\r
- >>> ctw.send(1)\r
- 1\r
- >>> ctw.send(True)\r
- True\r
- >>> ctw.send(False)\r
- >>> ctw.send([0, 1, 2])\r
- >>> ctw.send(1)\r
- >>> ctw.send(True)\r
- """\r
- while True:\r
- item = yield\r
- if not pred(item):\r
- break\r
- target.send(item)\r
-\r
- while True:\r
- item = yield\r
-\r
-\r
-@autostart\r
-def coslice(target, lower, upper):\r
- """\r
- >>> cs = coslice(printer_sink("%r"), 3, 5)\r
- >>> cs.send("0")\r
- >>> cs.send("1")\r
- >>> cs.send("2")\r
- >>> cs.send("3")\r
- '3'\r
- >>> cs.send("4")\r
- '4'\r
- >>> cs.send("5")\r
- >>> cs.send("6")\r
- """\r
- for i in xrange(lower):\r
- item = yield\r
- for i in xrange(upper - lower):\r
- item = yield\r
- target.send(item)\r
- while True:\r
- item = yield\r
-\r
-\r
-@autostart\r
-def cochain(targets):\r
- """\r
- >>> cr = cointercept(printer_sink("good %s"), [1, 2, 3, 4])\r
- >>> cc = cochain([cr, printer_sink("end %s")])\r
- >>> cc.send("a")\r
- good 1\r
- >>> cc.send(None)\r
- good 2\r
- >>> cc.send([])\r
- good 3\r
- >>> cc.send(0)\r
- good 4\r
- >>> cc.send("Bye")\r
- end Bye\r
- """\r
- behind = []\r
- for target in targets:\r
- try:\r
- while behind:\r
- item = behind.pop()\r
- target.send(item)\r
- while True:\r
- item = yield\r
- target.send(item)\r
- except StopIteration:\r
- behind.append(item)\r
-\r
-\r
-@autostart\r
-def queue_sink(queue):\r
- """\r
- >>> q = Queue.Queue()\r
- >>> qs = queue_sink(q)\r
- >>> qs.send("Hello")\r
- >>> qs.send("World")\r
- >>> qs.throw(RuntimeError, "Goodbye")\r
- >>> qs.send("Meh")\r
- >>> qs.close()\r
- >>> print [i for i in _flush_queue(q)]\r
- [(None, 'Hello'), (None, 'World'), (<type 'exceptions.RuntimeError'>, 'Goodbye'), (None, 'Meh'), (<type 'exceptions.GeneratorExit'>, None)]\r
- """\r
- while True:\r
- try:\r
- item = yield\r
- queue.put((None, item))\r
- except StandardError, e:\r
- queue.put((e.__class__, e.message))\r
- except GeneratorExit:\r
- queue.put((GeneratorExit, None))\r
- raise\r
-\r
-\r
-def decode_item(item, target):\r
- if item[0] is None:\r
- target.send(item[1])\r
- return False\r
- elif item[0] is GeneratorExit:\r
- target.close()\r
- return True\r
- else:\r
- target.throw(item[0], item[1])\r
- return False\r
-\r
-\r
-def queue_source(queue, target):\r
- """\r
- >>> q = Queue.Queue()\r
- >>> for i in [\r
- ... (None, 'Hello'),\r
- ... (None, 'World'),\r
- ... (GeneratorExit, None),\r
- ... ]:\r
- ... q.put(i)\r
- >>> qs = queue_source(q, printer_sink())\r
- Hello\r
- World\r
- """\r
- isDone = False\r
- while not isDone:\r
- item = queue.get()\r
- isDone = decode_item(item, target)\r
-\r
-\r
-def threaded_stage(target, thread_factory = threading.Thread):\r
- messages = Queue.Queue()\r
-\r
- run_source = functools.partial(queue_source, messages, target)\r
- thread_factory(target=run_source).start()\r
-\r
- # Sink running in current thread\r
- return functools.partial(queue_sink, messages)\r
-\r
-\r
-@autostart\r
-def pickle_sink(f):\r
- while True:\r
- try:\r
- item = yield\r
- pickle.dump((None, item), f)\r
- except StandardError, e:\r
- pickle.dump((e.__class__, e.message), f)\r
- except GeneratorExit:\r
- pickle.dump((GeneratorExit, ), f)\r
- raise\r
- except StopIteration:\r
- f.close()\r
- return\r
-\r
-\r
-def pickle_source(f, target):\r
- try:\r
- isDone = False\r
- while not isDone:\r
- item = pickle.load(f)\r
- isDone = decode_item(item, target)\r
- except EOFError:\r
- target.close()\r
-\r
-\r
-class EventHandler(object, xml.sax.ContentHandler):\r
-\r
- START = "start"\r
- TEXT = "text"\r
- END = "end"\r
-\r
- def __init__(self, target):\r
- object.__init__(self)\r
- xml.sax.ContentHandler.__init__(self)\r
- self._target = target\r
-\r
- def startElement(self, name, attrs):\r
- self._target.send((self.START, (name, attrs._attrs)))\r
-\r
- def characters(self, text):\r
- self._target.send((self.TEXT, text))\r
-\r
- def endElement(self, name):\r
- self._target.send((self.END, name))\r
-\r
-\r
-def expat_parse(f, target):\r
- parser = xml.parsers.expat.ParserCreate()\r
- parser.buffer_size = 65536\r
- parser.buffer_text = True\r
- parser.returns_unicode = False\r
- parser.StartElementHandler = lambda name, attrs: target.send(('start', (name, attrs)))\r
- parser.EndElementHandler = lambda name: target.send(('end', name))\r
- parser.CharacterDataHandler = lambda data: target.send(('text', data))\r
- parser.ParseFile(f)\r
-\r
-\r
-if __name__ == "__main__":\r
- import doctest\r
- doctest.testmod()\r
+++ /dev/null
-#!/usr/bin/env python
-
-
-"""
-This module provides three types of queues, with these constructors:
- Stack([items]) -- Create a Last In First Out queue, implemented as a list
- Queue([items]) -- Create a First In First Out queue
- PriorityQueue([items]) -- Create a queue where minimum item (by <) is first
-Here [items] is an optional list of initial items; if omitted, queue is empty.
-Each type supports the following methods and functions:
- len(q) -- number of items in q (also q.__len__())
- q.append(item)-- add an item to the queue
- q.extend(items) -- add each of the items to the queue
- q.pop() -- remove and return the "first" item from the queue
-"""
-
-
-import types
-import operator
-
-
-def Stack(items=None):
- "A stack, or last-in-first-out queue, is implemented as a list."
- return items or []
-
-
-class Queue(object):
- "A first-in-first-out queue."
-
- def __init__(self, initialItems=None):
- self.start = 0
- self.items = initialItems or []
-
- def __len__(self):
- return len(self.items) - self.start
-
- def append(self, item):
- self.items.append(item)
-
- def extend(self, items):
- self.items.extend(items)
-
- def pop(self):
- items = self.items
- item = items[self.start]
- self.start += 1
- if self.start > 100 and self.start > len(items)/2:
- del items[:self.start]
- self.start = 0
- return item
-
-
-class PriorityQueue(object):
- "A queue in which the minimum element (as determined by cmp) is first."
-
- def __init__(self, initialItems=None, comparator=operator.lt):
- self.items = []
- self.cmp = comparator
- if initialItems is not None:
- self.extend(initialItems)
-
- def __len__(self):
- return len(self.items)
-
- def append(self, item):
- items, cmp_func = self.items, self.cmp
- items.append(item)
- i = len(items) - 1
- while i > 0 and cmp_func(item, items[i//2]):
- items[i], i = items[i//2], i//2
- items[i] = item
-
- def extend(self, items):
- for item in items:
- self.append(item)
-
- def pop(self):
- items = self.items
- if len(items) == 1:
- return items.pop()
- e = items[0]
- items[0] = items.pop()
- self.heapify(0)
- return e
-
- def heapify(self, i):
- """
- itemsssumes items is an array whose left and right children are heaps,
- move items[i] into the correct position.See CLR&S p. 130
- """
- items, cmp_func = self.items, self.cmp
- left, right, N = 2*i + 1, 2*i + 2, len(items)-1
- if left <= N and cmp_func(items[left], items[i]):
- smallest = left
- else:
- smallest = i
-
- if right <= N and cmp_func(items[right], items[smallest]):
- smallest = right
- if smallest != i:
- items[i], items[smallest] = items[smallest], items[i]
- self.heapify(smallest)
-
-
-class AttrDict(object):
- """
- Can act as a mixin to add dictionary access to members to ease dynamic attribute access
- or as a wrapper around a class
-
- >>> class Mixin (AttrDict):
- ... def __init__ (self):
- ... AttrDict.__init__ (self)
- ... self.x = 5
- ...
- >>> mixinExample = Mixin ()
- >>> mixinExample.x
- 5
- >>> mixinExample["x"]
- 5
- >>> mixinExample["x"] = 10; mixinExample.x
- 10
- >>> "x" in mixinExample
- True
- >>> class Wrapper (object):
- ... def __init__ (self):
- ... self.y = 10
- ...
- >>> wrapper = Wrapper()
- >>> wrapper.y
- 10
- >>> wrapperExample = AttrDict (wrapper)
- >>> wrapperExample["y"]
- 10
- >>> wrapperExample["y"] = 20; wrapper.y
- 20
- >>> "y" in wrapperExample
- True
- """
-
- def __init__(self, obj = None):
- self.__obj = obj if obj is not None else self
-
- def __getitem__(self, name):
- return getattr(self.__obj, name)
-
- def __setitem__(self, name, value):
- setattr(self.__obj, name, value)
-
- def __delitem__(self, name):
- delattr(self.__obj, name)
-
- def __contains__(self, name):
- return hasattr(self.__obj, name)
-
-
-class Uncertain(object):
- """
- Represents a numeric value with a known small uncertainty
- (error, standard deviation...).
- Numeric operators are overloaded to work with other Uncertain or
- numeric objects.
- The uncertainty (error) must be small. Otherwise the linearization
- employed here becomes wrong.
-
- >>> pie = Uncertain(3.14, 0.01)
- >>> ee = Uncertain(2.718, 0.001)
- >>> pie, repr(pie)
- (Uncertain(3.14, 0.01), 'Uncertain(3.14, 0.01)')
- >>> ee, repr(ee)
- (Uncertain(2.718, 0.001), 'Uncertain(2.718, 0.001)')
- >>> pie + ee
- Uncertain(5.858, 0.0100498756211)
- >>> pie * ee
- Uncertain(8.53452, 0.0273607748428)
- """
-
- def __init__(self, value=0., error=0., *a, **t):
- self.value = value
- self.error = abs(error)
- super(Uncertain, self).__init__(*a, **t)
-
- # Conversions
-
- def __str__(self):
- return "%g+-%g" % (self.value, self.error)
-
- def __repr__(self):
- return "Uncertain(%s, %s)" % (self.value, self.error)
-
- def __complex__(self):
- return complex(self.value)
-
- def __int__(self):
- return int(self.value)
-
- def __long__(self):
- return long(self.value)
-
- def __float__(self):
- return self.value
-
- # Comparison
-
- def __eq__(self, other):
- epsilon = max(self.error, other.error)
- return abs(other.value - self.value) < epsilon
-
- def __ne__(self, other):
- return not (self == other)
-
- def __hash__(self):
- return hash(self.value) ^ hash(self.error)
-
- def __le__(self, other):
- return self.value < other.value or self == other
-
- def __lt__(self, other):
- return self.value < other.value and self != other
-
- def __gt__(self, other):
- return not (self <= other)
-
- def __ge__(self, other):
- return not (self < other)
-
- def __nonzero__(self):
- return self.error < abs(self.value)
-
- # Math
-
- def assign(self, other):
- if isinstance(other, Uncertain):
- self.value = other.value
- self.error = other.error
- else:
- self.value = other
- self.error = 0.
-
- def __add__(self, other):
- if isinstance(other, Uncertain):
- v = self.value + other.value
- e = (self.error**2 + other.error**2) ** .5
- return Uncertain(v, e)
- else:
- return Uncertain(self.value+other, self.error)
-
- def __sub__(self, other):
- return self + (-other)
-
- def __mul__(self, other):
- if isinstance(other, Uncertain):
- v = self.value * other.value
- e = ((self.error * other.value)**2 + (other.error * self.value)**2) ** .5
- return Uncertain(v, e)
- else:
- return Uncertain(self.value*other,
- self.error*other)
-
- def __div__(self, other):
- return self*(1./other)
-
- def __truediv__(self, other):
- return self*(1./other)
-
- def __radd__(self, other):
- return self + other
-
- def __rsub__(self, other):
- return -self + other
-
- def __rmul__(self, other):
- return self * other
-
- def __rdiv__(self, other):
- return (self/other)**-1.
-
- def __rtruediv__(self, other):
- return (self/other)**-1.
-
- def __neg__(self):
- return self*-1
-
- def __pos__(self):
- return self
-
- def __abs__(self):
- return Uncertain(abs(self.value), self.error)
-
-
-class Enumeration(object):
- """
- C-Style enumeration mapping attributes to numbers
-
- >>> Color = Enumeration("Color", ["Red", "Green", "Blue"])
- >>> Color.Red, Color.Green, Color.Blue
- (0, 1, 2)
- >>>
- >>> Color["Red"], Color.whatis(0)
- (0, 'Red')
- >>> Color.names(), Color.values()
- (['Blue', 'Green', 'Red'], [2, 1, 0])
- >>>
- >>> str(Color)
- "Color: {'Blue': 2, 'Green': 1, 'Red': 0}"
- >>>
- >>> 0 in Color, 10 in Color
- (True, False)
- >>> "Red" in Color, "Black" in Color
- (True, False)
- """
-
- def __init__(self, name, enumList):
- self.__name__ = name
- self.__doc__ = name
- lookup = { }
- reverseLookup = { }
-
- i = 0
- uniqueNames = [ ]
- uniqueValues = [ ]
- for x in enumList:
- if type(x) == types.TupleType:
- x, i = x
- if type(x) != types.StringType:
- raise TypeError("enum name is not a string: " + x)
- if type(i) != types.IntType:
- raise TypeError("enum value is not an integer: " + str(i))
- if x in uniqueNames:
- raise ValueError("enum name is not unique: " + x)
- if i in uniqueValues:
- raise ValueError("enum value is not unique for " + x)
- uniqueNames.append(x)
- uniqueValues.append(i)
- lookup[x] = i
- reverseLookup[i] = x
- i = i + 1
-
- self.__lookup = lookup
- self.__reverseLookup = reverseLookup
-
- def whatis(self, value):
- return self.__reverseLookup[value]
-
- def names(self):
- return self.__lookup.keys()
-
- def values(self):
- return self.__lookup.values()
-
- def __getattr__(self, attr):
- if attr not in self.__lookup:
- raise (AttributeError)
- return self.__lookup[attr]
-
- def __str__(self):
- return str(self.__doc__)+": "+str(self.__lookup)
-
- def __len__(self):
- return len(self.__lookup)
-
- def __contains__(self, x):
- return (x in self.__lookup) or (x in self.__reverseLookup)
-
- def __getitem__(self, attr):
- return self.__lookup[attr]
-
- def __iter__(self):
- return self.__lookup.itervalues()
-
- def iterkeys(self):
- return self.__lookup.iterkeys()
-
- def itervalues(self):
- return self.__lookup.itervalues()
-
- def iteritems(self):
- return self.__lookup.iteritems()
-
-
-def make_enum(cls):
- """
- @todo Make more object orientated (inheritance?)
- """
- name = cls.__name__
- values = cls.__values__
- return Enumeration(name, values)
+++ /dev/null
-#!/usr/bin/env python
-
-
-from __future__ import with_statement
-
-import threading
-import contextlib
-import functools
-
-import gobject
-import gtk
-import gtk.glade
-
-
-def make_idler(func):
- """
- Decorator that makes a generator-function into a function that will continue execution on next call
-
- >>> import misc
- >>> misc.validate_decorator(make_idler)
-
- """
- a = []
-
- @functools.wraps(func)
- def decorated_func(*args, **kwds):
- if not a:
- a.append(func(*args, **kwds))
- try:
- shouldBeNone = a[0].next()
- assert shouldBeNone is None, "The idle only task yield a value, %r" % shouldBeNone
- return True
- except StopIteration:
- del a[:]
- return False
-
- return decorated_func
-
-
-@contextlib.contextmanager
-def gtk_critical_section():
- #The API changed and I hope these are the right calls
- gtk.gdk.threads_enter()
- try:
- yield
- finally:
- gtk.gdk.threads_leave()
-
-
-if __name__ == "__main__":
- #gtk.gdk.threads_init()
- pass
+++ /dev/null
-#!/usr/bin/env python
-
-
-from __future__ import with_statement
-
-import os
-import pickle
-import contextlib
-import itertools
-import functools
-
-
-@contextlib.contextmanager
-def change_directory(directory):
- previousDirectory = os.getcwd()
- os.chdir(directory)
- currentDirectory = os.getcwd()
-
- try:
- yield previousDirectory, currentDirectory
- finally:
- os.chdir(previousDirectory)
-
-
-@contextlib.contextmanager
-def pickled(filename):
- """
- Here is an example usage:
- with pickled("foo.db") as p:
- p("users", list).append(["srid", "passwd", 23])
- """
-
- if os.path.isfile(filename):
- data = pickle.load(open(filename))
- else:
- data = {}
-
- def getter(item, factory):
- if item in data:
- return data[item]
- else:
- data[item] = factory()
- return data[item]
-
- yield getter
-
- pickle.dump(data, open(filename, "w"))
-
-
-@contextlib.contextmanager
-def redirect(object_, attr, value):
- """
- >>> import sys
- ... with redirect(sys, 'stdout', open('stdout', 'w')):
- ... print "hello"
- ...
- >>> print "we're back"
- we're back
- """
- orig = getattr(object_, attr)
- setattr(object_, attr, value)
- try:
- yield
- finally:
- setattr(object_, attr, orig)
-
-
-def pathsplit(path):
- """
- >>> pathsplit("/a/b/c")
- ['', 'a', 'b', 'c']
- >>> pathsplit("./plugins/builtins.ini")
- ['.', 'plugins', 'builtins.ini']
- """
- pathParts = path.split(os.path.sep)
- return pathParts
-
-
-def commonpath(l1, l2, common=None):
- """
- >>> commonpath(pathsplit('/a/b/c/d'), pathsplit('/a/b/c1/d1'))
- (['', 'a', 'b'], ['c', 'd'], ['c1', 'd1'])
- >>> commonpath(pathsplit("./plugins/"), pathsplit("./plugins/builtins.ini"))
- (['.', 'plugins'], [''], ['builtins.ini'])
- >>> commonpath(pathsplit("./plugins/builtins"), pathsplit("./plugins"))
- (['.', 'plugins'], ['builtins'], [])
- """
- if common is None:
- common = []
-
- if l1 == l2:
- return l1, [], []
-
- for i, (leftDir, rightDir) in enumerate(zip(l1, l2)):
- if leftDir != rightDir:
- return l1[0:i], l1[i:], l2[i:]
- else:
- if leftDir == rightDir:
- i += 1
- return l1[0:i], l1[i:], l2[i:]
-
-
-def relpath(p1, p2):
- """
- >>> relpath('/', '/')
- './'
- >>> relpath('/a/b/c/d', '/')
- '../../../../'
- >>> relpath('/a/b/c/d', '/a/b/c1/d1')
- '../../c1/d1'
- >>> relpath('/a/b/c/d', '/a/b/c1/d1/')
- '../../c1/d1'
- >>> relpath("./plugins/builtins", "./plugins")
- '../'
- >>> relpath("./plugins/", "./plugins/builtins.ini")
- 'builtins.ini'
- """
- sourcePath = os.path.normpath(p1)
- destPath = os.path.normpath(p2)
-
- (common, sourceOnly, destOnly) = commonpath(pathsplit(sourcePath), pathsplit(destPath))
- if len(sourceOnly) or len(destOnly):
- relParts = itertools.chain(
- (('..' + os.sep) * len(sourceOnly), ),
- destOnly,
- )
- return os.path.join(*relParts)
- else:
- return "."+os.sep
+++ /dev/null
-#!/usr/bin/env python
-
-from __future__ import with_statement
-
-import sys
-import cPickle
-
-import functools
-import contextlib
-import inspect
-
-import optparse
-import traceback
-import warnings
-import string
-
-
-def printfmt(template):
- """
- This hides having to create the Template object and call substitute/safe_substitute on it. For example:
-
- >>> num = 10
- >>> word = "spam"
- >>> printfmt("I would like to order $num units of $word, please") #doctest: +SKIP
- I would like to order 10 units of spam, please
- """
- frame = inspect.stack()[-1][0]
- try:
- print string.Template(template).safe_substitute(frame.f_locals)
- finally:
- del frame
-
-
-def is_special(name):
- return name.startswith("__") and name.endswith("__")
-
-
-def is_private(name):
- return name.startswith("_") and not is_special(name)
-
-
-def privatize(clsName, attributeName):
- """
- At runtime, make an attributeName private
-
- Example:
- >>> class Test(object):
- ... pass
- ...
- >>> try:
- ... dir(Test).index("_Test__me")
- ... print dir(Test)
- ... except:
- ... print "Not Found"
- Not Found
- >>> setattr(Test, privatize(Test.__name__, "me"), "Hello World")
- >>> try:
- ... dir(Test).index("_Test__me")
- ... print "Found"
- ... except:
- ... print dir(Test)
- 0
- Found
- >>> print getattr(Test, obfuscate(Test.__name__, "__me"))
- Hello World
- >>>
- >>> is_private(privatize(Test.__name__, "me"))
- True
- >>> is_special(privatize(Test.__name__, "me"))
- False
- """
- return "".join(["_", clsName, "__", attributeName])
-
-
-def obfuscate(clsName, attributeName):
- """
- At runtime, turn a private name into the obfuscated form
-
- Example:
- >>> class Test(object):
- ... __me = "Hello World"
- ...
- >>> try:
- ... dir(Test).index("_Test__me")
- ... print "Found"
- ... except:
- ... print dir(Test)
- 0
- Found
- >>> print getattr(Test, obfuscate(Test.__name__, "__me"))
- Hello World
- >>> is_private(obfuscate(Test.__name__, "__me"))
- True
- >>> is_special(obfuscate(Test.__name__, "__me"))
- False
- """
- return "".join(["_", clsName, attributeName])
-
-
-class PAOptionParser(optparse.OptionParser, object):
- """
- >>> if __name__ == '__main__':
- ... #parser = PAOptionParser("My usage str")
- ... parser = PAOptionParser()
- ... parser.add_posarg("Foo", help="Foo usage")
- ... parser.add_posarg("Bar", dest="bar_dest")
- ... parser.add_posarg("Language", dest='tr_type', type="choice", choices=("Python", "Other"))
- ... parser.add_option('--stocksym', dest='symbol')
- ... values, args = parser.parse_args()
- ... print values, args
- ...
-
- python mycp.py -h
- python mycp.py
- python mycp.py foo
- python mycp.py foo bar
-
- python mycp.py foo bar lava
- Usage: pa.py <Foo> <Bar> <Language> [options]
-
- Positional Arguments:
- Foo: Foo usage
- Bar:
- Language:
-
- pa.py: error: option --Language: invalid choice: 'lava' (choose from 'Python', 'Other'
- """
-
- def __init__(self, *args, **kw):
- self.posargs = []
- super(PAOptionParser, self).__init__(*args, **kw)
-
- def add_posarg(self, *args, **kw):
- pa_help = kw.get("help", "")
- kw["help"] = optparse.SUPPRESS_HELP
- o = self.add_option("--%s" % args[0], *args[1:], **kw)
- self.posargs.append((args[0], pa_help))
-
- def get_usage(self, *args, **kwargs):
- params = (' '.join(["<%s>" % arg[0] for arg in self.posargs]), '\n '.join(["%s: %s" % (arg) for arg in self.posargs]))
- self.usage = "%%prog %s [options]\n\nPositional Arguments:\n %s" % params
- return super(PAOptionParser, self).get_usage(*args, **kwargs)
-
- def parse_args(self, *args, **kwargs):
- args = sys.argv[1:]
- args0 = []
- for p, v in zip(self.posargs, args):
- args0.append("--%s" % p[0])
- args0.append(v)
- args = args0 + args
- options, args = super(PAOptionParser, self).parse_args(args, **kwargs)
- if len(args) < len(self.posargs):
- msg = 'Missing value(s) for "%s"\n' % ", ".join([arg[0] for arg in self.posargs][len(args):])
- self.error(msg)
- return options, args
-
-
-def explicitly(name, stackadd=0):
- """
- This is an alias for adding to '__all__'. Less error-prone than using
- __all__ itself, since setting __all__ directly is prone to stomping on
- things implicitly exported via L{alias}.
-
- @note Taken from PyExport (which could turn out pretty cool):
- @li @a http://codebrowse.launchpad.net/~glyph/
- @li @a http://glyf.livejournal.com/74356.html
- """
- packageVars = sys._getframe(1+stackadd).f_locals
- globalAll = packageVars.setdefault('__all__', [])
- globalAll.append(name)
-
-
-def public(thunk):
- """
- This is a decorator, for convenience. Rather than typing the name of your
- function twice, you can decorate a function with this.
-
- To be real, @public would need to work on methods as well, which gets into
- supporting types...
-
- @note Taken from PyExport (which could turn out pretty cool):
- @li @a http://codebrowse.launchpad.net/~glyph/
- @li @a http://glyf.livejournal.com/74356.html
- """
- explicitly(thunk.__name__, 1)
- return thunk
-
-
-def _append_docstring(obj, message):
- if obj.__doc__ is None:
- obj.__doc__ = message
- else:
- obj.__doc__ += message
-
-
-def validate_decorator(decorator):
-
- def simple(x):
- return x
-
- f = simple
- f.__name__ = "name"
- f.__doc__ = "doc"
- f.__dict__["member"] = True
-
- g = decorator(f)
-
- if f.__name__ != g.__name__:
- print f.__name__, "!=", g.__name__
-
- if g.__doc__ is None:
- print decorator.__name__, "has no doc string"
- elif not g.__doc__.startswith(f.__doc__):
- print g.__doc__, "didn't start with", f.__doc__
-
- if not ("member" in g.__dict__ and g.__dict__["member"]):
- print "'member' not in ", g.__dict__
-
-
-def deprecated_api(func):
- """
- This is a decorator which can be used to mark functions
- as deprecated. It will result in a warning being emitted
- when the function is used.
-
- >>> validate_decorator(deprecated_api)
- """
-
- @functools.wraps(func)
- def newFunc(*args, **kwargs):
- warnings.warn("Call to deprecated function %s." % func.__name__, category=DeprecationWarning)
- return func(*args, **kwargs)
- _append_docstring(newFunc, "\n@deprecated")
- return newFunc
-
-
-def unstable_api(func):
- """
- This is a decorator which can be used to mark functions
- as deprecated. It will result in a warning being emitted
- when the function is used.
-
- >>> validate_decorator(unstable_api)
- """
-
- @functools.wraps(func)
- def newFunc(*args, **kwargs):
- warnings.warn("Call to unstable API function %s." % func.__name__, category=FutureWarning)
- return func(*args, **kwargs)
- _append_docstring(newFunc, "\n@unstable")
- return newFunc
-
-
-def enabled(func):
- """
- This decorator doesn't add any behavior
-
- >>> validate_decorator(enabled)
- """
- return func
-
-
-def disabled(func):
- """
- This decorator disables the provided function, and does nothing
-
- >>> validate_decorator(disabled)
- """
-
- @functools.wraps(func)
- def emptyFunc(*args, **kargs):
- pass
- _append_docstring(emptyFunc, "\n@note Temporarily Disabled")
- return emptyFunc
-
-
-def metadata(document=True, **kwds):
- """
- >>> validate_decorator(metadata(author="Ed"))
- """
-
- def decorate(func):
- for k, v in kwds.iteritems():
- setattr(func, k, v)
- if document:
- _append_docstring(func, "\n@"+k+" "+v)
- return func
- return decorate
-
-
-def prop(func):
- """Function decorator for defining property attributes
-
- The decorated function is expected to return a dictionary
- containing one or more of the following pairs:
- fget - function for getting attribute value
- fset - function for setting attribute value
- fdel - function for deleting attribute
- This can be conveniently constructed by the locals() builtin
- function; see:
- http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/205183
- @author http://kbyanc.blogspot.com/2007/06/python-property-attribute-tricks.html
-
- Example:
- >>> #Due to transformation from function to property, does not need to be validated
- >>> #validate_decorator(prop)
- >>> class MyExampleClass(object):
- ... @prop
- ... def foo():
- ... "The foo property attribute's doc-string"
- ... def fget(self):
- ... print "GET"
- ... return self._foo
- ... def fset(self, value):
- ... print "SET"
- ... self._foo = value
- ... return locals()
- ...
- >>> me = MyExampleClass()
- >>> me.foo = 10
- SET
- >>> print me.foo
- GET
- 10
- """
- return property(doc=func.__doc__, **func())
-
-
-def print_handler(e):
- """
- @see ExpHandler
- """
- print "%s: %s" % (type(e).__name__, e)
-
-
-def print_ignore(e):
- """
- @see ExpHandler
- """
- print 'Ignoring %s exception: %s' % (type(e).__name__, e)
-
-
-def print_traceback(e):
- """
- @see ExpHandler
- """
- #print sys.exc_info()
- traceback.print_exc(file=sys.stdout)
-
-
-def ExpHandler(handler = print_handler, *exceptions):
- """
- An exception handling idiom using decorators
- Examples
- Specify exceptions in order, first one is handled first
- last one last.
-
- >>> validate_decorator(ExpHandler())
- >>> @ExpHandler(print_ignore, ZeroDivisionError)
- ... @ExpHandler(None, AttributeError, ValueError)
- ... def f1():
- ... 1/0
- >>> @ExpHandler(print_traceback, ZeroDivisionError)
- ... def f2():
- ... 1/0
- >>> @ExpHandler()
- ... def f3(*pargs):
- ... l = pargs
- ... return l[10]
- >>> @ExpHandler(print_traceback, ZeroDivisionError)
- ... def f4():
- ... return 1
- >>>
- >>>
- >>> f1()
- Ignoring ZeroDivisionError exception: integer division or modulo by zero
- >>> f2() # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
- Traceback (most recent call last):
- ...
- ZeroDivisionError: integer division or modulo by zero
- >>> f3()
- IndexError: tuple index out of range
- >>> f4()
- 1
- """
-
- def wrapper(f):
- localExceptions = exceptions
- if not localExceptions:
- localExceptions = [Exception]
- t = [(ex, handler) for ex in localExceptions]
- t.reverse()
-
- def newfunc(t, *args, **kwargs):
- ex, handler = t[0]
- try:
- if len(t) == 1:
- return f(*args, **kwargs)
- else:
- #Recurse for embedded try/excepts
- dec_func = functools.partial(newfunc, t[1:])
- dec_func = functools.update_wrapper(dec_func, f)
- return dec_func(*args, **kwargs)
- except ex, e:
- return handler(e)
-
- dec_func = functools.partial(newfunc, t)
- dec_func = functools.update_wrapper(dec_func, f)
- return dec_func
- return wrapper
-
-
-class bindclass(object):
- """
- >>> validate_decorator(bindclass)
- >>> class Foo(BoundObject):
- ... @bindclass
- ... def foo(this_class, self):
- ... return this_class, self
- ...
- >>> class Bar(Foo):
- ... @bindclass
- ... def bar(this_class, self):
- ... return this_class, self
- ...
- >>> f = Foo()
- >>> b = Bar()
- >>>
- >>> f.foo() # doctest: +ELLIPSIS
- (<class '...Foo'>, <...Foo object at ...>)
- >>> b.foo() # doctest: +ELLIPSIS
- (<class '...Foo'>, <...Bar object at ...>)
- >>> b.bar() # doctest: +ELLIPSIS
- (<class '...Bar'>, <...Bar object at ...>)
- """
-
- def __init__(self, f):
- self.f = f
- self.__name__ = f.__name__
- self.__doc__ = f.__doc__
- self.__dict__.update(f.__dict__)
- self.m = None
-
- def bind(self, cls, attr):
-
- def bound_m(*args, **kwargs):
- return self.f(cls, *args, **kwargs)
- bound_m.__name__ = attr
- self.m = bound_m
-
- def __get__(self, obj, objtype=None):
- return self.m.__get__(obj, objtype)
-
-
-class ClassBindingSupport(type):
- "@see bindclass"
-
- def __init__(mcs, name, bases, attrs):
- type.__init__(mcs, name, bases, attrs)
- for attr, val in attrs.iteritems():
- if isinstance(val, bindclass):
- val.bind(mcs, attr)
-
-
-class BoundObject(object):
- "@see bindclass"
- __metaclass__ = ClassBindingSupport
-
-
-def bindfunction(f):
- """
- >>> validate_decorator(bindfunction)
- >>> @bindfunction
- ... def factorial(thisfunction, n):
- ... # Within this function the name 'thisfunction' refers to the factorial
- ... # function(with only one argument), even after 'factorial' is bound
- ... # to another object
- ... if n > 0:
- ... return n * thisfunction(n - 1)
- ... else:
- ... return 1
- ...
- >>> factorial(3)
- 6
- """
-
- @functools.wraps(f)
- def bound_f(*args, **kwargs):
- return f(bound_f, *args, **kwargs)
- return bound_f
-
-
-class Memoize(object):
- """
- Memoize(fn) - an instance which acts like fn but memoizes its arguments
- Will only work on functions with non-mutable arguments
- @note Source: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/52201
-
- >>> validate_decorator(Memoize)
- """
-
- def __init__(self, fn):
- self.fn = fn
- self.__name__ = fn.__name__
- self.__doc__ = fn.__doc__
- self.__dict__.update(fn.__dict__)
- self.memo = {}
-
- def __call__(self, *args):
- if args not in self.memo:
- self.memo[args] = self.fn(*args)
- return self.memo[args]
-
-
-class MemoizeMutable(object):
- """Memoize(fn) - an instance which acts like fn but memoizes its arguments
- Will work on functions with mutable arguments(slower than Memoize)
- @note Source: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/52201
-
- >>> validate_decorator(MemoizeMutable)
- """
-
- def __init__(self, fn):
- self.fn = fn
- self.__name__ = fn.__name__
- self.__doc__ = fn.__doc__
- self.__dict__.update(fn.__dict__)
- self.memo = {}
-
- def __call__(self, *args, **kw):
- text = cPickle.dumps((args, kw))
- if text not in self.memo:
- self.memo[text] = self.fn(*args, **kw)
- return self.memo[text]
-
-
-callTraceIndentationLevel = 0
-
-
-def call_trace(f):
- """
- Synchronization decorator.
-
- >>> validate_decorator(call_trace)
- >>> @call_trace
- ... def a(a, b, c):
- ... pass
- >>> a(1, 2, c=3)
- Entering a((1, 2), {'c': 3})
- Exiting a((1, 2), {'c': 3})
- """
-
- @functools.wraps(f)
- def verboseTrace(*args, **kw):
- global callTraceIndentationLevel
-
- print "%sEntering %s(%s, %s)" % ("\t"*callTraceIndentationLevel, f.__name__, args, kw)
- callTraceIndentationLevel += 1
- try:
- result = f(*args, **kw)
- except:
- callTraceIndentationLevel -= 1
- print "%sException %s(%s, %s)" % ("\t"*callTraceIndentationLevel, f.__name__, args, kw)
- raise
- callTraceIndentationLevel -= 1
- print "%sExiting %s(%s, %s)" % ("\t"*callTraceIndentationLevel, f.__name__, args, kw)
- return result
-
- @functools.wraps(f)
- def smallTrace(*args, **kw):
- global callTraceIndentationLevel
-
- print "%sEntering %s" % ("\t"*callTraceIndentationLevel, f.__name__)
- callTraceIndentationLevel += 1
- try:
- result = f(*args, **kw)
- except:
- callTraceIndentationLevel -= 1
- print "%sException %s" % ("\t"*callTraceIndentationLevel, f.__name__)
- raise
- callTraceIndentationLevel -= 1
- print "%sExiting %s" % ("\t"*callTraceIndentationLevel, f.__name__)
- return result
-
- #return smallTrace
- return verboseTrace
-
-
-@contextlib.contextmanager
-def lexical_scope(*args):
- """
- @note Source: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/520586
- Example:
- >>> b = 0
- >>> with lexical_scope(1) as (a):
- ... print a
- ...
- 1
- >>> with lexical_scope(1,2,3) as (a,b,c):
- ... print a,b,c
- ...
- 1 2 3
- >>> with lexical_scope():
- ... d = 10
- ... def foo():
- ... pass
- ...
- >>> print b
- 2
- """
-
- frame = inspect.currentframe().f_back.f_back
- saved = frame.f_locals.keys()
- try:
- if not args:
- yield
- elif len(args) == 1:
- yield args[0]
- else:
- yield args
- finally:
- f_locals = frame.f_locals
- for key in (x for x in f_locals.keys() if x not in saved):
- del f_locals[key]
- del frame
+++ /dev/null
-#!/usr/bin/env python
-
-"""
-Example Operators for comparison
->>> class C(object):
-... def __init__(self, x):
-... self.x = x
-...
->>> x, y, z = C(1), C(1), C(2)
->>> x == y, hash(x) == hash(y)
-(False, False)
-"""
-
-
-import operator
-import itertools
-
-
-class KeyedEqualityOperators(object):
- """
- Mixin for auto-implementing comparison operators
- @note Requires inheriting class to implement a '__key__' function
- Example:
- >>> class C(KeyedEqualityOperators):
- ... def __init__(self, x):
- ... self.x = x
- ... def __key__(self):
- ... return self.x
- ...
- >>> x, y, z = C(1), C(1), C(2)
- >>> x == y, hash(x) == hash(y)
- (True, False)
- """
-
- def __init__(self):
- self.__key__ = None
-
- def __eq__(self, other):
- return self.__key__() == other.__key__()
-
- def __ne__(self, other):
- return self.__key__() != other.__key__()
-
-
-class KeyedComparisonOperators(KeyedEqualityOperators):
- """
- Mixin for auto-implementing comparison operators
- @note Requires inheriting class to implement a '__key__' function
- Example:
- >>> class C(KeyedComparisonOperators):
- ... def __init__(self, x):
- ... self.x = x
- ... def __key__(self):
- ... return self.x
- ...
- >>> x, y, z = C(1), C(1), C(2)
- >>> x == y, y < z, hash(x) == hash(y)
- (True, True, False)
- """
-
- def __init__(self):
- self.__key__ = None
-
- def __cmp__(self, other):
- return cmp(self.__key__(), other.__key__())
-
- def __lt__(self, other):
- return self.__key__() < other.__key__()
-
- def __le__(self, other):
- return self.__key__() <= other.__key__()
-
- def __gt__(self, other):
- return self.__key__() > other.__key__()
-
- def __ge__(self, other):
- return self.__key__() >= other.__key__()
-
-
-class KeyedHashing(object):
- """
- Mixin for auto-implementing comparison operators
- @note Requires inheriting class to implement a '__key__' function
- Example:
- >>> class C(KeyedHashing):
- ... def __init__(self, x):
- ... self.x = x
- ... def __key__(self):
- ... return self.x
- ...
- >>> x, y, z = C(1), C(1), C(2)
- >>> x == y, hash(x) == hash(y)
- (False, True)
- """
-
- def __init__(self):
- self.__key__ = None
-
- def __hash__(self):
- return hash(self.__key__())
-
-
-class NotEqualOperator(object):
- """
- Mixin for auto-implementing comparison operators
- @note Requires inheriting class to implement '__eq__' function
- """
-
- def __ne__(self, other):
- return not (self == other)
-
-
-class ComparisonOperators(NotEqualOperator):
- """
- Mixin for auto-implementing comparison operators
- @note Requires inheriting class to implement '__lt__' function
- """
-
- def __le__(self, other):
- return(self < other) or(self == other)
-
- def __gt__(self, other):
- return not(self <= other)
-
- def __ge__(self, other):
- return not(self < other)
-
-
-class infix(object):
- """
- Recipe #384122
- http://code.activestate.com/recipes/384122/
-
- >>> import operator
- >>> x = infix(operator.mul)
- >>> 1 |x| 2 |x| 10
- 20
- """
-
- def __init__(self, func):
- self.__name__ = func.__name__
- self.__doc__ = func.__doc__
- try:
- self.__dict__.update(func.__dict__)
- except AttributeError:
- pass
- self.function = func
-
- def __ror__(self, other):
- return infix(lambda x: self.function(other, x))
-
- def __or__(self, other):
- return self.function(other)
-
- def __call__(self, lhs, rhs):
- return self.function(lhs, rhs)
-
-
-class Just(object):
- """
- @see mreturn
- """
-
- def __init__(self, value):
- self.value = value
-
-
-@infix
-def mbind(maybe, func):
- """
- @see mreturn
- """
- if maybe is None:
- return None
- else:
- return func(maybe.value)
-
-
-def mreturn(value):
- """
- >>> class Sheep(object):
- ... def __init__(self, name):
- ... self.name = name
- ... self.mother = None
- ... self.father = None
- ...
- >>> def father(sheep):
- ... if sheep.father is None:
- ... return None
- ... else:
- ... return Just(sheep.father)
- ...
- >>> def mother(sheep):
- ... if sheep.mother is None:
- ... return None
- ... else:
- ... return Just(sheep.mother)
- ...
- >>> def mothersFather(sheep):
- ... return mreturn(sheep) |mbind| mother |mbind| father
- ...
- >>> def mothersPaternalGrandfather(sheep):
- ... return mreturn(sheep) |mbind| mother |mbind| father |mbind| father
- ...
- >>> shawn = Sheep("Shawn")
- >>> gertrude = Sheep("Gertrude")
- >>> ernie = Sheep("Ernie")
- >>> frank = Sheep("Frank")
- >>>
- >>> shawn.mother = gertrude
- >>> gertrude.father = ernie
- >>> ernie.father = frank
- >>>
- >>> print mothersFather(shawn).value.name
- Ernie
- >>> print mothersPaternalGrandfather(shawn).value.name
- Frank
- >>> print mothersPaternalGrandfather(ernie)
- None
- """
- return Just(value)
-
-
-def xor(*args):
- truth = itertools.imap(operator.truth, args)
- return reduce(operator.xor, truth)
-
-
-def equiv(*args):
- truth = itertools.imap(operator.truth, args)
- return reduce(lambda a, b: not operator.xor(a, b), truth)
+++ /dev/null
-#!/usr/bin/env python
-import new
-
-# Make the environment more like Python 3.0
-__metaclass__ = type
-from itertools import izip as zip
-import textwrap
-import inspect
-
-
-__all__ = [
- "AnyType",
- "overloaded"
-]
-
-
-AnyType = object
-
-
-class overloaded:
- """
- Dynamically overloaded functions.
-
- This is an implementation of (dynamically, or run-time) overloaded
- functions; also known as generic functions or multi-methods.
-
- The dispatch algorithm uses the types of all argument for dispatch,
- similar to (compile-time) overloaded functions or methods in C++ and
- Java.
-
- Most of the complexity in the algorithm comes from the need to support
- subclasses in call signatures. For example, if an function is
- registered for a signature (T1, T2), then a call with a signature (S1,
- S2) is acceptable, assuming that S1 is a subclass of T1, S2 a subclass
- of T2, and there are no other more specific matches (see below).
-
- If there are multiple matches and one of those doesn't *dominate* all
- others, the match is deemed ambiguous and an exception is raised. A
- subtlety here: if, after removing the dominated matches, there are
- still multiple matches left, but they all map to the same function,
- then the match is not deemed ambiguous and that function is used.
- Read the method find_func() below for details.
-
- @note Python 2.5 is required due to the use of predicates any() and all().
- @note only supports positional arguments
-
- @author http://www.artima.com/weblogs/viewpost.jsp?thread=155514
-
- >>> import misc
- >>> misc.validate_decorator (overloaded)
- >>>
- >>>
- >>>
- >>>
- >>> #################
- >>> #Basics, with reusing names and without
- >>> @overloaded
- ... def foo(x):
- ... "prints x"
- ... print x
- ...
- >>> @foo.register(int)
- ... def foo(x):
- ... "prints the hex representation of x"
- ... print hex(x)
- ...
- >>> from types import DictType
- >>> @foo.register(DictType)
- ... def foo_dict(x):
- ... "prints the keys of x"
- ... print [k for k in x.iterkeys()]
- ...
- >>> #combines all of the doc strings to help keep track of the specializations
- >>> foo.__doc__ # doctest: +ELLIPSIS
- "prints x\\n\\n...overloading.foo (<type 'int'>):\\n\\tprints the hex representation of x\\n\\n...overloading.foo_dict (<type 'dict'>):\\n\\tprints the keys of x"
- >>> foo ("text")
- text
- >>> foo (10) #calling the specialized foo
- 0xa
- >>> foo ({3:5, 6:7}) #calling the specialization foo_dict
- [3, 6]
- >>> foo_dict ({3:5, 6:7}) #with using a unique name, you still have the option of calling the function directly
- [3, 6]
- >>>
- >>>
- >>>
- >>>
- >>> #################
- >>> #Multiple arguments, accessing the default, and function finding
- >>> @overloaded
- ... def two_arg (x, y):
- ... print x,y
- ...
- >>> @two_arg.register(int, int)
- ... def two_arg_int_int (x, y):
- ... print hex(x), hex(y)
- ...
- >>> @two_arg.register(float, int)
- ... def two_arg_float_int (x, y):
- ... print x, hex(y)
- ...
- >>> @two_arg.register(int, float)
- ... def two_arg_int_float (x, y):
- ... print hex(x), y
- ...
- >>> two_arg.__doc__ # doctest: +ELLIPSIS
- "...overloading.two_arg_int_int (<type 'int'>, <type 'int'>):\\n\\n...overloading.two_arg_float_int (<type 'float'>, <type 'int'>):\\n\\n...overloading.two_arg_int_float (<type 'int'>, <type 'float'>):"
- >>> two_arg(9, 10)
- 0x9 0xa
- >>> two_arg(9.0, 10)
- 9.0 0xa
- >>> two_arg(15, 16.0)
- 0xf 16.0
- >>> two_arg.default_func(9, 10)
- 9 10
- >>> two_arg.find_func ((int, float)) == two_arg_int_float
- True
- >>> (int, float) in two_arg
- True
- >>> (str, int) in two_arg
- False
- >>>
- >>>
- >>>
- >>> #################
- >>> #wildcard
- >>> @two_arg.register(AnyType, str)
- ... def two_arg_any_str (x, y):
- ... print x, y.lower()
- ...
- >>> two_arg("Hello", "World")
- Hello world
- >>> two_arg(500, "World")
- 500 world
- """
-
- def __init__(self, default_func):
- # Decorator to declare new overloaded function.
- self.registry = {}
- self.cache = {}
- self.default_func = default_func
- self.__name__ = self.default_func.__name__
- self.__doc__ = self.default_func.__doc__
- self.__dict__.update (self.default_func.__dict__)
-
- def __get__(self, obj, type=None):
- if obj is None:
- return self
- return new.instancemethod(self, obj)
-
- def register(self, *types):
- """
- Decorator to register an implementation for a specific set of types.
-
- .register(t1, t2)(f) is equivalent to .register_func((t1, t2), f).
- """
-
- def helper(func):
- self.register_func(types, func)
-
- originalDoc = self.__doc__ if self.__doc__ is not None else ""
- typeNames = ", ".join ([str(type) for type in types])
- typeNames = "".join ([func.__module__+".", func.__name__, " (", typeNames, "):"])
- overloadedDoc = ""
- if func.__doc__ is not None:
- overloadedDoc = textwrap.fill (func.__doc__, width=60, initial_indent="\t", subsequent_indent="\t")
- self.__doc__ = "\n".join ([originalDoc, "", typeNames, overloadedDoc]).strip()
-
- new_func = func
-
- #Masking the function, so we want to take on its traits
- if func.__name__ == self.__name__:
- self.__dict__.update (func.__dict__)
- new_func = self
- return new_func
-
- return helper
-
- def register_func(self, types, func):
- """Helper to register an implementation."""
- self.registry[tuple(types)] = func
- self.cache = {} # Clear the cache (later we can optimize this).
-
- def __call__(self, *args):
- """Call the overloaded function."""
- types = tuple(map(type, args))
- func = self.cache.get(types)
- if func is None:
- self.cache[types] = func = self.find_func(types)
- return func(*args)
-
- def __contains__ (self, types):
- return self.find_func(types) is not self.default_func
-
- def find_func(self, types):
- """Find the appropriate overloaded function; don't call it.
-
- @note This won't work for old-style classes or classes without __mro__
- """
- func = self.registry.get(types)
- if func is not None:
- # Easy case -- direct hit in registry.
- return func
-
- # Phillip Eby suggests to use issubclass() instead of __mro__.
- # There are advantages and disadvantages.
-
- # I can't help myself -- this is going to be intense functional code.
- # Find all possible candidate signatures.
- mros = tuple(inspect.getmro(t) for t in types)
- n = len(mros)
- candidates = [sig for sig in self.registry
- if len(sig) == n and
- all(t in mro for t, mro in zip(sig, mros))]
-
- if not candidates:
- # No match at all -- use the default function.
- return self.default_func
- elif len(candidates) == 1:
- # Unique match -- that's an easy case.
- return self.registry[candidates[0]]
-
- # More than one match -- weed out the subordinate ones.
-
- def dominates(dom, sub,
- orders=tuple(dict((t, i) for i, t in enumerate(mro))
- for mro in mros)):
- # Predicate to decide whether dom strictly dominates sub.
- # Strict domination is defined as domination without equality.
- # The arguments dom and sub are type tuples of equal length.
- # The orders argument is a precomputed auxiliary data structure
- # giving dicts of ordering information corresponding to the
- # positions in the type tuples.
- # A type d dominates a type s iff order[d] <= order[s].
- # A type tuple (d1, d2, ...) dominates a type tuple of equal length
- # (s1, s2, ...) iff d1 dominates s1, d2 dominates s2, etc.
- if dom is sub:
- return False
- return all(order[d] <= order[s] for d, s, order in zip(dom, sub, orders))
-
- # I suppose I could inline dominates() but it wouldn't get any clearer.
- candidates = [cand
- for cand in candidates
- if not any(dominates(dom, cand) for dom in candidates)]
- if len(candidates) == 1:
- # There's exactly one candidate left.
- return self.registry[candidates[0]]
-
- # Perhaps these multiple candidates all have the same implementation?
- funcs = set(self.registry[cand] for cand in candidates)
- if len(funcs) == 1:
- return funcs.pop()
-
- # No, the situation is irreducibly ambiguous.
- raise TypeError("ambigous call; types=%r; candidates=%r" %
- (types, candidates))
+++ /dev/null
-#!/usr/bin/env python
-
-
-from __future__ import with_statement
-
-import inspect
-import contextlib
-import functools
-
-
-def TODO(func):
- """
- unittest test method decorator that ignores
- exceptions raised by test
-
- Used to annotate test methods for code that may
- not be written yet. Ignores failures in the
- annotated test method; fails if the text
- unexpectedly succeeds.
- !author http://kbyanc.blogspot.com/2007/06/pythons-unittest-module-aint-that-bad.html
-
- Example:
- >>> import unittest
- >>> class ExampleTestCase(unittest.TestCase):
- ... @TODO
- ... def testToDo(self):
- ... MyModule.DoesNotExistYet('boo')
- ...
- """
-
- @functools.wraps(func)
- def wrapper(*args, **kw):
- try:
- func(*args, **kw)
- succeeded = True
- except:
- succeeded = False
- assert succeeded is False, \
- "%s marked TODO but passed" % func.__name__
- return wrapper
-
-
-def PlatformSpecific(platformList):
- """
- unittest test method decorator that only
- runs test method if os.name is in the
- given list of platforms
- !author http://kbyanc.blogspot.com/2007/06/pythons-unittest-module-aint-that-bad.html
- Example:
- >>> import unittest
- >>> class ExampleTestCase(unittest.TestCase):
- ... @PlatformSpecific(('mac', ))
- ... def testMacOnly(self):
- ... MyModule.SomeMacSpecificFunction()
- ...
- """
-
- def decorator(func):
- import os
-
- @functools.wraps(func)
- def wrapper(*args, **kw):
- if os.name in platformList:
- return func(*args, **kw)
- return wrapper
- return decorator
-
-
-def CheckReferences(func):
- """
- !author http://kbyanc.blogspot.com/2007/06/pythons-unittest-module-aint-that-bad.html
- """
-
- @functools.wraps(func)
- def wrapper(*args, **kw):
- refCounts = []
- for i in range(5):
- func(*args, **kw)
- refCounts.append(XXXGetRefCount())
- assert min(refCounts) != max(refCounts), "Reference counts changed - %r" % refCounts
-
- return wrapper
-
-
-@contextlib.contextmanager
-def expected(exception):
- """
- >>> with expected2(ZeroDivisionError):
- ... 1 / 0
- >>> with expected2(AssertionError("expected ZeroDivisionError to have been thrown")):
- ... with expected(ZeroDivisionError):
- ... 1 / 2
- Traceback (most recent call last):
- File "/usr/lib/python2.5/doctest.py", line 1228, in __run
- compileflags, 1) in test.globs
- File "<doctest libraries.recipes.context.expected[1]>", line 3, in <module>
- 1 / 2
- File "/media/data/Personal/Development/bzr/Recollection-trunk/src/libraries/recipes/context.py", line 139, in __exit__
- assert t is not None, ("expected {0:%s} to have been thrown" % (self._t.__name__))
- AssertionError: expected {0:ZeroDivisionError} to have been thrown
- >>> with expected2(Exception("foo")):
- ... raise Exception("foo")
- >>> with expected2(Exception("bar")):
- ... with expected(Exception("foo")): # this won't catch it
- ... raise Exception("bar")
- ... assert False, "should not see me"
- >>> with expected2(Exception("can specify")):
- ... raise Exception("can specify prefixes")
- >>> with expected2(Exception("Base class fun")):
- True
- >>> True
- False
- """
- if isinstance(exception, Exception):
- excType, excValue = type(exception), str(exception)
- elif isinstance(exception, type):
- excType, excValue = exception, ""
-
- try:
- yield
- except Exception, e:
- if not (excType in inspect.getmro(type(e)) and str(e).startswith(excValue)):
- raise
- else:
- raise AssertionError("expected {0:%s} to have been thrown" % excType.__name__)
-
-
-if __name__ == "__main__":
- import doctest
- doctest.testmod()
+++ /dev/null
-#!/usr/bin/env python
-
-
-from __future__ import with_statement
-
-
-def flatten(elem, includeTail = False):
- """
- Recursively extract text content.
-
- @note To get rid of all subelements to a given element, and keep just the text, you can do:
- elem.text = flatten(elem); del elem[:]
- """
- text = elem.text or ""
- for e in elem:
- text += flatten(e)
- if includeTail and e.tail:
- text += e.tail
- return text
-
-
-def append(elem, item):
- """
- Universal append to an Element
- @param elem ElementTree.Element
- @param item Either None, Str/Unicode, or ElementTree.Element
- """
- if item is None:
- return
-
- if isinstance(item, basestring):
- if len(elem):
- elem[-1].tail = (elem[-1].tail or "") + item
- else:
- elem.text = (elem.text or "") + item
- else:
- elem.append(item)
-
-
-def indent(elem, level=0, indentation=" "):
- """
- Add indentation to the data of in an ElementTree
-
- >>> from xml.etree import ElementTree
- >>> xmlRoot = ElementTree.fromstring("<xml><tree><bird /></tree></xml>")
- >>> indent(xmlRoot)
- >>> ElementTree.dump(xmlRoot)
- <xml>
- <tree>
- <bird />
- </tree>
- </xml>
- """
-
- i = "\n" + level*indentation
- if len(elem):
- if not elem.text or not elem.text.strip():
- elem.text = i + indentation
- for e in elem:
- indent(e, level+1, indentation)
- if not e.tail or not e.tail.strip():
- e.tail = i + indentation
- if not e.tail or not e.tail.strip():
- e.tail = i
- else:
- if level and (not elem.tail or not elem.tail.strip()):
- elem.tail = i
-
-
-if __name__ == "__main__":
- import sys
- from xml.etree import ElementTree
- if len(sys.argv) == 3:
- xml = ElementTree.parse(sys.argv[1])
- indent(xml.getroot())
- with open(sys.argv[2], "w") as source:
- xml.write(source)
- elif len(sys.argv) == 1:
- xml = ElementTree.parse(sys.stdin)
- indent(xml.getroot())
- xml.write(sys.stdout)
import functools
import decimal
-from libraries.recipes import overloading
-from libraries.recipes import algorithms
+from util import overloading
+from util import algorithms
@overloading.overloaded
import ConfigParser
from libraries import gtkpieboard
-from libraries.recipes import io
+from util import io
import operation