Colourful Norwich skyline illustration

Michael Sage

IT, Digital & Culture

An old plugin back to life!

Below is the check_rss.py script for pulling RSS feeds into your monitoring platform. I’m currently using ITOpenCOCKPIT, although I have used it with nagios before that.

You do need a couple of dependencies. On ubuntu 22.04 these are:

    • python3
    • python3-feedparser

check_rss.py

#!/usr/bin/python3

"""
check_rss - A simple Nagios plugin to check an RSS feed.
Created to monitor status of cloud services.

Requires feedparser and argparse python libraries

  python-feedparser

on Debian or Redhat based systems

If you find it useful, feel free to leave me a comment/email
at http://john.wesorick.com/2011/10/nagios-plugin-checkrss.html


Copyright 2011 John Wesorick (john.wesorick.com)

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program.  If not, see <http://www.gnu.org/licenses/>.
"""

import argparse
import datetime
import sys

import feedparser


def fetch_feed_last_entry(feed_url):
    """Fetch a feed from a given string"""

    try:
        myfeed = feedparser.parse(feed_url)
    except:
        output = "Could not parse URL (%s)" % feed_url
        exitcritical(output, "")

    if myfeed.bozo != 0:
        exitcritical("Malformed feed: %s" % (myfeed.bozo_exception), "")
    if myfeed.status != 200:
        exitcritical("Status %s - %s" % (myfeed.status, myfeed.feed.summary), "")

    # feed with 0 entries are good too
    if len(myfeed.entries) == 0:
        exitok("No news == good news", "")

    return myfeed.entries[0]
def main(argv=None):
    """Gather user input and start the check"""
    description = "A simple Nagios plugin to check an RSS feed."
    epilog = """notes: If you do not specify any warning or
 critical conditions, it will always return OK.
 This will only check the newest feed entry.

 Copyright 2011 John Wesorick (http://john.wesorick.com)"""
    version = "0.3"

    # Set up our arguments
    parser = argparse.ArgumentParser(description=description, epilog=epilog)

    parser.add_argument("--version", action="version", version=version)

    parser.add_argument(
        "-H",
        dest="rssfeed",
        help="URL of RSS feed to monitor",
        action="store",
        required=True,
    )

    parser.add_argument(
        "-c",
        "--criticalif",
        dest="criticalif",
        help="critical condition if PRESENT",
        action="store",
    )

    parser.add_argument(
        "-C",
        "--criticalnot",
        dest="criticalnot",
        help="critical condition if MISSING",
        action="store",
    )

    parser.add_argument(
        "-w",
        "--warningif",
        dest="warningif",
        help="warning condition if PRESENT",
        action="store",
    )

    parser.add_argument(
        "-W",
        "--warningnot",
        dest="warningnot",
        help="warning condition if MISSING",
        action="store",
    )

    parser.add_argument(
        "-T",
        "--hours",
        dest="hours",
        help="Hours since last post. "
        "Will return critical if less than designated amount.",
        action="store",
    )

    parser.add_argument(
        "-t",
        "--titleonly",
        dest="titleonly",
        help="Search the titles only. The default is to search "
        "for strings matching in either the title or description",
        action="store_true",
        default=False,
    )

    parser.add_argument(
        "-p",
        "--perfdata",
        dest="perfdata",
        help="If used will keep very basic performance data "
        "(0 if OK, 1 if WARNING, 2 if CRITICAL, 3 if UNKNOWN)",
        action="store_true",
        default=False,
    )

    parser.add_argument(
        "-v",
        "--verbosity",
        dest="verbosity",
        help="Verbosity level. 0 = Only the title and time is returned. "
        "1 = Title, time and link are returned. "
        "2 = Title, time, link and description are returned (Default)",
        action="store",
        default="2",
    )

    try:
        args = parser.parse_args()
    except:
        # Something didn't work. We will return an unknown.
        output = ": Invalid argument(s) {usage}".format(usage=parser.format_usage())
        exitunknown(output)

    perfdata = args.perfdata

    # Parse our feed, getting title, description and link of newest entry.
    rssfeed = args.rssfeed
    if rssfeed.find("http://") != 0 and rssfeed.find("https://") != 0:
        rssfeed = "http://{rssfeed}".format(rssfeed=rssfeed)

    # we have everything we need, let's start
    last_entry = fetch_feed_last_entry(rssfeed)
    feeddate = last_entry["updated_parsed"]
    title = last_entry["title"]
    description = last_entry["description"]
    link = last_entry["link"]

    # Get the difference in time from last post
    datetime_now = datetime.datetime.now()
    datetime_feeddate = datetime.datetime(
        *feeddate[:6]
    )  # http://stackoverflow.com/a/1697838/726716
    timediff = datetime_now - datetime_feeddate
    hourssinceposted = timediff.days * 24 + timediff.seconds / 3600

    # We will form our response here based on the verbosity levels. This makes the logic below a lot easier.
    if args.verbosity == "0":
        output = "Posted %s hrs ago ; %s" % (hourssinceposted, title)
    elif args.verbosity == "1":
        output = "Posted %s hrs ago ; Title: %s; Link: %s" % (
            hourssinceposted,
            title,
            link,
        )
    elif args.verbosity == "2":
        output = "Posted %s hrs ago ; Title: %s ; Description: %s ; Link: %s" % (
            hourssinceposted,
            title,
            description,
            link,
        )

    # Check for strings that match, resulting in critical status
    if args.criticalif:
        criticalif = args.criticalif.lower().split(",")
        for search in criticalif:
            if args.titleonly:
                if title.lower().find(search) >= 0:
                    exitcritical(output, perfdata)
            else:
                if (
                    title.lower().find(search) >= 0
                    or description.lower().find(search) >= 0
                ):
                    exitcritical(output, perfdata)

    # Check for strings that are missing, resulting in critical status
    if args.criticalnot:
        criticalnot = args.criticalnot.lower().split(",")
        for search in criticalnot:
            if args.titleonly:
                if title.lower().find(search) == -1:
                    exitcritical(output, perfdata)
            else:
                if (
                    title.lower().find(search) == -1
                    and description.lower().find(search) == -1
                ):
                    exitcritical(output, perfdata)

    # Check for time difference (in hours), resulting in critical status
    if args.hours:
        if int(hourssinceposted) <= int(args.hours):
            exitcritical(output, perfdata)

    # Check for strings that match, resulting in warning status
    if args.warningif:
        warningif = args.warningif.lower().split(",")
        for search in warningif:
            if args.titleonly:
                if title.lower().find(search) >= 0:
                    exitwarning(output, perfdata)
            else:
                if (
                    title.lower().find(search) >= 0
                    or description.lower().find(search) >= 0
                ):
                    exitwarning(output, perfdata)

    # Check for strings that are missing, resulting in warning status
    if args.warningnot:
        warningnot = args.warningnot.lower().split(",")
        for search in warningnot:
            if args.titleonly:
                if title.lower().find(search) == -1:
                    exitwarning(output, perfdata)
            else:

                if (
                    title.lower().find(search) == -1
                    and description.lower().find(search) == -1
                ):
                    exitwarning(output, perfdata)

    # If we made it this far, we must be ok
    exitok(output, perfdata)


def exitok(output, perfdata):
    if perfdata:
        print("OK - %s|'RSS'=0;1;2;0;2" % output)
    else:
        print("OK - %s" % output)
    sys.exit(0)


def exitwarning(output, perfdata):
    if perfdata:
        print("WARNING - %s|'RSS'=1;1;2;0;2" % output)
    else:
        print("WARNING - %s" % output)
    sys.exit(1)


def exitcritical(output, perfdata):
    if perfdata:
        print("CRITICAL - %s|'RSS'=2;1;2;0;2" % output)
    else:
       print("CRITICAL - %s" % output)
    sys.exit(2)


def exitunknown(output):
    sys.exit(3)


if __name__ == "__main__":
    result = main(sys.argv)
    sys.exit(result)