Draugaujame su ELK ir Exchange. 2 dalis

Draugaujame su ELK ir Exchange. 2 dalis

Tęsiu savo pasakojimą apie tai, kaip susirasti draugų Exchange ir ELK (pradžia čia). Leiskite jums priminti, kad šis derinys gali nedvejodamas apdoroti labai daug rąstų. Šį kartą kalbėsime apie tai, kaip priversti Exchange dirbti su Logstash ir Kibana komponentais.

Logstash ELK rietuvėje naudojamas išmaniai apdoroti rąstus ir paruošti juos patalpinimui į Elastic dokumentų pavidalu, kurių pagrindu patogu kurti įvairias vizualizacijas Kibanoje.

Montavimas

Susideda iš dviejų etapų:

  • OpenJDK paketo diegimas ir konfigūravimas.
  • „Logstash“ paketo diegimas ir konfigūravimas.

OpenJDK paketo diegimas ir konfigūravimas

OpenJDK paketą reikia atsisiųsti ir išpakuoti į konkretų katalogą. Tada kelias į šį katalogą turi būti įvestas į „Windows“ operacinės sistemos kintamuosius $env:Path ir $env:JAVA_HOME:

Draugaujame su ELK ir Exchange. 2 dalis

Draugaujame su ELK ir Exchange. 2 dalis

Patikrinkime Java versiją:

PS C:> java -version
openjdk version "13.0.1" 2019-10-15
OpenJDK Runtime Environment (build 13.0.1+9)
OpenJDK 64-Bit Server VM (build 13.0.1+9, mixed mode, sharing)

„Logstash“ paketo diegimas ir konfigūravimas

Atsisiųskite archyvo failą su „Logstash“ paskirstymu taigi. Archyvas turi būti išpakuotas iki disko šaknies. Išpakuokite į aplanką C:Program Files Neverta, „Logstash“ atsisakys normaliai paleisti. Tada turite įvesti failą jvm.options pataisymai, atsakingi už RAM paskirstymą Java procesui. Rekomenduoju nurodyti pusę serverio RAM. Jei jame yra 16 GB RAM, numatytieji klavišai yra šie:

-Xms1g
-Xmx1g

turi būti pakeistas:

-Xms8g
-Xmx8g

Be to, patartina pakomentuoti eilutę -XX:+UseConcMarkSweepGC. Daugiau apie tai čia. Kitas veiksmas yra sukurti numatytąją konfigūraciją logstash.conf faile:

input {
 stdin{}
}
 
filter {
}
 
output {
 stdout {
 codec => "rubydebug"
 }
}

Naudodama šią konfigūraciją, „Logstash“ nuskaito duomenis iš konsolės, perduoda juos per tuščią filtrą ir išveda atgal į konsolę. Naudojant šią konfigūraciją bus patikrintas „Logstash“ funkcionalumas. Norėdami tai padaryti, paleiskite jį interaktyviu režimu:

PS C:...bin> .logstash.bat -f .logstash.conf
...
[2019-12-19T11:15:27,769][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2019-12-19T11:15:27,847][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-12-19T11:15:28,113][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

Logstash sėkmingai paleistas 9600 prievade.

Paskutinis diegimo veiksmas: paleiskite „Logstash“ kaip „Windows“ paslaugą. Tai galima padaryti, pavyzdžiui, naudojant paketą NSSM:

PS C:...bin> .nssm.exe install logstash
Service "logstash" installed successfully!

atsparumas gedimams

Žurnalų saugumas perduodant iš šaltinio serverio užtikrinamas nuolatinių eilių mechanizmu.

Kaip tai veikia

Eilių išdėstymas žurnalo apdorojimo metu yra toks: įvestis → eilė → filtras + išvestis.

Įvesties papildinys gauna duomenis iš žurnalo šaltinio, įrašo juos į eilę ir siunčia patvirtinimą, kad duomenys buvo gauti į šaltinį.

Pranešimus iš eilės apdoroja „Logstash“, jie perduodami per filtrą ir išvesties papildinį. Gavęs išvesties patvirtinimą, kad žurnalas buvo išsiųstas, „Logstash“ pašalina apdorotą žurnalą iš eilės. Jei „Logstash“ sustoja, visi neapdoroti pranešimai ir pranešimai, kurių patvirtinimas nebuvo gautas, lieka eilėje, o „Logstash“ toliau juos apdoros kitą kartą paleidus.

reguliavimas

Reguliuojamas faile esančiais raktais C:Logstashconfiglogstash.yml:

  • queue.type: (galimos reikšmės - persisted и memory (default)).
  • path.queue: (kelias į aplanką su eilės failais, kurie pagal numatytuosius nustatymus saugomi C:Logstashqueue).
  • queue.page_capacity: (maksimalus eilės puslapio dydis, numatytoji reikšmė yra 64 mb).
  • queue.drain: (true/false – įjungia/išjungia eilės apdorojimo sustabdymą prieš išjungiant Logstash. Nerekomenduoju jo įjungti, nes tai tiesiogiai paveiks serverio išjungimo greitį).
  • queue.max_events: (maksimalus įvykių skaičius eilėje, numatytoji vertė yra 0 (neribota)).
  • queue.max_bytes: (maksimalus eilės dydis baitais, numatytasis - 1024mb (1gb)).

Jei sukonfigūruotas queue.max_events и queue.max_bytes, tada pranešimai nustoja būti priimami į eilę, kai pasiekiama bet kurio iš šių nustatymų reikšmė. Sužinokite daugiau apie nuolatines eiles čia.

Logstash.yml dalies, atsakingos už eilės nustatymą, pavyzdys:

queue.type: persisted
queue.max_bytes: 10gb

reguliavimas

„Logstash“ konfigūraciją paprastai sudaro trys dalys, atsakingos už skirtingus gaunamų žurnalų apdorojimo etapus: priėmimas (įvesties sekcija), analizavimas (filtro sekcija) ir siuntimas į Elastic (išvesties sekcija). Žemiau mes atidžiau pažvelgsime į kiekvieną iš jų.

Indėlis

Gaunamą srautą su neapdorotais žurnalais gauname iš „Filebeat“ agentų. Būtent šį papildinį nurodome įvesties skiltyje:

input {
  beats {
    port => 5044
  }
}

Po šios konfigūracijos „Logstash“ pradeda klausytis prievado 5044, o gavęs žurnalus apdoroja juos pagal filtrų skyriaus nustatymus. Jei reikia, žurnalų gavimo kanalą iš failo bito galite apvynioti SSL. Skaitykite daugiau apie „beats“ papildinio nustatymus čia.

Filtruoti

Visi įdomūs apdoroti tekstiniai žurnalai, kuriuos sugeneruoja „Exchange“, yra csv formatu su laukais, aprašytais pačiame žurnalo faile. CSV įrašams analizuoti Logstash siūlo tris papildinius: išskaidyti, csv ir grok. Pirmasis yra labiausiai greitas, bet susidoroja su tik paprasčiausių rąstų analize.
Pavyzdžiui, šis įrašas bus padalintas į du (dėl kablelio lauke), todėl žurnalas bus neteisingai išanalizuotas:

…,"MDB:GUID1, Mailbox:GUID2, Event:526545791, MessageClass:IPM.Note, CreationTime:2020-05-15T12:01:56.457Z, ClientType:MOMT, SubmissionAssistant:MailboxTransportSubmissionEmailAssistant",…

Jis gali būti naudojamas analizuojant žurnalus, pavyzdžiui, IIS. Šiuo atveju filtro skyrius gali atrodyti taip:

filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
    }
  }
} 

Logstash konfigūracija leidžia naudoti sąlyginiai teiginiai, todėl į dissect papildinį galime siųsti tik tuos žurnalus, kurie buvo pažymėti filebeat žyma IIS. Papildinio viduje suderiname laukų reikšmes su jų pavadinimais, ištrinkite pradinį lauką message, kuriame buvo įrašas iš žurnalo, ir galime pridėti pasirinktinį lauką, kuriame, pavyzdžiui, bus programos, iš kurios renkame žurnalus, pavadinimas.

Stebėjimo žurnalų atveju geriau naudoti csv papildinį, kuris gali teisingai apdoroti sudėtingus laukus:

filter {
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
}

Papildinio viduje suderiname laukų reikšmes su jų pavadinimais, ištrinkite pradinį lauką message (ir laukai tenant-id и schema-version), kuriame buvo įrašas iš žurnalo, ir galime pridėti pasirinktinį lauką, kuriame, pavyzdžiui, bus programos, iš kurios renkame žurnalus, pavadinimas.

Pasibaigus filtravimo etapui, mes gausime dokumentus, paruoštus vizualizuoti Kibanoje. Mums trūks šių dalykų:

  • Skaitiniai laukai bus atpažinti kaip tekstas, o tai neleidžia su jais atlikti operacijų. Būtent, laukai time-taken IIS žurnalas, taip pat laukai recipient-count и total-bites Žurnalų sekimas.
  • Standartinėje dokumento laiko žymoje bus nurodytas laikas, kai žurnalas buvo apdorotas, o ne laikas, kai jis buvo parašytas serverio pusėje.
  • Laukas recipient-address atrodys kaip viena statybvietė, kuri neleidžia analizuoti laiškų gavėjų.

Atėjo laikas pridėti šiek tiek magijos prie žurnalo apdorojimo proceso.

Skaitinių laukų konvertavimas

Dissect papildinys turi parinktį convert_datatype, kuris gali būti naudojamas konvertuoti teksto lauką į skaitmeninį formatą. Pavyzdžiui, taip:

dissect {
  …
  convert_datatype => { "time-taken" => "int" }
  …
}

Verta prisiminti, kad šis metodas tinka tik tuo atveju, jei lauke tikrai bus eilutė. Ši parinktis neapdoroja nulinių reikšmių iš laukų ir pateikia išimtį.

Stebėdami žurnalus, geriau nenaudoti panašaus konvertavimo metodo, nes laukai recipient-count и total-bites gali būti tuščias. Norėdami konvertuoti šiuos laukus, geriau naudoti papildinį mutuoti:

mutate {
  convert => [ "total-bytes", "integer" ]
  convert => [ "recipient-count", "integer" ]
}

Gavėjo_adreso padalijimas į atskirus gavėjus

Šią problemą taip pat galima išspręsti naudojant mutacijos įskiepį:

mutate {
  split => ["recipient_address", ";"]
}

Laiko žymos keitimas

Stebėjimo žurnalų atveju problemą labai lengvai išsprendžia įskiepis duomenys, kuris padės rašyti lauke timestamp datą ir laiką reikiamu formatu iš lauko date-time:

date {
  match => [ "date-time", "ISO8601" ]
  timezone => "Europe/Moscow"
  remove_field => [ "date-time" ]
}

IIS žurnalų atveju turėsime sujungti lauko duomenis date и time naudodamiesi mutate papildiniu, užregistruokite mums reikalingą laiko juostą ir įdėkite šį laiko žymą timestamp naudojant datos papildinį:

mutate { 
  add_field => { "data-time" => "%{date} %{time}" }
  remove_field => [ "date", "time" ]
}
date { 
  match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
  timezone => "UTC"
  remove_field => [ "data-time" ]
}

produkcija

Išvesties sekcija naudojama apdorotiems žurnalams siųsti į žurnalo imtuvą. Siunčiant tiesiai į Elastic, naudojamas įskiepis elastinga paieška, kuris nurodo serverio adresą ir indekso pavadinimo šabloną sugeneruotam dokumentui siųsti:

output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Galutinė konfigūracija

Galutinė konfigūracija atrodys taip:

input {
  beats {
    port => 5044
  }
}
 
filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
      convert_datatype => { "time-taken" => "int" }
    }
    mutate { 
      add_field => { "data-time" => "%{date} %{time}" }
      remove_field => [ "date", "time" ]
    }
    date { 
      match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
      timezone => "UTC"
      remove_field => [ "data-time" ]
    }
  }
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
    mutate {
      convert => [ "total-bytes", "integer" ]
      convert => [ "recipient-count", "integer" ]
      split => ["recipient_address", ";"]
    }
    date {
      match => [ "date-time", "ISO8601" ]
      timezone => "Europe/Moscow"
      remove_field => [ "date-time" ]
    }
  }
}
 
output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Naudingos nuorodos:

Šaltinis: www.habr.com

Добавить комментарий