When Logstash And Syslog Go Wrong

This is another post triggered by writing The Art of Monitoring. You can join the mailing list on that site for further information and updates. See also my post on structured logging.

One of the challenges of centralized logging is that log formats blossom like umbrellas in cheap cocktails. One of the few apparent exceptions to this is Syslog. I mean it is governed by an RFC right? It’s a standard in logging right? At this point some of you in the know are sniggering: “Syslog has an RFC… Syslog standards… hahahahaha…”

So what’s so funny? Well Syslog IS a standard. It’s just that some of the log output that vendors create and then call Syslog doesn’t quite match that standard. Whilst:

include <syslog.h>

looks simple. It often isn’t executed perfectly.

These implementation failures aren’t representative of all vendors but given the wide-ranging use of Syslog across operating systems, network devices, data center infrastructure and applications subtle variants have been introduced. Indeed, many of these variants are pretty close to each other and the standard. Unfortunately, for a lot of log processing tools, even subtle differences can throw off their parsing of events.

Logstash and Syslog

We can use Logstash1 to demonstrate some of the challenges of Syslog variants. Logstash has an input plugin called syslog.

input {
  syslog { }
}
. . .

This plugin combines a TCP/UDP listener on port 514 and listens for RCF3164-compliant events like:

Sep 12 23:19:02 docker syslog-ng[25389]: syslog-ng starting up; version='3.5.3'

It parses incoming events using the following match:

"match" => { "message" => "<%{POSINT:priority}>%{SYSLOGLINE}"

With the SYSLOGLINE regular expression expanding to:

SYSLOGBASE2 (?:%{SYSLOGTIMESTAMP:timestamp}|%{TIMESTAMP_ISO8601:timestamp8601}) (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:
SYSLOGLINE %{SYSLOGBASE2} %{GREEDYDATA:message}

This works for a whole lot of Syslog output but if the output isn’t generating RFC-3164 compliant messages then Logstash will choke. Let’s take a little example. Here is a message from a to-remain-nameless firewall vendor.

Local7.Notice 10.0.1.2 id=firewall time='2007-05-11 14:08:53' fw=FGT-602038033716 pri=5 log_id=0000010002 type=traffic subtype=session SN=2170882 duration=20 rule=0 policyid=0 proto=icmp service=icmp status=accept src=10.0.1.96 srcname=10.0.1.96 dst=10.0.1.2 dstname=10.0.1.2 src_int=n/a dst_int=internal sent=30 rcvd=30 sent_pkt=1 rcvd_pkt=1 src_port=0 dst_port=0 vpn=n/a tran_ip=0.0.0.0 tran_port=0

And here is what happens when it hits our example Logstash syslog input configuration.

{
           "message" => "<5>Sep 14 23:29:24 tab: Local7.Notice 10.0.1.2 id=firewall time=2007-05-11 14:08:53 fw=FGT-602038033716 pri=5 log_id=0000010002 type=traffic subtype=session SN=2170882 duration=20 rule=0 policyid=0 proto=icmp service=icmp status=accept src=10.0.1.96 srcname=10.0.1.96 dst=10.0.1.2 dstname=10.0.1.2 src_int=n/a dst_int=internal sent=30 rcvd=30 sent_pkt=1 rcvd_pkt=1 src_port=0 dst_port=0 vpn=n/a tran_ip=0.0.0.0 tran_port=0\u0000",
          "@version" => "1",
        "@timestamp" => "2014-09-15T03:29:24.721Z",
              "host" => "96.126.127.108",
              "tags" => [
        [0] "_grokparsefailure"
    ],
          "priority" => 13,
          "severity" => 5,
          "facility" => 1,
    "facility_label" => "user-level",
    "severity_label" => "Notice"
}

You can see the dreaded _grokparsefailure error which means Logstash can’t “grok” the incoming log message because it is not what Logstash thinks a Syslog message should be.

Replacing the Syslog input

So what do we do about this and messages like it? Well at first glance, the syslog input is a pretty blunt instrument. Indeed the Logstash community has talked about removing it repeatedly over the last couple of years. However, as we’re going to discover, replacing the syslog input is actually pretty easy using a combination of some different plugins.

To make this replacement we need to handle two parts of the syslog input plugin’s behavior:

  1. The TCP/UDP listener.
  2. The message parsing.

Let’s start with replacing the network listener using the tcp and udp input plugins. We’d replace our existing syslog block in our Logstash configuration with:

input {
  tcp {
    port => 514
    type => syslog
  }
  udp {
    port => 514
    type => syslog
  }
}

Here we’ve specified two network input plugins. Both are configured to listen on port 514, the first via TCP and the second via UDP. Both assign incoming messages a type of syslog. This provides listeners on both protocols that are waiting for messages on the default Syslog port.

Next, let’s replace the parsing element of our syslog input plugin using a grok filter plugin.

filter {
  if [type] == "syslog" {
    grok {
      match => { "message" => "<%{POSINT:syslog_pri}>%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
    }
  }
}

Here we’ve added a filter section and populated it with a grok filter. Our grok filter mimics the syslog input plugin’s existing parsing behavior. This caters for any appropriately formatted Syslog messages we might receive. It uses the type field to match any incoming events tagged with syslog (most likely by our tcp and udp input plugins) and pass them to the filter. Let try it with a Syslog message now:

<1>Sep 20 02:58:12 porridge3 puppet-master[27025]: Compiled catalog for bigbopper.adm.intranet in environment production in 1.31 seconds

We should get back a Logstash event structured like this.

{
             "message" => "<1>Sep 20 02:58:12 porridge3 puppet-master[27025]: Compiled catalog for bigbopper.adm.intranet in environment production in 1.31 seconds\n",
            "@version" => "1",
          "@timestamp" => "2014-09-20T02:58:12.775Z",
                "type" => "syslog",
                "host" => "127.0.0.1",
          "syslog_pri" => "1",
    "syslog_timestamp" => "Sep 20 02:58:12",
     "syslog_hostname" => "porridge3",
      "syslog_program" => "puppet-master",
          "syslog_pid" => "27025",
      "syslog_message" => "Compiled catalog for bigbopper.adm.intranet in environment production in 1.31 seconds"
}

Capturing Failed Syslog messages

But this doesn’t solve our problem of incorrectly formatted Syslog messages. So we need to add some logic to handle failed parsing and then do something with those failed messages.

filter {
  if [type] == "syslog" {
    grok {
      match => [ "message", "<%{POSINT:syslog_pri}>%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" ]
    }
  }
}
output {
  if [type] == "syslog" and "_grokparsefailure" in [tags] {
    file { path => "/var/log/failed_syslog_events-%{+YYYY-MM-dd}" }
  }
}

Here we’ve added a catch-all for failed syslog messages. If an event fails to parse via our grok plugin then it gets a tag of _grokparsefailure. We’ve specified a new output section and captured events with a type of syslog and the _grokparsefailure in its tags. Each of these events is then added to a file using the file plugin. The capture file is located at /var/log/failed_syslog_events-%{+YYYY-MM-dd}. The %{+YYYY-MM-dd} appends a date to the file to help with log rotation. We could also generate instant messages or push events to a destination like IRC or Campfire or the like. Or we could use any one of the numerous Logstash output options. We could also add metrics or aggregation to capture volumes of failed events.

We can then use this data to add additional parsing configuration to Logstash to process Syslog event variants.

Varying the Syslog Parsing

Let’s say we have an incoming failed event. We can use the earlier event that failed to parse.

Local7.Notice 10.0.1.2 id=firewall time='2007-05-11 14:08:53' fw=FGT-602038033716 pri=5 log_id=0000010002 type=traffic subtype=session SN=2170882 duration=20 rule=0 policyid=0 proto=icmp service=icmp status=accept src=10.0.1.96 srcname=10.0.1.96 dst=10.0.1.2 dstname=10.0.1.2 src_int=n/a dst_int=internal sent=30 rcvd=30 sent_pkt=1 rcvd_pkt=1 src_port=0 dst_port=0 vpn=n/a tran_ip=0.0.0.0 tran_port=0

Let’s add some additional parsing to our filter section to process this specific class of event.

filter {
  if [type] == "syslog" and [message] =~ "id=firewall" {
      grok {
        match => [ "message", "%{WORD:facility}.%{WORD:priority} %{HOSTNAME:hostname} id=%{WORD:class} time='%{TIMESTAMP_ISO8601:timestamp}' %{GREEDYDATA:syslog_message}" ]
      }
  } else if [type] == "syslog" {
      grok {
        match => [ "message",  "<%{POSINT:syslog_pri}>%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" ]
      }
  }
}

In our new filter section we’ve specified two parsing mechanisms. The first tests for events with a type of syslog and which contain id=firewall in the event’s message. This will pick up all of our firewall events. We’ve then created a grok plugin with a regular expression match for our event. We could also use the kv plugin here as much of the data is in the form key=value.

So now if we sent our firewall event it’d be correctly parsed and would result in an event that looked like:

{
           "message" => "Local7.Notice 10.0.1.2 id=firewall time='2007-05-11 14:08:53' fw=FGT-602038033716 pri=5 log_id=0000010002 type=traffic subtype=session SN=2170882 duration=20 rule=0 policyid=0 proto=icmp service=icmp status=accept src=10.0.1.96 srcname=10.0.1.96 dst=10.0.1.2 dstname=10.0.1.2 src_int=n/a dst_int=internal sent=30 rcvd=30 sent_pkt=1 rcvd_pkt=1 src_port=0 dst_port=0 vpn=n/a tran_ip=0.0.0.0 tran_port=0\n",
          "@version" => "1",
        "@timestamp" => "2014-09-21T01:37:01.708Z",
              "type" => "syslog",
              "host" => "127.0.0.1",
          "facility" => "Local7",
          "priority" => "Notice",
          "hostname" => "10.0.1.2",
             "class" => "firewall",
         "timestamp" => "2007-05-11 14:08:53",
    "syslog_message" => "fw=FGT-602038033716 pri=5 log_id=0000010002 type=traffic subtype=session SN=2170882 duration=20 rule=0 policyid=0 proto=icmp service=icmp status=accept src=10.0.1.96 srcname=10.0.1.96 dst=10.0.1.2 dstname=10.0.1.2 src_int=n/a dst_int=internal sent=30 rcvd=30 sent_pkt=1 rcvd_pkt=1 src_port=0 dst_port=0 vpn=n/a tran_ip=0.0.0.0 tran_port=0"
}

We could also add further conditionals that match other variant events.

Finally, if the first conditional isn’t matched, then Logstash uses the next conditional to try our default Syslog parsing on the event. In our case if that didn’t match it’d trigger our catch-all collection of events that failed to parse and be put in our /var/log/failed_syslog_events-* file.

Summary

So here we’ve seen how to manage poorly or variant formatted Syslog messages using Logstash. We’ve developed a detection and cataloguing method for errant messages. We’ve also demonstrated a way to parse multiple variant Syslog messages.


  1. I’ve also written a book about Logstash. [return]
comments powered by Disqus