Sme priatelia s ELK a Exchange. Časť 2

Sme priatelia s ELK a Exchange. Časť 2

Pokračujem v mojom príbehu o tom, ako si nájsť priateľov Exchange a ELK (zač tu). Dovoľte mi pripomenúť, že táto kombinácia je schopná bez váhania spracovať veľmi veľké množstvo guľatiny. Tentoraz si povieme, ako prinútiť Exchange pracovať s komponentmi Logstash a Kibana.

Logstash v zásobníku ELK slúži na inteligentné spracovanie protokolov a ich prípravu na umiestnenie v Elastic vo forme dokumentov, na základe ktorých je vhodné v Kibane zostaviť rôzne vizualizácie.

Inštalácia

Pozostáva z dvoch etáp:

  • Inštalácia a konfigurácia balíka OpenJDK.
  • Inštalácia a konfigurácia balíka Logstash.

Inštalácia a konfigurácia balíka OpenJDK

Balík OpenJDK je potrebné stiahnuť a rozbaliť do konkrétneho adresára. Potom je potrebné zadať cestu k tomuto adresáru do premenných $env:Path a $env:JAVA_HOME operačného systému Windows:

Sme priatelia s ELK a Exchange. Časť 2

Sme priatelia s ELK a Exchange. Časť 2

Pozrime sa na verziu Java:

PS C:> java -version
openjdk version "13.0.1" 2019-10-15
OpenJDK Runtime Environment (build 13.0.1+9)
OpenJDK 64-Bit Server VM (build 13.0.1+9, mixed mode, sharing)

Inštalácia a konfigurácia balíka Logstash

Stiahnite si archívny súbor s distribúciou Logstash preto. Archív musí byť rozbalený do koreňového adresára disku. Rozbaľte do priečinka C:Program Files Nestojí to za to, Logstash odmietne normálne začať. Potom musíte vstúpiť do súboru jvm.options opravy zodpovedné za prideľovanie pamäte RAM pre proces Java. Odporúčam zadať polovicu RAM servera. Ak má na doske 16 GB RAM, predvolené klávesy sú:

-Xms1g
-Xmx1g

musí byť nahradený:

-Xms8g
-Xmx8g

Okrem toho je vhodné komentovať riadok -XX:+UseConcMarkSweepGC. Viac o tomto tu. Ďalším krokom je vytvorenie predvolenej konfigurácie v súbore logstash.conf:

input {
 stdin{}
}
 
filter {
}
 
output {
 stdout {
 codec => "rubydebug"
 }
}

S touto konfiguráciou Logstash načíta údaje z konzoly, prejde ich cez prázdny filter a odošle ich späť do konzoly. Použitím tejto konfigurácie sa otestuje funkčnosť Logstash. Ak to chcete urobiť, spustite ho v interaktívnom režime:

PS C:...bin> .logstash.bat -f .logstash.conf
...
[2019-12-19T11:15:27,769][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2019-12-19T11:15:27,847][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-12-19T11:15:28,113][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

Logstash bol úspešne spustený na porte 9600.

Posledný krok inštalácie: spustite Logstash ako službu Windows. Dá sa to urobiť napríklad pomocou balíka NSSM:

PS C:...bin> .nssm.exe install logstash
Service "logstash" installed successfully!

odolnosť proti chybám

Bezpečnosť protokolov pri prenose zo zdrojového servera je zabezpečená mechanizmom Persistent Queues.

Ako to funguje

Rozloženie frontov pri spracovaní protokolu je: vstup → front → filter + výstup.

Vstupný modul prijíma údaje zo zdroja denníka, zapisuje ich do frontu a odosiela potvrdenie o prijatí údajov do zdroja.

Správy z frontu spracuje Logstash, prejdú cez filter a výstupný plugin. Po prijatí potvrdenia z výstupu, že protokol bol odoslaný, Logstash odstráni spracovaný protokol z frontu. Ak sa Logstash zastaví, všetky nespracované správy a správy, pre ktoré nebolo prijaté žiadne potvrdenie, zostanú vo fronte a Logstash bude pokračovať v ich spracovaní pri ďalšom spustení.

nastavenie

Nastaviteľné klávesmi v súbore C:Logstashconfiglogstash.yml:

  • queue.type: (možné hodnoty - persisted и memory (default)).
  • path.queue: (cesta k priečinku so súbormi frontu, ktoré sú štandardne uložené v C:Logstashqueue).
  • queue.page_capacity: (maximálna veľkosť stránky vo fronte, predvolená hodnota je 64 MB).
  • queue.drain: (pravda/nepravda - povolí/zakáže zastavenie spracovania frontu pred vypnutím Logstash. Neodporúčam to zapínať, pretože to priamo ovplyvní rýchlosť vypínania servera).
  • queue.max_events: (maximálny počet udalostí vo fronte, predvolená hodnota je 0 (neobmedzené)).
  • queue.max_bytes: (maximálna veľkosť frontu v bajtoch, predvolená - 1024 MB (1 gb)).

Ak je nakonfigurovaný queue.max_events и queue.max_bytes, potom sa správy prestanú prijímať do frontu, keď sa dosiahne hodnota ktoréhokoľvek z týchto nastavení. Prečítajte si viac o trvalých frontoch tu.

Príklad časti logstash.yml zodpovednej za nastavenie frontu:

queue.type: persisted
queue.max_bytes: 10gb

nastavenie

Konfigurácia Logstash sa zvyčajne skladá z troch častí, ktoré sú zodpovedné za rôzne fázy spracovania prichádzajúcich protokolov: príjem (vstupná sekcia), analýza (filtračná sekcia) a odosielanie do Elastic (výstupná sekcia). Nižšie sa bližšie pozrieme na každý z nich.

Vstup

Prijímame prichádzajúci prúd s nespracovanými protokolmi od agentov filebeat. Je to tento doplnok, ktorý uvádzame vo vstupnej časti:

input {
  beats {
    port => 5044
  }
}

Po tejto konfigurácii začne Logstash počúvať port 5044 a pri prijímaní protokolov ich spracováva podľa nastavení sekcie filtra. V prípade potreby môžete kanál na prijímanie protokolov z filebitu zabaliť do SSL. Prečítajte si viac o nastaveniach doplnku beats tu.

filter

Všetky textové protokoly, ktoré sú zaujímavé na spracovanie a ktoré Exchange generuje, sú vo formáte csv s poľami popísanými v samotnom protokolovom súbore. Na analýzu záznamov csv nám Logstash ponúka tri doplnky: pitvať, csv a grok. Prvý je najviac быстрый, ale zvláda analýzu iba najjednoduchších protokolov.
Napríklad rozdelí nasledujúci záznam na dva (kvôli prítomnosti čiarky vo vnútri poľa), čo je dôvod, prečo bude protokol analyzovaný nesprávne:

…,"MDB:GUID1, Mailbox:GUID2, Event:526545791, MessageClass:IPM.Note, CreationTime:2020-05-15T12:01:56.457Z, ClientType:MOMT, SubmissionAssistant:MailboxTransportSubmissionEmailAssistant",…

Môže sa použiť pri analýze protokolov, napríklad IIS. V tomto prípade môže časť filtra vyzerať takto:

filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
    }
  }
} 

Konfigurácia Logstash vám umožňuje používať podmienené príkazy, takže do doplnku dissect môžeme posielať iba protokoly, ktoré boli označené značkou filebeat IIS. Vo vnútri doplnku porovnáme hodnoty polí s ich názvami, odstránime pôvodné pole message, ktorý obsahoval záznam z logu a môžeme pridať vlastné pole, ktoré bude napríklad obsahovať názov aplikácie, z ktorej logy zbierame.

V prípade protokolov sledovania je lepšie použiť doplnok csv, ktorý dokáže správne spracovať zložité polia:

filter {
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
}

Vo vnútri doplnku porovnáme hodnoty polí s ich názvami, odstránime pôvodné pole message (a tiež polia tenant-id и schema-version), ktorý obsahoval záznam z logu a môžeme pridať vlastné pole, ktoré bude napríklad obsahovať názov aplikácie, z ktorej logy zbierame.

Na výstupe z fázy filtrovania dostaneme dokumenty v prvej aproximácii, pripravené na vizualizáciu v Kibane. Chýbať nám bude nasledovné:

  • Číselné polia budú rozpoznané ako text, čo zabráni operáciám s nimi. Totiž polia time-taken Protokol IIS, ako aj polia recipient-count и total-bites Sledovanie denníka.
  • Štandardná časová pečiatka dokumentu bude obsahovať čas spracovania protokolu, nie čas, kedy bol zapísaný na strane servera.
  • Pole recipient-address bude vyzerať ako jedno stavenisko, ktoré neumožňuje analýzou spočítať príjemcov listov.

Je čas pridať trochu kúzla do procesu spracovania denníka.

Konverzia číselných polí

Doplnok pitvy má možnosť convert_datatype, ktorý možno použiť na prevod textového poľa do digitálneho formátu. Napríklad takto:

dissect {
  …
  convert_datatype => { "time-taken" => "int" }
  …
}

Stojí za to pamätať, že táto metóda je vhodná len vtedy, ak pole bude určite obsahovať reťazec. Možnosť nespracováva hodnoty Null z polí a vyvoláva výnimku.

Pre sledovanie protokolov je lepšie nepoužívať podobnú metódu prevodu, pretože polia recipient-count и total-bites môže byť prázdny. Na konverziu týchto polí je lepšie použiť plugin mutovať:

mutate {
  convert => [ "total-bytes", "integer" ]
  convert => [ "recipient-count", "integer" ]
}

Rozdelenie adresy príjemcu na jednotlivých príjemcov

Tento problém je možné vyriešiť aj pomocou doplnku mutate:

mutate {
  split => ["recipient_address", ";"]
}

Zmena časovej pečiatky

V prípade logov sledovania problém veľmi jednoducho vyrieši plugin dáta, ktorý vám pomôže zapísať do poľa timestamp dátum a čas v požadovanom formáte z poľa date-time:

date {
  match => [ "date-time", "ISO8601" ]
  timezone => "Europe/Moscow"
  remove_field => [ "date-time" ]
}

V prípade denníkov IIS budeme musieť skombinovať údaje z polí date и time pomocou doplnku mutate zaregistrujte časové pásmo, ktoré potrebujeme, a umiestnite doň túto časovú pečiatku timestamp pomocou doplnku dátumu:

mutate { 
  add_field => { "data-time" => "%{date} %{time}" }
  remove_field => [ "date", "time" ]
}
date { 
  match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
  timezone => "UTC"
  remove_field => [ "data-time" ]
}

Výkon

Výstupná sekcia sa používa na odosielanie spracovaných protokolov do prijímača protokolov. V prípade zaslania priamo do Elastic sa používa plugin ElasticSearch, ktorá špecifikuje adresu servera a šablónu indexového názvu na odoslanie vygenerovaného dokumentu:

output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Konečná konfigurácia

Konečná konfigurácia bude vyzerať takto:

input {
  beats {
    port => 5044
  }
}
 
filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
      convert_datatype => { "time-taken" => "int" }
    }
    mutate { 
      add_field => { "data-time" => "%{date} %{time}" }
      remove_field => [ "date", "time" ]
    }
    date { 
      match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
      timezone => "UTC"
      remove_field => [ "data-time" ]
    }
  }
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
    mutate {
      convert => [ "total-bytes", "integer" ]
      convert => [ "recipient-count", "integer" ]
      split => ["recipient_address", ";"]
    }
    date {
      match => [ "date-time", "ISO8601" ]
      timezone => "Europe/Moscow"
      remove_field => [ "date-time" ]
    }
  }
}
 
output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Užitočné odkazy:

Zdroj: hab.com

Pridať komentár