Дружимо ELK та Exchange. Частина 2

Дружимо ELK та Exchange. Частина 2

Я продовжую свою розповідь про те, як подружити Exchange та ELK (початок тут). Нагадаю, що ця комбінація здатна без вагань обробляти дуже багато логів. На цей раз ми поговоримо про те, як налагодити роботу Exchange з компонентами Logstash та Kibana.

Logstash в стеку ELK використовується для інтелектуальної обробки логів та їх підготовки до розміщення в Elastic у вигляді документів, на основі яких зручно будувати різні візуалізації в Kibana.

Встановлення

Складається з двох етапів:

  • Встановлення та налаштування пакету OpenJDK.
  • Встановлення та налаштування пакета Logstash.

Встановлення та налаштування пакету OpenJDK

Пакет OpenJDK необхідно завантажити та розпакувати у певну директорію. Потім шлях до цієї директорії необхідно внести до змінних $env:Path і $env:JAVA_HOME операційної системи Windows:

Дружимо ELK та Exchange. Частина 2

Дружимо ELK та Exchange. Частина 2

Перевіримо версію Java:

PS C:> java -version
openjdk version "13.0.1" 2019-10-15
OpenJDK Runtime Environment (build 13.0.1+9)
OpenJDK 64-Bit Server VM (build 13.0.1+9, mixed mode, sharing)

Встановлення та налаштування пакету Logstash

Файл-архів з дистрибутивом Logstash звідси. Архів потрібно розпакувати в корінь диска. Розпаковувати в папку C:Program Files не варто, Logstash відмовиться нормально запускатись. Потім необхідно внести файл jvm.options редагування, що відповідають за виділення оперативної пам'яті для процесу Java. Рекомендую вказати половину оперативної пам'яті сервера. Якщо у нього на борту 16 Гб оперативної пам'яті, то ключі за замовчуванням:

-Xms1g
-Xmx1g

необхідно замінити на:

-Xms8g
-Xmx8g

Крім цього, доцільно закоментувати рядок -XX:+UseConcMarkSweepGC. Докладніше про це тут. Наступним кроком є ​​створення конфігурації за промовчанням у файлі logstash.conf:

input {
 stdin{}
}
 
filter {
}
 
output {
 stdout {
 codec => "rubydebug"
 }
}

При використанні цієї конфігурації Logstash зчитує дані з консолі, пропускає через порожній фільтр та виводить назад у консоль. Використання цієї конфігурації дозволить перевірити працездатність Logstash. Для цього запустимо його в інтерактивному режимі:

PS C:...bin> .logstash.bat -f .logstash.conf
...
[2019-12-19T11:15:27,769][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2019-12-19T11:15:27,847][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-12-19T11:15:28,113][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

Logstash успішно запустився на порту 9600.

Фінальний крок інсталяції: запуск Logstash у вигляді сервісу Windows. Це можна зробити, наприклад, за допомогою пакета NSSM:

PS C:...bin> .nssm.exe install logstash
Service "logstash" installed successfully!

Відмовостійкість

Збереження логів передачі з вихідного сервера забезпечується механізмом Persistent Queues.

Як працює

Схема розташування черг у процесі обробки логів: input → queue → filter + output.

p align="justify"> Плагін input отримує дані від джерела логів, записує їх у чергу і відправляє джерелу підтвердження отримання даних.

Повідомлення з черги обробляються Logstash, проходять фільтр та плагін output. При отриманні від output підтвердження відправлення лога Logstash видаляє оброблений лог із черги. Якщо Logstash зупиняється, всі необроблені повідомлення та повідомлення, за якими не отримано підтвердження про відправлення, залишаються в черзі, і Logstash продовжить їх обробку при наступному запуску.

Налаштування

Регулюється ключами у файлі C:Logstashconfiglogstash.yml:

  • queue.type: (можливі значення - persisted и memory (default)).
  • path.queue: (шлях до папки з файлами черг, які за замовчуванням зберігаються у C:Logstashqueue).
  • queue.page_capacity: (максимальний розмір сторінки черги, значення за замовчуванням – 64mb).
  • queue.drain: (true/false — включає/вимикає зупинку обробки черги перед вимкненням Logstash. Не рекомендую включати, тому що це прямо позначиться на швидкості вимкнення сервера).
  • queue.max_events: (максимально кількість подій у черзі, за замовчуванням – 0 (не обмежено)).
  • queue.max_bytes: (максимальний розмір черги в байтах, за замовчуванням – 1024mb (1gb)).

Якщо налаштовані queue.max_events и queue.max_bytes, то повідомлення перестають прийматися в чергу при досягненні значення будь-якої з цих установок. Докладніше про Persistent Queues розказано тут.

Приклад частини logstash.yml, що відповідає за налаштування черги:

queue.type: persisted
queue.max_bytes: 10gb

Налаштування

Конфігурація Logstash зазвичай складається з трьох частин, що відповідають за різні фази обробки вхідних логів: прийом (секція input), парсинг (секція filter) і відправка в Elastic (секція output). Нижче ми розглянемо кожну з них.

вхід

Вхідний потік із сирими логами приймаємо від агентів файлубагата. Саме цей плагін ми і вказуємо в секції input:

input {
  beats {
    port => 5044
  }
}

Після такого налаштування Logstash починає прослуховувати порт 5044, і при отриманні логів обробляє їх згідно з налаштуваннями секції filter. При необхідності можна канал отримання логів від filebit загорнути в SSL. Детальніше про налаштування плагіна beats написано тут.

фільтр

Всі цікаві для обробки текстові логи, які генерує Exchange, мають csv-формат із описаними у самому файлі логів полями. Для парсингу csv-записів Logstash пропонує нам три плагіни: розсікати, csv та grok. Перший - самий швидкийАле справляється з парсингом тільки найпростіших логів.
Наприклад, наступний запис він розіб'є на дві (через наявність усередині поля комою), через що лог буде розібраний неправильно:

…,"MDB:GUID1, Mailbox:GUID2, Event:526545791, MessageClass:IPM.Note, CreationTime:2020-05-15T12:01:56.457Z, ClientType:MOMT, SubmissionAssistant:MailboxTransportSubmissionEmailAssistant",…

Його можна використовувати при парсингу логів, наприклад IIS. У цьому випадку секція filter може виглядати так:

filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
    }
  }
} 

Конфігурація Logstash дозволяє використовувати умовні оператори, тому ми в плагін dissect можемо направити тільки логи, які були позначені filebeat тегом IIS. Усередині плагіна ми зіставляємо значення полів із їхніми назвами, видаляємо вихідне поле message, що містило запис з лога, і можемо додати довільне поле, яке, наприклад, міститиме ім'я програми з якого ми збираємо логи.

У випадку з логами трекінгу краще використовувати плагін CSV, він вміє коректно обробляти складні поля:

filter {
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
}

Усередині плагіна ми зіставляємо значення полів із їхніми назвами, видаляємо вихідне поле message (а також поля tenant-id и schema-version), яке містило запис з лога, і можемо додати довільне поле, яке, наприклад, міститиме ім'я програми з якого ми збираємо логи.

На виході зі стадії фільтрації ми отримаємо документи у першому наближенні, готові до візуалізації в Kibana. Не вистачатиме нам наступного:

  • Числові поля будуть розпізнані як текст, що дозволяє виконувати операції з ними. А саме, поля time-taken лога IIS, а також поля recipient-count и total-bites логу Tracking.
  • Стандартний часовий штамп документа міститиме час обробки лога, а не час його запису на стороні сервера.
  • Поле recipient-address виглядатиме однією будовою, що дозволяє проводити аналіз із підрахунком одержувачів листів.

Настав час додати трохи магії у процес обробки логів.

Конвертація числових полів

Плагін dissect має опцію convert_datatype, яку можна використовувати для конвертації текстового поля у цифровий формат. Наприклад, так:

dissect {
  …
  convert_datatype => { "time-taken" => "int" }
  …
}

Варто пам'ятати, що цей метод підходить тільки в тому випадку, якщо поле точно міститиме рядок. Null-значення з полів опція не обробляє і вивалюється у виняток.

Для логів трекінгу аналогічний метод convert краще не використовувати, тому що поля recipient-count и total-bites можуть бути порожніми. Для конвертації цих полів краще використовувати плагін мутувати:

mutate {
  convert => [ "total-bytes", "integer" ]
  convert => [ "recipient-count", "integer" ]
}

Розбиття recipient_address на окремих одержувачів

Це завдання також можна вирішити за допомогою плагіна mutate:

mutate {
  split => ["recipient_address", ";"]
}

Змінюємо timestamp

У випадку з логами трекінгу завдання просто вирішується плагіном дата, який допоможе прописати у полі timestamp дату та час у потрібному форматі з поля date-time:

date {
  match => [ "date-time", "ISO8601" ]
  timezone => "Europe/Moscow"
  remove_field => [ "date-time" ]
}

У разі логів IIS нам буде необхідно об'єднати дані полів date и time за допомогою плагіна mutate, прописати потрібну нам тимчасову зону та помістити цей тимчасовий штамп у timestamp за допомогою плагіна date:

mutate { 
  add_field => { "data-time" => "%{date} %{time}" }
  remove_field => [ "date", "time" ]
}
date { 
  match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
  timezone => "UTC"
  remove_field => [ "data-time" ]
}

Вихід

Секція output використовується для відправки оброблених логів у приймач логів. У разі відправки безпосередньо до Elastic використовується плагін еластичний пошук, в якому вказується адреса сервера та шаблон імені індексу для надсилання сформованого документа:

output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Підсумкова конфігурація

Підсумкова конфігурація буде виглядати так:

input {
  beats {
    port => 5044
  }
}
 
filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
      convert_datatype => { "time-taken" => "int" }
    }
    mutate { 
      add_field => { "data-time" => "%{date} %{time}" }
      remove_field => [ "date", "time" ]
    }
    date { 
      match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
      timezone => "UTC"
      remove_field => [ "data-time" ]
    }
  }
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
    mutate {
      convert => [ "total-bytes", "integer" ]
      convert => [ "recipient-count", "integer" ]
      split => ["recipient_address", ";"]
    }
    date {
      match => [ "date-time", "ISO8601" ]
      timezone => "Europe/Moscow"
      remove_field => [ "date-time" ]
    }
  }
}
 
output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Корисні посилання:

Джерело: habr.com

Додати коментар або відгук