Vi er venner med ELK og Exchange. Del 2

Vi er venner med ELK og Exchange. Del 2

Jeg fortsætter min historie om, hvordan man får venner Exchange og ELK (begyndende her). Lad mig minde dig om, at denne kombination er i stand til at behandle et meget stort antal logfiler uden tøven. Denne gang vil vi tale om, hvordan du får Exchange til at arbejde med Logstash- og Kibana-komponenter.

Logstash i ELK-stakken bruges til intelligent at behandle logs og forberede dem til placering i Elastic i form af dokumenter, på grundlag af hvilke det er praktisk at bygge forskellige visualiseringer i Kibana.

Installation

Består af to faser:

  • Installation og konfiguration af OpenJDK-pakken.
  • Installation og konfiguration af Logstash-pakken.

Installation og konfiguration af OpenJDK-pakken

OpenJDK-pakken skal downloades og pakkes ud i en bestemt mappe. Derefter skal stien til denne mappe indtastes i variablerne $env:Path og $env:JAVA_HOME i Windows-operativsystemet:

Vi er venner med ELK og Exchange. Del 2

Vi er venner med ELK og Exchange. Del 2

Lad os tjekke Java-versionen:

PS C:> java -version
openjdk version "13.0.1" 2019-10-15
OpenJDK Runtime Environment (build 13.0.1+9)
OpenJDK 64-Bit Server VM (build 13.0.1+9, mixed mode, sharing)

Installation og konfiguration af Logstash-pakken

Download arkivfilen med Logstash-distributionen dermed. Arkivet skal pakkes ud til roden af ​​disken. Pak ud til mappe C:Program Files Det er ikke det værd, Logstash vil nægte at starte normalt. Så skal du indtaste i filen jvm.options rettelser ansvarlig for allokering af RAM til Java-processen. Jeg anbefaler at angive halvdelen af ​​serverens RAM. Hvis den har 16 GB RAM ombord, er standardtasterne:

-Xms1g
-Xmx1g

skal erstattes med:

-Xms8g
-Xmx8g

Derudover er det tilrådeligt at kommentere linjen -XX:+UseConcMarkSweepGC. Mere om dette her. Det næste trin er at oprette en standardkonfiguration i filen logstash.conf:

input {
 stdin{}
}
 
filter {
}
 
output {
 stdout {
 codec => "rubydebug"
 }
}

Med denne konfiguration læser Logstash data fra konsollen, sender dem gennem et tomt filter og sender dem tilbage til konsollen. Brug af denne konfiguration vil teste funktionaliteten af ​​Logstash. For at gøre dette, lad os køre det i interaktiv tilstand:

PS C:...bin> .logstash.bat -f .logstash.conf
...
[2019-12-19T11:15:27,769][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2019-12-19T11:15:27,847][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-12-19T11:15:28,113][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

Logstash blev lanceret med succes på port 9600.

Det sidste installationstrin: start Logstash som en Windows-tjeneste. Dette kan for eksempel gøres ved hjælp af pakken NSSM:

PS C:...bin> .nssm.exe install logstash
Service "logstash" installed successfully!

fejltolerance

Sikkerheden af ​​logfiler, når de overføres fra kildeserveren, er sikret af mekanismen Persistent Queuees.

Sådan virker det

Layoutet af køer under logbehandling er: input → kø → filter + output.

Input-plugin'et modtager data fra en logkilde, skriver dem til en kø og sender en bekræftelse på, at dataene er modtaget til kilden.

Beskeder fra køen behandles af Logstash, sendes gennem filteret og output-plugin'et. Når der modtages bekræftelse fra output om, at loggen er blevet sendt, fjerner Logstash den behandlede log fra køen. Hvis Logstash stopper, forbliver alle ubehandlede beskeder og beskeder, for hvilke der ikke er modtaget en bekræftelse, i køen, og Logstash vil fortsætte med at behandle dem næste gang den starter.

justering

Justerbar med tasterne i filen C:Logstashconfiglogstash.yml:

  • queue.type: (mulige værdier - persisted и memory (default)).
  • path.queue: (sti til mappen med køfiler, som som standard er gemt i C:Logstashqueue).
  • queue.page_capacity: (maksimal køsidestørrelse, standardværdi er 64mb).
  • queue.drain: (sandt/falsk - aktiverer/deaktiverer stop af købehandling før nedlukning af Logstash. Jeg anbefaler ikke at aktivere det, fordi dette vil direkte påvirke hastigheden af ​​servernedlukning).
  • queue.max_events: (maksimalt antal hændelser i køen, standard er 0 (ubegrænset)).
  • queue.max_bytes: (maksimal køstørrelse i bytes, standard - 1024mb (1gb)).

Hvis konfigureret queue.max_events и queue.max_bytes, så stopper meddelelser med at blive accepteret i køen, når værdien af ​​en af ​​disse indstillinger er nået. Få mere at vide om vedvarende køer her.

Et eksempel på den del af logstash.yml, der er ansvarlig for opsætning af køen:

queue.type: persisted
queue.max_bytes: 10gb

justering

Logstash-konfigurationen består normalt af tre dele, der er ansvarlige for forskellige faser af behandlingen af ​​indgående logfiler: modtagelse (inputsektion), parsing (filtersektion) og afsendelse til Elastic (outputsektion). Nedenfor vil vi se nærmere på hver af dem.

Input

Vi modtager den indkommende stream med rå logfiler fra filebeat-agenter. Det er dette plugin, som vi angiver i inputsektionen:

input {
  beats {
    port => 5044
  }
}

Efter denne konfiguration begynder Logstash at lytte til port 5044, og når den modtager logs, behandler den dem i henhold til indstillingerne i filtersektionen. Hvis det er nødvendigt, kan du indpakke kanalen til modtagelse af logfiler fra filebit i SSL. Læs mere om beats plugin-indstillinger her.

filtre

Alle tekstlogfiler, der er interessante for behandling, som Exchange genererer, er i csv-format med felterne beskrevet i selve logfilen. Til at analysere csv-poster tilbyder Logstash os tre plugins: dissekere, csv og grok. Den første er den mest hurtigt, men klarer kun at parse de enkleste logfiler.
For eksempel vil den opdele følgende post i to (på grund af tilstedeværelsen af ​​et komma inde i feltet), hvilket er grunden til, at loggen vil blive parset forkert:

…,"MDB:GUID1, Mailbox:GUID2, Event:526545791, MessageClass:IPM.Note, CreationTime:2020-05-15T12:01:56.457Z, ClientType:MOMT, SubmissionAssistant:MailboxTransportSubmissionEmailAssistant",…

Det kan bruges ved parsing af logfiler, for eksempel IIS. I dette tilfælde kan filterafsnittet se sådan ud:

filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
    }
  }
} 

Logstash-konfiguration giver dig mulighed for at bruge betingede erklæringer, så vi kan kun sende logfiler, der var tagget med filebeat-tagget, til dissect-plugin'et IIS. Inde i plugin'et matcher vi feltværdierne med deres navne, slet det originale felt message, som indeholdt en post fra loggen, og vi kan tilføje et brugerdefineret felt, der for eksempel vil indeholde navnet på den applikation, som vi indsamler logs fra.

I tilfælde af sporingslogfiler er det bedre at bruge csv-pluginet; det kan behandle komplekse felter korrekt:

filter {
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
}

Inde i plugin'et matcher vi feltværdierne med deres navne, slet det originale felt message (og også felter tenant-id и schema-version), som indeholdt en post fra loggen, og vi kan tilføje et brugerdefineret felt, som for eksempel vil indeholde navnet på den applikation, som vi indsamler logs fra.

Ved udgangen fra filtreringsstadiet vil vi modtage dokumenter i en første tilnærmelse, klar til visualisering i Kibana. Vi kommer til at mangle følgende:

  • Numeriske felter genkendes som tekst, hvilket forhindrer handlinger på dem. Nemlig markerne time-taken IIS log, samt felter recipient-count и total-bites Log sporing.
  • Standard dokumenttidsstemplet vil indeholde det tidspunkt, hvor loggen blev behandlet, ikke det tidspunkt, den blev skrevet på serversiden.
  • Field recipient-address vil ligne én byggeplads, som ikke giver mulighed for analyse for at tælle modtagere af breve.

Det er tid til at tilføje lidt magi til logbehandlingsprocessen.

Konvertering af numeriske felter

Dissekeringsplugin'et har en mulighed convert_datatype, som kan bruges til at konvertere et tekstfelt til et digitalt format. For eksempel sådan her:

dissect {
  …
  convert_datatype => { "time-taken" => "int" }
  …
}

Det er værd at huske, at denne metode kun er egnet, hvis feltet helt sikkert vil indeholde en streng. Indstillingen behandler ikke Null-værdier fra felter og kaster en undtagelse.

Til sporing af logfiler er det bedre ikke at bruge en lignende konverteringsmetode, da felterne recipient-count и total-bites kan være tom. For at konvertere disse felter er det bedre at bruge et plugin mutere:

mutate {
  convert => [ "total-bytes", "integer" ]
  convert => [ "recipient-count", "integer" ]
}

Opdeling af modtager_adresse i individuelle modtagere

Dette problem kan også løses ved hjælp af mutate plugin:

mutate {
  split => ["recipient_address", ";"]
}

Ændring af tidsstemplet

I tilfælde af sporingslogs løses problemet meget nemt af plugin'et dato, som vil hjælpe dig med at skrive i feltet timestamp dato og klokkeslæt i det påkrævede format fra feltet date-time:

date {
  match => [ "date-time", "ISO8601" ]
  timezone => "Europe/Moscow"
  remove_field => [ "date-time" ]
}

I tilfælde af IIS-logfiler skal vi kombinere feltdata date и time ved hjælp af mutate-plugin'et, registrer den tidszone, vi har brug for, og placer dette tidsstempel i timestamp ved hjælp af dato plugin:

mutate { 
  add_field => { "data-time" => "%{date} %{time}" }
  remove_field => [ "date", "time" ]
}
date { 
  match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
  timezone => "UTC"
  remove_field => [ "data-time" ]
}

Produktion

Udgangssektionen bruges til at sende behandlede logfiler til logmodtageren. I tilfælde af at sende direkte til Elastic, bruges et plugin elastiksøgning, som angiver serveradressen og indeksnavnskabelonen til afsendelse af det genererede dokument:

output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Endelig konfiguration

Den endelige konfiguration vil se sådan ud:

input {
  beats {
    port => 5044
  }
}
 
filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
      convert_datatype => { "time-taken" => "int" }
    }
    mutate { 
      add_field => { "data-time" => "%{date} %{time}" }
      remove_field => [ "date", "time" ]
    }
    date { 
      match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
      timezone => "UTC"
      remove_field => [ "data-time" ]
    }
  }
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
    mutate {
      convert => [ "total-bytes", "integer" ]
      convert => [ "recipient-count", "integer" ]
      split => ["recipient_address", ";"]
    }
    date {
      match => [ "date-time", "ISO8601" ]
      timezone => "Europe/Moscow"
      remove_field => [ "date-time" ]
    }
  }
}
 
output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Nyttige links:

Kilde: www.habr.com

Tilføj en kommentar