Prijatelji smo sa ELK-om i Exchange-om. Dio 2

Prijatelji smo sa ELK-om i Exchange-om. Dio 2

Nastavljam svoju priču o tome kako steći prijatelje Exchange i ELK (početak ovdje). Da vas podsjetim da je ova kombinacija sposobna bez oklijevanja obraditi vrlo veliki broj dnevnika. Ovaj put ćemo razgovarati o tome kako natjerati Exchange da radi sa Logstash i Kibana komponentama.

Logstash u ELK steku se koristi za inteligentnu obradu logova i njihovu pripremu za postavljanje u Elastic u obliku dokumenata, na osnovu kojih je pogodno izgraditi različite vizualizacije u Kibani.

postavljanje

Sastoji se od dvije faze:

  • Instaliranje i konfigurisanje OpenJDK paketa.
  • Instalacija i konfiguracija Logstash paketa.

Instaliranje i konfigurisanje OpenJDK paketa

OpenJDK paket mora se preuzeti i raspakirati u određeni direktorij. Zatim se putanja do ovog direktorija mora unijeti u varijable $env:Path i $env:JAVA_HOME operativnog sistema Windows:

Prijatelji smo sa ELK-om i Exchange-om. Dio 2

Prijatelji smo sa ELK-om i Exchange-om. Dio 2

Provjerimo Java verziju:

PS C:> java -version
openjdk version "13.0.1" 2019-10-15
OpenJDK Runtime Environment (build 13.0.1+9)
OpenJDK 64-Bit Server VM (build 13.0.1+9, mixed mode, sharing)

Instalacija i konfiguracija Logstash paketa

Preuzmite arhivsku datoteku sa Logstash distribucijom odavde. Arhiva mora biti raspakirana u korijenu diska. Raspakujte u folder C:Program Files Ne isplati se, Logstash će odbiti normalno pokretanje. Zatim morate ući u datoteku jvm.options popravke odgovorne za dodjelu RAM-a za Java proces. Preporučujem da navedete polovinu RAM-a servera. Ako ima 16 GB RAM-a na ploči, tada su zadani ključevi:

-Xms1g
-Xmx1g

mora se zamijeniti sa:

-Xms8g
-Xmx8g

Osim toga, preporučljivo je komentarisati liniju -XX:+UseConcMarkSweepGC. Više o ovome ovdje. Sljedeći korak je kreiranje zadane konfiguracije u datoteci logstash.conf:

input {
 stdin{}
}
 
filter {
}
 
output {
 stdout {
 codec => "rubydebug"
 }
}

Sa ovom konfiguracijom, Logstash čita podatke sa konzole, prolazi ih kroz prazan filter i vraća ih nazad u konzolu. Korištenje ove konfiguracije će testirati funkcionalnost Logstash-a. Da bismo to učinili, pokrenimo ga u interaktivnom načinu:

PS C:...bin> .logstash.bat -f .logstash.conf
...
[2019-12-19T11:15:27,769][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2019-12-19T11:15:27,847][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-12-19T11:15:28,113][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

Logstash je uspješno pokrenut na portu 9600.

Poslednji korak instalacije: pokrenite Logstash kao Windows uslugu. To se može učiniti, na primjer, pomoću paketa NSSM:

PS C:...bin> .nssm.exe install logstash
Service "logstash" installed successfully!

tolerancije grešaka

Sigurnost dnevnika prilikom prijenosa sa izvornog servera je osigurana mehanizmom Persistent Queues.

Kako to funkcioniše

Raspored redova tokom obrade dnevnika je: ulaz → red → filter + izlaz.

Dodatak za unos prima podatke iz izvora dnevnika, upisuje ih u red čekanja i šalje potvrdu da su podaci primljeni izvoru.

Poruke iz reda se obrađuju od strane Logstash-a, prolaze kroz filter i izlazni dodatak. Kada od izlaza dobije potvrdu da je dnevnik poslat, Logstash uklanja obrađeni dnevnik iz reda čekanja. Ako se Logstash zaustavi, sve neobrađene poruke i poruke za koje nije primljena potvrda ostaju u redu, a Logstash će nastaviti da ih obrađuje sljedeći put kada se pokrene.

podešavanje

Podesiva tipkama u datoteci C:Logstashconfiglogstash.yml:

  • queue.type: (moguće vrijednosti - persisted и memory (default)).
  • path.queue: (putanja do fascikle sa datotekama u redu čekanja, koje su podrazumevano uskladištene u C:Logstashqueue).
  • queue.page_capacity: (maksimalna veličina stranice u redu čekanja, zadana vrijednost je 64mb).
  • queue.drain: (true/false - omogućava/onemogućava zaustavljanje obrade reda prije gašenja Logstash-a. Ne preporučujem da ga omogućite, jer će to direktno uticati na brzinu gašenja servera).
  • queue.max_events: (maksimalni broj događaja u redu čekanja, zadana vrijednost je 0 (neograničeno)).
  • queue.max_bytes: (maksimalna veličina reda u bajtovima, default - 1024mb (1gb)).

Ako je konfigurisano queue.max_events и queue.max_bytes, tada poruke prestaju da se primaju u red čekanja kada se dostigne vrijednost bilo koje od ovih postavki. Saznajte više o trajnim redovima ovdje.

Primjer dijela logstash.yml odgovornog za postavljanje reda čekanja:

queue.type: persisted
queue.max_bytes: 10gb

podešavanje

Logstash konfiguracija se obično sastoji od tri dijela, odgovornih za različite faze obrade ulaznih dnevnika: prijem (ulazni dio), raščlanjivanje (filter dio) i slanje u Elastic (izlazni odjeljak). U nastavku ćemo detaljnije pogledati svaki od njih.

ulazni

Primamo dolazni stream sa sirovim zapisnicima od filebeat agenata. Upravo ovaj dodatak navodimo u odjeljku za unos:

input {
  beats {
    port => 5044
  }
}

Nakon ove konfiguracije, Logstash počinje osluškivati ​​port 5044, a kada prima dnevnike, obrađuje ih prema postavkama odjeljka filtera. Ako je potrebno, možete umotati kanal za primanje logova iz bita fajla u SSL. Pročitajte više o postavkama dodataka za beats ovdje.

Filter

Svi tekstualni dnevnici koji su interesantni za obradu koje Exchange generiše su u csv formatu sa poljima opisanim u samoj log datoteci. Za raščlanjivanje csv zapisa, Logstash nam nudi tri dodatka: secirati, csv i grok. Prvi je najviše brza, ali se nosi sa raščlanjivanjem samo najjednostavnijih dnevnika.
Na primjer, podijelit će sljedeći zapis na dva (zbog prisustva zareza unutar polja), zbog čega će dnevnik biti pogrešno analiziran:

…,"MDB:GUID1, Mailbox:GUID2, Event:526545791, MessageClass:IPM.Note, CreationTime:2020-05-15T12:01:56.457Z, ClientType:MOMT, SubmissionAssistant:MailboxTransportSubmissionEmailAssistant",…

Može se koristiti prilikom raščlanjivanja dnevnika, na primjer, IIS. U ovom slučaju, odjeljak filtera može izgledati ovako:

filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
    }
  }
} 

Logstash konfiguracija vam omogućava da koristite uslovne izjave, tako da možemo slati samo zapise koji su označeni oznakom filebeat u dodatak za dissect IIS. Unutar dodatka povezujemo vrijednosti polja s njihovim imenima, brišemo originalno polje message, koji je sadržavao unos iz dnevnika, a možemo dodati i prilagođeno polje koje će, na primjer, sadržavati naziv aplikacije iz koje prikupljamo dnevnike.

U slučaju evidencije praćenja, bolje je koristiti csv dodatak; on može ispravno obraditi složena polja:

filter {
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
}

Unutar dodatka povezujemo vrijednosti polja s njihovim imenima, brišemo originalno polje message (a takođe i polja tenant-id и schema-version), koji je sadržavao unos iz dnevnika, a možemo dodati i prilagođeno polje koje će, na primjer, sadržavati naziv aplikacije iz koje prikupljamo dnevnike.

Na izlasku iz faze filtriranja, dobićemo dokumente u prvom aproksimaciji, spremne za vizualizaciju u Kibani. Nedostajaće nam sledeće:

  • Numerička polja će biti prepoznata kao tekst, što onemogućuje operacije na njima. Naime, polja time-taken IIS dnevnik, kao i polja recipient-count и total-bites Log Tracking.
  • Standardna vremenska oznaka dokumenta će sadržavati vrijeme obrade dnevnika, a ne vrijeme kada je napisana na strani servera.
  • polje recipient-address izgledaće kao jedno gradilište, koje ne dozvoljava analizu da se prebroje primaoci pisama.

Vrijeme je da dodate malo magije procesu obrade dnevnika.

Pretvaranje numeričkih polja

Dodatak za diseciranje ima opciju convert_datatype, koji se može koristiti za pretvaranje tekstualnog polja u digitalni format. Na primjer, ovako:

dissect {
  …
  convert_datatype => { "time-taken" => "int" }
  …
}

Vrijedi zapamtiti da je ova metoda prikladna samo ako će polje definitivno sadržavati niz. Opcija ne obrađuje nulte vrijednosti iz polja i izbacuje izuzetak.

Za evidencije praćenja, bolje je ne koristiti sličnu metodu konverzije, jer polja recipient-count и total-bites može biti prazan. Za konvertovanje ovih polja bolje je koristiti dodatak mutirati:

mutate {
  convert => [ "total-bytes", "integer" ]
  convert => [ "recipient-count", "integer" ]
}

Dijeljenje recipient_address na pojedinačne primaoce

Ovaj problem se također može riješiti pomoću dodatka za mutiranje:

mutate {
  split => ["recipient_address", ";"]
}

Promjena vremenske oznake

U slučaju evidencije praćenja, problem se vrlo lako rješava pluginom datum, što će vam pomoći da pišete na terenu timestamp datum i vrijeme u traženom formatu iz polja date-time:

date {
  match => [ "date-time", "ISO8601" ]
  timezone => "Europe/Moscow"
  remove_field => [ "date-time" ]
}

U slučaju IIS dnevnika, morat ćemo kombinirati podatke polja date и time koristeći mutate plugin, registrirajte vremensku zonu koja nam je potrebna i ubacite ovu vremensku oznaku timestamp pomoću dodatka datuma:

mutate { 
  add_field => { "data-time" => "%{date} %{time}" }
  remove_field => [ "date", "time" ]
}
date { 
  match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
  timezone => "UTC"
  remove_field => [ "data-time" ]
}

izlaz

Izlazni dio se koristi za slanje obrađenih dnevnika prijemniku dnevnika. U slučaju slanja direktno na Elastic, koristi se dodatak elastična pretraga, koji specificira adresu servera i predložak imena indeksa za slanje generiranog dokumenta:

output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Konačna konfiguracija

Konačna konfiguracija će izgledati ovako:

input {
  beats {
    port => 5044
  }
}
 
filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
      convert_datatype => { "time-taken" => "int" }
    }
    mutate { 
      add_field => { "data-time" => "%{date} %{time}" }
      remove_field => [ "date", "time" ]
    }
    date { 
      match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
      timezone => "UTC"
      remove_field => [ "data-time" ]
    }
  }
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
    mutate {
      convert => [ "total-bytes", "integer" ]
      convert => [ "recipient-count", "integer" ]
      split => ["recipient_address", ";"]
    }
    date {
      match => [ "date-time", "ISO8601" ]
      timezone => "Europe/Moscow"
      remove_field => [ "date-time" ]
    }
  }
}
 
output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Korisni linkovi:

izvor: www.habr.com

Dodajte komentar