Сябруем ELK і Exchange. Частка 2

Сябруем ELK і Exchange. Частка 2

Я працягваю свой аповяд аб тым, як пасябраваць Exchange і ELK (пачатак тут). Нагадаю, што гэтая камбінацыя здольная без ваганняў апрацоўваць вельмі вялікую колькасць логаў. На гэты раз мы пагаворым аб тым, як наладзіць працу Exchange з кампанентамі Logstash і Kibana.

Logstash у стэку ELK выкарыстоўваецца для інтэлектуальнай апрацоўкі логаў і іх падрыхтоўкі да размяшчэння ў Elastic у выглядзе дакументаў, на аснове якіх зручна будаваць розныя візуалізацыі ў Kibana.

Ўстаноўка

Складаецца з двух этапаў:

  • Усталяванне і настройка пакета OpenJDK.
  • Ўстаноўка і настройка пакета Logstash.

Усталяванне і настройка пакета OpenJDK

Пакет OpenJDK неабходна спампаваць і распакаваць у пэўную дырэкторыю. Затым шлях да гэтай дырэкторыі неабходна занесці ў зменныя $env:Path і $env:JAVA_HOME аперацыйнай сістэмы Windows:

Сябруем ELK і Exchange. Частка 2

Сябруем ELK і Exchange. Частка 2

Праверым версію Java:

PS C:> java -version
openjdk version "13.0.1" 2019-10-15
OpenJDK Runtime Environment (build 13.0.1+9)
OpenJDK 64-Bit Server VM (build 13.0.1+9, mixed mode, sharing)

Устаноўка і настройка пакета Logstash

Файл-архіў з дыстрыбутывам Logstash запампуйце адсюль. Архіў трэба распакаваць у корань дыска. Распакоўваць у тэчку C:Program Files не варта, Logstash адмовіцца нармальна запускацца. Затым неабходна ўнесці ў файл jvm.options праўкі, якія адказваюць за вылучэнне аператыўнай памяці для працэсу Java. Рэкамендую пазначыць палову аператыўнай памяці сервера. Калі ў яго на борце 16 Гб аператыўкі, то ключы па змаўчанні:

-Xms1g
-Xmx1g

неабходна замяніць на:

-Xms8g
-Xmx8g

Акрамя гэтага, мэтазгодна закамэнтаваць радок -XX:+UseConcMarkSweepGC. Больш падрабязна пра гэта тут. Наступны крок - стварэнне канфігурацыі па змаўчанні ў файле logstash.conf:

input {
 stdin{}
}
 
filter {
}
 
output {
 stdout {
 codec => "rubydebug"
 }
}

Пры выкарыстанні гэтай канфігурацыі Logstash счытвае дадзеныя з кансолі, прапускае праз пусты фільтр і выводзіць зваротна ў кансоль. Ужыванне гэтай канфігурацыі дазволіць праверыць працаздольнасць Logstash. Для гэтага запусцім яго ў інтэрактыўным рэжыме:

PS C:...bin> .logstash.bat -f .logstash.conf
...
[2019-12-19T11:15:27,769][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2019-12-19T11:15:27,847][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-12-19T11:15:28,113][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

Logstash паспяхова запусціўся на порце 9600.

Фінальны крок усталёўкі: запуск Logstash у выглядзе сэрвісу Windows. Гэта можна зрабіць, напрыклад, з дапамогай пакета НССМ:

PS C:...bin> .nssm.exe install logstash
Service "logstash" installed successfully!

адмоваўстойлівасць

Захаванасць логаў пры перадачы з зыходнага сервера забяспечваецца механізмам Persistent Queues.

Як працуе

Схема размяшчэння чэргаў у працэсе апрацоўкі логаў: input → queue → filter + output.

Убудова input атрымлівае дадзеныя ад крыніцы логаў, запісвае іх у чаргу і адпраўляе крыніцы пацверджанне атрымання дадзеных.

Паведамленні з чаргі апрацоўваюцца Logstash, праходзяць фільтр і плягін output. Пры атрыманні ад output пацверджання адпраўкі лога Logstash выдаляе апрацаваны лог з чаргі. Калі Logstash спыняецца, то ўсе неапрацаваныя паведамленні і паведамленні, па якіх не атрымана пацверджанне аб адпраўцы, застаюцца ў чарзе, і Logstash працягне іх апрацоўку пры наступным запуску.

Настройка

Рэгулюецца ключамі ў файле C:Logstashconfiglogstash.yml:

  • queue.type: (магчымыя значэнні persisted и memory (default)).
  • path.queue: (шлях да тэчкі з файламі чэргаў, якія па змаўчанні захоўваюцца ў C:Logstashqueue).
  • queue.page_capacity: (максімальны памер старонкі чаргі, значэнне па змаўчанні - 64mb).
  • queue.drain: (true/false - уключае/выключае прыпынак апрацоўкі чаргі перад выключэннем Logstash. Не рэкамендую ўключаць, таму што гэта прама адаб'ецца на хуткасці выключэння сервера).
  • queue.max_events: (максімальна колькасць падзей у чарзе, па змаўчанні - 0 (не абмежавана)).
  • queue.max_bytes: (максімальны памер чаргі ў байтах, па змаўчанні - 1024mb (1gb)).

Калі настроены queue.max_events и queue.max_bytes, то паведамленні перастаюць прымацца ў чаргу пры дасягненні значэння любой з гэтых налад. Падрабязней пра Persistent Queues расказана тут.

Прыклад часткі logstash.yml, якая адказвае за наладу чаргі:

queue.type: persisted
queue.max_bytes: 10gb

Настройка

Канфігурацыя Logstash звычайна складаецца з трох частак, якія адказваюць за розныя фазы апрацоўкі ўваходны логаў: прыём (секцыя input), парсінг (секцыя filter) і адпраўка ў Elastic (секцыя output). Ніжэй мы падрабязней разгледзім кожную з іх.

уваход

Уваходны струмень з волкімі логамі прымаем ад агентаў filebeat. Менавіта гэты плягін мы і паказваем у секцыі input:

input {
  beats {
    port => 5044
  }
}

Пасля такой налады Logstash пачынае праслухоўваць порт 5044, і пры атрыманні логаў апрацоўвае іх паводле налад секцыі filter. Пры неабходнасці можна канал атрымання логаў ад filebit загарнуць у SSL. Падрабязней аб наладах плагіна beats напісана тут.

фільтры

Усе цікавыя для апрацоўкі тэкставыя логі, якія генеруе Exchange, маюць csv-фармат з апісанымі ў самім файле логаў палямі. Для парсінгу csv-запісаў Logstash прапануе нам тры плагіны: рассякаць, csv і grok. Першы - самы хуткі, але спраўляецца з парсінгам толькі самых простых логаў.
Напрыклад, наступны запіс ён разаб'е на дзве (з-за наяўнасці ўнутры поля коскі), з-за чаго лог будзе разабраны няправільна:

…,"MDB:GUID1, Mailbox:GUID2, Event:526545791, MessageClass:IPM.Note, CreationTime:2020-05-15T12:01:56.457Z, ClientType:MOMT, SubmissionAssistant:MailboxTransportSubmissionEmailAssistant",…

Яго можна выкарыстоўваць пры парсінгу логаў, напрыклад, IIS. У гэтым выпадку секцыя filter можа выглядаць наступным чынам:

filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
    }
  }
} 

Канфігурацыя Logstash дазваляе выкарыстоўваць умоўныя аператары, таму мы ў плягін dissect можам накіраваць толькі логі, якія былі пазначаныя filebeat тэгам IIS. Усярэдзіне плагіна мы супастаўляем значэнні палёў з іх назовамі, выдаляны зыходнае поле message, якое змяшчала запіс з лога, і можам дадаць адвольнае поле, якое будзе, напрыклад, змяшчаць імя прыкладання з якога мы збіраем логі.

У выпадку з логамі трэкінга лепш выкарыстоўваць убудову csv, ён умее карэктна апрацоўваць складаныя палі:

filter {
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
}

Усярэдзіне плагіна мы супастаўляем значэнні палёў з іх назовамі, выдаляны зыходнае поле message (а таксама палі tenant-id и schema-version), якое змяшчала запіс з лога, і можам дадаць адвольнае поле, якое будзе, напрыклад, змяшчаць імя прыкладання з якога мы збіраем логі.

На выхадзе са стадыі фільтрацыі мы атрымаем дакументы ў першым набліжэнні, гатовыя да візуалізацыі ў Kibana. Бракаць нам будзе наступнага:

  • Лікавыя палі будуць распазнаны як тэкст, што не дазваляе выконваць аперацыі з імі. А менавіта, палі time-taken лога IIS, а таксама палі recipient-count и total-bites лога Tracking.
  • Стандартны часавы штамп дакумента будзе змяшчаць час апрацоўкі лога, а не час запісу яго на баку сервера.
  • Поле recipient-address будзе выглядаць адной будоўляй, што не дазваляе праводзіць аналіз з падлікам атрымальнікаў лістоў.

Настаў час дадаць крыху магіі ў працэс апрацоўкі логаў.

Канвертаванне лікавых палёў

Убудова dissect мае опцыю convert_datatype, якую можна выкарыстоўваць для канвертавання тэкставага поля ў лічбавы фармат. Напрыклад, так:

dissect {
  …
  convert_datatype => { "time-taken" => "int" }
  …
}

Варта памятаць, што гэты метад падыходзіць толькі ў тым выпадку, калі поле сапраўды будзе змяшчаць радок. Null-значэнні з палёў опцыя не апрацоўвае і вывальваецца ў выключэнне.

Для логаў трэкінга аналагічны метад convert лепш не выкарыстоўваць, бо палі recipient-count и total-bites могуць быць пустымі. Для канвертавання гэтых палёў лепш выкарыстоўваць убудова мутаваць:

mutate {
  convert => [ "total-bytes", "integer" ]
  convert => [ "recipient-count", "integer" ]
}

Разбіццё recipient_address на асобных атрымальнікаў

Гэтую задачу можна таксама вырашыць з дапамогай плагіна mutate:

mutate {
  split => ["recipient_address", ";"]
}

Змяняны timestamp

У выпадку з логамі трэкінга задача вельмі проста вырашаецца плагінам. дата, які дапаможа прапісаць у полі timestamp дату і час у патрэбным фармаце з поля date-time:

date {
  match => [ "date-time", "ISO8601" ]
  timezone => "Europe/Moscow"
  remove_field => [ "date-time" ]
}

У выпадку з логамі IIS нам будзе неабходна аб'яднаць дадзеныя палёў date и time з дапамогай плагіна mutate, прапісаць патрэбную нам часовую зону і змясціць гэты часавы штамп у timestamp з дапамогай плагіна date:

mutate { 
  add_field => { "data-time" => "%{date} %{time}" }
  remove_field => [ "date", "time" ]
}
date { 
  match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
  timezone => "UTC"
  remove_field => [ "data-time" ]
}

выхад

Секцыя output выкарыстоўваецца для адпраўкі апрацаваных логаў у прымач логаў. У выпадку адпраўкі напрамую ў Elastic выкарыстоўваецца плягін эластычны пошук, у якім указваецца адрас сервера і шаблон імя індэкса для адпраўкі сфармаванага дакумента:

output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Выніковая канфігурацыя

Выніковая канфігурацыя будзе выглядаць наступным чынам:

input {
  beats {
    port => 5044
  }
}
 
filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
      convert_datatype => { "time-taken" => "int" }
    }
    mutate { 
      add_field => { "data-time" => "%{date} %{time}" }
      remove_field => [ "date", "time" ]
    }
    date { 
      match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
      timezone => "UTC"
      remove_field => [ "data-time" ]
    }
  }
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
    mutate {
      convert => [ "total-bytes", "integer" ]
      convert => [ "recipient-count", "integer" ]
      split => ["recipient_address", ";"]
    }
    date {
      match => [ "date-time", "ISO8601" ]
      timezone => "Europe/Moscow"
      remove_field => [ "date-time" ]
    }
  }
}
 
output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Карысныя спасылкі:

Крыніца: habr.com

Дадаць каментар