X-Git-Url: http://git.maemo.org/git/?p=theonering;a=blobdiff_plain;f=src%2Fgvoice%2Fstate_machine.py;h=6d184d07412323c8d71fc3edf5c473433915202b;hp=8e81cb71dea5778e136e24b4fab390f18ea51bbb;hb=074b7adedaeba92d5993f8015be26c684f8bbfd8;hpb=bc433598796afa36b5d75b7c40af19abdb573f67 diff --git a/src/gvoice/state_machine.py b/src/gvoice/state_machine.py index 8e81cb7..6d184d0 100644 --- a/src/gvoice/state_machine.py +++ b/src/gvoice/state_machine.py @@ -1,155 +1,343 @@ #!/usr/bin/env python -""" -@todo Look into switching from POLL_TIME = min(F * 2^n, MAX) to POLL_TIME = min(CONST + F * 2^n, MAX) -@todo Look into supporting more states that have a different F and MAX -""" - -import Queue -import threading import logging -import gobject - -import util.algorithms as algorithms +import util.go_utils as gobject_utils import util.coroutines as coroutines +import util.misc as misc_utils -_moduleLogger = logging.getLogger("gvoice.state_machine") +_moduleLogger = logging.getLogger(__name__) -def _to_milliseconds(**kwd): +def to_milliseconds(**kwd): if "milliseconds" in kwd: return kwd["milliseconds"] elif "seconds" in kwd: return kwd["seconds"] * 1000 elif "minutes" in kwd: return kwd["minutes"] * 1000 * 60 + elif "hours" in kwd: + return kwd["hours"] * 1000 * 60 * 60 raise KeyError("Unknown arg: %r" % kwd) +def to_seconds(**kwd): + if "milliseconds" in kwd: + return kwd["milliseconds"] / 1000 + elif "seconds" in kwd: + return kwd["seconds"] + elif "minutes" in kwd: + return kwd["minutes"] * 60 + elif "hours" in kwd: + return kwd["hours"] * 60 * 60 + raise KeyError("Unknown arg: %r" % kwd) + + +class NopStateStrategy(object): + + def __init__(self): + pass + + def initialize_state(self): + pass + + def reinitialize_state(self): + pass + + def increment_state(self): + pass + + @property + def timeout(self): + return UpdateStateMachine.INFINITE_PERIOD + + def __repr__(self): + return "NopStateStrategy()" + + +class ConstantStateStrategy(object): + + def __init__(self, timeout): + assert 0 < timeout or timeout == UpdateStateMachine.INFINITE_PERIOD + self._timeout = timeout + + def initialize_state(self): + pass + + def reinitialize_state(self): + pass + + def increment_state(self): + pass + + @property + def timeout(self): + return self._timeout + + def __repr__(self): + return "ConstantStateStrategy(timeout=%r)" % self._timeout + + +class NTimesStateStrategy(object): + + def __init__(self, timeouts, postTimeout): + assert 0 < len(timeouts) + for timeout in timeouts: + assert 0 < timeout or timeout == UpdateStateMachine.INFINITE_PERIOD + assert 0 < postTimeout or postTimeout == UpdateStateMachine.INFINITE_PERIOD + self._timeouts = timeouts + self._postTimeout = postTimeout + + self._attemptCount = 0 + + def initialize_state(self): + self._attemptCount = len(self._timeouts) + + def reinitialize_state(self): + self._attemptCount = 0 + + def increment_state(self): + self._attemptCount += 1 + + @property + def timeout(self): + try: + return self._timeouts[self._attemptCount] + except IndexError: + return self._postTimeout + + def __str__(self): + return "NTimesStateStrategy(timeout=%r)" % ( + self.timeout, + ) + + def __repr__(self): + return "NTimesStateStrategy(timeouts=%r, postTimeout=%r)" % ( + self._timeouts, + self._postTimeout, + ) + + +class GeometricStateStrategy(object): + + def __init__(self, init, min, max): + assert 0 < init and init < max or init == UpdateStateMachine.INFINITE_PERIOD + assert 0 < min or min == UpdateStateMachine.INFINITE_PERIOD + assert min < max or max == UpdateStateMachine.INFINITE_PERIOD + self._min = min + self._max = max + self._init = init + self._current = 0 + + def initialize_state(self): + self._current = self._max + + def reinitialize_state(self): + self._current = self._min + + def increment_state(self): + if self._current == UpdateStateMachine.INFINITE_PERIOD: + pass + if self._init == UpdateStateMachine.INFINITE_PERIOD: + self._current = UpdateStateMachine.INFINITE_PERIOD + elif self._max == UpdateStateMachine.INFINITE_PERIOD: + self._current *= 2 + else: + self._current = min(2 * self._current, self._max - self._init) + + @property + def timeout(self): + if UpdateStateMachine.INFINITE_PERIOD in (self._init, self._current): + timeout = UpdateStateMachine.INFINITE_PERIOD + else: + timeout = self._init + self._current + return timeout + + def __str__(self): + return "GeometricStateStrategy(timeout=%r)" % ( + self.timeout + ) + + def __repr__(self): + return "GeometricStateStrategy(init=%r, min=%r, max=%r)" % ( + self._init, self._min, self._max + ) + + class StateMachine(object): - STATE_ACTIVE = "active" - STATE_IDLE = "idle" - STATE_DND = "dnd" + STATE_ACTIVE = 0, "active" + STATE_IDLE = 1, "idle" + STATE_DND = 2, "dnd" - _ACTION_UPDATE = "update" - _ACTION_RESET = "reset" - _ACTION_STOP = "stop" + def start(self): + raise NotImplementedError("Abstract") - _INITIAL_ACTIVE_PERIOD = int(_to_milliseconds(seconds=5)) - _FINAL_ACTIVE_PERIOD = int(_to_milliseconds(minutes=2)) - _IDLE_PERIOD = int(_to_milliseconds(minutes=10)) - _INFINITE_PERIOD = -1 + def stop(self): + raise NotImplementedError("Abstract") - _IS_DAEMON = True + def close(self): + raise NotImplementedError("Abstract") - def __init__(self, initItems, updateItems): - self._initItems = initItems - self._updateItems = updateItems + def set_state(self, state): + raise NotImplementedError("Abstract") + + @property + def state(self): + raise NotImplementedError("Abstract") + + +class MasterStateMachine(StateMachine): - self._actions = Queue.Queue() + def __init__(self): + self._machines = [] self._state = self.STATE_ACTIVE - self._timeoutId = None - self._thread = None - self._currentPeriod = self._INITIAL_ACTIVE_PERIOD - self._set_initial_period() + + def append_machine(self, machine): + self._machines.append(machine) def start(self): - assert self._thread is None - self._thread = threading.Thread(target=self._run) - self._thread.setDaemon(self._IS_DAEMON) - self._thread.start() + # Confirm we are all on the same page + for machine in self._machines: + machine.set_state(self._state) + for machine in self._machines: + machine.start() def stop(self): - if self._thread is not None: - self._actions.put(self._ACTION_STOP) - self._thread = None - else: - _moduleLogger.info("Stopping an already stopped state machine") + for machine in self._machines: + machine.stop() + + def close(self): + for machine in self._machines: + machine.close() def set_state(self, state): self._state = state - self.reset_timers() + for machine in self._machines: + machine.set_state(state) - def get_state(self): + @property + def state(self): return self._state - def reset_timers(self): - self._actions.put(self._ACTION_RESET) - @coroutines.func_sink - def request_reset_timers(self, args): - self.reset_timers() +class UpdateStateMachine(StateMachine): + # Making sure the it is initialized is finicky, be careful - def _run(self): - logging.basicConfig(level=logging.DEBUG) - _moduleLogger.info("Starting State Machine") - for item in self._initItems: - try: - item.update() - except Exception: - _moduleLogger.exception("Initial update failed for %r" % item) + INFINITE_PERIOD = -1 + DEFAULT_MAX_TIMEOUT = to_seconds(hours=24) - # empty the task queue - actions = list(algorithms.itr_available(self._actions, initiallyBlock = False)) - self._schedule_update() - if len(self._updateItems) == 0: - self.stop() - - while True: - # block till we get a task, or get all the tasks if we were late - actions = list(algorithms.itr_available(self._actions, initiallyBlock = True)) - - if self._ACTION_STOP in actions: - _moduleLogger.info("Requested to stop") - self._stop_update() - break - elif self._ACTION_RESET in actions: - _moduleLogger.info("Reseting timers") - self._reset_timers() - elif self._ACTION_UPDATE in actions: - _moduleLogger.info("Update") - for item in self._updateItems: - try: - item.update(force=True) - except Exception: - _moduleLogger.exception("Update failed for %r" % item) - self._schedule_update() - - def _set_initial_period(self): - self._currentPeriod = self._INITIAL_ACTIVE_PERIOD / 2 # We will double it later - - def _reset_timers(self): - self._stop_update() - self._set_initial_period() + _IS_DAEMON = True + + def __init__(self, updateItems, name="", maxTime = DEFAULT_MAX_TIMEOUT): + self._name = name + self._updateItems = updateItems + self._maxTime = maxTime + self._isActive = False + + self._state = self.STATE_ACTIVE + self._onTimeout = gobject_utils.Timeout(self._on_timeout) + + self._strategies = {} + self._callback = coroutines.func_sink( + coroutines.expand_positional( + self._request_reset_timers + ) + ) + + def __str__(self): + return """UpdateStateMachine( + name=%r, + strategie=%s, + isActive=%r, + isPolling=%r, +)""" % (self._name, self._strategy, self._isActive, self._onTimeout.is_running()) + + def __repr__(self): + return """UpdateStateMachine( + name=%r, + strategie=%r, +)""" % (self._name, self._strategies) + + def set_state_strategy(self, state, strategy): + self._strategies[state] = strategy + + def start(self): + for strategy in self._strategies.itervalues(): + strategy.initialize_state() + if self._strategy.timeout != self.INFINITE_PERIOD: + self._onTimeout.start(seconds=0) + self._isActive = True + _moduleLogger.info("%s Starting State Machine" % (self._name, )) + + def stop(self): + _moduleLogger.info("%s Stopping State Machine" % (self._name, )) + self._isActive = False + self._onTimeout.cancel() + + def close(self): + self._onTimeout.cancel() + self._callback = None + + def set_state(self, newState): + if self._state == newState: + return + oldState = self._state + _moduleLogger.info("%s Transitioning from %s to %s" % (self._name, oldState, newState)) + + self._state = newState + self._reset_timers(initialize=True) + + @property + def state(self): + return self._state + + def reset_timers(self): + self._reset_timers() + + @property + def request_reset_timers(self): + return self._callback + + @property + def _strategy(self): + return self._strategies[self._state] + + @property + def maxTime(self): + return self._maxTime + + @misc_utils.log_exception(_moduleLogger) + def _request_reset_timers(self, *args): + self._reset_timers() + + def _reset_timers(self, initialize=False): + if not self._isActive: + return # not started yet + _moduleLogger.info("%s Resetting State Machine" % (self._name, )) + self._onTimeout.cancel() + if initialize: + self._strategy.initialize_state() + else: + self._strategy.reinitialize_state() self._schedule_update() def _schedule_update(self): - nextTimeout = self._calculate_step(self._state, self._currentPeriod) - nextTimeout = int(nextTimeout) - if nextTimeout != self._INFINITE_PERIOD: - self._timeoutId = gobject.timeout_add(nextTimeout, self._on_timeout) - self._currentPeriod = nextTimeout - - def _stop_update(self): - if self._timeoutId is None: - return - gobject.source_remove(self._timeoutId) - self._timeoutId = None + self._strategy.increment_state() + nextTimeout = self._strategy.timeout + if nextTimeout != self.INFINITE_PERIOD and nextTimeout < self._maxTime: + assert 0 < nextTimeout + self._onTimeout.start(seconds=nextTimeout) + _moduleLogger.info("%s Next update in %s seconds" % (self._name, nextTimeout, )) + else: + _moduleLogger.info("%s No further updates (timeout is %s seconds)" % (self._name, nextTimeout, )) + @misc_utils.log_exception(_moduleLogger) def _on_timeout(self): - self._actions.put(self._ACTION_UPDATE) - return False # do not continue - - @classmethod - def _calculate_step(cls, state, period): - if state == cls.STATE_ACTIVE: - return min(period * 2, cls._FINAL_ACTIVE_PERIOD) - elif state == cls.STATE_IDLE: - return cls._IDLE_PERIOD - elif state == cls.STATE_DND: - return cls._INFINITE_PERIOD - else: - raise RuntimeError("Unknown state: %r" % (state, )) + self._schedule_update() + for item in self._updateItems: + try: + item.update(force=True) + except Exception: + _moduleLogger.exception("Update failed for %r" % item)