Merge branch 'master' of https://git.maemo.org/projects/feedingit
[feedingit] / src / update_feeds.py
1 #!/usr/bin/env python2.5
2
3
4 # Copyright (c) 2007-2008 INdT.
5 # Copyright (c) 2011 Neal H. Walfield
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Lesser General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 #  This program is distributed in the hope that it will be useful,
12 #  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 #  GNU Lesser General Public License for more details.
15 #
16 #  You should have received a copy of the GNU Lesser General Public License
17 #  along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 #
19
20 # ============================================================================
21 # Name        : update_feeds.py
22 # Author      : Yves Marcoz
23 # Version     : 0.6.1
24 # Description : Simple RSS Reader
25 # ============================================================================
26
27 from rss_sqlite import Listing
28 from config import Config
29 from updatedbus import UpdateServerObject
30
31 import os
32 import traceback
33 import sys
34 import dbus
35
36 from jobmanager import JobManager
37 import mainthread
38
39 import gobject
40 gobject.threads_init()
41
42 import logging
43 logger = logging.getLogger(__name__)
44 import debugging
45 debugging.init(dot_directory=".feedingit", program_name="update_feeds")
46
47 CONFIGDIR="/home/user/.feedingit/"
48 #DESKTOP_FILE = "/usr/share/applications/hildon-status-menu/feedingit_status.desktop"
49
50 from socket import setdefaulttimeout
51 timeout = 5
52 setdefaulttimeout(timeout)
53 del timeout
54
55 class FeedUpdate(UpdateServerObject):
56     def __init__(self, bus_name):
57         UpdateServerObject.__init__(self, bus_name)
58
59         self.config = Config(self, CONFIGDIR+"config.ini")
60         self.listing = Listing(self.config, CONFIGDIR)
61
62         jm = JobManager(True)
63         jm.stats_hook_register (self.job_manager_update,
64                                 run_in_main_thread=True)
65
66         # Whether or no an update is in progress.
67         self.am_updating = False
68
69         # After an update an finished, we start the inactivity timer.
70         # If this fires before a new job arrives, we quit.
71         self.inactivity_timer = 0
72
73         # Whether we started in daemon mode, or not.
74         self.daemon = '--daemon' in sys.argv
75
76         if self.daemon:
77             logger.debug("Running in daemon mode: waiting for commands.")
78             self.inactivity_timer = gobject.timeout_add(
79                 5 * 60 * 1000, self.inactivity_cb)
80         else:
81             # Update all feeds.
82             logger.debug("Not running in daemon mode: updating all feeds.")
83             gobject.idle_add(self.UpdateAll)
84
85         # If the system becomes idle
86         bus = dbus.SystemBus()
87
88         mce_request_proxy = bus.get_object(
89             'com.nokia.mce', '/com/nokia/mce/request')
90         mce_request_iface = dbus.Interface(
91             mce_request_proxy, 'com.nokia.mce.request')
92         system_idle = mce_request_iface.get_inactivity_status()
93         # Force self.system_inactivity_ind to run: ensure that a state
94         # change occurs.
95         self.system_idle = not system_idle
96         self.system_inactivity_ind(system_idle)
97
98         mce_signal_proxy = bus.get_object(
99             'com.nokia.mce', '/com/nokia/mce/signal')
100         mce_signal_iface = dbus.Interface(
101             mce_signal_proxy, 'com.nokia.mce.signal')
102         mce_signal_iface.connect_to_signal(
103             'system_inactivity_ind', self.system_inactivity_ind)
104
105     def increase_download_parallelism(self):
106         # The system has been idle for a while.  Enable parallel
107         # downloads.
108         logger.debug("Increasing parallelism to 4 workers.")
109         JobManager().num_threads = 4
110         gobject.source_remove (self.increase_download_parallelism_id)
111         del self.increase_download_parallelism_id
112         return False
113
114     def system_inactivity_ind(self, idle):
115         # The system's idle state changed.
116         if (self.system_idle and idle) or (not self.system_idle and not idle):
117             # No change.
118             return
119
120         if not idle:
121             if hasattr (self, 'increase_download_parallelism_id'):
122                 gobject.source_remove (self.increase_download_parallelism_id)
123                 del self.increase_download_parallelism_id
124         else:
125             self.increase_download_parallelism_id = \
126                 gobject.timeout_add_seconds(
127                     60, self.increase_download_parallelism)
128
129         if not idle:
130             logger.debug("Reducing parallelism to 1 worker.")
131             JobManager().num_threads = 1
132
133         self.system_idle = idle
134
135     def job_manager_update(self, jm, old_stats, new_stats, updated_feed):
136         queued = new_stats['jobs-queued']
137         in_progress = new_stats['jobs-in-progress']
138
139         if (queued or in_progress) and not self.am_updating:
140             logger.debug("new update started")
141             self.am_updating = True
142             self.UpdateStarted()
143             self.UpdateProgress(0, 0, in_progress, queued, 0, 0, 0, "")
144
145         if not queued and not in_progress:
146             logger.debug("update finished!")
147             self.am_updating = False
148             self.UpdateFinished()
149             self.ArticleCountUpdated()
150
151             if self.daemon:
152                 self.inactivity_timer = gobject.timeout_add(
153                     60 * 1000, self.inactivity_cb)
154             else:
155                 logger.debug("update finished, not running in daemon mode: "
156                              "quitting")
157                 mainloop.quit()
158
159         if (queued or in_progress) and self.inactivity_timer:
160             gobject.source_remove(self.inactivity_timer)
161             self.inactivity_timer = 0
162
163     def inactivity_cb(self):
164         """
165         The updater has been inactive for a while.  Quit.
166         """
167         assert self.inactivity_timer
168         self.inactivity_timer = 0
169
170         if not self.am_updating:
171             logger.info("Nothing to do for a while.  Quitting.")
172             mainloop.quit()
173
174     def StopUpdate(self):
175         """
176         Stop updating.
177         """
178         super(FeedUpdate, self).stopUpdate()
179
180         JobManager().quit()
181
182     def UpdateAll(self):
183         """
184         Update all feeds.
185         """
186         super(FeedUpdate, self).UpdateAll()
187
188         feeds = self.listing.getListOfFeeds()
189         for k in feeds:
190             self.listing.updateFeed(k)
191         logger.debug("Queued all feeds (%d) for update." % len(feeds))
192
193     def Update(self, feed):
194         """
195         Update a particular feed.
196         """
197         super(FeedUpdate, self).Update(feed)
198
199         # We got a request via dbus.  If we weren't in daemon mode
200         # before, enter it now.
201         self.daemon = True
202
203         self.listing.updateFeed(feed)
204
205
206 import dbus.mainloop.glib
207 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
208
209 mainloop = gobject.MainLoop()
210 mainthread.init()
211
212 # Acquire our name on the session bus.  If this doesn't work, most
213 # likely another update_feeds instance is already running.  In this
214 # case, just quit.
215 try:
216     bus_name = dbus.service.BusName('org.marcoz.feedingit',
217                                     bus=dbus.SessionBus(),
218                                     do_not_queue=True)
219 except Exception:
220     # We failed to acquire our bus name.  Die.
221     try:
222         dbus_proxy = dbus.SessionBus().get_object(
223             'org.freedesktop.DBus', '/org/freedesktop/DBus')
224         dbus_iface = dbus.Interface(dbus_proxy, 'org.freedesktop.DBus')
225         pid = dbus_iface.GetConnectionUnixProcessID('org.marcoz.feedingit')
226         logger.error("update_feeds already running: pid %d." % pid)
227     except Exception, e:
228         logger.error("Getting pid associated with org.marcoz.feedingit: %s"
229                      % str(e))
230         logger.error("update_feeds already running.")
231
232     sys.exit(1)
233
234 # Run the updater.  Note: we run this until feed.am_updating is false.
235 # Only is this case have all worker threads exited.  If the main
236 # thread exits before all threads have exited and the process gets a
237 # signal, the Python interpreter is unable to handle the signal and it
238 # runs really slow (rescheduling after ever single instruction instead
239 # of every few thousand).
240 feed = FeedUpdate(bus_name)
241 while True:
242     try:
243         mainloop.run()
244     except KeyboardInterrupt:
245         logger.error("Interrupted.  Quitting.")
246         JobManager().quit()
247
248     if not feed.am_updating:
249         break
250