X-Git-Url: http://git.maemo.org/git/?a=blobdiff_plain;f=src%2Frss_sqlite.py;h=75c0f3461a29b0e06747dc2083d85ba789c255b8;hb=bbf326b51e054992cfb00b1e8c69738adb28d418;hp=ecc2ed01b0b4e8baddbc20205dc30c5ec1df5397;hpb=5f2372dc0feb5c52cba644554f50c3bb225f5e97;p=feedingit diff --git a/src/rss_sqlite.py b/src/rss_sqlite.py index ecc2ed0..75c0f34 100644 --- a/src/rss_sqlite.py +++ b/src/rss_sqlite.py @@ -24,6 +24,8 @@ # Description : Simple RSS Reader # ============================================================================ +from __future__ import with_statement + import sqlite3 from os.path import isfile, isdir from shutil import rmtree @@ -36,16 +38,20 @@ import urllib2 from BeautifulSoup import BeautifulSoup from urlparse import urljoin from calendar import timegm -from updatedbus import get_lock, release_lock import threading import traceback from wc import wc, wc_init, woodchuck +import subprocess +import dbus +from updatedbus import update_server_object from jobmanager import JobManager import mainthread from httpprogresshandler import HTTPProgressHandler import random import sys +import logging +logger = logging.getLogger(__name__) def getId(string): return md5.new(string).hexdigest() @@ -57,15 +63,23 @@ def download_callback(connection): def downloader(progress_handler=None, proxy=None): openers = [] - if progress_handler: - openers.append (progress_handler) + if progress_handler is not None: + openers.append(progress_handler) else: openers.append(HTTPProgressHandler(download_callback)) if proxy: - openers.append (proxy) + openers.append(proxy) + + return urllib2.build_opener(*openers) + +# If not None, a subprocess.Popen object corresponding to a +# update_feeds.py process. +update_feed_process = None + +update_feeds_iface = None - return urllib2.build_opener (*openers) +jobs_at_start = 0 class Feed: serial_execution_lock = threading.Lock() @@ -101,20 +115,21 @@ class Feed: abs_url = urljoin(baseurl,url) f = opener.open(abs_url) - outf = open(filename, "w") - outf.write(f.read()) - f.close() - outf.close() + try: + with open(filename, "w") as outf: + for data in f: + outf.write(data) + finally: + f.close() except (urllib2.HTTPError, urllib2.URLError, IOError), exception: - print ("Could not download image %s: %s" - % (abs_url, str (exception))) + logger.info("Could not download image %s: %s" + % (abs_url, str (exception))) return None except: exception = sys.exc_info()[0] - print "Downloading image: %s" % abs_url - traceback.print_exc() - + logger.info("Downloading image %s: %s" % + (abs_url, traceback.format_exc())) try: remove(filename) except OSError: @@ -129,22 +144,61 @@ class Feed: return filename def updateFeed(self, configdir, url, etag, modified, expiryTime=24, proxy=None, imageCache=False, priority=0, postFeedUpdateFunc=None, *postFeedUpdateFuncArgs): - def doit(): - def it(): - self._updateFeed(configdir, url, etag, modified, expiryTime, proxy, imageCache, postFeedUpdateFunc, *postFeedUpdateFuncArgs) - return it - JobManager().execute(doit(), self.key, priority=priority) + if (os.path.basename(sys.argv[0]) == 'update_feeds.py'): + def doit(): + def it(): + self._updateFeed(configdir, url, etag, modified, expiryTime, proxy, imageCache, postFeedUpdateFunc, *postFeedUpdateFuncArgs) + return it + JobManager().execute(doit(), self.key, priority=priority) + else: + def send_update_request(): + global update_feeds_iface + if update_feeds_iface is None: + bus=dbus.SessionBus() + remote_object = bus.get_object( + "org.marcoz.feedingit", # Connection name + "/org/marcoz/feedingit/update" # Object's path + ) + update_feeds_iface = dbus.Interface( + remote_object, 'org.marcoz.feedingit') + + try: + update_feeds_iface.Update(self.key) + except Exception, e: + logger.error("Invoking org.marcoz.feedingit.Update: %s" + % str(e)) + update_feeds_iface = None + else: + return True + + if send_update_request(): + # Success! It seems we were able to start the update + # daemon via dbus (or, it was already running). + return + + global update_feed_process + if (update_feed_process is None + or update_feed_process.poll() is not None): + # The update_feeds process is not running. Start it. + update_feeds = os.path.join(os.path.dirname(__file__), + 'update_feeds.py') + argv = ['/usr/bin/env', 'python', update_feeds, '--daemon' ] + logger.debug("Starting update_feeds: running %s" + % (str(argv),)) + update_feed_process = subprocess.Popen(argv) + # Make sure the dbus calls go to the right process: + # rebind. + update_feeds_iface = None + + for _ in xrange(5): + if send_update_request(): + break + time.sleep(1) def _updateFeed(self, configdir, url, etag, modified, expiryTime=24, proxy=None, imageCache=False, postFeedUpdateFunc=None, *postFeedUpdateFuncArgs): success = False have_serial_execution_lock = False try: - update_lock = None - update_lock = get_lock("key") - if not update_lock: - # Someone else is doing an update. - return - download_start = time.time () progress_handler = HTTPProgressHandler(download_callback) @@ -169,8 +223,6 @@ class Feed: currentTime = 0 - have_woodchuck = mainthread.execute (wc().available) - def wc_success(): try: wc().stream_register (self.key, "", 6 * 60 * 60) @@ -187,8 +239,9 @@ class Feed: new_objects=len (tmp.entries), objects_inline=len (tmp.entries)) except KeyError: - print "Failed to register update with woodchuck!" - pass + logger.warn( + "Failed to register update of %s with woodchuck!" + % (self.key)) http_status = tmp.get ('status', 200) @@ -199,20 +252,21 @@ class Feed: # parse fails. But really, everything went great! Check for # this first. if http_status == 304: - print "%s: No changes to feed." % (self.key,) - mainthread.execute (wc_success, async=True) + logger.debug("%s: No changes to feed." % (self.key,)) + mainthread.execute(wc_success, async=True) success = True elif len(tmp["entries"])==0 and not tmp.version: # An error occured fetching or parsing the feed. (Version # will be either None if e.g. the connection timed our or # '' if the data is not a proper feed) - print ("Error fetching %s: version is: %s: error: %s" - % (url, str (tmp.version), - str (tmp.get ('bozo_exception', 'Unknown error')))) - print tmp - if have_woodchuck: - def e(): - print "%s: stream update failed!" % self.key + logger.error( + "Error fetching %s: version is: %s: error: %s" + % (url, str (tmp.version), + str (tmp.get ('bozo_exception', 'Unknown error')))) + logger.debug(tmp) + def register_stream_update_failed(http_status): + def doit(): + logger.debug("%s: stream update failed!" % self.key) try: # It's not easy to get the feed's title from here. @@ -229,7 +283,12 @@ class Feed: if 500 <= http_status and http_status < 600: ec = woodchuck.TransferStatus.TransientNetwork wc()[self.key].update_failed(ec) - mainthread.execute (e, async=True) + return doit + if wc().available: + mainthread.execute( + register_stream_update_failed( + http_status=http_status), + async=True) else: currentTime = time.time() # The etag and modified value should only be updated if the content was not null @@ -251,8 +310,8 @@ class Feed: outf.close() del data except (urllib2.HTTPError, urllib2.URLError), exception: - print ("Could not download favicon %s: %s" - % (abs_url, str (exception))) + logger.debug("Could not download favicon %s: %s" + % (abs_url, str (exception))) self.serial_execution_lock.acquire () have_serial_execution_lock = True @@ -306,8 +365,10 @@ class Feed: self.serial_execution_lock.release () have_serial_execution_lock = False for img in images: - filename = self.addImage(configdir, self.key, baseurl, img['src'], proxy=proxy) - if filename: + filename = self.addImage( + configdir, self.key, baseurl, img['src'], + opener=opener) + if filename: img['src']="file://%s" %filename count = self.db.execute("SELECT count(1) FROM images where id=? and imagePath=?;", (id, filename )).fetchone()[0] if count == 0: @@ -317,9 +378,8 @@ class Feed: try: object_size += os.path.getsize (filename) except os.error, exception: - print ("Error getting size of %s: %s" - % (filename, exception)) - pass + logger.error ("Error getting size of %s: %s" + % (filename, exception)) self.serial_execution_lock.acquire () have_serial_execution_lock = True @@ -352,38 +412,57 @@ class Feed: # Register the object with Woodchuck and mark it as # downloaded. - if have_woodchuck: - def e(): + def register_object_transferred( + id, title, publication_time, + sent, received, object_size): + def doit(): + logger.debug("Registering transfer of object %s" + % title) try: obj = wc()[self.key].object_register( object_identifier=id, - human_readable_name=tmpEntry["title"]) + human_readable_name=title) except woodchuck.ObjectExistsError: obj = wc()[self.key][id] else: - # If the entry does not contain a publication - # time, the attribute won't exist. - pubtime = entry.get ('date_parsed', None) - if pubtime: - obj.publication_time = time.mktime (pubtime) - - received = (progress_handler.stats['received'] - - received_base) - sent = progress_handler.stats['sent'] - sent_base - obj.transferred ( - indicator=(woodchuck.Indicator.ApplicationVisual - |woodchuck.Indicator.StreamWide), + obj.publication_time = publication_time + obj.transferred( + indicator=( + woodchuck.Indicator.ApplicationVisual + |woodchuck.Indicator.StreamWide), transferred_down=received, transferred_up=sent, object_size=object_size) - mainthread.execute(e, async=True) + return doit + if wc().available: + # If the entry does not contain a publication + # time, the attribute won't exist. + pubtime = entry.get('date_parsed', None) + if pubtime: + publication_time = time.mktime (pubtime) + else: + publication_time = None + + sent = progress_handler.stats['sent'] - sent_base + received = (progress_handler.stats['received'] + - received_base) + + mainthread.execute( + register_object_transferred( + id=id, + title=tmpEntry["title"], + publication_time=publication_time, + sent=sent, received=received, + object_size=object_size), + async=True) self.db.commit() - print ("%s: Update successful: transferred: %d/%d; objects: %d)" - % (self.key, - progress_handler.stats['sent'], - progress_handler.stats['received'], - len (tmp.entries))) + logger.debug ( + "%s: Update successful: transferred: %d/%d; objects: %d)" + % (self.key, + progress_handler.stats['sent'], + progress_handler.stats['received'], + len (tmp.entries))) mainthread.execute (wc_success, async=True) success = True @@ -415,30 +494,27 @@ class Feed: # except OSError, exception: # - print 'Could not remove %s: %s' % (file, str (exception)) - print ("updated %s: %fs in download, %fs in processing" - % (self.key, download_duration, - time.time () - process_start)) + logger.error('Could not remove %s: %s' + % (file, str (exception))) + logger.debug("updated %s: %fs in download, %fs in processing" + % (self.key, download_duration, + time.time () - process_start)) except: - print "Updating %s: %s" % (self.key, sys.exc_info()[0]) - traceback.print_exc() + logger.error("Updating %s: %s" % (self.key, traceback.format_exc())) finally: self.db.commit () if have_serial_execution_lock: self.serial_execution_lock.release () - if update_lock is not None: - release_lock (update_lock) - updateTime = 0 try: rows = self.db.execute("SELECT MAX(date) FROM feed;") for row in rows: updateTime=row[0] - except: - print "Fetching update time." - traceback.print_exc() + except Exception, e: + logger.error("Fetching update time: %s: %s" + % (str(e), traceback.format_exc())) finally: if not success: etag = None @@ -456,12 +532,13 @@ class Feed: self.db.execute("UPDATE feed SET read=1 WHERE id=?;", (id,) ) self.db.commit() - def e(): - if wc().available(): - try: - wc()[self.key][id].used() - except KeyError: - pass + def doit(): + try: + wc()[self.key][id].used() + except KeyError: + pass + if wc().available(): + mainthread.execute(doit, async=True) def setEntryUnread(self, id): self.db.execute("UPDATE feed SET read=0 WHERE id=?;", (id,) ) @@ -532,15 +609,17 @@ class Feed: #ids.reverse() return ids - def getNextId(self, id): + def getNextId(self, id, forward=True): + if forward: + delta = 1 + else: + delta = -1 ids = self.getIds() index = ids.index(id) - return ids[(index+1)%len(ids)] + return ids[(index + delta) % len(ids)] def getPreviousId(self, id): - ids = self.getIds() - index = ids.index(id) - return ids[(index-1)%len(ids)] + return self.getNextId(id, forward=False) def getNumberOfUnreadItems(self): return self.db.execute("SELECT count(*) FROM feed WHERE read=0;").fetchone()[0] @@ -608,20 +687,20 @@ class Feed: try: remove(contentLink) except OSError, exception: - print "Deleting %s: %s" % (contentLink, str (exception)) + logger.error("Deleting %s: %s" % (contentLink, str (exception))) self.db.execute("DELETE FROM feed WHERE id=?;", (id,) ) self.db.execute("DELETE FROM images WHERE id=?;", (id,) ) self.db.commit() - def e(): - if wc().available(): - try: - wc()[self.key][id].files_deleted ( - woodchuck.DeletionResponse.Deleted) - del wc()[self.key][id] - except KeyError: - pass - mainthread.execute (e, async=True) + def doit(): + try: + wc()[self.key][id].files_deleted ( + woodchuck.DeletionResponse.Deleted) + del wc()[self.key][id] + except KeyError: + pass + if wc().available(): + mainthread.execute (doit, async=True) class ArchivedArticles(Feed): def addArchivedArticle(self, title, link, date, configdir): @@ -719,8 +798,9 @@ class Listing: # Check that Woodchuck's state is up to date with respect our # state. - wc_init (self) - if wc().available(): + updater = os.path.basename(sys.argv[0]) == 'update_feeds.py' + wc_init (self, True if updater else False) + if wc().available() and updater: # The list of known streams. streams = wc().streams_list () stream_ids = [s.identifier for s in streams] @@ -732,8 +812,9 @@ class Listing: # XXX: We should also check whether the list of # articles/objects in each feed/stream is up to date. if key not in stream_ids: - print ("Registering previously unknown channel: %s (%s)" - % (key, title,)) + logger.debug( + "Registering previously unknown channel: %s (%s)" + % (key, title,)) # Use a default refresh interval of 6 hours. wc().stream_register (key, title, 6 * 60 * 60) else: @@ -745,7 +826,7 @@ class Listing: # Unregister any streams that are no longer subscribed to. for id in stream_ids: - print ("Unregistering %s" % (id,)) + logger.debug("Unregistering %s" % (id,)) w.stream_unregister (id) def importOldFormatFeeds(self): @@ -785,7 +866,8 @@ class Listing: pass self.updateUnread(id) except: - traceback.print_exc() + logger.error("importOldFormatFeeds: %s" + % (traceback.format_exc(),)) remove(self.configdir+"feeds.pickle") @@ -844,6 +926,23 @@ class Listing: (title, key)) self.db.commit() self.updateUnread(key) + + update_server_object().ArticleCountUpdated() + + stats = JobManager().stats() + global jobs_at_start + completed = stats['jobs-completed'] - jobs_at_start + in_progress = stats['jobs-in-progress'] + queued = stats['jobs-queued'] + + percent = (100 * ((completed + in_progress / 2.)) + / (completed + in_progress + queued)) + + update_server_object().UpdateProgress( + percent, completed, in_progress, queued, 0, 0, 0, key) + + if in_progress == 0 and queued == 0: + jobs_at_start = stats['jobs-completed'] def getFeed(self, key): if key == "ArchivedArticles": @@ -861,11 +960,14 @@ class Listing: try: wc()[key].human_readable_name = title except KeyError: - print "Feed %s (%s) unknown." % (key, title) - pass + logger.debug("Feed %s (%s) unknown." % (key, title)) def getFeedUpdateTime(self, key): - return time.ctime(self.db.execute("SELECT updateTime FROM feeds WHERE id=?;", (key,)).fetchone()[0]) + update_time = self.db.execute( + "SELECT updateTime FROM feeds WHERE id=?;", (key,)).fetchone()[0] + if not update_time: + return "Never" + return time.ctime(update_time) def getFeedNumberOfUnreadItems(self, key): return self.db.execute("SELECT unread FROM feeds WHERE id=?;", (key,)).fetchone()[0] @@ -984,7 +1086,7 @@ class Listing: try: del wc()[key] except KeyError: - print "Removing unregistered feed %s failed" % (key,) + logger.debug("Removing unregistered feed %s failed" % (key,)) rank = self.db.execute("SELECT rank FROM feeds WHERE id=?;", (key,) ).fetchone()[0] self.db.execute("DELETE FROM feeds WHERE id=?;", (key, ))