CheckPoint Syslog Data to Elastic Stack

Recently I had an opportunity to get some exposure Elastic Stack (previously ELK). I had some downtime and a possible need for this and an app team was looking at replacing splunk with it. I will not be going into the install of it here but there are plenty of how-to guides on it and possibly another article.

We produce a ton of CheckPoint logs that were previously going to both the Management Server and proxied to Microsoft OMS via syslog relay. The problem with OMS is it was not indexed by field and at the time of implementation, there was not an easy way to do it. Integrating this into a stack that non infrastructure support staff may have access to was a bonus.

For those not familiar with Elastic Stack, it is primarily made up of Elastic Search (search engine), Logstash (data flow manipulator) and Kibana (web front end). The later versions also implemented beats as a light weight mechanism for pulling in syslog data, file data and a few others without having to load Logstash where the logs reside as it has some beefy memory requirements.

With Logstash, it is very easy to filter CheckPoint data that 1) gets a syslog header wrapped around it due to the proxy and 2) has embedded key value pairs.

Here is a sample of the log

29:51--7:00 1.1.1.1 CP-GW - Log [[email protected] Action="accept" UUid="{0x5da7ee3f,0x4,0x5679710a,0xc0000005}" rule="42" rule_uid="{0F0D6B41-C4CC-45E1-A059-0753CBAB43E1}" rule_name="Allowed Traffic" src="2.2.2.2" dst="3.3.3.3" proto="6" product="VPN-1 & FireWall-1" service="1234" s_port="4321" product_family="Network"]

And here is the logstash config to go with it.

  if [type] == "syslog" and "checkpoint" in [tags] {
    grok {
      match => { "message" => "<%{POSINT:priority}>%{SYSLOGTIMESTAMP:syslog_timestamp} %{IPORHOST:checkpoint.cluster}  %{DATA:checkpoint.timestamp} %{IPORHOST:checkpoint.node} %{DATA:checkpoint.product_type} - %{DATA:checkpoint.log_type} \[[email protected]%{DATA:checkpoint.field_id} %{DATA:[@metadata][checkpoint.data]}\]" }
      add_field => {
         "received_at" => "%{@timestamp}"
         "received_from" => "%{host}"
      }
      remove_field => [ "host" ]
    }
    mutate {
      gsub => [ "checkpoint.timestamp", "--", "-" ]
    }
    date {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
      #This is not quite ISO8601 because it has the timezone on it
      #match => [ "checkpoint.timestamp", "ISO8601" ]
    }
    kv {
        prefix => "checkpoint."
        source => "[@metadata][checkpoint.data]"
        transform_key => "lowercase"
    }
    mutate {
      rename => { "checkpoint.name" => "checkpoint.protection_name" }
      rename => { "checkpoint.type" => "checkpoint.protection_type" }
      rename => { "checkpoint.level" => "checkpoint.confidence_level" }
      rename => { "checkpoint.profile" => "checkpoint.smartdefense_profile" }
      rename => { "checkpoint.impact" => "checkpoint.performance_impact" }
      rename => { "checkpoint.info" => "checkpoint.attack_info" }
      rename => { "checkpoint.src" => "source.ip" }
      rename => { "checkpoint.s_port" => "source.port" }
      rename => { "checkpoint.dst" => "destination.ip" }
      rename => { "checkpoint.service" => "destination.port" }
      rename => { "checkpoint.node" => "hostname" }
    }
  }
}

output {
  if [type] == "syslog" and "checkpoint" in [tags] and "_grokparsefailure" in [tags] {
    file {
        path => "/var/log/logstash/checkpoint_failure.log"
#         codec => rubydebug
    }
  }

The “grok” filter seems to be a simplified regular expression where you can match data based on the type and is fairly self explanatory for those familiar with Regular Expressions

The “kv” filter is a for the Key/Values in the fields. I could do a better job of using this as the fields sometimes have spaces in them which kv doesn’t match automatically but that’s a word in progress. That’s why the mutate filter is renaming some of the fields.