Parsing RSS

From IronPython Cookbook

Parsing XML with System.Xml.Linq.XDocument

See Parsing XML with XDocument.

Parsing XML with System.Xml.XmlDocument

There are a few great Python modules for parsing RSS. Unfortunately I hit issues with these on IronPython and decided to put something together. This little module needs some work and tidying up but is surprisingly robust (lots of real world testing almost daily!).

Look at Main for an example of usage.

import clr
clr.AddReference('System.Xml')

from System.Net import WebClient
from System.Xml import XmlDocument, XmlTextReader
from System.IO import StreamReader

class RSSFeedItem(object):
    def __init__(self):
        self.Title = ""
        self.Description = ""
        self.Link = ""
        self.GUID = ""

class RSSFeedFetcher(object):
    def __init__(self,URL):
        self.FeedURL = URL
        self.ChannelName = ""
        self.Items = []
        self.Loaded = True
        self.BadCount = 0
    
    def Fetch(self):
        try:
            xmlDoc = XmlDocument()
            webClient = WebClient()
            
            rssStream = webClient.OpenRead(self.FeedURL)
            textReader = StreamReader(rssStream)
            reader = XmlTextReader(textReader)
            xmlDoc = XmlDocument()
            xmlDoc.Load(reader)
            rssNode = xmlDoc.SelectSingleNode("rss")
            channelNodes = rssNode.ChildNodes
            
            for channelNode in channelNodes:
                itemNodes = channelNode.SelectNodes("item")
                
                self.ChannelName = channelNode.SelectSingleNode("title").InnerText
                
                for itemNode in itemNodes:
                    try:
                        newitem = RSSFeedItem()
                        newitem.Title = str(itemNode.SelectSingleNode("title").InnerText)
                        newitem.Description = str(itemNode.SelectSingleNode("description").InnerText)
                        newitem.Link = str(itemNode.SelectSingleNode("link").InnerText)
                        newitem.Channel = self
                        if itemNode.SelectSingleNode("guid"):
                            newitem.GUID = str(itemNode.SelectSingleNode("guid").InnerText)
                        self.Items.append(newitem)
                    except:
                        print self.Name + " Error \n", sys.exc_info()[0]
                        for line in sys.exc_info():
                            print line
                        self.BadCount = self.BadCount + 1
        except:
            print self.Name + " Error \n", sys.exc_info()[0]
            for line in sys.exc_info():
                print line
            self.Loaded = False
        finally:
            return self.Loaded

class RSSFeedSubscriptions(object):
    def __init__(self):
        self._Reset()
    def _Reset(self):
        self.Feeds = []
        self.AllItems = []
        self._GoodDownloads = 0
        self._BadDownloads = 0
    @property
    def ItemCount(self):
        return len(self.AllItems)
    @property
    def FeedCount(self):
        return len(self.Feeds)
    @property
    def Downloaded(self):
        return self._GoodDownloads
    @property
    def Problems(self):
        return self._BadDownloads
    def Load(self, filename):
        self._Reset()
        ListReader = StreamReader(filename)
        try:
            while ListReader.EndOfStream == False:
                URL = ListReader.ReadLine()
                feed = RSSFeedFetcher(URL)
                self.Feeds.append(feed)
                print feed.FeedURL
        except:
            print " Error \n", sys.exc_info()[0]
            for line in sys.exc_info():
                print line
        finally:
            ListReader.Close()
    def LoadFeed(self,URL):
        try:
            feed = RSSFeedFetcher(URL)
            self.Feeds.append(feed)
        except:
            print " Error \n", sys.exc_info()[0]
            for line in sys.exc_info():
                print line
    def FilterItemsByTitle(self):
        dupes = {}
        for story in self.AllItems:
            if story.Title not in dupes:
                self.AllItems.remove(story)
                dupes[story] = ""
    def FilterItemsByQuestion(self):
        for story in self.AllItems:
            if "?" in story.Title:
                print "LDRSS:Removed:" + story.Title
                self.AllItems.remove(story)
    def Download(self):
        for feed in self.Feeds:
            if feed.Fetch():
                self._GoodDownloads += 1
                self.AllItems.extend(feed.Items)
            else:
                self._BadDownloads += 1

if __name__ == "__main__":
    test = RSSFeedSubscriptions()
    test.LoadFeed("http://feeds.feedburner.com/DavyMitchellsMeanderingWeblog")
    test.Download()
    for item in test.AllItems:
        print item.Title
        print item.Description
        print item.Link


Back to Contents.

TOOLBOX
LANGUAGES