Add support for Woodchuck.
authorNeal H. Walfield <neal@walfield.org>
Sat, 23 Jul 2011 09:13:47 +0000 (11:13 +0200)
committerNeal H. Walfield <neal@walfield.org>
Sun, 31 Jul 2011 19:58:14 +0000 (21:58 +0200)
Makefile
src/jobmanager.py
src/rss_sqlite.py
src/wc.py [new file with mode: 0644]

index 7d2ca79..f322164 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -31,6 +31,10 @@ install:
        install src/aboutdialog.py ${DESTDIR}/opt/FeedingIt
        install src/rss_sqlite.py ${DESTDIR}/opt/FeedingIt
        install src/style.py ${DESTDIR}/opt/FeedingIt
+       install src/mainthread.py ${DESTDIR}/opt/FeedingIt
+       install src/jobmanager.py ${DESTDIR}/opt/FeedingIt
+       install src/httpprogresshandler.py ${DESTDIR}/opt/FeedingIt
+       install src/wc.py ${DESTDIR}/opt/FeedingIt
        install -d ${DESTDIR}/usr/share/applications/hildon
        install src/FeedingIt.desktop ${DESTDIR}/usr/share/applications/hildon
        install -d ${DESTDIR}/usr/share/icons/hicolor/48x48/apps/
index 8ca4df8..42c7441 100644 (file)
@@ -40,7 +40,10 @@ class JobRunner(threading.Thread):
         have_lock = True
         self.job_manager.lock.acquire ()
         try:
-            while self.job_manager.pause == 0 and not self.job_manager.do_quit:
+            while (self.job_manager.pause == 0
+                   and not self.job_manager.do_quit
+                   and (len (self.job_manager.threads)
+                        <= self.job_manager.num_threads)):
                 try:
                     _, key, job = heapq.heappop (self.job_manager.queue)
                 except IndexError:
@@ -127,7 +130,7 @@ class _JobManager(object):
         self.started = started
 
         # The maximum number of threads to use for executing jobs.
-        self.num_threads = num_threads
+        self._num_threads = num_threads
 
         # List of jobs (priority, key, job) that are queued for
         # execution.
@@ -159,6 +162,13 @@ class _JobManager(object):
                 self.lock.release()
         return wrapper
 
+    def get_num_threads(self):
+        return self._num_threads
+    def set_num_threads(self, value):
+        self._num_threads = value
+        self.tickle ()
+    num_threads = property(get_num_threads, set_num_threads)
+
     @_lock
     def start(self):
         """
index e8b44a2..b3dc2b0 100644 (file)
@@ -39,9 +39,13 @@ from calendar import timegm
 from updatedbus import get_lock, release_lock
 import threading
 import traceback
+from wc import wc, wc_init
+import woodchuck
 from jobmanager import JobManager
 import mainthread
 from httpprogresshandler import HTTPProgressHandler
+import random
+import sys
 
 def getId(string):
     return md5.new(string).hexdigest()
@@ -101,7 +105,7 @@ class Feed:
                 outf.write(f.read())
                 f.close()
                 outf.close()
-            except (urllib2.HTTPError, urllib2.URLError), exception:
+            except (urllib2.HTTPError, urllib2.URLError, IOError), exception:
                 print ("Could not download image %s: %s"
                        % (abs_url, str (exception)))
                 return None
@@ -109,7 +113,6 @@ class Feed:
                 exception = sys.exc_info()[0]
 
                 print "Downloading image: %s" % abs_url
-                import traceback
                 traceback.print_exc()
 
                 try:
@@ -165,6 +168,28 @@ class Feed:
             expiry = float(expiryTime) * 3600.
     
             currentTime = 0
+    
+            have_woodchuck = mainthread.execute (wc().available)
+
+            def wc_success():
+                try:
+                    wc().stream_register (self.key, "", 6 * 60 * 60)
+                except woodchuck.ObjectExistsError:
+                    pass
+                try:
+                    wc()[self.key].updated (
+                        indicator=(woodchuck.Indicator.ApplicationVisual
+                                   |woodchuck.Indicator.StreamWide),
+                        transferred_down=progress_handler.stats['received'],
+                        transferred_up=progress_handler.stats['sent'],
+                        transfer_time=download_start,
+                        transfer_duration=download_duration,
+                        new_objects=len (tmp.entries),
+                        objects_inline=len (tmp.entries))
+                except KeyError:
+                    print "Failed to register update with woodchuck!"
+                    pass
+    
             http_status = tmp.get ('status', 200)
     
             # Check if the parse was succesful.  If the http status code
@@ -174,6 +199,8 @@ class Feed:
             # parse fails.  But really, everything went great!  Check for
             # this first.
             if http_status == 304:
+                print "%s: No changes to feed." % (self.key,)
+                mainthread.execute (wc_success, async=True)
                 success = True
             elif len(tmp["entries"])==0 and not tmp.version:
                 # An error occured fetching or parsing the feed.  (Version
@@ -183,6 +210,26 @@ class Feed:
                        % (url, str (tmp.version),
                           str (tmp.get ('bozo_exception', 'Unknown error'))))
                 print tmp
+                if have_woodchuck:
+                    def e():
+                        print "%s: stream update failed!" % self.key
+    
+                        try:
+                            # It's not easy to get the feed's title from here.
+                            # At the latest, the next time the application is
+                            # started, we'll fix up the human readable name.
+                            wc().stream_register (self.key, "", 6 * 60 * 60)
+                        except woodchuck.ObjectExistsError:
+                            pass
+                        ec = woodchuck.TransferStatus.TransientOther
+                        if 300 <= http_status and http_status < 400:
+                            ec = woodchuck.TransferStatus.TransientNetwork
+                        if 400 <= http_status and http_status < 500:
+                            ec = woodchuck.TransferStatus.FailureGone
+                        if 500 <= http_status and http_status < 600:
+                            ec = woodchuck.TransferStatus.TransientNetwork
+                        wc()[self.key].update_failed(ec)
+                    mainthread.execute (e, async=True)
             else:
                currentTime = time.time()
                # The etag and modified value should only be updated if the content was not null
@@ -303,9 +350,41 @@ class Feed:
 #                       except:
 #                           pass
     
+                   # Register the object with Woodchuck and mark it as
+                   # downloaded.
+                   if have_woodchuck:
+                       def e():
+                           try:
+                               obj = wc()[self.key].object_register(
+                                   object_identifier=id,
+                                   human_readable_name=tmpEntry["title"])
+                           except woodchuck.ObjectExistsError:
+                               obj = wc()[self.key][id]
+                           else:
+                               # If the entry does not contain a publication
+                               # time, the attribute won't exist.
+                               pubtime = entry.get ('date_parsed', None)
+                               if pubtime:
+                                   obj.publication_time = time.mktime (pubtime)
         
+                               received = (progress_handler.stats['received']
+                                           - received_base)
+                               sent = progress_handler.stats['sent'] - sent_base
+                               obj.transferred (
+                                   indicator=(woodchuck.Indicator.ApplicationVisual
+                                              |woodchuck.Indicator.StreamWide),
+                                   transferred_down=received,
+                                   transferred_up=sent,
+                                   object_size=object_size)
+                       mainthread.execute(e, async=True)
                self.db.commit()
 
+               print ("%s: Update successful: transferred: %d/%d; objects: %d)"
+                      % (self.key,
+                         progress_handler.stats['sent'],
+                         progress_handler.stats['received'],
+                         len (tmp.entries)))
+               mainthread.execute (wc_success, async=True)
                success = True
 
             rows = self.db.execute("SELECT id FROM feed WHERE (read=0 AND updated<?) OR (read=1 AND updated<?);", (currentTime-2*expiry, currentTime-expiry))
@@ -331,6 +410,7 @@ class Feed:
                         #
                         #print 'Removing', file
                         #
+                        # XXX: Tell woodchuck.
                         remove(file) # commented out for testing
                         #
                     except OSError, exception:
@@ -339,6 +419,9 @@ class Feed:
             print ("updated %s: %fs in download, %fs in processing"
                    % (self.key, download_duration,
                       time.time () - process_start))
+        except:
+            print "Updating %s: %s" % (self.key, sys.exc_info()[0])
+            traceback.print_exc()
         finally:
             self.db.commit ()
 
@@ -355,7 +438,6 @@ class Feed:
                     updateTime=row[0]
             except:
                 print "Fetching update time."
-                import traceback
                 traceback.print_exc()
             finally:
                 if not success:
@@ -368,7 +450,14 @@ class Feed:
     def setEntryRead(self, id):
         self.db.execute("UPDATE feed SET read=1 WHERE id=?;", (id,) )
         self.db.commit()
-        
+
+        def e():
+            if wc().available():
+                try:
+                    wc()[self.key][id].used()
+                except KeyError:
+                    pass
+
     def setEntryUnread(self, id):
         self.db.execute("UPDATE feed SET read=0 WHERE id=?;", (id,) )
         self.db.commit()     
@@ -502,6 +591,16 @@ class Feed:
         self.db.execute("DELETE FROM feed WHERE id=?;", (id,) )
         self.db.execute("DELETE FROM images WHERE id=?;", (id,) )
         self.db.commit()
+
+        def e():
+            if wc().available():
+                try:
+                    wc()[self.key][id].files_deleted (
+                        woodchuck.DeletionResponse.Deleted)
+                    del wc()[self.key][id]
+                except KeyError:
+                    pass
+        mainthread.execute (e, async=True)
  
 class ArchivedArticles(Feed):    
     def addArchivedArticle(self, title, link, date, configdir):
@@ -597,6 +696,37 @@ class Listing:
         except:
             pass
 
+        # Check that Woodchuck's state is up to date with respect our
+        # state.
+        wc_init (self)
+        if wc().available():
+            # The list of known streams.
+            streams = wc().streams_list ()
+            stream_ids = [s.identifier for s in streams]
+
+            # Register any unknown streams.  Remove known streams from
+            # STREAMS_IDS.
+            for key in self.getListOfFeeds():
+                title = self.getFeedTitle(key)
+                # XXX: We should also check whether the list of
+                # articles/objects in each feed/stream is up to date.
+                if key not in stream_ids:
+                    print ("Registering previously unknown channel: %s (%s)"
+                           % (key, title,))
+                    # Use a default refresh interval of 6 hours.
+                    wc().stream_register (key, title, 6 * 60 * 60)
+                else:
+                    # Make sure the human readable name is up to date.
+                    if wc()[key].human_readable_name != title:
+                        wc()[key].human_readable_name = title
+                    stream_ids.remove (key)
+                    
+
+            # Unregister any streams that are no longer subscribed to.
+            for id in stream_ids:
+                print ("Unregistering %s" % (id,))
+                w.stream_unregister (id)
+
     def importOldFormatFeeds(self):
         """This function loads feeds that are saved in an outdated format, and converts them to sqlite"""
         import rss
@@ -634,7 +764,6 @@ class Listing:
                             pass
                 self.updateUnread(id)
             except:
-                import traceback
                 traceback.print_exc()
         remove(self.configdir+"feeds.pickle")
                 
@@ -702,6 +831,13 @@ class Listing:
         else:
             self.db.execute("UPDATE feeds SET title=?, url=? WHERE id=?;", (title, url, key))
         self.db.commit()
+
+        if wc().available():
+            try:
+                wc()[key].human_readable_name = title
+            except KeyError:
+                print "Feed %s (%s) unknown." % (key, title)
+                pass
         
     def getFeedUpdateTime(self, key):
         return time.ctime(self.db.execute("SELECT updateTime FROM feeds WHERE id=?;", (key,)).fetchone()[0])
@@ -793,6 +929,14 @@ class Listing:
             self.db.commit()
             # Ask for the feed object, it will create the necessary tables
             self.getFeed(id)
+
+            if wc().available():
+                # Register the stream with Woodchuck.  Update approximately
+                # every 6 hours.
+                wc().stream_register(stream_identifier=id,
+                                     human_readable_name=title,
+                                     freshness=6*60*60)
+
             return True
         else:
             return False
@@ -808,6 +952,12 @@ class Listing:
         self.db.commit()
     
     def removeFeed(self, key):
+        if wc().available ():
+            try:
+                del wc()[key]
+            except KeyError:
+                print "Removing unregistered feed %s failed" % (key,)
+
         rank = self.db.execute("SELECT rank FROM feeds WHERE id=?;", (key,) ).fetchone()[0]
         self.db.execute("DELETE FROM feeds WHERE id=?;", (key, ))
         self.db.execute("UPDATE feeds SET rank=rank-1 WHERE rank>?;", (rank,) )
diff --git a/src/wc.py b/src/wc.py
new file mode 100644 (file)
index 0000000..15afc62
--- /dev/null
+++ b/src/wc.py
@@ -0,0 +1,80 @@
+# Copyright (c) 2011 Neal H. Walfield
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+import traceback
+
+# Don't fail if the Woodchuck modules are not available.  Just disable
+# Woodchuck's functionality.
+
+# Whether we imported the woodchuck modules successfully.
+woodchuck_imported = True
+try:
+    import pywoodchuck
+    from pywoodchuck import PyWoodchuck
+    from pywoodchuck import woodchuck
+except ImportError, exception:
+    print ("Unable to load Woodchuck modules: disabling Woodchuck support: %s"
+           % traceback.format_exc ())
+    woodchuck_imported = False
+    class PyWoodchuck (object):
+        def available():
+            return False
+    woodchuck = None
+
+# The default channel refresh interval: 6 hours.
+refresh_interval = 6 * 60 * 60
+
+class mywoodchuck (PyWoodchuck):
+    def __init__(self, listing, *args):
+        PyWoodchuck.__init__ (self, *args)
+
+        self.listing = listing
+
+    # Woodchuck upcalls.
+    def stream_update_cb(self, stream):
+        print ("stream update called on %s (%s)"
+               % (stream.human_readable_name, stream.identifier,))
+
+        # Make sure no one else is concurrently updating this
+        # feed.
+        try:
+            self.listing.updateFeed(stream.identifier)
+        except:
+            print ("Updating %s: %s"
+                   % (stream.identifier, traceback.format_exc ()))
+
+    def object_transfer_cb(self, stream, object,
+                           version, filename, quality):
+        log ("object transfer called on %s (%s) in stream %s (%s)"
+             % (object.human_readable_name, object.identifier,
+                stream.human_readable_name, stream.identifier))
+
+_w = None
+def wc_init(listing):
+    global _w
+    assert _w is None
+    
+    _w = mywoodchuck (listing, "FeedingIt", "org.maemo.feedingit")
+
+    if not woodchuck_imported or not _w.available ():
+        print "Unable to contact Woodchuck server."
+    else:
+        print "Woodchuck appears to be available."
+
+def wc():
+    """Connect to the woodchuck server and initialize any state."""
+    global _w
+    assert _w is not None
+    return _w