2. Elastic stack: analysis of security logs. logstash

2. Elastic stack: analysis of security logs. logstash

In the past article we met ELK stackwhat software products it consists of. And the first task that an engineer faces when working with the ELK stack is sending logs for storage in elasticsearch for further analysis. However, this is just in words, elasticsearch stores logs in the form of documents with certain fields and values, which means that the engineer must use various tools to parse the message that is sent from the end systems. There are several ways to do this - write a program yourself that will add documents to the database using the API or use ready-made solutions. In this course, we will consider the solution logstash, which is part of the ELK stack. We will see how logs can be sent from end systems to Logstash, and then we will set up a configuration file for parsing and redirecting to the Elasticsearch database. To do this, we take logs from the Check Point firewall as an incoming system.

The course does not cover the installation of the ELK stack, since there are a huge number of articles on this topic, we will consider the configuration component.

Let's make an action plan for configuring Logstash:

  1. Checking that elasticsearch will accept logs (checking the health and openness of the port).
  2. We consider how we can send events to Logstash, choose a method, implement it.
  3. Set up Input in the Logstash config file.
  4. We set Output in the Logstash configuration file in debug mode in order to understand what the log message looks like.
  5. Set up Filter.
  6. Set up the correct Output in ElasticSearch.
  7. Launching Logstash.
  8. Checking the logs in Kibana.

Let's consider each item in more detail:

Checking that elasticsearch will accept logs

To do this, you can use the curl command to check access to Elasticsearch from the system on which Logstash is deployed. If you have authentication configured, then we also pass the user / password through curl, specify port 9200 if you have not changed it. If you receive a response like the one below, then everything is in order.

[elastic@elasticsearch ~]$ curl -u <<user_name>> : <<password>> -sS -XGET "<<ip_address_elasticsearch>>:9200"
{
  "name" : "elastic-1",
  "cluster_name" : "project",
  "cluster_uuid" : "sQzjTTuCR8q4ZO6DrEis0A",
  "version" : {
    "number" : "7.4.1",
    "build_flavor" : "default",
    "build_type" : "rpm",
    "build_hash" : "fc0eeb6e2c25915d63d871d344e3d0b45ea0ea1e",
    "build_date" : "2019-10-22T17:16:35.176724Z",
    "build_snapshot" : false,
    "lucene_version" : "8.2.0",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"
}
[elastic@elasticsearch ~]$

If the answer does not come, then there may be several types of errors: the elasticsearch process is not working, the wrong port is specified, or the port is blocked by a firewall on the server where elasticsearch is installed.

Considering how you can send logs to Logstash from the check point firewall

From the Check Point management server, you can send logs to Logstash via syslog using the log_exporter utility, you can read more about it at this article, here we leave only the command that creates the thread:

cp_log_export add name check_point_syslog target-server < > target-port 5555 protocol tcp format generic read-mode semi-unified

< > - the address of the server on which Logstash is running, target-port 5555 - the port to which we will send logs, sending logs via tcp can load the server, so in some cases it is more correct to set udp.

Setting up INPUT in the Logstash config file

2. Elastic stack: analysis of security logs. logstash

By default, the configuration file is located in the /etc/logstash/conf.d/ directory. The configuration file consists of 3 meaningful parts: INPUT, FILTER, OUTPUT. IN INPUT we specify from where the system will take logs, in FILTERS parsim log - configure how to divide the message into fields and values, in OUTPUT configure the outgoing stream - where the parsed logs will be sent.

First, let's set up INPUT, consider some types that can be - file, tcp and exe.

tcp:

input {
tcp {
    port => 5555
    host => β€œ10.10.1.205”
    type => "checkpoint"
    mode => "server"
}
}

mode => "server"
Tells Logstash to accept connections.

port => 5555
host => "10.10.1.205"
We accept connections to the IP address 10.10.1.205 (Logstash), port 5555 - the port must be allowed by the firewall policy.

type => "checkpoint"
We mark the document, it is very convenient if you have several incoming connections. Subsequently, for each connection, you can write your own filter using the logical if construct.

File:

input {
  file {
    path => "/var/log/openvas_report/*"
    type => "openvas"
    start_position => "beginning"
    }
}

Description of settings:
path => "/var/log/openvas_report/*"
Specify the directory in which the files are to be read.

type => "openvas"
Event type.

start_position => "beginning"
When a file is changed, it reads the entire file, if you set β€œend”, then the system waits for new records to appear at the end of the file.

Exec:

input {
  exec {
    command => "ls -alh"
    interval => 30
  }
}

On this input, a (only!) shell command is launched and its output is wrapped in a log message.

command => "ls -alh"
The command whose output we are interested in.

interval => 30
Command call interval in seconds.

In order to receive logs from the firewall, we prescribe a filter tcp or udp, depending on how the logs are sent to Logstash.

Set up Output in the Logstash configuration file in debug mode in order to understand what the log message looks like

After we have configured INPUT, we need to understand how the log message will look like, what methods should be used to configure the log filter (parser).

To do this, we will use a filter that gives the result to stdout in order to view the original message, the full configuration file at the moment will look like this:

input 
{
         tcp 
         {
                port => 5555
  	  	type => "checkpoint"
    		mode => "server"
                host => β€œ10.10.1.205”
   	 }
}

output 
{
	if [type] == "checkpoint" 
       {
		stdout { codec=> json }
	}
}

I run the command to check:
sudo /usr/share/logstash/bin//logstash -f /etc/logstash/conf.d/checkpoint.conf
We see the result, the picture is clickable:

2. Elastic stack: analysis of security logs. logstash

If copied it will look like this:

action="Accept" conn_direction="Internal" contextnum="1" ifdir="outbound" ifname="bond1.101" logid="0" loguid="{0x5dfb8c13,0x5,0xfe0a0a0a,0xc0000000}" origin="10.10.10.254" originsicname="CN=ts-spb-cpgw-01,O=cp-spb-mgmt-01.tssolution.local.kncafb" sequencenum="8" time="1576766483" version="5" context_num="1" dst="10.10.10.10" dst_machine_name="[email protected]" layer_name="TSS-Standard Security" layer_name="TSS-Standard Application" layer_uuid="dae7f01c-4c98-4c3a-a643-bfbb8fcf40f0" layer_uuid="dbee3718-cf2f-4de0-8681-529cb75be9a6" match_id="8" match_id="33554431" parent_rule="0" parent_rule="0" rule_action="Accept" rule_action="Accept" rule_name="Implicit Cleanup" rule_uid="6dc2396f-9644-4546-8f32-95d98a3344e6" product="VPN-1 & FireWall-1" proto="17" s_port="37317" service="53" service_id="domain-udp" src="10.10.1.180" ","type":"qqqqq","host":"10.10.10.250","@version":"1","port":50620}{"@timestamp":"2019-12-19T14:50:12.153Z","message":"time="1576766483" action="Accept" conn_direction="Internal" contextnum="1" ifdir="outbound" ifname="bond1.101" logid="0" loguid="{0x5dfb8c13,

Looking at the message data, we understand that the logs look like: field = value or key = value, which means that a filter called kv is suitable. In order to choose the right filter for each specific case, it would be nice to familiarize yourself with them in the technical documentation, or ask a friend.

Setting up the Filter

At the last stage, kv was chosen, the following is the configuration of this filter:

filter {
if [type] == "checkpoint"{
	kv {
		value_split => "="
		allow_duplicate_values => false
	}
}
}

We select the symbol by which we will divide the field and value - β€œ=”. If we have identical entries in the log, we save only one instance in the database, otherwise you will get an array of identical values, that is, if we have the message β€œfoo = some foo=some”, we only write foo = some.

Setting up the correct Output in ElasticSearch

After the Filter is configured, you can upload logs to the database elasticsearch:

output 
{
if [type] == "checkpoint"
{
 	elasticsearch 
        {
		hosts => ["10.10.1.200:9200"]
		index => "checkpoint-%{+YYYY.MM.dd}"
    		user => "tssolution"
    		password => "cool"
  	}
}
}

If the document is signed with the checkpoint type, we save the event to the elasticsearch database, which accepts connections on 10.10.1.200 on port 9200 by default. Each document is saved to a specific index, in this case, we save it to the β€œcheckpoint-” index + the current time date. Each index can have a certain set of fields, or it is created automatically when a new field appears in the message, field settings and their type can be found in mappings.

If you have authentication configured (we'll look at it later), credits must be specified for writing to a specific index, in this example it is "tssolution" with the password "cool". You can delimit the rights of users to write logs only to a certain index and no more.

Launch Logstash.

Logstash config file:

input 
{
         tcp 
         {
                port => 5555
  	  	type => "checkpoint"
    		mode => "server"
                host => β€œ10.10.1.205”
   	 }
}

filter {
        if [type] == "checkpoint"{
	kv {
		value_split => "="
		allow_duplicate_values => false
	}
        }
}

output 
{
if [type] == "checkpoint"
{
 	elasticsearch 
        {
		hosts => ["10.10.1.200:9200"]
		index => "checkpoint-%{+YYYY.MM.dd}"
    		user => "tssolution"
    		password => "cool"
  	}
}
}

Check the configuration file for correct compilation:
/usr/share/logstash/bin//logstash -f checkpoint.conf
2. Elastic stack: analysis of security logs. logstash

Start the Logstash process:
sudo systemctl start logstash

Check that the process has started:
sudo systemctl status logstash

2. Elastic stack: analysis of security logs. logstash

Check if the socket is up:
netstat -nat |grep 5555

2. Elastic stack: analysis of security logs. logstash

Checking the logs in Kibana.

After everything is running, go to Kibana - Discover, make sure that everything is set up correctly, the picture is clickable!

2. Elastic stack: analysis of security logs. logstash

All logs are in place and we can see all the fields and their values!

Conclusion

We looked at how to write a Logstash configuration file, as a result we got a parser for all fields and values. Now we can work with searching and plotting specific fields. Later in the course, we will look at visualization in Kibana, create a simple dashboard. It is worth mentioning that the Logstash configuration file needs to be constantly updated in certain situations, for example, when we want to change the value of a field from a number to a word. In subsequent articles, we will constantly do this.

So stay tunedTelegram, Facebook, VK, TS Solution Blog), Yandex Zen.

Source: habr.com

Add a comment