4. ELK Stack: "L" is for Lord of the Stack

Logstash is my favorite tool. So much flexibility and so much to play with.

What is Logstash?

In my honest opinion, Logstash is the best tool of the stack and deserves the title of The Lord of the Stack

Logstash, one of the core products of the Elastic Suite, is used to aggregate and process data to send it to Elasticsearch or many other destinations . Logstash is an open-source server-side pipeline for data processing that lets you ingest data from multiple sources simultaneously, then transforms it before it's indexed in Elasticsearch. Similar to an event collector, Logstash is the agent that intervenes between the source of our data and the final storage location, it is very flexible that it can import data from almost any source.

Logstash can:

  • Transform, parse and filter data in real time.

  • Structuring unstructured data.

  • Anonymize personal data or exclude it completely.

  • Perform geolocation searches.

  • Be scalable on several nodes.

  • Be used as a buffer for Elasticsearch.

Logstash can work in three stages: inputs → filters → outputs. Inputs generate events, filters modify them, and outputs send them elsewhere.

Inputs and outputs support codecs that allow you to encode or decode data as it enters or leaves the pipeline without having to use a separate filter.

Logstash is part of ETL (Extract, Transform and Load) technology :

Logstash Anatomy :

Logstash uses configuration files that are maintained under the /etc/logstash/conf.d/ folder. These files can play the role of parsers in a SIEM use case, where you can define data entry, process and transforms event logs and send them to Elasticsearch.

Input plugins

Most commonly used input plugins in a centralized logging context are :

  • Syslog : Upon running, Logstash is going to start listening on port 514 for syslog incoming data.

input {
    syslog {
        port => 514
    }
}
  • File: Reading logs from a file

input {
    file {
        path => "/home/elk/folder/filename.csv"
        start_position => beginning
    }
}
  • Beats: Upon running, Logstash is going to start listening on port 5044 for beats agents incoming data.

input {
    syslog {
        port => 5044
    }
}
  • TCP/UDP:

input {
    tcp {
        port => 5400
    }
    udp {
        port => 5500
    }
}

Notice that Logstash can't listen to the same port number defined in two or more inputs.

Filter plugins

This is where the magic happens, where you can parse, enrich, drop, remove specific fields or hide them

  • GROK: This filter is an enhanced version of Regular Expressions, where it provides pre-cooked Regex expressions like IP address format or Syslog header expression.

Event sample :LEEF:1.0|FORCEPOINT|Firewall|6.6.1|Connection_Discarded|devTimeFormat=MMM dd yyyy HH:mm:ss devTime=Apr 18 2020 21:03:07 proto=17 dstPort=137 srcPort=137 dst=10.20.65.255 src=10.10.1.79 action=Discard sender=FP_CLUSTER 1

filter {
    grok {
        match => {"<%{NUMBER:entier}>%{WORD:logformat}:%{NUMBER:format_version}\|%{WORD:observer_product}\|%{WORD:observer_
type}\|%{NUMBER:observer_version}.*?\|%{DATA:event_name}\|%{GREEDYDATA:fp_msg}"}
    }
}
  • KV: This filter provides an easy way to parse any event that has a key/value format

Event sample: date=2020-04-22 time=14:46:33 devname="fgt1" devid="FG100" logid="0000000013" type="traffic" subtype="forward" level="notice" vd="root" eventtime=1587563193 srcip=X.X.X.X srcport=42837 srcintf="PUBL" srcintfrole="DMZ" dstip=X.X.X.X dstport=53 dstintf="wan1" dstintfrole="wan" sessionid=70225713

filter {
        kv {
                value_split => " = "         # key value seperator
                field_split => " "           # key to key seperator   
        }
}
  • MUTATE: if you need to remove, add, copy fields or replace them and add tags to enrich you event logs then MUTATE is the way to go. This particularly helpful for event normalization using ECS format

filter {
    mutate {
        copy =>{ "[direction]"=> "[network][direction]" }
        copy =>{ "[locport]"=> "[source][port]" }
        copy => { "[action]"=> "[event][action]" }
        add_field => { "[ecs][version]" => "1.5.0" }
        rename => { "[destination_ip]"=> "[destination][ip]" }
        remove_field => [ "logformat" ]
        add_tag => ["forcepoint"]
    }
}
  • Translate: This is helpful for event enrichment purposes when you need correlate fields content with predefined dictionaries and then override them accordingly. For example if you want to explicitly name SIDs in a windows environment you can use this filter.

filter {
    translate {
        field => "TargetUserSid"
        destination => "TargetUserSid"
        dictionary_path => "/etc/logstash/dictionaries/sid.yaml"
        override => true
    }
}

the /etc/logstash/dictionaries/sid.yaml file can look something like this

  • DATE: Normalizing dates is something crucial for data indexing in Elasticsearch. This filter can be used to transform any date to the desired format.

filter {
    date {
        match => [ "logdate", "MMM dd yyyy HH:mm:ss” ]
    }
}
  • CSV: Useful for ingesting data in csv format

filter {
        csv {
            columns => ["[event][outcome]","[event][type]","[user][name]","[server][ip]","[server][name]"]
            separator => ","
        }
}
  • JSON: Ingesting incoming events in json format

filter {
        json {
                source => "message"
        }
}
  • CIDR: This filter would be useful for defining your networks and tagging your traffic and also enriching event with IP addresses locality.

filter {
    cidr {
                add_field => {"[source][locality]" => "private"}
                address => [ "%{[source][ip]}" ]
                network => [ "10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16" ]
        }
}
  • TLD: This plugin is not bundled by default, so you need to install it using the following command /usr/share/logstash/bin/logstash-plugin install logstash-filter-tld. TLD plugin can be used for extracting domain names related information like TLD, eTLD plus 1, TRD...etc.

filter {
    if [alert][destination][domain] {
        tld {
            source => "[alert][destination][domain]"
            target => "[domain_dns]"
        }
       if [domain_dns][trd]{
            mutate {
                join =>  { "[domain_dns][trd]" => "." }
            }
        }
        mutate {
            copy =>    { "[domain_dns][domain]" => "[alert][dns][question][registered_domain]"  }
            copy =>    { "[domain_dns][trd]" => "[alert][dns][question][subdomain]"  }
            copy =>    { "[domain_dns][tld]" => "[alert][dns][question][top_level_domain]" }
            copy =>    { "[alert][dns][question][registered_domain]" => "[alert][related][domain]"  }
        }
    }
    
}
  • GEOIP: Geolocation public IPs enrichment.

if [src_ip_geo_apply] {
        geoip {
            source => "[src_ip_geo_apply]"
            target => "[alert][source][geo]"
            fields => ["city_name", "continent_code", "country_code2", "country_name", "location", "region_code", "region_name"]
        }
}

Output plugins

The main outputs for our use case would be Elasticsearch, Email or a Pipeline (We would cover pipelines in the next article).

  • Elasticsearch : Elasticsearch output supports many options which you can read more about in elastic.co

output {
    if [host] == "10.1.1.254" {
        elasticsearch {
            hosts => ["localhost:9200"]     # Elasticsearch hostname or IP address
            index => "firewallfortigate"    # Name of destination index
        }
    }
    else if "forcepoint" in [tags] {
        elasticsearch {
            hosts => ["localhost:9200"]
            index => "forcepoint"
        }
    }
}

Last updated