Starting the work on the polling state machine
authorEd Page <eopage@byu.net>
Wed, 16 Dec 2009 04:03:34 +0000 (22:03 -0600)
committerEd Page <eopage@byu.net>
Wed, 16 Dec 2009 04:03:34 +0000 (22:03 -0600)
hand_tests/sm.py [new file with mode: 0755]
src/gvoice/state_machine.py [new file with mode: 0644]
src/util/algorithms.py

diff --git a/hand_tests/sm.py b/hand_tests/sm.py
new file mode 100755 (executable)
index 0000000..f3c1e3b
--- /dev/null
@@ -0,0 +1,68 @@
+#!/usr/bin/env python
+
+import threading
+import datetime
+import time
+
+import gtk
+
+import sys
+sys.path.insert(0,"../src")
+import gvoice.state_machine as state_machine
+
+
+class _I(object):
+
+       def __init__(self, startTime):
+               self._startTime = startTime
+
+       def update(self, force = False):
+               print "%s\t%r: force=%r" % (datetime.datetime.now() - self._startTime, self, force)
+
+
+def loop(state):
+
+       def actual():
+               while state[0]:
+                       gtk.main_iteration(block=False)
+                       time.sleep(0.1)
+
+       return actual
+
+
+def main():
+       startTime = datetime.datetime.now()
+
+       state = [True]
+       mainLoop = threading.Thread(target=loop(state))
+       mainLoop.setDaemon(False)
+       mainLoop.start()
+       try:
+               state_machine.StateMachine._IS_DAEMON = False
+
+               initial = _I(startTime)
+               print "Initial:", initial
+               regular = _I(startTime)
+               print "Regular:", regular
+
+               sm = state_machine.StateMachine([initial], [regular])
+               print "Starting", datetime.datetime.now() - startTime
+               sm.start()
+               time.sleep(60.0) # seconds
+               print "Reseting timers", datetime.datetime.now() - startTime
+               sm.reset_timers()
+               time.sleep(60.0) # seconds
+               print "Switching to IDLE", datetime.datetime.now() - startTime
+               sm.set_state(state_machine.StateMachine.STATE_IDLE)
+               time.sleep(10.0) # seconds
+               print "Stopping", datetime.datetime.now() - startTime
+               sm.stop()
+       finally:
+               state[0] = False
+
+
+if __name__ == "__main__":
+       print state_machine.StateMachine._INITIAL_ACTIVE_PERIOD
+       print state_machine.StateMachine._FINAL_ACTIVE_PERIOD
+       print state_machine.StateMachine._IDLE_PERIOD
+       main()
diff --git a/src/gvoice/state_machine.py b/src/gvoice/state_machine.py
new file mode 100644 (file)
index 0000000..09ed28b
--- /dev/null
@@ -0,0 +1,144 @@
+#!/usr/bin/env python
+
+import Queue
+import threading
+import logging
+
+import gobject
+
+import util.algorithms as algorithms
+import util.coroutines as coroutines
+
+_moduleLogger = logging.getLogger("gvoice.state_machine")
+
+
+def _to_milliseconds(**kwd):
+       if "milliseconds" in kwd:
+               return kwd["milliseconds"]
+       elif "seconds" in kwd:
+               return kwd["seconds"] * 1000
+       elif "minutes" in kwd:
+               return kwd["minutes"] * 1000 * 60
+       raise KeyError("Unknown arg: %r" % kwd)
+
+
+class StateMachine(object):
+
+       STATE_ACTIVE = "active"
+       STATE_IDLE = "idle"
+       STATE_DND = "dnd"
+
+       _ACTION_UPDATE = "update"
+       _ACTION_RESET = "reset"
+       _ACTION_STOP = "stop"
+
+       _INITIAL_ACTIVE_PERIOD = int(_to_milliseconds(seconds=5))
+       _FINAL_ACTIVE_PERIOD = int(_to_milliseconds(minutes=2))
+       _IDLE_PERIOD = int(_to_milliseconds(minutes=10))
+       _INFINITE_PERIOD = -1
+
+       _IS_DAEMON = True
+
+       def __init__(self, initItems, updateItems):
+               self._initItems = initItems
+               self._updateItems = updateItems
+
+               self._actions = Queue.Queue()
+               self._state = self.STATE_ACTIVE
+               self._timeoutId = None
+               self._thread = None
+               self._currentPeriod = self._INITIAL_ACTIVE_PERIOD
+               self._set_initial_period()
+
+       def start(self):
+               assert self._thread is None
+               self._thread = threading.Thread(target=self._run)
+               self._thread.setDaemon(self._IS_DAEMON)
+               self._thread.start()
+
+       def stop(self):
+               if self._thread is not None:
+                       self._actions.put(self._ACTION_STOP)
+                       self._thread = None
+               else:
+                       _moduleLogger.info("Stopping an already stopped state machine")
+
+       def set_state(self, state):
+               self._state = state
+               self.reset_timers()
+
+       def get_state(self):
+               return self._state
+
+       def reset_timers(self):
+               self._actions.put(self._ACTION_RESET)
+
+       @coroutines.func_sink
+       def request_reset_timers(self, args):
+               self.reset_timers()
+
+       def _run(self):
+               for item in self._initItems:
+                       try:
+                               item.update()
+                       except Exception:
+                               _moduleLogger.exception("Initial update failed for %r" % item)
+
+               # empty the task queue
+               actions = list(algorithms.itr_available(self._actions, initiallyBlock = False))
+               self._schedule_update()
+               if len(self._updateItems) == 0:
+                       self.stop()
+
+               while True:
+                       # block till we get a task, or get all the tasks if we were late 
+                       actions = list(algorithms.itr_available(self._actions, initiallyBlock = True))
+
+                       if self._ACTION_STOP in actions:
+                               self._stop_update()
+                               break
+                       elif self._ACTION_RESET in actions:
+                               self._reset_timers()
+                       elif self._ACTION_UPDATE in actions:
+                               for item in self._updateItems:
+                                       try:
+                                               item.update(force=True)
+                                       except Exception:
+                                               _moduleLogger.exception("Update failed for %r" % item)
+                               self._schedule_update()
+
+       def _set_initial_period(self):
+               self._currentPeriod = self._INITIAL_ACTIVE_PERIOD / 2 # We will double it later
+
+       def _reset_timers(self):
+               self._stop_update()
+               self._set_initial_period()
+               self._schedule_update()
+
+       def _schedule_update(self):
+               nextTimeout = self._calculate_step(self._state, self._currentPeriod)
+               nextTimeout = int(nextTimeout)
+               if nextTimeout != self._INFINITE_PERIOD:
+                       self._timeoutId = gobject.timeout_add(nextTimeout, self._on_timeout)
+               self._currentPeriod = nextTimeout
+
+       def _stop_update(self):
+               if self._timeoutId is None:
+                       return
+               gobject.source_remove(self._timeoutId)
+               self._timeoutId = None
+
+       def _on_timeout(self):
+               self._actions.put(self._ACTION_UPDATE)
+               return False # do not continue
+
+       @classmethod
+       def _calculate_step(cls, state, period):
+               if state == cls.STATE_ACTIVE:
+                       return min(period * 2, cls._FINAL_ACTIVE_PERIOD)
+               elif state == cls.STATE_IDLE:
+                       return cls._IDLE_PERIOD
+               elif state == cls.STATE_DND:
+                       return cls._INFINITE_PERIOD
+               else:
+                       raise RuntimeError("Unknown state: %r" % (state, ))
index 040967a..7052b98 100644 (file)
@@ -572,6 +572,13 @@ def pushback_itr(itr):
                                maybePushedBack = yield item
 
 
+def itr_available(queue, initiallyBlock = False):
+       if initiallyBlock:
+               yield queue.get()
+       while not queue.empty():
+               yield queue.get_nowait()
+
+
 if __name__ == "__main__":
        import doctest
        print doctest.testmod()