Following something Christian Stocker brought up in Fetching a looot of feeds with PHP , this is a hack for an Rss aggregator with twisted to add Conditional GET support.
from twisted.internet import reactor, protocol, defer from twisted.web import client from twisted.web.client import HTTPPageGetter, HTTPClientFactory from twisted.python import failure from twisted.web import error import feedparser, time, sys import md5, os # You can get this file from http://xoomer.virgilio.it/dialtone/out.py # Since it's only a list of URLs made in this way: # out = [ ( 'URL', 'EXTRA_INFOS'), # ( 'URL', 'EXTRA_INFOS') # ... # ] import out try: import cStringIO as _StringIO except ImportError: import StringIO as _StringIO rss_feeds = out.rss_feed # This is the HUGE feed list (730 feeds) DEFERRED_GROUPS = 60 # Number of simultaneous connections INTER_QUERY_TIME = 300 # Max Age (in seconds) of each feed in the cache TIMEOUT = 30 # Timeout in seconds for the web request # This dict structure will be the following: # { 'URL': (TIMESTAMP, value) } cache = {} class ConditionalHTTPPageGetter(HTTPPageGetter): def handleStatus_200(self): # If we're good, try recording the last-modified header if self.headers.has_key('last-modified'): self.factory.lastModified(self.headers['last-modified']) def handleStatus_304(self): # Close connection self.factory.notModified() self.transport.loseConnection() class ConditionalHTTPClientFactory(HTTPClientFactory): protocol = ConditionalHTTPPageGetter def __init__(self, cacheDir, url, method='GET', postdata=None, headers=None, agent="Twisted ConditionalPageGetter", timeout=0, cookies=None, followRedirect=1): self.cachefile = cacheDir+os.path.sep+self.getHashForUrl(url) if os.path.exists(self.cachefile): lastModified = open(self.cachefile).readline().strip() if headers is not None: headers['last-modified'] = lastModified else: headers = {'last-modified': lastModified } HTTPClientFactory.__init__(self, url, method=method, postdata=postdata, headers=headers, agent=agent, timeout=timeout, cookies=cookies, followRedirect=followRedirect ) self.waiting = 1 self.deferred = defer.Deferred() def lastModified(self, modtime): f = open(self.cachefile,'w') f.write(modtime[0]) def getHashForUrl(self, url): hash = md5.new() hash.update(url) return hash.hexdigest() def notModified(): if self.waiting: self.waiting = 0 def conditionalGetPage(cacheDir, url, contextFactory=None, *args, **kwargs): scheme, host, port, path = client._parse(url) factory = ConditionalHTTPClientFactory(cacheDir, url, *args, **kwargs) if scheme == 'https': from twisted.internet import ssl if contextFactory is None: contextFactory = ssl.ClientContextFactory() reactor.connectSSL(host, port, factory, contextFactory) else: reactor.connectTCP(host, port, factory) return factory.deferred class FeederProtocol(object): def __init__(self): self.parsed = 1 self.with_errors = 0 self.error_list = [] def isCached(self, site): # Try to get the tuple (TIMESTAMP, FEED_STRUCT) from the dict if it has # already been downloaded. Otherwise assign None to already_got already_got = cache.get(site[0], None) # Ok guys, we got it cached, let's see what we will do if already_got: # Well, it's cached, but will it be recent enough? elapsed_time = time.time() - already_got[0] # Woooohooo it is, elapsed_time is less than INTER_QUERY_TIME so I # can get the page from the memory, recent enough if elapsed_time < INTER_QUERY_TIME: return True else: # Uhmmm... actually it's a bit old, I'm going to get it from the # Net then, then I'll parse it and then I'll try to memoize it # again return False else: # Well... We hadn't it cached in, so we need to get it from the Net # now, It's useless to check if it's recent enough, it's not there. return False def gotError(self, traceback, extra_args): # An Error as occurred, print traceback infos and go on print traceback, extra_args self.with_errors += 1 self.error_list.append(extra_args) print "="*20 print "Trying to go on..." def getPageFromMemory(self, data, key=None): # Getting the second element of the tuple which is the parsed structure # of the feed at address key, the first element of the tuple is the # timestamp print "Getting from memory..." return defer.succeed(cache.get(key,key)[1]) def parseFeed(self, feed): # This is self explaining :) print "parsing..." try: feed+'' parsed = feedparser.parse(_StringIO.StringIO(feed)) except TypeError: parsed = feedparser.parse(_StringIO.StringIO(str(feed))) print "parsed feed" return parsed def memoize(self, feed, addr): # feed is the raw structure, just as returned from feedparser.parse() # while addr is the address from which the feed was got. print "Memoizing",addr,"..." if cache.get(addr, None): cache[addr] = (time.time(), feed) else: cache.setdefault(addr, (time.time(),feed)) return feed def workOnPage(self, parsed_feed, addr): # As usual, addr is the feed address and file is the file in # which you can eventually save the structure. print "-"*20 print "finished retrieving" print "Feed Version:",parsed_feed.get('version','Unknown') # # Uncomment the following if you want to print the feeds # chan = parsed_feed.get('channel', None) if chan: print chan.get('title', '') print chan.get('link', '') #print chan.get('tagline', '') print chan.get('description','') print "-"*20 items = parsed_feed.get('items', None) if items: for item in items: print '\tTitle: ', item.get('title','') print '\tDate: ', item.get('date', '') print '\tLink: ', item.get('link', '') print '\tDescription: ', item.get('description', '') print '\tSummary: ', item.get('summary','') print "-"*20 print "got",addr print "="*40 return parsed_feed def stopWorking(self, data=None): print "Closing connection number %d..."%(self.parsed,) print "=-"*20 # This is here only for testing. When a protocol/interface will be # created to communicate with this rss-aggregator server, we won't need # to die after we parsed some feeds just one time. self.parsed += 1 print self.parsed, self.END_VALUE if self.parsed > self.END_VALUE: # print "Closing all..." # for i in self.error_list: # Just for testing sake print i # print len(self.error_list) # reactor.stop() # def getPage(self, data, args): return conditionalGetPage(self.cacheDir,args,timeout=TIMEOUT) def printStatus(self, data=None): print "Starting feed group..." def start(self, data=None, cacheDir='.', std_alone=True): self.cacheDir = cacheDir d = defer.succeed(self.printStatus()) for feed in data: # Now we start telling the reactor that it has # to get all the feeds one by one... cached = self.isCached(feed) if not cached: # When the feed is not cached, it's time to # go and get it from the web directly d.addCallback(self.getPage, feed[0]) d.addErrback(self.gotError, (feed[0], 'getting')) # Parse the feed and if there's some errors call self.gotError d.addCallback(self.parseFeed) d.addErrback(self.gotError, (feed[0], 'parsing')) # Now memoize it, if there's some error call self.getError d.addCallback(self.memoize, feed[0]) d.addErrback(self.gotError, (feed[0], 'memoizing')) else: # If it's cached d.addCallback(self.getPageFromMemory, feed[0]) d.addErrback(self.gotError, (feed[0], 'getting from memory')) # When you get the raw structure you can work on it # to format in the best way you can think of. # For any error call self.gotError. d.addCallback(self.workOnPage, feed[0]) d.addErrback(self.gotError, (feed[0], 'working on page')) # And when the for loop is ended we put # stopWorking on the callback for the last # feed gathered # This is only for testing purposes if std_alone: d.addCallback(self.stopWorking) d.addErrback(self.gotError, (feed[0], 'while stopping')) if not std_alone: return d class FeederFactory(protocol.ClientFactory): protocol = FeederProtocol() def __init__(self, cacheDir, std_alone=False): self.feeds = self.getFeeds() self.cacheDir = cacheDir self.std_alone = std_alone self.protocol.factory = self self.protocol.END_VALUE = len(self.feeds) # this is just for testing if std_alone: self.start(self.feeds) def start(self, addresses): # Divide into groups all the feeds to download if len(addresses) > DEFERRED_GROUPS: url_groups = [[] for x in xrange(DEFERRED_GROUPS)] for i, addr in enumerate(addresses): url_groups[i%DEFERRED_GROUPS].append(addr) else: url_groups = [[addr] for addr in addresses] for group in url_groups: if not self.std_alone: return self.protocol.start(group, self.cacheDir, self.std_alone) else: self.protocol.start(group, self.cacheDir, self.std_alone) def getFeeds(self, where=None): # This is used when you call a COMPLETE refresh of the feeds, # or for testing purposes #print "getting feeds" # This is to get the feeds we want if not where: # We don't have a database, then we use the local # variabile rss_feeds return rss_feeds else: return None if __name__=="__main__": f = FeederFactory(cacheDir='.',std_alone=True) reactor.run()
Right now this is barely tested - no error messages but does it really work?