psa: adding missing opml_lib library
[feedingit] / src / jobmanager.py
1 #!/usr/bin/env python2.5
2
3 # Copyright (c) 2011 Neal H. Walfield <neal@walfield.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import threading
19 import thread
20 import traceback
21 import heapq
22 import sys
23 import mainthread
24 import logging
25 logger = logging.getLogger(__name__)
26
27 def debug(*args):
28     if False:
29         logger.debug(' '.join(args))
30
31 # The default priority.  Like nice(), a smaller numeric priority
32 # corresponds to a higher priority class.
33 default_priority = 0
34
35 class JobRunner(threading.Thread):
36     def __init__(self, job_manager):
37         threading.Thread.__init__(self)
38         self.job_manager = job_manager
39
40     def run (self):
41         have_lock = True
42         self.job_manager.lock.acquire ()
43         try:
44             while (self.job_manager.pause == 0
45                    and not self.job_manager.do_quit
46                    and (len (self.job_manager.threads)
47                         <= self.job_manager.num_threads)):
48                 try:
49                     _, key, job = heapq.heappop (self.job_manager.queue)
50                 except IndexError:
51                     return
52     
53                 try:
54                     self.job_manager.in_progress.append (key)
55                     self.job_manager.lock.release ()
56                     have_lock = False
57         
58                     # Execute the job.
59                     try:
60                         job ()
61                     except KeyboardInterrupt:
62                         # This is handled below and doesn't require a
63                         # traceback.
64                         raise
65                     except:
66                         print ("Executing job %s (%s) from thread %s: %s"
67                                % (str (key), str (job),
68                                   threading.currentThread(),
69                                   traceback.format_exc ()))
70         
71                     self.job_manager.lock.acquire ()
72                     have_lock = True
73     
74                     assert key in self.job_manager.in_progress
75                 finally:
76                     try:
77                         self.job_manager.in_progress.remove (key)
78                     except ValueError:
79                         pass
80     
81                 debug("Finished executing job %s (%s)" % (key, job,))
82     
83                 self.job_manager._stats_hooks_run ({'job':job, 'key':key})
84         except KeyboardInterrupt:
85             debug("%s: KeyboardInterrupt" % threading.currentThread())
86             thread.interrupt_main()
87             debug("%s: Forwarded KeyboardInterrupt to main thread"
88                   % threading.currentThread())
89         finally:
90             if have_lock:
91                 self.job_manager.lock.release ()
92
93             assert self in self.job_manager.threads
94             self.job_manager.threads.remove (self)
95
96             debug ("Job runner %s (%d left) exiting."
97                    % (threading.currentThread(),
98                       len (self.job_manager.threads)))
99
100 _jm = None
101 def JobManager(start=False):
102     """
103     Return the job manager instance.  The job manager will not start
104     executing jobs until this is called with start set to True.  Note:
105     you can still queue jobs.
106     """
107     global _jm
108     if _jm is None:
109         _jm = _JobManager ()
110     if start and not _jm.started:
111         _jm.started = True
112         if _jm.jobs > 0:
113             _jm._stats_hooks_run ()
114         _jm.tickle ()
115
116     return _jm
117
118 class _JobManager(object):
119     def __init__(self, started=False, num_threads=4):
120         """
121         Initialize the job manager.
122
123         If started is false, jobs may be queued, but jobs will not be
124         started until start() is called.
125         """
126         # A reentrant lock so that a job runner can call stat without
127         # dropping the lock.
128         self.lock = threading.RLock()
129
130         # If we can start executing jobs.
131         self.started = started
132
133         # The maximum number of threads to use for executing jobs.
134         self._num_threads = num_threads
135
136         # List of jobs (priority, key, job) that are queued for
137         # execution.
138         self.queue = []
139         # List of keys of the jobs that are being executed.
140         self.in_progress = []
141         # List of threads.
142         self.threads = []
143
144         # If 0, jobs may execute, otherwise, job execution is paused.
145         self.pause = 0
146
147         # The total number of jobs that this manager ever executed.
148         self.jobs = 0
149
150         # A list of status hooks to execute when the stats change.
151         self._stats_hooks = []
152         self._current_stats = self.stats ()
153
154         self.do_quit = False
155
156     def _lock(f):
157         def wrapper(*args, **kwargs):
158             self = args[0]
159             self.lock.acquire ()
160             try:
161                 return f(*args, **kwargs)
162             finally:
163                 self.lock.release()
164         return wrapper
165
166     def get_num_threads(self):
167         return self._num_threads
168     def set_num_threads(self, value):
169         self._num_threads = value
170         self.tickle ()
171     num_threads = property(get_num_threads, set_num_threads)
172
173     @_lock
174     def start(self):
175         """
176         Start executing jobs.
177         """
178         if self.started:
179             return
180         if self.jobs > 0:
181             self._stats_hooks_run ()
182         self.tickle ()
183
184     @_lock
185     def tickle(self):
186         """
187         Ensure that there are enough job runners for the number of
188         pending jobs.
189         """
190         if self.do_quit:
191             debug("%s.quit called, not creating new threads."
192                   % self.__class__.__name__)
193             return
194
195         if self.pause > 0:
196             # Job execution is paused.  Don't start any new threads.
197             debug("%s.tickle(): Not doing anything: paused"
198                   % (self.__class__.__name__))
199             return
200
201         debug("%s.tickle: Have %d threads (can start %d); %d jobs queued"
202               % (self.__class__.__name__,
203                  len (self.threads), self.num_threads, len (self.queue)))
204         if len (self.threads) < self.num_threads:
205             for _ in range (min (len (self.queue),
206                                  self.num_threads - len (self.threads))):
207                 thread = JobRunner (self)
208                 # Setting threads as daemons means faster shutdown
209                 # when the main thread exists, but it results in
210                 # exceptions and occassional setfaults.
211                 # thread.setDaemon(True)
212                 self.threads.append (thread)
213                 thread.start ()
214                 debug("Now have %d threads" % len (self.threads))
215
216     @_lock
217     def execute(self, job, key=None, priority=default_priority):
218         """
219         Enqueue a job for execution.  job is a function to execute.
220         If key is not None, the job is only enqueued if there is no
221         job that is inprogress or enqueued with the same key.
222         priority is the job's priority.  Like nice(), a smaller
223         numeric priority corresponds to a higher priority class.  Jobs
224         are executed highest priority first, in the order that they
225         were added.
226         """
227         if self.do_quit:
228             debug("%s.quit called, not enqueuing new jobs."
229                   % self.__class__.__name__)
230
231         if key is not None:
232             if key in self.in_progress:
233                 return
234             for item in self.queue:
235                 if item[1] == key:
236                     if item[0][0] < priority:
237                         # Priority raised.
238                         item[0][0] = priority
239                         self.queue = heapq.heapify (self.queue)
240                     return
241
242         # To ensure that jobs with the same priority are executed
243         # in the order they are added, we set the priority to
244         # [priority, next (monotomic counter)].
245         self.jobs += 1
246         heapq.heappush (self.queue, [[priority, self.jobs], key, job])
247
248         if self.started:
249             self._stats_hooks_run ()
250             self.tickle ()
251         else:
252             debug("%s not initialized. delaying execution of %s (%s)"
253                   % (self.__class__.__name__, key, str (job),))
254
255     @_lock
256     def pause(self):
257         """
258         Increasement the pause count.  When the pause count is greater
259         than 0, job execution is suspended.
260         """
261         self.pause += 1
262
263         if self.pause == 1:
264             self._stats_hooks_run ()
265
266     @_lock
267     def resume(self):
268         """
269         Decrement the pause count.  If the pause count is greater than
270         0 and this decrement brings it to 0, enqueued jobs are
271         resumed.
272         """
273         assert self.pause > 0
274         self.pause -= 1
275         if not self.paused():
276             self._stats_hooks_run ()
277             self.tickle ()
278
279     @_lock
280     def paused(self):
281         """
282         Returns whether job execution is paused.
283         """
284         return self.pause > 0
285
286     @_lock
287     def cancel(self):
288         """
289         Cancel any pending jobs.
290         """
291         self.queue = []
292         self._stats_hooks_run ()
293
294     def quit(self):
295         self.cancel ()
296         self.do_quit = True
297
298     @_lock
299     def stats(self):
300         """
301         Return a dictionary consisting of:
302
303           - 'paused': whether execution is paused
304           - 'jobs': the total number of jobs this manager has
305               executed, is executing or are queued
306           - 'jobs-completed': the numer of jobs that have completed
307           - 'jobs-in-progress': the number of jobs in progress
308           - 'jobs-queued': the number of jobs currently queued
309         """
310         return {'paused': self.paused(),
311                 'jobs': self.jobs,
312                 'jobs-completed':
313                   self.jobs - len (self.in_progress) - len (self.queue),
314                 'jobs-in-progress': len (self.in_progress),
315                 'jobs-queued': len (self.queue)
316                 }
317
318     def stats_hook_register(self, func, *args, **kwargs):
319         """
320         Registers a function to be called when the job status changes.
321         Passed the following parameters:
322
323           - the JobManager instance.
324           - the previous stats (as returned by stats)
325           - the current stats
326           - the job that was completed (or None)
327
328         Note: the hook may not be run in the main thread!
329         """
330         mainthread=False
331         try:
332             mainthread = kwargs['run_in_main_thread']
333             del kwargs['run_in_main_thread']
334         except KeyError:
335             pass
336         self._stats_hooks.append ([func, mainthread, args, kwargs])
337
338     def _stats_hooks_run(self, completed_job=None):
339         """
340         Run the stats hooks.
341         """
342         # if not self._stats_hooks:
343         #     return
344
345         self.lock.acquire ()
346         try:
347             old_stats = self._current_stats
348             self._current_stats = self.stats ()
349             current_stats = self._current_stats
350         finally:
351             self.lock.release ()
352
353         debug("%s -> %s" % (str (old_stats), str (current_stats)))
354
355         for (f, run_in_main_thread, args, kwargs) in self._stats_hooks:
356             if run_in_main_thread:
357                 debug("JobManager._stats_hooks_run: Running %s in main thread"
358                       % f)
359                 mainthread.execute(
360                     f, self, old_stats, current_stats, completed_job,
361                     async=True, *args, **kwargs)
362             else:
363                 debug("JobManager._stats_hooks_run: Running %s in any thread"
364                       % f)
365                 f(self, old_stats, current_stats, completed_job,
366                   *args, **kwargs)