Prijatelji smo z ELK in Exchange. 2. del

Prijatelji smo z ELK in Exchange. 2. del

Nadaljujem svojo zgodbo o tem, kako se spoprijateljiti z Exchange in ELK (začetek tukaj). Naj vas spomnim, da je ta kombinacija sposobna brez zadržkov obdelati zelo veliko število hlodov. Tokrat bomo govorili o tem, kako omogočiti, da Exchange deluje s komponentama Logstash in Kibana.

Logstash v skladu ELK se uporablja za inteligentno obdelavo dnevnikov in njihovo pripravo za postavitev v Elastic v obliki dokumentov, na podlagi katerih je priročno graditi različne vizualizacije v Kibani.

Namestitev

Sestavljen je iz dveh stopenj:

  • Namestitev in konfiguracija paketa OpenJDK.
  • Namestitev in konfiguracija paketa Logstash.

Namestitev in konfiguracija paketa OpenJDK

Paket OpenJDK je treba prenesti in razpakirati v določen imenik. Nato je treba pot do tega imenika vnesti v spremenljivki $env:Path in $env:JAVA_HOME operacijskega sistema Windows:

Prijatelji smo z ELK in Exchange. 2. del

Prijatelji smo z ELK in Exchange. 2. del

Preverimo različico Jave:

PS C:> java -version
openjdk version "13.0.1" 2019-10-15
OpenJDK Runtime Environment (build 13.0.1+9)
OpenJDK 64-Bit Server VM (build 13.0.1+9, mixed mode, sharing)

Namestitev in konfiguracija paketa Logstash

Prenesite arhivsko datoteko z distribucijo Logstash zato. Arhiv mora biti razpakiran v koren diska. Razpakirajte v mapo C:Program Files Ni vredno, Logstash se ne bo hotel normalno zagnati. Nato morate vstopiti v datoteko jvm.options popravki, odgovorni za dodeljevanje RAM-a za proces Java. Priporočam, da določite polovico RAM-a strežnika. Če ima vgrajen 16 GB RAM-a, so privzeti ključi:

-Xms1g
-Xmx1g

je treba nadomestiti z:

-Xms8g
-Xmx8g

Poleg tega je priporočljivo vrstico komentirati -XX:+UseConcMarkSweepGC. Več o tem tukaj. Naslednji korak je ustvariti privzeto konfiguracijo v datoteki logstash.conf:

input {
 stdin{}
}
 
filter {
}
 
output {
 stdout {
 codec => "rubydebug"
 }
}

S to konfiguracijo Logstash prebere podatke iz konzole, jih prenese skozi prazen filter in jih vrne v konzolo. Z uporabo te konfiguracije boste preizkusili funkcionalnost Logstasha. Če želite to narediti, ga zaženite v interaktivnem načinu:

PS C:...bin> .logstash.bat -f .logstash.conf
...
[2019-12-19T11:15:27,769][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2019-12-19T11:15:27,847][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-12-19T11:15:28,113][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

Logstash se je uspešno zagnal na vratih 9600.

Zadnji korak namestitve: zaženite Logstash kot storitev Windows. To je mogoče storiti na primer z uporabo paketa NSSM:

PS C:...bin> .nssm.exe install logstash
Service "logstash" installed successfully!

toleranca napak

Varnost dnevnikov pri prenosu iz izvornega strežnika zagotavlja mehanizem Persistent Queues.

Kako deluje?

Postavitev čakalnih vrst med obdelavo dnevnika je: vnos → čakalna vrsta → filter + izhod.

Vnosni vtičnik prejme podatke iz vira dnevnika, jih zapiše v čakalno vrsto in pošlje potrdilo, da so bili podatki prejeti viru.

Sporočila iz čakalne vrste obdela Logstash, jih prenese skozi filter in izhodni vtičnik. Ko iz izhoda prejme potrditev, da je bil dnevnik poslan, Logstash obdelani dnevnik odstrani iz čakalne vrste. Če se Logstash ustavi, ostanejo v čakalni vrsti vsa neobdelana sporočila in sporočila, za katera ni bila prejeta potrditev, Logstash pa jih bo ob naslednjem zagonu nadaljeval z obdelavo.

prilagoditev

Nastavljivo s tipkami v datoteki C:Logstashconfiglogstash.yml:

  • queue.type: (možne vrednosti - persisted и memory (default)).
  • path.queue: (pot do mape z datotekami čakalne vrste, ki so privzeto shranjene v C:Logstashqueue).
  • queue.page_capacity: (največja velikost strani čakalne vrste, privzeta vrednost je 64 mb).
  • queue.drain: (true/false - omogoči/onemogoči zaustavitev obdelave čakalne vrste pred zaustavitvijo Logstash. Ne priporočam, da jo omogočite, ker bo to neposredno vplivalo na hitrost zaustavitve strežnika).
  • queue.max_events: (največje število dogodkov v čakalni vrsti, privzeto je 0 (neomejeno)).
  • queue.max_bytes: (največja velikost čakalne vrste v bajtih, privzeto - 1024mb (1gb)).

Če je konfiguriran queue.max_events и queue.max_bytes, se sporočila prenehajo sprejemati v čakalno vrsto, ko je dosežena vrednost katere koli od teh nastavitev. Izvedite več o trajnih čakalnih vrstah tukaj.

Primer dela logstash.yml, odgovornega za nastavitev čakalne vrste:

queue.type: persisted
queue.max_bytes: 10gb

prilagoditev

Konfiguracija Logstash je običajno sestavljena iz treh delov, ki so odgovorni za različne faze obdelave dohodnih dnevnikov: prejemanje (vhodni del), razčlenjevanje (filtrirni del) in pošiljanje v Elastic (izhodni del). Spodaj si bomo podrobneje ogledali vsakega od njih.

vhod

Prejmemo dohodni tok z neobdelanimi dnevniki od agentov filebeat. To je ta vtičnik, ki ga navedemo v razdelku za vnos:

input {
  beats {
    port => 5044
  }
}

Po tej konfiguraciji začne Logstash poslušati vrata 5044 in jih ob prejemu dnevnikov obdela v skladu z nastavitvami razdelka filtra. Po potrebi lahko kanal za prejemanje dnevnikov iz filebita zavijete v SSL. Preberite več o nastavitvah vtičnika Beats tukaj.

filter

Vsi besedilni dnevniki, zanimivi za obdelavo, ki jih ustvari Exchange, so v obliki csv s polji, opisanimi v sami dnevniški datoteki. Za razčlenjevanje zapisov csv nam Logstash ponuja tri vtičnike: secirati, csv in grok. Prvi je najbolj hitro, vendar se spopada z razčlenjevanjem le najpreprostejših dnevnikov.
Naslednji zapis bo na primer razdelil na dva (zaradi vejice v polju), zaradi česar bo dnevnik nepravilno razčlenjen:

…,"MDB:GUID1, Mailbox:GUID2, Event:526545791, MessageClass:IPM.Note, CreationTime:2020-05-15T12:01:56.457Z, ClientType:MOMT, SubmissionAssistant:MailboxTransportSubmissionEmailAssistant",…

Uporablja se lahko pri razčlenjevanju dnevnikov, na primer IIS. V tem primeru bi lahko razdelek filtra izgledal takole:

filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
    }
  }
} 

Konfiguracija Logstash vam omogoča uporabo pogojne izjave, zato lahko v vtičnik dissect pošljemo samo dnevnike, ki so bili označeni z oznako filebeat IIS. Znotraj vtičnika povežemo vrednosti polj z njihovimi imeni, izbrišemo izvirno polje message, ki je vseboval vnos iz dnevnika, dodamo pa lahko polje po meri, ki bo na primer vsebovalo ime aplikacije, iz katere zbiramo dnevnike.

V primeru sledenja dnevnikov je bolje uporabiti vtičnik csv, ki lahko pravilno obdela kompleksna polja:

filter {
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
}

Znotraj vtičnika povežemo vrednosti polj z njihovimi imeni, izbrišemo izvirno polje message (in tudi polja tenant-id и schema-version), ki je vseboval vnos iz dnevnika, dodamo pa lahko polje po meri, ki bo na primer vsebovalo ime aplikacije, iz katere zbiramo dnevnike.

Na izhodu iz stopnje filtriranja bomo prejeli dokumente v prvem približku, pripravljene za vizualizacijo v Kibani. Pogrešali bomo naslednje:

  • Številska polja bodo prepoznana kot besedilo, kar preprečuje operacije z njimi. Polja namreč time-taken Dnevnik IIS in polja recipient-count и total-bites Sledenje dnevniku.
  • Standardni časovni žig dokumenta bo vseboval čas obdelave dnevnika in ne časa, ko je bil zapisan na strani strežnika.
  • Polje recipient-address bo videti kot eno gradbišče, ki ne omogoča analize prešteti prejemnikov pisem.

Čas je, da procesu obdelave dnevnikov dodamo malo čarovnije.

Pretvarjanje številskih polj

Vtičnik dissect ima možnost convert_datatype, ki se lahko uporablja za pretvorbo besedilnega polja v digitalno obliko. Na primer takole:

dissect {
  …
  convert_datatype => { "time-taken" => "int" }
  …
}

Ne smemo pozabiti, da je ta metoda primerna le, če bo polje zagotovo vsebovalo niz. Možnost ne obdeluje ničelnih vrednosti iz polj in vrže izjemo.

Za sledenje dnevnikom je bolje, da ne uporabljate podobne metode pretvorbe, saj polja recipient-count и total-bites lahko prazen. Za pretvorbo teh polj je bolje uporabiti vtičnik mutirati:

mutate {
  convert => [ "total-bytes", "integer" ]
  convert => [ "recipient-count", "integer" ]
}

Razdelitev naslova prejemnika na posamezne prejemnike

To težavo je mogoče rešiti tudi z uporabo vtičnika mutate:

mutate {
  split => ["recipient_address", ";"]
}

Spreminjanje časovnega žiga

V primeru sledenja dnevnikov je težavo zelo enostavno rešiti z dodatkom Datum, ki vam bo v pomoč pri pisanju na terenu timestamp datum in čas v zahtevani obliki iz polja date-time:

date {
  match => [ "date-time", "ISO8601" ]
  timezone => "Europe/Moscow"
  remove_field => [ "date-time" ]
}

V primeru dnevnikov IIS bomo morali združiti podatke polja date и time z uporabo vtičnika mutate registrirajte časovni pas, ki ga potrebujemo, in vanj vstavite ta časovni žig timestamp z uporabo datumskega dodatka:

mutate { 
  add_field => { "data-time" => "%{date} %{time}" }
  remove_field => [ "date", "time" ]
}
date { 
  match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
  timezone => "UTC"
  remove_field => [ "data-time" ]
}

izhod

Izhodni del se uporablja za pošiljanje obdelanih dnevnikov sprejemniku dnevnika. V primeru pošiljanja neposredno na Elastic se uporabi vtičnik elastično iskanje, ki določa naslov strežnika in predlogo imena indeksa za pošiljanje ustvarjenega dokumenta:

output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Končna konfiguracija

Končna konfiguracija bo videti takole:

input {
  beats {
    port => 5044
  }
}
 
filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
      convert_datatype => { "time-taken" => "int" }
    }
    mutate { 
      add_field => { "data-time" => "%{date} %{time}" }
      remove_field => [ "date", "time" ]
    }
    date { 
      match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
      timezone => "UTC"
      remove_field => [ "data-time" ]
    }
  }
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
    mutate {
      convert => [ "total-bytes", "integer" ]
      convert => [ "recipient-count", "integer" ]
      split => ["recipient_address", ";"]
    }
    date {
      match => [ "date-time", "ISO8601" ]
      timezone => "Europe/Moscow"
      remove_field => [ "date-time" ]
    }
  }
}
 
output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Koristne povezave:

Vir: www.habr.com

Dodaj komentar