2. Elastic stack: аналіз security логів. Logstash

2. Elastic stack: аналіз security логів. Logstash

У минулій статті ми познайомилися зі стеком ELK, З яких програмних продуктів він складається. І перше завдання, з яким стикається інженер при роботі з ELK стеком, є відправлення логів для зберігання в еластичномудослідженні для подальшого аналізу. Однак, це просто лише на словах, еластичнідослідження зберігає логи у вигляді документів з певними полями і значеннями, а значить інженер повинен використовуючи різні інструменти розпарити повідомлення, яке відправляється з кінцевих систем. Зробити це можна кількома способами - самому написати програму, яка за API додаватиме документи в базу або використовувати вже готові рішення. В рамках цього курсу ми розглядатимемо рішення Логсташяка є частиною ELK stack. Ми подивимося як можна відправити логи з кінцевих систем до Logstash, а потім будемо налаштовувати конфігураційний файл для парсингу та перенаправлення в базу даних Elasticsearch. Для цього як вхідна система беремо логи з міжмережевого екрану Check Point.

У рамках курсу не розглядається установка ELK stack, оскільки існує безліч статей на цю тему, будемо розглядати конфігураційну складову.

Складемо план дій щодо конфігурації Logstash:

  1. Перевірка того, що еластичнийдослідження прийматиме логи (перевірка працездатності та відкритості порту).
  2. Розглядаємо, яким чином ми можемо відправити події в Logstash, вибираємо спосіб, реалізуємо.
  3. Налаштовуємо Input у конфігураційному файлі Logstash.
  4. Налаштовуємо Output у конфігураційному файлі Logstash в режимі дебагу, щоб зрозуміти як виглядає лог повідомлення.
  5. Налаштовуємо Filter.
  6. Налаштовуємо коректний Output у ElasticSearch.
  7. Запуск Logstash.
  8. Перевіряємо логи в Kibana.

Розглянемо детальніше кожен пункт:

Перевірка, що еластичнийдослідження прийматиме логи

Для цього можна перевірити командою curl доступ до Elasticsearch із системи, на якій розгорнуть Logstash. Якщо у вас налаштована аутентифікація, то також через curl передаємо користувач/пароль, вказуємо порт 9200, якщо ви його не змінювали. Якщо приходить відповідь на кшталт як зазначено нижче, значить усе гаразд.

[elastic@elasticsearch ~]$ curl -u <<user_name>> : <<password>> -sS -XGET "<<ip_address_elasticsearch>>:9200"
{
  "name" : "elastic-1",
  "cluster_name" : "project",
  "cluster_uuid" : "sQzjTTuCR8q4ZO6DrEis0A",
  "version" : {
    "number" : "7.4.1",
    "build_flavor" : "default",
    "build_type" : "rpm",
    "build_hash" : "fc0eeb6e2c25915d63d871d344e3d0b45ea0ea1e",
    "build_date" : "2019-10-22T17:16:35.176724Z",
    "build_snapshot" : false,
    "lucene_version" : "8.2.0",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"
}
[elastic@elasticsearch ~]$

Якщо відповідь не надійшла, то може бути кілька видів помилок: не працює процес elasticsearch, вказаний не той порт, або порт заблокований фаєрволом на сервері, де стоїть elasticsearch.

Розглядаємо як можна надіслати логи в Logstash з міжмережевого екрану check point

З Check Point management server можна відправити логи в Logstash через syslog, використовуючи утиліту log_exporter, докладніше можна ознайомитися з нею по даній статті, тут залишимо тільки команду, яка створює потік:

cp_log_export add name check_point_syslog target-server < > target-port 5555 protocol tcp format generic read-mode semi-unified

< > - адреса сервера на якому крутиться Logstash, target-port 5555 - порт на який будемо відправляти логи, відправлення логів по tcp може навантажити сервер, тому в деяких випадках правильніше ставити udp.

Налаштовуємо INPUT у конфігураційному файлі Logstash

2. Elastic stack: аналіз security логів. Logstash

За замовчуванням файл конфігурації знаходиться в директорії /etc/logstash/conf.d/. Конфігураційний файл складається з трьох осмислених елементів: INPUT, FILTER, OUTPUT. У ВХІД ми вказуємо звідки система братиме логи, ФІЛЬТР парсим лог - налаштовуємо як поділити повідомлення на поля та значення, ВИХІД налаштовуємо потік, що виходить - куди розпарені логи будуть відправлятися.

Спочатку налаштуємо INPUT, розглянемо деякі типи, які можуть бути - file, tcp і exe.

TCP:

input {
tcp {
    port => 5555
    host => “10.10.1.205”
    type => "checkpoint"
    mode => "server"
}
}

mode => «server»
Говорить про те, що Logstash приймає з'єднання.

port => 5555
host => "10.10.1.205"
Приймаємо з'єднання за IP-адресою 10.10.1.205 (Logstash), порт 5555 - порт повинен бути дозволений політикою фаєрволу.

type => "checkpoint"
Маркуємо документ, дуже зручно, якщо у вас кілька вхідних з'єднань. Надалі для кожного з'єднання можна написати свій фільтр за допомогою логічної конструкції if.

Файл:

input {
  file {
    path => "/var/log/openvas_report/*"
    type => "openvas"
    start_position => "beginning"
    }
}

Опис налаштувань:
path => "/var/log/openvas_report/*"
Вказуємо директорію, файли в якій потрібно рахувати.

type => «openvas»
Тип події.

start_position => "beginning"
При зміні файлу зчитує файл повністю, якщо виставити «end», то система чекає появи нових записів в кінці файлу.

Виконавець:

input {
  exec {
    command => "ls -alh"
    interval => 30
  }
}

За даним інпутом запускається (тільки!) shell команда і її виведення обертається в лог повідомлення.

command => «ls-alh»
Команда, висновок якої нас цікавить.

interval => 30
Інтервал виклику команди за секунди.

Для того, щоб приймати логи з міжмережевого екрану, прописуємо фільтр. тпп або udp, залежно від того, як надсилаються логи в Logstash.

Налаштовуємо Output в конфігураційному файлі Logstash в режимі дебагу, щоб зрозуміти як виглядає лог повідомлення

Після того як ми налаштували INPUT, необхідно зрозуміти як виглядатиме лог повідомлення, які методи необхідно використовувати для налаштування фільтра (парсеру) логів.

Для цього будемо використовувати фільтр, який видає результат у stdout для того, щоб переглянути вихідне повідомлення, повністю конфігураційний файл на поточний момент виглядатиме так:

input 
{
         tcp 
         {
                port => 5555
  	  	type => "checkpoint"
    		mode => "server"
                host => “10.10.1.205”
   	 }
}

output 
{
	if [type] == "checkpoint" 
       {
		stdout { codec=> json }
	}
}

Запуск команду для перевірки:
sudo /usr/share/logstash/bin//logstash -f /etc/logstash/conf.d/checkpoint.conf
Бачимо результат, картинка клікабельна:

2. Elastic stack: аналіз security логів. Logstash

Якщо скопіювати це буде виглядати так:

action="Accept" conn_direction="Internal" contextnum="1" ifdir="outbound" ifname="bond1.101" logid="0" loguid="{0x5dfb8c13,0x5,0xfe0a0a0a,0xc0000000}" origin="10.10.10.254" originsicname="CN=ts-spb-cpgw-01,O=cp-spb-mgmt-01.tssolution.local.kncafb" sequencenum="8" time="1576766483" version="5" context_num="1" dst="10.10.10.10" dst_machine_name="[email protected]" layer_name="TSS-Standard Security" layer_name="TSS-Standard Application" layer_uuid="dae7f01c-4c98-4c3a-a643-bfbb8fcf40f0" layer_uuid="dbee3718-cf2f-4de0-8681-529cb75be9a6" match_id="8" match_id="33554431" parent_rule="0" parent_rule="0" rule_action="Accept" rule_action="Accept" rule_name="Implicit Cleanup" rule_uid="6dc2396f-9644-4546-8f32-95d98a3344e6" product="VPN-1 & FireWall-1" proto="17" s_port="37317" service="53" service_id="domain-udp" src="10.10.1.180" ","type":"qqqqq","host":"10.10.10.250","@version":"1","port":50620}{"@timestamp":"2019-12-19T14:50:12.153Z","message":"time="1576766483" action="Accept" conn_direction="Internal" contextnum="1" ifdir="outbound" ifname="bond1.101" logid="0" loguid="{0x5dfb8c13,

Дивлячись на дані повідомлення, розуміємо, що логи мають вигляд: поле = значення або key = value, а значить підходить фільтр, який називається kv. Для того, щоб правильно вибрати фільтр до кожного конкретного випадку, було б непогано з ними ознайомитися в технічній документації, або запитати друга.

Налаштовуємо Filter

На минулому етапі вибрали kv, далі представлена ​​конфігурація цього фільтра:

filter {
if [type] == "checkpoint"{
	kv {
		value_split => "="
		allow_duplicate_values => false
	}
}
}

Вибираємо символ за яким ділитимемо поле і значення - "=". Якщо у нас є в лозі однакові записи, зберігаємо в базі лише один екземпляр, інакше у вас вийде масив з однакових значень, тобто якщо маємо повідомлення "foo = some foo = some" записуємо тільки foo = some.

Налаштовуємо коректний Output у ElasticSearch

Після налаштування Filter можна вивантажувати логи в базу еластичний пошук:

output 
{
if [type] == "checkpoint"
{
 	elasticsearch 
        {
		hosts => ["10.10.1.200:9200"]
		index => "checkpoint-%{+YYYY.MM.dd}"
    		user => "tssolution"
    		password => "cool"
  	}
}
}

Якщо документ підписаний типом checkpoint, зберігаємо подію в базі elasticsearch, яка приймає з'єднання на 10.10.1.200 на порт 9200 по дефолту. Кожен документ зберігається в певний індекс, в даному випадку зберігаємо в індекс checkpoint + поточна тимчасова дата. Кожен індекс може володіти певним набором полів, або створюється автоматично з появою нового поля в повідомленні, налаштування полів та їх тип можна подивитися в mappings.

Якщо у вас налаштована аутентифікація (розглянемо пізніше), обов'язково повинні бути вказані кредити для запису в конкретний індекс, в даному прикладі це «tssolution» з паролем «cool». Можна розмежувати права користувачів для запису логів тільки в певний індекс і більше.

Запускаємо Logstash.

Конфігураційний файл Logstash:

input 
{
         tcp 
         {
                port => 5555
  	  	type => "checkpoint"
    		mode => "server"
                host => “10.10.1.205”
   	 }
}

filter {
        if [type] == "checkpoint"{
	kv {
		value_split => "="
		allow_duplicate_values => false
	}
        }
}

output 
{
if [type] == "checkpoint"
{
 	elasticsearch 
        {
		hosts => ["10.10.1.200:9200"]
		index => "checkpoint-%{+YYYY.MM.dd}"
    		user => "tssolution"
    		password => "cool"
  	}
}
}

Перевіряємо конфігураційний файл на правильність складання:
/usr/share/logstash/bin//logstash -f checkpoint.conf
2. Elastic stack: аналіз security логів. Logstash

Запускаємо процес Logstash:
sudo systemctl start logstash

Перевіряємо, що процес запустився:
sudo systemctl status logstash

2. Elastic stack: аналіз security логів. Logstash

Перевіряємо чи піднявся сокет:
netstat-nat |grep 5555

2. Elastic stack: аналіз security логів. Logstash

Перевіряємо логи в Kibana.

Після того як все запущено, заходимо в Kibana - Discover, переконуємося, що все налаштовано правильно, картинка клікабельна!

2. Elastic stack: аналіз security логів. Logstash

Всі логі на місці і ми можемо бачити всі поля та їх значення!

Висновок

Ми розглянули як писати конфігураційний файл Logstash, у результаті отримали парсер усіх полів та значень. Тепер ми можемо працювати з пошуком та складанням графіків за певними полями. Далі в курсі ми розглянемо візуалізацію до Kibana, створимо простенький дашбоард. Варто згадати, що конфігураційний файл Logstash потрібно постійно дописувати в певних ситуаціях, наприклад, коли хочемо замінити значення поля з цифри на слово. У наступних статтях ми постійно це робитимемо.

Так що слідкуйте за оновленнями (Telegram, Facebook, VK, TS Solution Blog), Яндекс.Дзен.

Джерело: habr.com

Додати коментар або відгук