Correctly set update time to never.
[feedingit] / src / rss_sqlite.py
index ecc2ed0..75c0f34 100644 (file)
@@ -24,6 +24,8 @@
 # Description : Simple RSS Reader
 # ============================================================================
 
+from __future__ import with_statement
+
 import sqlite3
 from os.path import isfile, isdir
 from shutil import rmtree
@@ -36,16 +38,20 @@ import urllib2
 from BeautifulSoup import BeautifulSoup
 from urlparse import urljoin
 from calendar import timegm
-from updatedbus import get_lock, release_lock
 import threading
 import traceback
 from wc import wc, wc_init, woodchuck
+import subprocess
+import dbus
+from updatedbus import update_server_object
 
 from jobmanager import JobManager
 import mainthread
 from httpprogresshandler import HTTPProgressHandler
 import random
 import sys
+import logging
+logger = logging.getLogger(__name__)
 
 def getId(string):
     return md5.new(string).hexdigest()
@@ -57,15 +63,23 @@ def download_callback(connection):
 def downloader(progress_handler=None, proxy=None):
     openers = []
 
-    if progress_handler:
-        openers.append (progress_handler)
+    if progress_handler is not None:
+        openers.append(progress_handler)
     else:
         openers.append(HTTPProgressHandler(download_callback))
 
     if proxy:
-        openers.append (proxy)
+        openers.append(proxy)
+
+    return urllib2.build_opener(*openers)
+
+# If not None, a subprocess.Popen object corresponding to a
+# update_feeds.py process.
+update_feed_process = None
+
+update_feeds_iface = None
 
-    return urllib2.build_opener (*openers)
+jobs_at_start = 0
 
 class Feed:
     serial_execution_lock = threading.Lock()
@@ -101,20 +115,21 @@ class Feed:
 
                 abs_url = urljoin(baseurl,url)
                 f = opener.open(abs_url)
-                outf = open(filename, "w")
-                outf.write(f.read())
-                f.close()
-                outf.close()
+                try:
+                    with open(filename, "w") as outf:
+                        for data in f:
+                            outf.write(data)
+                finally:
+                    f.close()
             except (urllib2.HTTPError, urllib2.URLError, IOError), exception:
-                print ("Could not download image %s: %s"
-                       % (abs_url, str (exception)))
+                logger.info("Could not download image %s: %s"
+                            % (abs_url, str (exception)))
                 return None
             except:
                 exception = sys.exc_info()[0]
 
-                print "Downloading image: %s" % abs_url
-                traceback.print_exc()
-
+                logger.info("Downloading image %s: %s" %
+                            (abs_url, traceback.format_exc()))
                 try:
                     remove(filename)
                 except OSError:
@@ -129,22 +144,61 @@ class Feed:
         return filename
 
     def updateFeed(self, configdir, url, etag, modified, expiryTime=24, proxy=None, imageCache=False, priority=0, postFeedUpdateFunc=None, *postFeedUpdateFuncArgs):
-        def doit():
-            def it():
-                self._updateFeed(configdir, url, etag, modified, expiryTime, proxy, imageCache, postFeedUpdateFunc, *postFeedUpdateFuncArgs)
-            return it
-        JobManager().execute(doit(), self.key, priority=priority)
+        if (os.path.basename(sys.argv[0]) == 'update_feeds.py'):
+            def doit():
+                def it():
+                    self._updateFeed(configdir, url, etag, modified, expiryTime, proxy, imageCache, postFeedUpdateFunc, *postFeedUpdateFuncArgs)
+                return it
+            JobManager().execute(doit(), self.key, priority=priority)
+        else:
+            def send_update_request():
+                global update_feeds_iface
+                if update_feeds_iface is None:
+                    bus=dbus.SessionBus()
+                    remote_object = bus.get_object(
+                        "org.marcoz.feedingit", # Connection name
+                        "/org/marcoz/feedingit/update" # Object's path
+                        )
+                    update_feeds_iface = dbus.Interface(
+                        remote_object, 'org.marcoz.feedingit')
+
+                try:
+                    update_feeds_iface.Update(self.key)
+                except Exception, e:
+                    logger.error("Invoking org.marcoz.feedingit.Update: %s"
+                                 % str(e))
+                    update_feeds_iface = None
+                else:
+                    return True
+
+            if send_update_request():
+                # Success!  It seems we were able to start the update
+                # daemon via dbus (or, it was already running).
+                return
+
+            global update_feed_process
+            if (update_feed_process is None
+                or update_feed_process.poll() is not None):
+                # The update_feeds process is not running.  Start it.
+                update_feeds = os.path.join(os.path.dirname(__file__),
+                                            'update_feeds.py')
+                argv = ['/usr/bin/env', 'python', update_feeds, '--daemon' ]
+                logger.debug("Starting update_feeds: running %s"
+                             % (str(argv),))
+                update_feed_process = subprocess.Popen(argv)
+                # Make sure the dbus calls go to the right process:
+                # rebind.
+                update_feeds_iface = None
+
+            for _ in xrange(5):
+                if send_update_request():
+                    break
+                time.sleep(1)
 
     def _updateFeed(self, configdir, url, etag, modified, expiryTime=24, proxy=None, imageCache=False, postFeedUpdateFunc=None, *postFeedUpdateFuncArgs):
         success = False
         have_serial_execution_lock = False
         try:
-            update_lock = None
-            update_lock = get_lock("key")
-            if not update_lock:
-                # Someone else is doing an update.
-                return
-
             download_start = time.time ()
 
             progress_handler = HTTPProgressHandler(download_callback)
@@ -169,8 +223,6 @@ class Feed:
     
             currentTime = 0
     
-            have_woodchuck = mainthread.execute (wc().available)
-
             def wc_success():
                 try:
                     wc().stream_register (self.key, "", 6 * 60 * 60)
@@ -187,8 +239,9 @@ class Feed:
                         new_objects=len (tmp.entries),
                         objects_inline=len (tmp.entries))
                 except KeyError:
-                    print "Failed to register update with woodchuck!"
-                    pass
+                    logger.warn(
+                        "Failed to register update of %s with woodchuck!"
+                        % (self.key))
     
             http_status = tmp.get ('status', 200)
     
@@ -199,20 +252,21 @@ class Feed:
             # parse fails.  But really, everything went great!  Check for
             # this first.
             if http_status == 304:
-                print "%s: No changes to feed." % (self.key,)
-                mainthread.execute (wc_success, async=True)
+                logger.debug("%s: No changes to feed." % (self.key,))
+                mainthread.execute(wc_success, async=True)
                 success = True
             elif len(tmp["entries"])==0 and not tmp.version:
                 # An error occured fetching or parsing the feed.  (Version
                 # will be either None if e.g. the connection timed our or
                 # '' if the data is not a proper feed)
-                print ("Error fetching %s: version is: %s: error: %s"
-                       % (url, str (tmp.version),
-                          str (tmp.get ('bozo_exception', 'Unknown error'))))
-                print tmp
-                if have_woodchuck:
-                    def e():
-                        print "%s: stream update failed!" % self.key
+                logger.error(
+                    "Error fetching %s: version is: %s: error: %s"
+                    % (url, str (tmp.version),
+                       str (tmp.get ('bozo_exception', 'Unknown error'))))
+                logger.debug(tmp)
+                def register_stream_update_failed(http_status):
+                    def doit():
+                        logger.debug("%s: stream update failed!" % self.key)
     
                         try:
                             # It's not easy to get the feed's title from here.
@@ -229,7 +283,12 @@ class Feed:
                         if 500 <= http_status and http_status < 600:
                             ec = woodchuck.TransferStatus.TransientNetwork
                         wc()[self.key].update_failed(ec)
-                    mainthread.execute (e, async=True)
+                    return doit
+                if wc().available:
+                    mainthread.execute(
+                        register_stream_update_failed(
+                            http_status=http_status),
+                        async=True)
             else:
                currentTime = time.time()
                # The etag and modified value should only be updated if the content was not null
@@ -251,8 +310,8 @@ class Feed:
                    outf.close()
                    del data
                except (urllib2.HTTPError, urllib2.URLError), exception:
-                   print ("Could not download favicon %s: %s"
-                          % (abs_url, str (exception)))
+                   logger.debug("Could not download favicon %s: %s"
+                                % (abs_url, str (exception)))
     
                self.serial_execution_lock.acquire ()
                have_serial_execution_lock = True
@@ -306,8 +365,10 @@ class Feed:
                        self.serial_execution_lock.release ()
                        have_serial_execution_lock = False
                        for img in images:
-                            filename = self.addImage(configdir, self.key, baseurl, img['src'], proxy=proxy)
-                            if filename:
+                           filename = self.addImage(
+                               configdir, self.key, baseurl, img['src'],
+                               opener=opener)
+                           if filename:
                                 img['src']="file://%s" %filename
                                 count = self.db.execute("SELECT count(1) FROM images where id=? and imagePath=?;", (id, filename )).fetchone()[0]
                                 if count == 0:
@@ -317,9 +378,8 @@ class Feed:
                                 try:
                                     object_size += os.path.getsize (filename)
                                 except os.error, exception:
-                                    print ("Error getting size of %s: %s"
-                                           % (filename, exception))
-                                    pass
+                                    logger.error ("Error getting size of %s: %s"
+                                                  % (filename, exception))
                        self.serial_execution_lock.acquire ()
                        have_serial_execution_lock = True
     
@@ -352,38 +412,57 @@ class Feed:
     
                    # Register the object with Woodchuck and mark it as
                    # downloaded.
-                   if have_woodchuck:
-                       def e():
+                   def register_object_transferred(
+                           id, title, publication_time,
+                           sent, received, object_size):
+                       def doit():
+                           logger.debug("Registering transfer of object %s"
+                                        % title)
                            try:
                                obj = wc()[self.key].object_register(
                                    object_identifier=id,
-                                   human_readable_name=tmpEntry["title"])
+                                   human_readable_name=title)
                            except woodchuck.ObjectExistsError:
                                obj = wc()[self.key][id]
                            else:
-                               # If the entry does not contain a publication
-                               # time, the attribute won't exist.
-                               pubtime = entry.get ('date_parsed', None)
-                               if pubtime:
-                                   obj.publication_time = time.mktime (pubtime)
-        
-                               received = (progress_handler.stats['received']
-                                           - received_base)
-                               sent = progress_handler.stats['sent'] - sent_base
-                               obj.transferred (
-                                   indicator=(woodchuck.Indicator.ApplicationVisual
-                                              |woodchuck.Indicator.StreamWide),
+                               obj.publication_time = publication_time
+                               obj.transferred(
+                                   indicator=(
+                                       woodchuck.Indicator.ApplicationVisual
+                                       |woodchuck.Indicator.StreamWide),
                                    transferred_down=received,
                                    transferred_up=sent,
                                    object_size=object_size)
-                       mainthread.execute(e, async=True)
+                       return doit
+                   if wc().available:
+                       # If the entry does not contain a publication
+                       # time, the attribute won't exist.
+                       pubtime = entry.get('date_parsed', None)
+                       if pubtime:
+                           publication_time = time.mktime (pubtime)
+                       else:
+                           publication_time = None
+
+                       sent = progress_handler.stats['sent'] - sent_base
+                       received = (progress_handler.stats['received']
+                                   - received_base)
+
+                       mainthread.execute(
+                           register_object_transferred(
+                               id=id,
+                               title=tmpEntry["title"],
+                               publication_time=publication_time,
+                               sent=sent, received=received,
+                               object_size=object_size),
+                           async=True)
                self.db.commit()
 
-               print ("%s: Update successful: transferred: %d/%d; objects: %d)"
-                      % (self.key,
-                         progress_handler.stats['sent'],
-                         progress_handler.stats['received'],
-                         len (tmp.entries)))
+               logger.debug (
+                   "%s: Update successful: transferred: %d/%d; objects: %d)"
+                   % (self.key,
+                      progress_handler.stats['sent'],
+                      progress_handler.stats['received'],
+                      len (tmp.entries)))
                mainthread.execute (wc_success, async=True)
                success = True
 
@@ -415,30 +494,27 @@ class Feed:
                         #
                     except OSError, exception:
                         #
-                        print 'Could not remove %s: %s' % (file, str (exception))
-            print ("updated %s: %fs in download, %fs in processing"
-                   % (self.key, download_duration,
-                      time.time () - process_start))
+                        logger.error('Could not remove %s: %s'
+                                     % (file, str (exception)))
+            logger.debug("updated %s: %fs in download, %fs in processing"
+                         % (self.key, download_duration,
+                            time.time () - process_start))
         except:
-            print "Updating %s: %s" % (self.key, sys.exc_info()[0])
-            traceback.print_exc()
+            logger.error("Updating %s: %s" % (self.key, traceback.format_exc()))
         finally:
             self.db.commit ()
 
             if have_serial_execution_lock:
                 self.serial_execution_lock.release ()
 
-            if update_lock is not None:
-                release_lock (update_lock)
-
             updateTime = 0
             try:
                 rows = self.db.execute("SELECT MAX(date) FROM feed;")
                 for row in rows:
                     updateTime=row[0]
-            except:
-                print "Fetching update time."
-                traceback.print_exc()
+            except Exception, e:
+                logger.error("Fetching update time: %s: %s"
+                             % (str(e), traceback.format_exc()))
             finally:
                 if not success:
                     etag = None
@@ -456,12 +532,13 @@ class Feed:
         self.db.execute("UPDATE feed SET read=1 WHERE id=?;", (id,) )
         self.db.commit()
 
-        def e():
-            if wc().available():
-                try:
-                    wc()[self.key][id].used()
-                except KeyError:
-                    pass
+        def doit():
+            try:
+                wc()[self.key][id].used()
+            except KeyError:
+                pass
+        if wc().available():
+            mainthread.execute(doit, async=True)
 
     def setEntryUnread(self, id):
         self.db.execute("UPDATE feed SET read=0 WHERE id=?;", (id,) )
@@ -532,15 +609,17 @@ class Feed:
         #ids.reverse()
         return ids
     
-    def getNextId(self, id):
+    def getNextId(self, id, forward=True):
+        if forward:
+            delta = 1
+        else:
+            delta = -1
         ids = self.getIds()
         index = ids.index(id)
-        return ids[(index+1)%len(ids)]
+        return ids[(index + delta) % len(ids)]
         
     def getPreviousId(self, id):
-        ids = self.getIds()
-        index = ids.index(id)
-        return ids[(index-1)%len(ids)]
+        return self.getNextId(id, forward=False)
     
     def getNumberOfUnreadItems(self):
         return self.db.execute("SELECT count(*) FROM feed WHERE read=0;").fetchone()[0]
@@ -608,20 +687,20 @@ class Feed:
             try:
                 remove(contentLink)
             except OSError, exception:
-                print "Deleting %s: %s" % (contentLink, str (exception))
+                logger.error("Deleting %s: %s" % (contentLink, str (exception)))
         self.db.execute("DELETE FROM feed WHERE id=?;", (id,) )
         self.db.execute("DELETE FROM images WHERE id=?;", (id,) )
         self.db.commit()
 
-        def e():
-            if wc().available():
-                try:
-                    wc()[self.key][id].files_deleted (
-                        woodchuck.DeletionResponse.Deleted)
-                    del wc()[self.key][id]
-                except KeyError:
-                    pass
-        mainthread.execute (e, async=True)
+        def doit():
+            try:
+                wc()[self.key][id].files_deleted (
+                    woodchuck.DeletionResponse.Deleted)
+                del wc()[self.key][id]
+            except KeyError:
+                pass
+        if wc().available():
+            mainthread.execute (doit, async=True)
  
 class ArchivedArticles(Feed):    
     def addArchivedArticle(self, title, link, date, configdir):
@@ -719,8 +798,9 @@ class Listing:
 
         # Check that Woodchuck's state is up to date with respect our
         # state.
-        wc_init (self)
-        if wc().available():
+        updater = os.path.basename(sys.argv[0]) == 'update_feeds.py'
+        wc_init (self, True if updater else False)
+        if wc().available() and updater:
             # The list of known streams.
             streams = wc().streams_list ()
             stream_ids = [s.identifier for s in streams]
@@ -732,8 +812,9 @@ class Listing:
                 # XXX: We should also check whether the list of
                 # articles/objects in each feed/stream is up to date.
                 if key not in stream_ids:
-                    print ("Registering previously unknown channel: %s (%s)"
-                           % (key, title,))
+                    logger.debug(
+                        "Registering previously unknown channel: %s (%s)"
+                        % (key, title,))
                     # Use a default refresh interval of 6 hours.
                     wc().stream_register (key, title, 6 * 60 * 60)
                 else:
@@ -745,7 +826,7 @@ class Listing:
 
             # Unregister any streams that are no longer subscribed to.
             for id in stream_ids:
-                print ("Unregistering %s" % (id,))
+                logger.debug("Unregistering %s" % (id,))
                 w.stream_unregister (id)
 
     def importOldFormatFeeds(self):
@@ -785,7 +866,8 @@ class Listing:
                             pass
                 self.updateUnread(id)
             except:
-                traceback.print_exc()
+                logger.error("importOldFormatFeeds: %s"
+                             % (traceback.format_exc(),))
         remove(self.configdir+"feeds.pickle")
                 
         
@@ -844,6 +926,23 @@ class Listing:
                             (title, key))
         self.db.commit()
         self.updateUnread(key)
+
+        update_server_object().ArticleCountUpdated()
+
+        stats = JobManager().stats()
+        global jobs_at_start
+        completed = stats['jobs-completed'] - jobs_at_start
+        in_progress = stats['jobs-in-progress']
+        queued = stats['jobs-queued']
+
+        percent = (100 * ((completed + in_progress / 2.))
+                   / (completed + in_progress + queued))
+
+        update_server_object().UpdateProgress(
+            percent, completed, in_progress, queued, 0, 0, 0, key)
+
+        if in_progress == 0 and queued == 0:
+            jobs_at_start = stats['jobs-completed']
         
     def getFeed(self, key):
         if key == "ArchivedArticles":
@@ -861,11 +960,14 @@ class Listing:
             try:
                 wc()[key].human_readable_name = title
             except KeyError:
-                print "Feed %s (%s) unknown." % (key, title)
-                pass
+                logger.debug("Feed %s (%s) unknown." % (key, title))
         
     def getFeedUpdateTime(self, key):
-        return time.ctime(self.db.execute("SELECT updateTime FROM feeds WHERE id=?;", (key,)).fetchone()[0])
+        update_time = self.db.execute(
+            "SELECT updateTime FROM feeds WHERE id=?;", (key,)).fetchone()[0]
+        if not update_time:
+            return "Never"
+        return time.ctime(update_time)
         
     def getFeedNumberOfUnreadItems(self, key):
         return self.db.execute("SELECT unread FROM feeds WHERE id=?;", (key,)).fetchone()[0]
@@ -984,7 +1086,7 @@ class Listing:
             try:
                 del wc()[key]
             except KeyError:
-                print "Removing unregistered feed %s failed" % (key,)
+                logger.debug("Removing unregistered feed %s failed" % (key,))
 
         rank = self.db.execute("SELECT rank FROM feeds WHERE id=?;", (key,) ).fetchone()[0]
         self.db.execute("DELETE FROM feeds WHERE id=?;", (key, ))