5 import util.go_utils as gobject_utils
6 import util.coroutines as coroutines
7 import util.misc as misc_utils
10 _moduleLogger = logging.getLogger(__name__)
13 def to_milliseconds(**kwd):
14 if "milliseconds" in kwd:
15 return kwd["milliseconds"]
16 elif "seconds" in kwd:
17 return kwd["seconds"] * 1000
18 elif "minutes" in kwd:
19 return kwd["minutes"] * 1000 * 60
21 return kwd["hours"] * 1000 * 60 * 60
22 raise KeyError("Unknown arg: %r" % kwd)
25 def to_seconds(**kwd):
26 if "milliseconds" in kwd:
27 return kwd["milliseconds"] / 1000
28 elif "seconds" in kwd:
30 elif "minutes" in kwd:
31 return kwd["minutes"] * 60
33 return kwd["hours"] * 60 * 60
34 raise KeyError("Unknown arg: %r" % kwd)
37 class NopStateStrategy(object):
42 def initialize_state(self):
45 def reinitialize_state(self):
48 def increment_state(self):
53 return UpdateStateMachine.INFINITE_PERIOD
56 return "NopStateStrategy()"
59 class ConstantStateStrategy(object):
61 def __init__(self, timeout):
62 assert 0 < timeout or timeout == UpdateStateMachine.INFINITE_PERIOD
63 self._timeout = timeout
65 def initialize_state(self):
68 def reinitialize_state(self):
71 def increment_state(self):
79 return "ConstantStateStrategy(timeout=%r)" % self._timeout
82 class NTimesStateStrategy(object):
84 def __init__(self, timeouts, postTimeout):
85 assert 0 < len(timeouts)
86 for timeout in timeouts:
87 assert 0 < timeout or timeout == UpdateStateMachine.INFINITE_PERIOD
88 assert 0 < postTimeout or postTimeout == UpdateStateMachine.INFINITE_PERIOD
89 self._timeouts = timeouts
90 self._postTimeout = postTimeout
92 self._attemptCount = 0
94 def initialize_state(self):
95 self._attemptCount = len(self._timeouts)
97 def reinitialize_state(self):
98 self._attemptCount = 0
100 def increment_state(self):
101 self._attemptCount += 1
106 return self._timeouts[self._attemptCount]
108 return self._postTimeout
111 return "NTimesStateStrategy(timeout=%r)" % (
116 return "NTimesStateStrategy(timeouts=%r, postTimeout=%r)" % (
122 class GeometricStateStrategy(object):
124 def __init__(self, init, min, max):
125 assert 0 < init and init < max or init == UpdateStateMachine.INFINITE_PERIOD
126 assert 0 < min or min == UpdateStateMachine.INFINITE_PERIOD
127 assert min < max or max == UpdateStateMachine.INFINITE_PERIOD
133 def initialize_state(self):
134 self._current = self._max
136 def reinitialize_state(self):
137 self._current = self._min
139 def increment_state(self):
140 if self._current == UpdateStateMachine.INFINITE_PERIOD:
142 if self._init == UpdateStateMachine.INFINITE_PERIOD:
143 self._current = UpdateStateMachine.INFINITE_PERIOD
144 elif self._max == UpdateStateMachine.INFINITE_PERIOD:
147 self._current = min(2 * self._current, self._max - self._init)
151 if UpdateStateMachine.INFINITE_PERIOD in (self._init, self._current):
152 timeout = UpdateStateMachine.INFINITE_PERIOD
154 timeout = self._init + self._current
158 return "GeometricStateStrategy(timeout=%r)" % (
163 return "GeometricStateStrategy(init=%r, min=%r, max=%r)" % (
164 self._init, self._min, self._max
168 class StateMachine(object):
170 STATE_ACTIVE = 0, "active"
171 STATE_IDLE = 1, "idle"
175 raise NotImplementedError("Abstract")
178 raise NotImplementedError("Abstract")
181 raise NotImplementedError("Abstract")
183 def set_state(self, state):
184 raise NotImplementedError("Abstract")
188 raise NotImplementedError("Abstract")
191 class MasterStateMachine(StateMachine):
195 self._state = self.STATE_ACTIVE
197 def append_machine(self, machine):
198 self._machines.append(machine)
201 # Confirm we are all on the same page
202 for machine in self._machines:
203 machine.set_state(self._state)
204 for machine in self._machines:
208 for machine in self._machines:
212 for machine in self._machines:
215 def set_state(self, state):
217 for machine in self._machines:
218 machine.set_state(state)
225 class UpdateStateMachine(StateMachine):
226 # Making sure the it is initialized is finicky, be careful
229 DEFAULT_MAX_TIMEOUT = to_seconds(hours=24)
233 def __init__(self, updateItems, name="", maxTime = DEFAULT_MAX_TIMEOUT):
235 self._updateItems = updateItems
236 self._maxTime = maxTime
237 self._isActive = False
239 self._state = self.STATE_ACTIVE
240 self._onTimeout = gobject_utils.Timeout(self._on_timeout)
242 self._strategies = {}
243 self._callback = coroutines.func_sink(
244 coroutines.expand_positional(
245 self._request_reset_timers
250 return """UpdateStateMachine(
255 )""" % (self._name, self._strategy, self._isActive, self._onTimeout.is_running())
258 return """UpdateStateMachine(
261 )""" % (self._name, self._strategies)
263 def set_state_strategy(self, state, strategy):
264 self._strategies[state] = strategy
267 for strategy in self._strategies.itervalues():
268 strategy.initialize_state()
269 if self._strategy.timeout != self.INFINITE_PERIOD:
270 self._onTimeout.start(seconds=0)
271 self._isActive = True
272 _moduleLogger.info("%s Starting State Machine" % (self._name, ))
275 _moduleLogger.info("%s Stopping State Machine" % (self._name, ))
276 self._isActive = False
277 self._onTimeout.cancel()
280 self._onTimeout.cancel()
281 self._callback = None
283 def set_state(self, newState):
284 if self._state == newState:
286 oldState = self._state
287 _moduleLogger.info("%s Transitioning from %s to %s" % (self._name, oldState, newState))
289 self._state = newState
290 self._reset_timers(initialize=True)
296 def reset_timers(self):
300 def request_reset_timers(self):
301 return self._callback
305 return self._strategies[self._state]
311 @misc_utils.log_exception(_moduleLogger)
312 def _request_reset_timers(self, *args):
315 def _reset_timers(self, initialize=False):
316 if not self._isActive:
317 return # not started yet
318 _moduleLogger.info("%s Resetting State Machine" % (self._name, ))
319 self._onTimeout.cancel()
321 self._strategy.initialize_state()
323 self._strategy.reinitialize_state()
324 self._schedule_update()
326 def _schedule_update(self):
327 self._strategy.increment_state()
328 nextTimeout = self._strategy.timeout
329 if nextTimeout != self.INFINITE_PERIOD and nextTimeout < self._maxTime:
330 assert 0 < nextTimeout
331 self._onTimeout.start(seconds=nextTimeout)
332 _moduleLogger.info("%s Next update in %s seconds" % (self._name, nextTimeout, ))
334 _moduleLogger.info("%s No further updates (timeout is %s seconds)" % (self._name, nextTimeout, ))
336 @misc_utils.log_exception(_moduleLogger)
337 def _on_timeout(self):
338 self._schedule_update()
339 for item in self._updateItems:
341 item.update(force=True)
343 _moduleLogger.exception("Update failed for %r" % item)