Adding a threshold where if a timeout goes above it, we just set it to infinite
[theonering] / src / gvoice / state_machine.py
index 09ed28b..ba7ce8b 100644 (file)
 #!/usr/bin/env python
 
-import Queue
-import threading
 import logging
 
 import gobject
 
-import util.algorithms as algorithms
+import util.go_utils as gobject_utils
 import util.coroutines as coroutines
+import gtk_toolbox
+
 
 _moduleLogger = logging.getLogger("gvoice.state_machine")
 
 
-def _to_milliseconds(**kwd):
+def to_milliseconds(**kwd):
        if "milliseconds" in kwd:
                return kwd["milliseconds"]
        elif "seconds" in kwd:
                return kwd["seconds"] * 1000
        elif "minutes" in kwd:
                return kwd["minutes"] * 1000 * 60
+       elif "hours" in kwd:
+               return kwd["hours"] * 1000 * 60 * 60
+       raise KeyError("Unknown arg: %r" % kwd)
+
+
+def to_seconds(**kwd):
+       if "milliseconds" in kwd:
+               return kwd["milliseconds"] / 1000
+       elif "seconds" in kwd:
+               return kwd["seconds"]
+       elif "minutes" in kwd:
+               return kwd["minutes"] * 60
+       elif "hours" in kwd:
+               return kwd["hours"] * 60 * 60
        raise KeyError("Unknown arg: %r" % kwd)
 
 
+class NopStateStrategy(object):
+
+       def __init__(self):
+               pass
+
+       def initialize_state(self):
+               pass
+
+       def increment_state(self):
+               pass
+
+       @property
+       def timeout(self):
+               return UpdateStateMachine.INFINITE_PERIOD
+
+
+class ConstantStateStrategy(object):
+
+       def __init__(self, timeout):
+               assert 0 < timeout or timeout == UpdateStateMachine.INFINITE_PERIOD
+               self._timeout = timeout
+
+       def initialize_state(self):
+               pass
+
+       def increment_state(self):
+               pass
+
+       @property
+       def timeout(self):
+               return self._timeout
+
+
+class GeometricStateStrategy(object):
+
+       def __init__(self, init, min, max):
+               assert 0 < init and init < max and init != UpdateStateMachine.INFINITE_PERIOD
+               assert 0 < min and min != UpdateStateMachine.INFINITE_PERIOD
+               assert min < max or max == UpdateStateMachine.INFINITE_PERIOD
+               self._min = min
+               self._max = max
+               self._init = init
+               self._current = 0
+
+       def initialize_state(self):
+               self._current = self._min
+
+       def increment_state(self):
+               if self._max == UpdateStateMachine.INFINITE_PERIOD:
+                       self._current *= 2
+               else:
+                       self._current = min(2 * self._current, self._max - self._init)
+
+       @property
+       def timeout(self):
+               timeout = self._init + self._current
+               return timeout
+
+
 class StateMachine(object):
 
-       STATE_ACTIVE = "active"
-       STATE_IDLE = "idle"
-       STATE_DND = "dnd"
+       STATE_ACTIVE = 0, "active"
+       STATE_IDLE = 1, "idle"
+       STATE_DND = 2, "dnd"
+
+       def start(self):
+               raise NotImplementedError("Abstract")
+
+       def stop(self):
+               raise NotImplementedError("Abstract")
+
+       def close(self):
+               raise NotImplementedError("Abstract")
+
+       def set_state(self, state):
+               raise NotImplementedError("Abstract")
 
-       _ACTION_UPDATE = "update"
-       _ACTION_RESET = "reset"
-       _ACTION_STOP = "stop"
+       @property
+       def state(self):
+               raise NotImplementedError("Abstract")
+
+
+class MasterStateMachine(StateMachine):
+
+       def __init__(self):
+               self._machines = []
+               self._state = self.STATE_ACTIVE
 
-       _INITIAL_ACTIVE_PERIOD = int(_to_milliseconds(seconds=5))
-       _FINAL_ACTIVE_PERIOD = int(_to_milliseconds(minutes=2))
-       _IDLE_PERIOD = int(_to_milliseconds(minutes=10))
-       _INFINITE_PERIOD = -1
+       def append_machine(self, machine):
+               self._machines.append(machine)
+
+       def start(self):
+               # Confirm we are all on the same page
+               for machine in self._machines:
+                       machine.set_state(self._state)
+               for machine in self._machines:
+                       machine.start()
+
+       def stop(self):
+               for machine in self._machines:
+                       machine.stop()
+
+       def close(self):
+               for machine in self._machines:
+                       machine.close()
+
+       def set_state(self, state):
+               self._state = state
+               for machine in self._machines:
+                       machine.set_state(state)
+
+       @property
+       def state(self):
+               return self._state
+
+
+class UpdateStateMachine(StateMachine):
+       # Making sure the it is initialized is finicky, be careful
+
+       INFINITE_PERIOD = -1
+       DEFAULT_MAX_TIMEOUT = to_seconds(hours=24)
 
        _IS_DAEMON = True
 
-       def __init__(self, initItems, updateItems):
-               self._initItems = initItems
+       def __init__(self, updateItems, name="", maxTime = DEFAULT_MAX_TIMEOUT):
+               self._name = name
                self._updateItems = updateItems
+               self._maxTime = maxTime
 
-               self._actions = Queue.Queue()
                self._state = self.STATE_ACTIVE
                self._timeoutId = None
-               self._thread = None
-               self._currentPeriod = self._INITIAL_ACTIVE_PERIOD
-               self._set_initial_period()
+
+               self._strategies = {}
+               self._callback = coroutines.func_sink(
+                       coroutines.expand_positional(
+                               self._request_reset_timers
+                       )
+               )
+
+       def set_state_strategy(self, state, strategy):
+               self._strategies[state] = strategy
 
        def start(self):
-               assert self._thread is None
-               self._thread = threading.Thread(target=self._run)
-               self._thread.setDaemon(self._IS_DAEMON)
-               self._thread.start()
+               assert self._timeoutId is None
+               for strategy in self._strategies.itervalues():
+                       strategy.initialize_state()
+               if self._strategy.timeout != self.INFINITE_PERIOD:
+                       self._timeoutId = gobject.idle_add(self._on_timeout)
+               _moduleLogger.info("%s Starting State Machine" % (self._name, ))
 
        def stop(self):
-               if self._thread is not None:
-                       self._actions.put(self._ACTION_STOP)
-                       self._thread = None
-               else:
-                       _moduleLogger.info("Stopping an already stopped state machine")
+               _moduleLogger.info("%s Stopping State Machine" % (self._name, ))
+               self._stop_update()
 
-       def set_state(self, state):
-               self._state = state
-               self.reset_timers()
+       def close(self):
+               assert self._timeoutId is None
+               self._callback = None
+
+       def set_state(self, newState):
+               if self._state == newState:
+                       return
+               oldState = self._state
+               _moduleLogger.info("%s Transitioning from %s to %s" % (self._name, oldState, newState))
 
-       def get_state(self):
+               self._state = newState
+               self._reset_timers()
+
+       @property
+       def state(self):
                return self._state
 
        def reset_timers(self):
-               self._actions.put(self._ACTION_RESET)
+               _moduleLogger.info("%s Resetting State Machine" % (self._name, ))
+               self._reset_timers()
 
-       @coroutines.func_sink
-       def request_reset_timers(self, args):
-               self.reset_timers()
+       @property
+       def request_reset_timers(self):
+               return self._callback
 
-       def _run(self):
-               for item in self._initItems:
-                       try:
-                               item.update()
-                       except Exception:
-                               _moduleLogger.exception("Initial update failed for %r" % item)
+       @property
+       def _strategy(self):
+               return self._strategies[self._state]
 
-               # empty the task queue
-               actions = list(algorithms.itr_available(self._actions, initiallyBlock = False))
-               self._schedule_update()
-               if len(self._updateItems) == 0:
-                       self.stop()
-
-               while True:
-                       # block till we get a task, or get all the tasks if we were late 
-                       actions = list(algorithms.itr_available(self._actions, initiallyBlock = True))
-
-                       if self._ACTION_STOP in actions:
-                               self._stop_update()
-                               break
-                       elif self._ACTION_RESET in actions:
-                               self._reset_timers()
-                       elif self._ACTION_UPDATE in actions:
-                               for item in self._updateItems:
-                                       try:
-                                               item.update(force=True)
-                                       except Exception:
-                                               _moduleLogger.exception("Update failed for %r" % item)
-                               self._schedule_update()
-
-       def _set_initial_period(self):
-               self._currentPeriod = self._INITIAL_ACTIVE_PERIOD / 2 # We will double it later
+       @gtk_toolbox.log_exception(_moduleLogger)
+       def _request_reset_timers(self, *args):
+               self._reset_timers()
 
        def _reset_timers(self):
+               if self._timeoutId is None:
+                       return # not started yet
                self._stop_update()
-               self._set_initial_period()
+               self._strategy.initialize_state()
                self._schedule_update()
 
-       def _schedule_update(self):
-               nextTimeout = self._calculate_step(self._state, self._currentPeriod)
-               nextTimeout = int(nextTimeout)
-               if nextTimeout != self._INFINITE_PERIOD:
-                       self._timeoutId = gobject.timeout_add(nextTimeout, self._on_timeout)
-               self._currentPeriod = nextTimeout
-
        def _stop_update(self):
                if self._timeoutId is None:
                        return
                gobject.source_remove(self._timeoutId)
                self._timeoutId = None
 
+       def _schedule_update(self):
+               assert self._timeoutId is None
+               self._strategy.increment_state()
+               nextTimeout = self._strategy.timeout
+               if nextTimeout != self.INFINITE_PERIOD and nextTimeout < self._maxTime:
+                       self._timeoutId = gobject_utils.timeout_add_seconds(nextTimeout, self._on_timeout)
+               _moduleLogger.info("%s Next update in %s seconds" % (self._name, nextTimeout, ))
+
+       @gtk_toolbox.log_exception(_moduleLogger)
        def _on_timeout(self):
-               self._actions.put(self._ACTION_UPDATE)
+               self._timeoutId = None
+               self._schedule_update()
+               for item in self._updateItems:
+                       try:
+                               item.update(force=True)
+                       except Exception:
+                               _moduleLogger.exception("Update failed for %r" % item)
                return False # do not continue
-
-       @classmethod
-       def _calculate_step(cls, state, period):
-               if state == cls.STATE_ACTIVE:
-                       return min(period * 2, cls._FINAL_ACTIVE_PERIOD)
-               elif state == cls.STATE_IDLE:
-                       return cls._IDLE_PERIOD
-               elif state == cls.STATE_DND:
-                       return cls._INFINITE_PERIOD
-               else:
-                       raise RuntimeError("Unknown state: %r" % (state, ))