Mir si Frënn mat ELK an Exchange. Deel 2

Mir si Frënn mat ELK an Exchange. Deel 2

Ech weider meng Geschicht iwwer wéi Frënn Exchange an ELK ze maachen (Ufank hei). Loosst mech Iech drun erënneren datt dës Kombinatioun fäeg ass eng ganz grouss Zuel vu Logbicher ouni ze zécken. Dës Kéier schwätze mir iwwer wéi een Exchange mat Logstash a Kibana Komponenten funktionnéiert.

Logstash am ELK Stack gëtt benotzt fir d'Protokoller intelligent ze veraarbechten an se fir d'Placement an Elastic a Form vun Dokumenter virzebereeden, op Basis vun deenen et bequem ass verschidde Visualiséierungen zu Kibana ze bauen.

Kader

Et besteet aus zwou Etappen:

  • Installatioun a Konfiguratioun vum OpenJDK Package.
  • De Logstash Package installéieren an konfiguréieren.

Installatioun a Konfiguratioun vum OpenJDK Package

Den OpenJDK Package muss erofgeluede ginn an an e spezifesche Verzeichnis ausgepackt ginn. Da muss de Wee zu dësem Verzeichnis an den $env:Path an $env:JAVA_HOME Variabelen vum Windows Betriebssystem aginn ginn:

Mir si Frënn mat ELK an Exchange. Deel 2

Mir si Frënn mat ELK an Exchange. Deel 2

Loosst eis d'Java Versioun kontrolléieren:

PS C:> java -version
openjdk version "13.0.1" 2019-10-15
OpenJDK Runtime Environment (build 13.0.1+9)
OpenJDK 64-Bit Server VM (build 13.0.1+9, mixed mode, sharing)

De Logstash Package installéieren an konfiguréieren

Luet d'Archivdatei mat der Logstash Verdeelung erof vun hei. D'Archiv muss op d'Root vun der Disk ausgepackt ginn. Auspacken an den Dossier C:Program Files Et ass net wäert et, Logstash wäert refuséieren normalerweis unzefänken. Da musst Dir an d'Datei aginn jvm.options fixéiert verantwortlech fir RAM fir den Java Prozess ze verdeelen. Ech recommandéieren d'Halschent vum RAM vum Server ze spezifizéieren. Wann et 16 GB RAM u Bord huet, da sinn d'Standardschlësselen:

-Xms1g
-Xmx1g

muss ersat ginn duerch:

-Xms8g
-Xmx8g

Zousätzlech ass et unzeroden d'Linn ze kommentéieren -XX:+UseConcMarkSweepGC. Méi iwwer dëst hei. De nächste Schrëtt ass eng Standardkonfiguratioun an der logstash.conf Datei ze kreéieren:

input {
 stdin{}
}
 
filter {
}
 
output {
 stdout {
 codec => "rubydebug"
 }
}

Mat dëser Konfiguratioun liest Logstash Daten aus der Konsole, passt se duerch en eidele Filter, a gitt se zréck an d'Konsole. Mat dëser Konfiguratioun wäert d'Funktionalitéit vu Logstash testen. Fir dëst ze maachen, loosst eis et am interaktiven Modus lafen:

PS C:...bin> .logstash.bat -f .logstash.conf
...
[2019-12-19T11:15:27,769][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2019-12-19T11:15:27,847][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-12-19T11:15:28,113][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

Logstash erfollegräich op Port 9600 gestart.

Déi lescht Installatiounsschrëtt: Start Logstash als Windows Service. Dëst kann zum Beispill mam Package gemaach ginn NSSM:

PS C:...bin> .nssm.exe install logstash
Service "logstash" installed successfully!

Feeler Toleranz

D'Sécherheet vu Logbicher wann se vum Quellserver transferéiert ginn ass vum Persistent Queues Mechanismus gesuergt.

Wéi et schafft

De Layout vun de Schlaangen während der Logveraarbechtung ass: Input → Schlaang → Filter + Ausgang.

Den Input Plugin kritt Daten aus enger Logquell, schreift se an eng Schlaang a schéckt d'Bestätegung datt d'Donnéeën un d'Quell kritt goufen.

Messagen aus der Schlaang gi vu Logstash veraarbecht, duerch de Filter an den Output Plugin passéiert. Wann Dir d'Bestätegung vum Output kritt datt de Log geschéckt gouf, läscht Logstash de veraarbechte Log aus der Schlaang. Wann Logstash stoppt, bleiwen all onveraarbechtte Messagen a Messagen, fir déi keng Bestätegung kritt gouf, an der Schlaang, a Logstash wäert se weider veraarbecht déi nächst Kéier wann et ufänkt.

Upassung

Upassbar duerch Schlësselen an der Datei C:Logstashconfiglogstash.yml:

  • queue.type: (méiglech Wäerter - persisted и memory (default)).
  • path.queue: (Wee an den Dossier mat Schlaangdateien, déi als Standard an C:Logstashqueue gespäichert sinn).
  • queue.page_capacity: (maximal Schlaang Säit Gréisst, Standardwäert ass 64mb).
  • queue.drain: (true/false - aktivéiert/deaktivéiert d'Stopp vun der Schlaangveraarbechtung ier de Logstash auszeschalten. Ech recommandéieren et net z'aktivéieren, well dëst direkt d'Geschwindegkeet vum Serverausschaltung beaflosst).
  • queue.max_events: (maximal Unzuel vun Eventer an der Schlaang, Standard ass 0 (onlimitéiert)).
  • queue.max_bytes: (maximal Schlaanggréisst an Bytes, Standard - 1024mb (1gb)).

Wann konfiguréiert queue.max_events и queue.max_bytes, da stoppen d'Messagen an d'Schlaang akzeptéiert ze ginn wann de Wäert vun enger vun dësen Astellungen erreecht gëtt. Léiert méi iwwer Persistent Schlaangen hei.

E Beispill vum Deel vun logstash.yml verantwortlech fir d'Schlaang opzestellen:

queue.type: persisted
queue.max_bytes: 10gb

Upassung

D'Logstash Konfiguratioun besteet normalerweis aus dräi Deeler, verantwortlech fir verschidde Phasen vun der Veraarbechtung vun erakommen Logbicher: Empfang (Input Sektioun), Parsing (Filter Sektioun) a Schécken op Elastic (Output Sektioun). Drënner wäerte mir jiddereng vun hinnen méi genau kucken.

Input

Mir kréien den erakommende Stream mat roude Logbicher vu Filebeat Agenten. Et ass dëse Plugin dee mir an der Input Sektioun uginn:

input {
  beats {
    port => 5044
  }
}

No dëser Konfiguratioun fänkt Logstash un den Hafen 5044 ze lauschteren, a wann Dir Logbicher kritt, veraarbecht se no den Astellunge vun der Filtersektioun. Wann néideg, kënnt Dir de Kanal wéckelen fir Logbicher vum Filebit an SSL ze kréien. Liest méi iwwer Beats Plugin Astellungen hei.

Filter

All Textprotokoller déi interessant sinn fir d'Veraarbechtung déi Exchange generéiert sinn am Csv-Format mat de Felder, déi an der Logdatei selwer beschriwwe sinn. Fir Csv Records ze analyséieren, bitt Logstash eis dräi Plugins: dissekéieren, csv und grok. Déi éischt ass am meeschten schnell, mee copes mat Parsing nëmmen déi einfachst Logbicher.
Zum Beispill gëtt de folgende Rekord an zwee opgedeelt (wéinst der Präsenz vun engem Komma am Feld), dofir gëtt de Logbuch falsch parséiert:

…,"MDB:GUID1, Mailbox:GUID2, Event:526545791, MessageClass:IPM.Note, CreationTime:2020-05-15T12:01:56.457Z, ClientType:MOMT, SubmissionAssistant:MailboxTransportSubmissionEmailAssistant",…

Et kann benotzt ginn wann Dir Logbicher parséiert, zum Beispill IIS. An dësem Fall kann d'Filter Sektioun esou ausgesinn:

filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
    }
  }
} 

Logstash Konfiguratioun erlaabt Iech ze benotzen bedingt Aussoen, also kënne mir nëmmen Logbicher schécken, déi mam Filebeat-Tag markéiert goufen, an den dissect Plugin IIS. Am Plugin passen mir d'Feldwäerter mat hiren Nimm, läscht d'Originalfeld message, déi en Entrée aus dem Logbuch enthält, a mir kënnen e personaliséierte Feld derbäisetzen, dat zum Beispill den Numm vun der Applikatioun enthält, aus där mir Logbicher sammelen.

Am Fall vun Tracking Logbicher ass et besser den csv Plugin ze benotzen; et kann komplex Felder korrekt veraarbecht:

filter {
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
}

Am Plugin passen mir d'Feldwäerter mat hiren Nimm, läscht d'Originalfeld message (an och Felder tenant-id и schema-version), déi eng Entrée aus dem Logbuch enthält, a mir kënnen e personaliséierte Feld derbäisetzen, dat zum Beispill den Numm vun der Applikatioun enthält, aus där mir Logbicher sammelen.

Beim Sortie vun der Filterstadium kréie mir Dokumenter an enger éischter Approximatioun, prett fir Visualiséierung zu Kibana. Mir wäerten déi folgend feelen:

  • Numeresch Felder ginn als Text unerkannt, wat Operatiounen op hinnen verhënnert. Nämlech d'Felder time-taken IIS Log, souwéi Felder recipient-count и total-bites Log Tracking.
  • De Standard Dokument Zäitstempel enthält d'Zäit wou de Log veraarbecht gouf, net d'Zäit wou et op der Serversäit geschriwwe gouf.
  • Beräich recipient-address wäert ausgesinn wéi ee Chantier, deen d'Analyse net erlaabt fir d'Empfänger vu Bréiwer ze zielen.

Et ass Zäit e bësse Magie un de Logveraarbechtungsprozess ze addéieren.

Konvertéieren numeresch Felder

D'Dissect Plugin huet eng Optioun convert_datatype, déi benotzt ka ginn fir en Textfeld an en digitale Format ze konvertéieren. Zum Beispill, wéi dëst:

dissect {
  …
  convert_datatype => { "time-taken" => "int" }
  …
}

Et ass derwäert ze erënneren datt dës Method nëmme gëeegent ass wann d'Feld definitiv e String enthält. D'Optioun veraarbecht net Null Wäerter vu Felder a werft eng Ausnam.

Fir Tracking Logbicher ass et besser net eng ähnlech Konvertéierungsmethod ze benotzen, well d'Felder recipient-count и total-bites kann eidel sinn. Fir dës Felder ze konvertéieren ass et besser e Plugin ze benotzen mutéieren:

mutate {
  convert => [ "total-bytes", "integer" ]
  convert => [ "recipient-count", "integer" ]
}

Recipient_address an eenzel Empfänger opzedeelen

Dëse Problem kann och mat dem mutate Plugin geléist ginn:

mutate {
  split => ["recipient_address", ";"]
}

Änneren vum Zäitstempel

Am Fall vun Tracking Logbicher ass de Problem ganz einfach vum Plugin geléist Datum, déi Iech hëllefen am Feld ze schreiwen timestamp Datum an Zäit am néideg Format vum Terrain date-time:

date {
  match => [ "date-time", "ISO8601" ]
  timezone => "Europe/Moscow"
  remove_field => [ "date-time" ]
}

Am Fall vun IIS Logbicher musse mir Felddaten kombinéieren date и time benotzt de mutate Plugin, registréiert d'Zäitzone déi mir brauchen a setzt dësen Zäitstempel an timestamp benotzt den Datum Plugin:

mutate { 
  add_field => { "data-time" => "%{date} %{time}" }
  remove_field => [ "date", "time" ]
}
date { 
  match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
  timezone => "UTC"
  remove_field => [ "data-time" ]
}

Däischterheet

D'Output Sektioun gëtt benotzt fir veraarbechte Logbicher un de Log Receiver ze schécken. Am Fall wou Dir direkt op Elastic schéckt, gëtt e Plugin benotzt elastesch Sich, déi d'Serveradress an den Indexnumm Schabloun spezifizéiert fir dat generéiert Dokument ze schécken:

output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Finale Konfiguratioun

Déi lescht Konfiguratioun wäert esou ausgesinn:

input {
  beats {
    port => 5044
  }
}
 
filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
      convert_datatype => { "time-taken" => "int" }
    }
    mutate { 
      add_field => { "data-time" => "%{date} %{time}" }
      remove_field => [ "date", "time" ]
    }
    date { 
      match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
      timezone => "UTC"
      remove_field => [ "data-time" ]
    }
  }
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
    mutate {
      convert => [ "total-bytes", "integer" ]
      convert => [ "recipient-count", "integer" ]
      split => ["recipient_address", ";"]
    }
    date {
      match => [ "date-time", "ISO8601" ]
      timezone => "Europe/Moscow"
      remove_field => [ "date-time" ]
    }
  }
}
 
output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Nëtzlech Adressen:

Source: will.com

Setzt e Commentaire