In the past
The course does not cover the installation of the ELK stack, since there are a huge number of articles on this topic, we will consider the configuration component.
Let's make an action plan for configuring Logstash:
- Checking that elasticsearch will accept logs (checking the health and openness of the port).
- We consider how we can send events to Logstash, choose a method, implement it.
- Set up Input in the Logstash config file.
- We set Output in the Logstash configuration file in debug mode in order to understand what the log message looks like.
- Set up Filter.
- Set up the correct Output in ElasticSearch.
- Launching Logstash.
- Checking the logs in Kibana.
Let's consider each item in more detail:
Checking that elasticsearch will accept logs
To do this, you can use the curl command to check access to Elasticsearch from the system on which Logstash is deployed. If you have authentication configured, then we also pass the user / password through curl, specify port 9200 if you have not changed it. If you receive a response like the one below, then everything is in order.
[elastic@elasticsearch ~]$ curl -u <<user_name>> : <<password>> -sS -XGET "<<ip_address_elasticsearch>>:9200"
{
"name" : "elastic-1",
"cluster_name" : "project",
"cluster_uuid" : "sQzjTTuCR8q4ZO6DrEis0A",
"version" : {
"number" : "7.4.1",
"build_flavor" : "default",
"build_type" : "rpm",
"build_hash" : "fc0eeb6e2c25915d63d871d344e3d0b45ea0ea1e",
"build_date" : "2019-10-22T17:16:35.176724Z",
"build_snapshot" : false,
"lucene_version" : "8.2.0",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
}
[elastic@elasticsearch ~]$
If the answer does not come, then there may be several types of errors: the elasticsearch process is not working, the wrong port is specified, or the port is blocked by a firewall on the server where elasticsearch is installed.
Considering how you can send logs to Logstash from the check point firewall
From the Check Point management server, you can send logs to Logstash via syslog using the log_exporter utility, you can read more about it at this
cp_log_export add name check_point_syslog target-server < > target-port 5555 protocol tcp format generic read-mode semi-unified
< > - the address of the server on which Logstash is running, target-port 5555 - the port to which we will send logs, sending logs via tcp can load the server, so in some cases it is more correct to set udp.
Setting up INPUT in the Logstash config file
By default, the configuration file is located in the /etc/logstash/conf.d/ directory. The configuration file consists of 3 meaningful parts: INPUT, FILTER, OUTPUT. IN INPUT we specify from where the system will take logs, in FILTERS parsim log - configure how to divide the message into fields and values, in OUTPUT configure the outgoing stream - where the parsed logs will be sent.
First, let's set up INPUT, consider some types that can be - file, tcp and exe.
tcp:
input {
tcp {
port => 5555
host => β10.10.1.205β
type => "checkpoint"
mode => "server"
}
}
mode => "server"
Tells Logstash to accept connections.
port => 5555
host => "10.10.1.205"
We accept connections to the IP address 10.10.1.205 (Logstash), port 5555 - the port must be allowed by the firewall policy.
type => "checkpoint"
We mark the document, it is very convenient if you have several incoming connections. Subsequently, for each connection, you can write your own filter using the logical if construct.
File:
input {
file {
path => "/var/log/openvas_report/*"
type => "openvas"
start_position => "beginning"
}
}
Description of settings:
path => "/var/log/openvas_report/*"
Specify the directory in which the files are to be read.
type => "openvas"
Event type.
start_position => "beginning"
When a file is changed, it reads the entire file, if you set βendβ, then the system waits for new records to appear at the end of the file.
Exec:
input {
exec {
command => "ls -alh"
interval => 30
}
}
On this input, a (only!) shell command is launched and its output is wrapped in a log message.
command => "ls -alh"
The command whose output we are interested in.
interval => 30
Command call interval in seconds.
In order to receive logs from the firewall, we prescribe a filter tcp or udp, depending on how the logs are sent to Logstash.
Set up Output in the Logstash configuration file in debug mode in order to understand what the log message looks like
After we have configured INPUT, we need to understand how the log message will look like, what methods should be used to configure the log filter (parser).
To do this, we will use a filter that gives the result to stdout in order to view the original message, the full configuration file at the moment will look like this:
input
{
tcp
{
port => 5555
type => "checkpoint"
mode => "server"
host => β10.10.1.205β
}
}
output
{
if [type] == "checkpoint"
{
stdout { codec=> json }
}
}
I run the command to check:
sudo /usr/share/logstash/bin//logstash -f /etc/logstash/conf.d/checkpoint.conf
We see the result, the picture is clickable:
If copied it will look like this:
action="Accept" conn_direction="Internal" contextnum="1" ifdir="outbound" ifname="bond1.101" logid="0" loguid="{0x5dfb8c13,0x5,0xfe0a0a0a,0xc0000000}" origin="10.10.10.254" originsicname="CN=ts-spb-cpgw-01,O=cp-spb-mgmt-01.tssolution.local.kncafb" sequencenum="8" time="1576766483" version="5" context_num="1" dst="10.10.10.10" dst_machine_name="[email protected]" layer_name="TSS-Standard Security" layer_name="TSS-Standard Application" layer_uuid="dae7f01c-4c98-4c3a-a643-bfbb8fcf40f0" layer_uuid="dbee3718-cf2f-4de0-8681-529cb75be9a6" match_id="8" match_id="33554431" parent_rule="0" parent_rule="0" rule_action="Accept" rule_action="Accept" rule_name="Implicit Cleanup" rule_uid="6dc2396f-9644-4546-8f32-95d98a3344e6" product="VPN-1 & FireWall-1" proto="17" s_port="37317" service="53" service_id="domain-udp" src="10.10.1.180" ","type":"qqqqq","host":"10.10.10.250","@version":"1","port":50620}{"@timestamp":"2019-12-19T14:50:12.153Z","message":"time="1576766483" action="Accept" conn_direction="Internal" contextnum="1" ifdir="outbound" ifname="bond1.101" logid="0" loguid="{0x5dfb8c13,
Looking at the message data, we understand that the logs look like: field = value or key = value, which means that a filter called kv is suitable. In order to choose the right filter for each specific case, it would be nice to familiarize yourself with them in the technical documentation, or ask a friend.
Setting up the Filter
At the last stage, kv was chosen, the following is the configuration of this filter:
filter {
if [type] == "checkpoint"{
kv {
value_split => "="
allow_duplicate_values => false
}
}
}
We select the symbol by which we will divide the field and value - β=β. If we have identical entries in the log, we save only one instance in the database, otherwise you will get an array of identical values, that is, if we have the message βfoo = some foo=someβ, we only write foo = some.
Setting up the correct Output in ElasticSearch
After the Filter is configured, you can upload logs to the database elasticsearch:
output
{
if [type] == "checkpoint"
{
elasticsearch
{
hosts => ["10.10.1.200:9200"]
index => "checkpoint-%{+YYYY.MM.dd}"
user => "tssolution"
password => "cool"
}
}
}
If the document is signed with the checkpoint type, we save the event to the elasticsearch database, which accepts connections on 10.10.1.200 on port 9200 by default. Each document is saved to a specific index, in this case, we save it to the βcheckpoint-β index + the current time date. Each index can have a certain set of fields, or it is created automatically when a new field appears in the message, field settings and their type can be found in mappings.
If you have authentication configured (we'll look at it later), credits must be specified for writing to a specific index, in this example it is "tssolution" with the password "cool". You can delimit the rights of users to write logs only to a certain index and no more.
Launch Logstash.
Logstash config file:
input
{
tcp
{
port => 5555
type => "checkpoint"
mode => "server"
host => β10.10.1.205β
}
}
filter {
if [type] == "checkpoint"{
kv {
value_split => "="
allow_duplicate_values => false
}
}
}
output
{
if [type] == "checkpoint"
{
elasticsearch
{
hosts => ["10.10.1.200:9200"]
index => "checkpoint-%{+YYYY.MM.dd}"
user => "tssolution"
password => "cool"
}
}
}
Check the configuration file for correct compilation:
/usr/share/logstash/bin//logstash -f checkpoint.conf
Start the Logstash process:
sudo systemctl start logstash
Check that the process has started:
sudo systemctl status logstash
Check if the socket is up:
netstat -nat |grep 5555
Checking the logs in Kibana.
After everything is running, go to Kibana - Discover, make sure that everything is set up correctly, the picture is clickable!
All logs are in place and we can see all the fields and their values!
Conclusion
We looked at how to write a Logstash configuration file, as a result we got a parser for all fields and values. Now we can work with searching and plotting specific fields. Later in the course, we will look at visualization in Kibana, create a simple dashboard. It is worth mentioning that the Logstash configuration file needs to be constantly updated in certain situations, for example, when we want to change the value of a field from a number to a word. In subsequent articles, we will constantly do this.
So stay tuned
Source: habr.com