We are friends with ELK and Exchange. Part 2

We are friends with ELK and Exchange. Part 2

I continue my story about how to make Exchange and ELK friends (beginning here). Let me remind you that this combination is capable of processing a very large number of logs without hesitation. This time we'll talk about how to set up Exchange with Logstash and Kibana components.

Logstash in the ELK stack is used for intelligent processing of logs and preparing them for placement in Elastic in the form of documents, on the basis of which it is convenient to build various visualizations in Kibana.

Installation

Consists of two stages:

  • Installing and configuring the OpenJDK package.
  • Installing and configuring the Logstash package.

Installing and configuring the OpenJDK package

The OpenJDK package must be downloaded and unpacked into a specific directory. Then the path to this directory must be entered in the $env:Path and $env:JAVA_HOME variables of the Windows operating system:

We are friends with ELK and Exchange. Part 2

We are friends with ELK and Exchange. Part 2

Check Java version:

PS C:> java -version
openjdk version "13.0.1" 2019-10-15
OpenJDK Runtime Environment (build 13.0.1+9)
OpenJDK 64-Bit Server VM (build 13.0.1+9, mixed mode, sharing)

Installing and configuring the Logstash package

Download archive file with Logstash distribution hence. The archive must be unpacked to the root of the disk. Unzip to a folder C:Program Files don't, Logstash will refuse to start normally. Then you need to add to the file jvm.options edits responsible for allocating RAM for the Java process. I recommend specifying half of the server's RAM. If he has 16 GB of RAM on board, then the default keys are:

-Xms1g
-Xmx1g

needs to be replaced with:

-Xms8g
-Xmx8g

In addition, it is advisable to comment out the line -XX:+UseConcMarkSweepGC. More about it here. The next step is to create a default configuration in the logstash.conf file:

input {
 stdin{}
}
 
filter {
}
 
output {
 stdout {
 codec => "rubydebug"
 }
}

With this configuration, Logstash reads data from the console, runs it through an empty filter, and prints back to the console. Applying this configuration will test the functionality of Logstash. To do this, run it interactively:

PS C:...bin> .logstash.bat -f .logstash.conf
...
[2019-12-19T11:15:27,769][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2019-12-19T11:15:27,847][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-12-19T11:15:28,113][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

Logstash successfully started on port 9600.

Final installation step: running Logstash as a Windows service. This can be done, for example, using the package NSSM:

PS C:...bin> .nssm.exe install logstash
Service "logstash" installed successfully!

fault tolerance

The safety of the logs when transferred from the source server is ensured by the Persistent Queues mechanism.

How does work

The layout of queues in the process of processing logs: input β†’ queue β†’ filter + output.

The input plugin receives data from the log source, writes it to the queue, and sends an acknowledgment to the source.

Messages from the queue are processed by Logstash, pass the filter and the output plugin. When output confirms that the log has been sent, Logstash removes the processed log from the queue. If Logstash stops, all pending and unacknowledged messages remain in the queue, and Logstash will continue to process them the next time it starts.

Setting

Regulated by keys in file C:Logstashconfiglogstash.yml:

  • queue.type: (possible values ​​- persisted ΠΈ memory (default)).
  • path.queue: (path to the folder with queue files, which are stored in C:Logstashqueue by default).
  • queue.page_capacity: (maximum queue page size, default value is 64mb).
  • queue.drain: (true/false - enables/disables stopping queue processing before shutting down Logstash. I do not recommend enabling it, because this will directly affect the speed of shutting down the server).
  • queue.max_events: (maximum number of events in the queue, default is 0 (unlimited)).
  • queue.max_bytes: (maximum queue size in bytes, default is 1024mb (1gb)).

If customized queue.max_events ΠΈ queue.max_bytes, then messages are no longer accepted into the queue when the value of any of these settings is reached. Learn more about Persistent Queues here.

An example of the part of logstash.yml responsible for setting up the queue:

queue.type: persisted
queue.max_bytes: 10gb

Setting

The Logstash configuration usually consists of three parts responsible for different phases of processing incoming logs: receiving (input section), parsing (filter section) and sending to Elastic (output section). Below we will take a closer look at each of them.

Input

We accept an incoming stream with raw logs from filebeat agents. It is this plugin that we specify in the input section:

input {
  beats {
    port => 5044
  }
}

After this configuration, Logstash starts listening on port 5044, and when receiving logs, it processes them according to the settings of the filter section. If necessary, you can wrap the channel for receiving logs from filebit in SSL. Read more about beats plugin settings here.

Filter

All text logs that are interesting for processing, which are generated by Exchange, have a csv format with fields described in the log file itself. For parsing csv records, Logstash offers us three plugins: dissect, csv and grok. The first one is the most fast, but copes with parsing only the simplest logs.
For example, it will split the following entry into two (due to the presence of a comma inside the field), which will cause the log to be parsed incorrectly:

…,"MDB:GUID1, Mailbox:GUID2, Event:526545791, MessageClass:IPM.Note, CreationTime:2020-05-15T12:01:56.457Z, ClientType:MOMT, SubmissionAssistant:MailboxTransportSubmissionEmailAssistant",…

It can be used when parsing logs, for example, IIS. In this case, the filter section might look like this:

filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
    }
  }
} 

The Logstash configuration allows you to use conditional statements, so we can only send logs that were marked with the filebeat tag to the dissect plugin IIS. Inside the plugin, we match the field values ​​with their names, delete the original field message, which contained an entry from the log, and we can add an arbitrary field that will, for example, contain the name of the application from which we collect logs.

In the case of tracking logs, it is better to use the csv plugin, it can correctly handle complex fields:

filter {
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
}

Inside the plugin, we match the field values ​​with their names, delete the original field message (as well as fields tenant-id ΠΈ schema-version) that contained an entry from the log, and we can add an arbitrary field that will, for example, contain the name of the application from which we collect logs.

At the exit from the filtering stage, we will get documents in the first approximation ready for rendering in Kibana. We will miss the following:

  • Numeric fields will be recognized as text, which will prevent you from performing operations on them. Namely, the fields time-taken IIS log, as well as fields recipient-count ΠΈ total-bites Tracking log.
  • The standard timestamp of the document will contain the time the log was processed, not the time it was written on the server side.
  • Field recipient-address will look like one construction site, which does not allow for analysis with counting the recipients of letters.

It's time to add some magic to the logging process.

Numeric field conversion

The dissect plugin has an option convert_datatype, which can be used to convert a text field to digital format. For example, like this:

dissect {
  …
  convert_datatype => { "time-taken" => "int" }
  …
}

It is worth remembering that this method is only suitable if the field will definitely contain a string. Null values ​​from fields are not processed by the option and are thrown into an exception.

For tracking logs, it is better not to use the similar convert method, since the fields recipient-count ΠΈ total-bites may be empty. To convert these fields, it is better to use a plugin mutate:

mutate {
  convert => [ "total-bytes", "integer" ]
  convert => [ "recipient-count", "integer" ]
}

Splitting recipient_address into individual recipients

This task can also be solved using the mutate plugin:

mutate {
  split => ["recipient_address", ";"]
}

Changing timestamp

In the case of tracking logs, the task is very simply solved by the plugin data, which will help to register in the field timestamp date and time in the required format from the field date-time:

date {
  match => [ "date-time", "ISO8601" ]
  timezone => "Europe/Moscow"
  remove_field => [ "date-time" ]
}

In the case of IIS logs, we will need to combine field data date ΠΈ time using the mutate plugin, set the time zone we need and place this time stamp in timestamp using the date plugin:

mutate { 
  add_field => { "data-time" => "%{date} %{time}" }
  remove_field => [ "date", "time" ]
}
date { 
  match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
  timezone => "UTC"
  remove_field => [ "data-time" ]
}

output

The output section is used to send the processed logs to the log receiver. In the case of sending directly to Elastic, a plugin is used elasticsearch, which specifies the server address and index name template for sending the generated document:

output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Final configuration

The final configuration will look like this:

input {
  beats {
    port => 5044
  }
}
 
filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
      convert_datatype => { "time-taken" => "int" }
    }
    mutate { 
      add_field => { "data-time" => "%{date} %{time}" }
      remove_field => [ "date", "time" ]
    }
    date { 
      match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
      timezone => "UTC"
      remove_field => [ "data-time" ]
    }
  }
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
    mutate {
      convert => [ "total-bytes", "integer" ]
      convert => [ "recipient-count", "integer" ]
      split => ["recipient_address", ";"]
    }
    date {
      match => [ "date-time", "ISO8601" ]
      timezone => "Europe/Moscow"
      remove_field => [ "date-time" ]
    }
  }
}
 
output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Useful links:

Source: habr.com

Add a comment