У минулій
У рамках курсу не розглядається установка ELK stack, оскільки існує безліч статей на цю тему, будемо розглядати конфігураційну складову.
Складемо план дій щодо конфігурації Logstash:
- Перевірка того, що еластичнийдослідження прийматиме логи (перевірка працездатності та відкритості порту).
- Розглядаємо, яким чином ми можемо відправити події в Logstash, вибираємо спосіб, реалізуємо.
- Налаштовуємо Input у конфігураційному файлі Logstash.
- Налаштовуємо Output у конфігураційному файлі Logstash в режимі дебагу, щоб зрозуміти як виглядає лог повідомлення.
- Налаштовуємо Filter.
- Налаштовуємо коректний Output у ElasticSearch.
- Запуск Logstash.
- Перевіряємо логи в Kibana.
Розглянемо детальніше кожен пункт:
Перевірка, що еластичнийдослідження прийматиме логи
Для цього можна перевірити командою curl доступ до Elasticsearch із системи, на якій розгорнуть Logstash. Якщо у вас налаштована аутентифікація, то також через curl передаємо користувач/пароль, вказуємо порт 9200, якщо ви його не змінювали. Якщо приходить відповідь на кшталт як зазначено нижче, значить усе гаразд.
[elastic@elasticsearch ~]$ curl -u <<user_name>> : <<password>> -sS -XGET "<<ip_address_elasticsearch>>:9200"
{
"name" : "elastic-1",
"cluster_name" : "project",
"cluster_uuid" : "sQzjTTuCR8q4ZO6DrEis0A",
"version" : {
"number" : "7.4.1",
"build_flavor" : "default",
"build_type" : "rpm",
"build_hash" : "fc0eeb6e2c25915d63d871d344e3d0b45ea0ea1e",
"build_date" : "2019-10-22T17:16:35.176724Z",
"build_snapshot" : false,
"lucene_version" : "8.2.0",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
}
[elastic@elasticsearch ~]$
Якщо відповідь не надійшла, то може бути кілька видів помилок: не працює процес elasticsearch, вказаний не той порт, або порт заблокований фаєрволом на сервері, де стоїть elasticsearch.
Розглядаємо як можна надіслати логи в Logstash з міжмережевого екрану check point
З Check Point management server можна відправити логи в Logstash через syslog, використовуючи утиліту log_exporter, докладніше можна ознайомитися з нею по даній
cp_log_export add name check_point_syslog target-server < > target-port 5555 protocol tcp format generic read-mode semi-unified
< > - адреса сервера на якому крутиться Logstash, target-port 5555 - порт на який будемо відправляти логи, відправлення логів по tcp може навантажити сервер, тому в деяких випадках правильніше ставити udp.
Налаштовуємо INPUT у конфігураційному файлі Logstash
За замовчуванням файл конфігурації знаходиться в директорії /etc/logstash/conf.d/. Конфігураційний файл складається з трьох осмислених елементів: INPUT, FILTER, OUTPUT. У ВХІД ми вказуємо звідки система братиме логи, ФІЛЬТР парсим лог - налаштовуємо як поділити повідомлення на поля та значення, ВИХІД налаштовуємо потік, що виходить - куди розпарені логи будуть відправлятися.
Спочатку налаштуємо INPUT, розглянемо деякі типи, які можуть бути - file, tcp і exe.
TCP:
input {
tcp {
port => 5555
host => “10.10.1.205”
type => "checkpoint"
mode => "server"
}
}
mode => «server»
Говорить про те, що Logstash приймає з'єднання.
port => 5555
host => "10.10.1.205"
Приймаємо з'єднання за IP-адресою 10.10.1.205 (Logstash), порт 5555 - порт повинен бути дозволений політикою фаєрволу.
type => "checkpoint"
Маркуємо документ, дуже зручно, якщо у вас кілька вхідних з'єднань. Надалі для кожного з'єднання можна написати свій фільтр за допомогою логічної конструкції if.
Файл:
input {
file {
path => "/var/log/openvas_report/*"
type => "openvas"
start_position => "beginning"
}
}
Опис налаштувань:
path => "/var/log/openvas_report/*"
Вказуємо директорію, файли в якій потрібно рахувати.
type => «openvas»
Тип події.
start_position => "beginning"
При зміні файлу зчитує файл повністю, якщо виставити «end», то система чекає появи нових записів в кінці файлу.
Виконавець:
input {
exec {
command => "ls -alh"
interval => 30
}
}
За даним інпутом запускається (тільки!) shell команда і її виведення обертається в лог повідомлення.
command => «ls-alh»
Команда, висновок якої нас цікавить.
interval => 30
Інтервал виклику команди за секунди.
Для того, щоб приймати логи з міжмережевого екрану, прописуємо фільтр. тпп або udp, залежно від того, як надсилаються логи в Logstash.
Налаштовуємо Output в конфігураційному файлі Logstash в режимі дебагу, щоб зрозуміти як виглядає лог повідомлення
Після того як ми налаштували INPUT, необхідно зрозуміти як виглядатиме лог повідомлення, які методи необхідно використовувати для налаштування фільтра (парсеру) логів.
Для цього будемо використовувати фільтр, який видає результат у stdout для того, щоб переглянути вихідне повідомлення, повністю конфігураційний файл на поточний момент виглядатиме так:
input
{
tcp
{
port => 5555
type => "checkpoint"
mode => "server"
host => “10.10.1.205”
}
}
output
{
if [type] == "checkpoint"
{
stdout { codec=> json }
}
}
Запуск команду для перевірки:
sudo /usr/share/logstash/bin//logstash -f /etc/logstash/conf.d/checkpoint.conf
Бачимо результат, картинка клікабельна:
Якщо скопіювати це буде виглядати так:
action="Accept" conn_direction="Internal" contextnum="1" ifdir="outbound" ifname="bond1.101" logid="0" loguid="{0x5dfb8c13,0x5,0xfe0a0a0a,0xc0000000}" origin="10.10.10.254" originsicname="CN=ts-spb-cpgw-01,O=cp-spb-mgmt-01.tssolution.local.kncafb" sequencenum="8" time="1576766483" version="5" context_num="1" dst="10.10.10.10" dst_machine_name="[email protected]" layer_name="TSS-Standard Security" layer_name="TSS-Standard Application" layer_uuid="dae7f01c-4c98-4c3a-a643-bfbb8fcf40f0" layer_uuid="dbee3718-cf2f-4de0-8681-529cb75be9a6" match_id="8" match_id="33554431" parent_rule="0" parent_rule="0" rule_action="Accept" rule_action="Accept" rule_name="Implicit Cleanup" rule_uid="6dc2396f-9644-4546-8f32-95d98a3344e6" product="VPN-1 & FireWall-1" proto="17" s_port="37317" service="53" service_id="domain-udp" src="10.10.1.180" ","type":"qqqqq","host":"10.10.10.250","@version":"1","port":50620}{"@timestamp":"2019-12-19T14:50:12.153Z","message":"time="1576766483" action="Accept" conn_direction="Internal" contextnum="1" ifdir="outbound" ifname="bond1.101" logid="0" loguid="{0x5dfb8c13,
Дивлячись на дані повідомлення, розуміємо, що логи мають вигляд: поле = значення або key = value, а значить підходить фільтр, який називається kv. Для того, щоб правильно вибрати фільтр до кожного конкретного випадку, було б непогано з ними ознайомитися в технічній документації, або запитати друга.
Налаштовуємо Filter
На минулому етапі вибрали kv, далі представлена конфігурація цього фільтра:
filter {
if [type] == "checkpoint"{
kv {
value_split => "="
allow_duplicate_values => false
}
}
}
Вибираємо символ за яким ділитимемо поле і значення - "=". Якщо у нас є в лозі однакові записи, зберігаємо в базі лише один екземпляр, інакше у вас вийде масив з однакових значень, тобто якщо маємо повідомлення "foo = some foo = some" записуємо тільки foo = some.
Налаштовуємо коректний Output у ElasticSearch
Після налаштування Filter можна вивантажувати логи в базу еластичний пошук:
output
{
if [type] == "checkpoint"
{
elasticsearch
{
hosts => ["10.10.1.200:9200"]
index => "checkpoint-%{+YYYY.MM.dd}"
user => "tssolution"
password => "cool"
}
}
}
Якщо документ підписаний типом checkpoint, зберігаємо подію в базі elasticsearch, яка приймає з'єднання на 10.10.1.200 на порт 9200 по дефолту. Кожен документ зберігається в певний індекс, в даному випадку зберігаємо в індекс checkpoint + поточна тимчасова дата. Кожен індекс може володіти певним набором полів, або створюється автоматично з появою нового поля в повідомленні, налаштування полів та їх тип можна подивитися в mappings.
Якщо у вас налаштована аутентифікація (розглянемо пізніше), обов'язково повинні бути вказані кредити для запису в конкретний індекс, в даному прикладі це «tssolution» з паролем «cool». Можна розмежувати права користувачів для запису логів тільки в певний індекс і більше.
Запускаємо Logstash.
Конфігураційний файл Logstash:
input
{
tcp
{
port => 5555
type => "checkpoint"
mode => "server"
host => “10.10.1.205”
}
}
filter {
if [type] == "checkpoint"{
kv {
value_split => "="
allow_duplicate_values => false
}
}
}
output
{
if [type] == "checkpoint"
{
elasticsearch
{
hosts => ["10.10.1.200:9200"]
index => "checkpoint-%{+YYYY.MM.dd}"
user => "tssolution"
password => "cool"
}
}
}
Перевіряємо конфігураційний файл на правильність складання:
/usr/share/logstash/bin//logstash -f checkpoint.conf
Запускаємо процес Logstash:
sudo systemctl start logstash
Перевіряємо, що процес запустився:
sudo systemctl status logstash
Перевіряємо чи піднявся сокет:
netstat-nat |grep 5555
Перевіряємо логи в Kibana.
Після того як все запущено, заходимо в Kibana - Discover, переконуємося, що все налаштовано правильно, картинка клікабельна!
Всі логі на місці і ми можемо бачити всі поля та їх значення!
Висновок
Ми розглянули як писати конфігураційний файл Logstash, у результаті отримали парсер усіх полів та значень. Тепер ми можемо працювати з пошуком та складанням графіків за певними полями. Далі в курсі ми розглянемо візуалізацію до Kibana, створимо простенький дашбоард. Варто згадати, що конфігураційний файл Logstash потрібно постійно дописувати в певних ситуаціях, наприклад, коли хочемо замінити значення поля з цифри на слово. У наступних статтях ми постійно це робитимемо.
Так що слідкуйте за оновленнями (
Джерело: habr.com