There are multiple reasons why someone would like to add the processing of RSS feeds as part of their overall big data analytics solution. It could be, for example, to complement the information already in your system. IBM Streams is an ideal tool to ingest and process this information.

It would be very tempting to simply write a new Java operator to do the work but it is not necessary. The IBM Streams product includes pre-built operators that can be used for this processing. In this article, we show a simple processing topology to do this. Since, RSS feed documents are in HTML format, we also include the XML processing that can be used to extract the exact fields you are interested in.

The overall processing graph is as follows:

RssFeed

The InetSource_1 operator (based on the InetSource operator) reads a number of RSS feeds and passes the result to the XMLParse_5 operator that converts each HTML document into its equivalent tuple representation. It is then passed to the Projection operator that extracts some of the fields before passing them on to the FileSink_1 operator that writes each document to a different file. The rest of this article goes into more details on the use of each operator.

The InetSource operator

This operator is found in the com.ibm.streamsx.inet toolkit. Here, the InetSource operator accesses 10 computerworld RSS feeds. By default, the operator returns its result one line at a time. To make it easier to process, we change the default value to a large number so each line returned represents the result of one feed. The SPL code is:

(stream<rss_channel_type> XMLParse_5_out0) as InetSource_1 = InetSource()
{
  param
    URIList : [ "http://www.computerworld.com/category/big-data/index.rss",
      "http://www.computerworld.com/category/cloud-computing/index.rss",
      "http://www.computerworld.com/category/data-center/index.rss",
      "http://www.computerworld.com/category/emerging-technology/index.rss",
      "http://www.computerworld.com/category/enterprise-applications/index.rss",
      "http://www.computerworld.com/category/it-management/index.rss",
      "http://www.computerworld.com/category/mobile-wireless/index.rss",
      "http://www.computerworld.com/category/networking/index.rss",
      "http://www.computerworld.com/category/operating-systems/index.rss",
      "http://www.computerworld.com/category/vertical-it/index.rss" ] ;
    fetchInterval : 60.0 ;
    incrementalFetch : false ;
    inputLinesPerRecord : 1000000u ;
}

Please consult the IBM Streams documentation for all the information on the available parameters. This example only shows a few of them.

  • fetchInterval: how often the feeds are read. In this case, it is set to sixty seconds.
  • incrementalFetch: the value is set to “false” to indicate that the data should be retrieved, no matter what, at each interval.
  • inputLinesPerRecord: By default, a record contains only one line. By setting this value at a high number, we insure that each record is the entire content on one feed. This makes it easier to process later.

Each record submitted by the InetSource operator represents the content of an RSS feed in HTML format. Since HTML is a subset of XML, we can use the XMLParse operator to convert the HTML document into a tuple structure.

The XMLParse operator

We can use the XML parse to implicitly match the content of the document to a tuple structure. Since input document can be quite complex, we can use a utility program available with IBM Streams to generate the type definitions for us from an actual document. The easiest way to do this is to execute the job containing the InetSource operator and write the output to a file. Then create a new file containing only the first line of that file since each line represents one feed entry.

Using the resulting file, we can execute the spl-schema-from-xml utility that comes with IBM Streams to generate the code. For this example, the following command was used:

spl-schema-from-xml --flatten="elements" --ignoreNamespaces --trigger="/rss/channel" sample.out --outfile=xxx

This indicates that we want to generate the code for the rss/channel element and put the resulting type definition in the file xxx. The —flatten="elements" simply allows for shorter name in some parts of the result. The resulting SPL type definition is:

type
    rss_channel_type = tuple<rstring title, rstring link, rstring description, rstring language, rstring pubDate, rstring lastBuildDate, list<rss_channel_image_type> image, list<rss_channel_item_type> item>;
    rss_channel_title_type = tuple<rstring _text>;
    rss_channel_link_type = tuple<rstring _text>;
    rss_channel_description_type = tuple<list<rstring>[1] _text>;
    rss_channel_language_type = tuple<rstring _text>;
    rss_channel_pubDate_type = tuple<rstring _text>;
    rss_channel_lastBuildDate_type = tuple<rstring _text>;
    rss_channel_image_type = tuple<rstring url, rstring title, rstring link, rstring width, rstring height>;
    rss_channel_image_url_type = tuple<rstring _text>;
    rss_channel_image_title_type = tuple<rstring _text>;
    rss_channel_image_link_type = tuple<rstring _text>;
    rss_channel_image_width_type = tuple<rstring _text>;
    rss_channel_image_height_type = tuple<rstring _text>;
    rss_channel_item_type = tuple<rstring title, rstring pubDate, rstring author, rstring creator, rstring description, rstring link, rss_channel_item_thumbnail_type thumbnail, rss_channel_item_content_type content, rss_channel_item_categories_type categories>;
    rss_channel_item_title_type = tuple<rstring _text>;
    rss_channel_item_pubDate_type = tuple<rstring _text>;
    rss_channel_item_author_type = tuple<rstring _text>;
    rss_channel_item_creator_type = tuple<rstring _text>;
    rss_channel_item_description_type = tuple<rstring _text>;
    rss_channel_item_link_type = tuple<rstring _text>;
    rss_channel_item_thumbnail_type = tuple<map<rstring, rstring> _attrs>;
    rss_channel_item_content_type = tuple<map<rstring, rstring> _attrs>;
    rss_channel_item_categories_type = tuple<list<rstring> category>;
    rss_channel_item_categories_category_type = tuple<rstring _text>;

The SPL code to process the tuples into the appropriate structure is:

(stream<rss_channel_type> XMLParse_5_out0) as  XMLParse_5 = XMLParse(InetSource_1_out0)
{
  param
    trigger : "/rss/channel" ;
    xmlInput : txt ;
    flatten : elements ;
}

It is important to notice that output stream is only specified as the rss_channel_type. This indicates that the schema of the output schema is of type rss_channel_type.

The parameters used are:

  • trigger: use to match the desired element. It has to be the same as the one used in the spl-schema-from-xml command.
  • xmlInput: this simply indicated the name of the input attribute that contains the text to process
  • flatten: use a simpler naming convention for element names. Again, this has to match the option used with spl-schema-from-xml

Projection: Custom operator

In this example code, we collect the classification of each article into a comma delimited string. It may be possible to do it with another operator but Custom was used here. This shows the manipulation of the XMLParse resulting tuple type:

(stream<rstring title, rstring author, rstring link, rstring categories> Custom_2_out0) as Projection =
       Custom(XMLParse_5_out0 as inputStream)
{
  logic
    state :
    {
      mutable rstring categories ;
    }
  onTuple inputStream : {
    for(rss_channel_item_type article in item)
    {
      categories = "" ;
      for(rstring cat in article.categories.category)
      {
        if(length(categories) > 0) categories = categories + ", " ;
        categories = categories + cat ;
      }
      submit({ title = article.title, author = article.author,
      link = article.link, categories = categories }, Custom_2_out0) ;
    }
  }
}

The content of this operator should be easy to understand. The reason to include it here is simply to show how the elements in the rss_channel_type are accessed.

The FileSink operator

The FileSink operator here is not important. Still, for completeness, here is the code:

() as FileSink_1 = FileSink(Custom_2_out0 as inputStream)
{
  param
    file : "File_{id}.out" ;
    flush : 1u ;
    quoteStrings : false ;
    format : csv ;
    separator : "|" ;
}

For each tuple sent to FileSink, a new filename is generated. This is just one of the niceties of this operator. Please consult the documentation for more information.

Conclusion

This article showed how to process RSS feeds and convert HTML results into tuples for further processing. The amount of code is short and easy to understand making it easy to maintain. IBM Streams is an ideal tool to ingest RSS feeds and do any pre-processing or analytics, including text analytics, that is necessary for your business needs.

Join The Discussion