Barátok vagyunk az ELK-val és az Exchange-vel. 2. rész

Barátok vagyunk az ELK-val és az Exchange-vel. 2. rész

Folytatom a történetemet arról, hogyan lehet barátokat szerezni Exchange és ELK (eleje itt). Hadd emlékeztesselek arra, hogy ez a kombináció nagyon sok rönk habozás nélküli feldolgozására képes. Ezúttal arról fogunk beszélni, hogyan lehet az Exchange-et a Logstash és a Kibana összetevőivel együttműködni.

Az ELK veremben található Logstash a rönkök intelligens feldolgozására és az Elasticban való elhelyezésre való előkészítésére szolgál dokumentumok formájában, amelyek alapján kényelmes a Kibana különféle vizualizációinak elkészítése.

Telepítés

Két szakaszból áll:

  • Az OpenJDK csomag telepítése és konfigurálása.
  • A Logstash csomag telepítése és konfigurálása.

Az OpenJDK csomag telepítése és konfigurálása

Az OpenJDK csomagot le kell tölteni és ki kell csomagolni egy adott könyvtárba. Ezután a könyvtár elérési útját be kell írni a Windows operációs rendszer $env:Path és $env:JAVA_HOME változóiba:

Barátok vagyunk az ELK-val és az Exchange-vel. 2. rész

Barátok vagyunk az ELK-val és az Exchange-vel. 2. rész

Nézzük meg a Java verziót:

PS C:> java -version
openjdk version "13.0.1" 2019-10-15
OpenJDK Runtime Environment (build 13.0.1+9)
OpenJDK 64-Bit Server VM (build 13.0.1+9, mixed mode, sharing)

A Logstash csomag telepítése és konfigurálása

Töltse le az archív fájlt a Logstash disztribúcióval ezért. Az archívumot ki kell csomagolni a lemez gyökeréig. Csomagolja ki a mappába C:Program Files Nem éri meg, a Logstash nem hajlandó normálisan elindulni. Ezután be kell lépnie a fájlba jvm.options javítások, amelyek felelősek a RAM kiosztásáért a Java folyamat számára. Azt javaslom, hogy adja meg a szerver RAM felét. Ha 16 GB RAM van a fedélzeten, akkor az alapértelmezett kulcsok:

-Xms1g
-Xmx1g

a következőre kell cserélni:

-Xms8g
-Xmx8g

Ezen kívül célszerű kommentálni a sort -XX:+UseConcMarkSweepGC. Erről bővebben itt. A következő lépés az alapértelmezett konfiguráció létrehozása a logstash.conf fájlban:

input {
 stdin{}
}
 
filter {
}
 
output {
 stdout {
 codec => "rubydebug"
 }
}

Ezzel a konfigurációval a Logstash beolvassa az adatokat a konzolról, átengedi egy üres szűrőn, és visszaküldi a konzolra. Ezzel a konfigurációval teszteljük a Logstash működését. Ehhez futtassuk interaktív módban:

PS C:...bin> .logstash.bat -f .logstash.conf
...
[2019-12-19T11:15:27,769][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2019-12-19T11:15:27,847][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-12-19T11:15:28,113][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

A Logstash sikeresen elindult a 9600-as porton.

Az utolsó telepítési lépés: indítsa el a Logstash-t Windows-szolgáltatásként. Ez megtehető például a csomag használatával NSSM:

PS C:...bin> .nssm.exe install logstash
Service "logstash" installed successfully!

hibatűrés

A forráskiszolgálóról történő átvitel során a naplók biztonságát a Persistent Queues mechanizmus biztosítja.

Hogyan működik

A sorok elrendezése a naplófeldolgozás során: bemenet → sor → szűrő + kimenet.

A bemeneti beépülő modul adatokat fogad egy naplóforrásból, beírja egy sorba, és visszaigazolást küld arról, hogy az adatok megérkeztek a forráshoz.

A sorból érkező üzeneteket a Logstash dolgozza fel, és továbbítja a szűrőn és a kimeneti bővítményen. Amikor a kimenetről megerősítést kap a napló elküldéséről, a Logstash eltávolítja a feldolgozott naplót a sorból. Ha a Logstash leáll, minden feldolgozatlan üzenet és üzenet, amelyre nem érkezett visszaigazolás, a sorban marad, és a Logstash a következő indításkor folytatja ezek feldolgozását.

beállítás

A fájlban található gombokkal állítható C:Logstashconfiglogstash.yml:

  • queue.type: (lehetséges értékek - persisted и memory (default)).
  • path.queue: (az alapértelmezés szerint a C:Logstashqueue-ban tárolt sorfájlokat tartalmazó mappa elérési útja).
  • queue.page_capacity: (a sor oldalának maximális mérete, az alapértelmezett érték 64 mb).
  • queue.drain: (igaz/hamis - engedélyezi/letiltja a sorfeldolgozás leállítását a Logstash leállítása előtt. Nem javaslom az engedélyezését, mert ez közvetlenül befolyásolja a szerver leállási sebességét).
  • queue.max_events: (az események maximális száma a sorban, az alapértelmezett érték 0 (korlátlan)).
  • queue.max_bytes: (maximális sorméret bájtban, alapértelmezett - 1024 mb (1 GB)).

Ha be van állítva queue.max_events и queue.max_bytes, akkor az üzenetek fogadása leáll a várakozási sorba, ha ezek a beállítások elérik valamelyik értékét. További információ az állandó várólistákról itt.

Példa a logstash.yml várólista beállításáért felelős részére:

queue.type: persisted
queue.max_bytes: 10gb

beállítás

A Logstash konfiguráció általában három részből áll, amelyek a bejövő naplók feldolgozásának különböző fázisaiért felelősek: fogadás (bemeneti szakasz), elemzés (szűrő szakasz) és elküldés Elastic-nak (kimeneti szakasz). Az alábbiakban mindegyiket közelebbről megvizsgáljuk.

Bemenet

A bejövő adatfolyamot nyers naplókkal fogadjuk a filebeat ügynököktől. Ezt a beépülő modult jelezzük a beviteli részben:

input {
  beats {
    port => 5044
  }
}

A konfiguráció után a Logstash elkezdi figyelni az 5044-es portot, és a naplók fogadásakor a szűrő szakasz beállításai szerint feldolgozza azokat. Ha szükséges, csomagolhatja a csatornát a naplók fogadásához a filebitből SSL-be. További információ a beats beépülő modul beállításairól itt.

Szűrő

Az Exchange által generált összes feldolgozásra érdekes szöveges napló csv formátumú, magában a naplófájlban leírt mezőkkel. A csv rekordok elemzéséhez a Logstash három bővítményt kínál: elemez, csv és grok. Az első a legtöbb быстрый, de csak a legegyszerűbb naplók elemzésével birkózik meg.
Például a következő rekordot két részre osztja (a mezőben lévő vessző miatt), ezért a napló elemzése helytelenül történik:

…,"MDB:GUID1, Mailbox:GUID2, Event:526545791, MessageClass:IPM.Note, CreationTime:2020-05-15T12:01:56.457Z, ClientType:MOMT, SubmissionAssistant:MailboxTransportSubmissionEmailAssistant",…

Használható naplók, például IIS elemzéséhez. Ebben az esetben a szűrő rész így nézhet ki:

filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
    }
  }
} 

A Logstash konfiguráció lehetővé teszi a használatát feltételes állítások, így csak a filebeat címkével ellátott naplókat tudjuk elküldeni a dissect bővítménynek IIS. A beépülő modulon belül a mezőértékeket a nevükkel egyeztetjük, töröljük az eredeti mezőt message, amely egy bejegyzést tartalmazott a naplóból, és hozzáadhatunk egy egyéni mezőt, amely például tartalmazza annak az alkalmazásnak a nevét, amelyből naplókat gyűjtünk.

Nyomon követési naplók esetén jobb a csv beépülő modul használata, amely képes megfelelően feldolgozni az összetett mezőket:

filter {
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
}

A beépülő modulon belül a mezőértékeket a nevükkel egyeztetjük, töröljük az eredeti mezőt message (és mezőket is tenant-id и schema-version), amely egy bejegyzést tartalmazott a naplóból, és hozzáadhatunk egy egyéni mezőt, amely például tartalmazza annak az alkalmazásnak a nevét, amelyből naplókat gyűjtünk.

A szűrési szakaszból való kilépéskor első közelítésben megkapjuk a dokumentumokat, amelyek készen állnak a Kibana megjelenítésére. A következők hiányoznak majd:

  • A numerikus mezőket a rendszer szövegként ismeri fel, ami megakadályozza a rajtuk végzett műveleteket. Mégpedig a mezők time-taken IIS naplót, valamint mezőket recipient-count и total-bites Naplókövetés.
  • A szabványos dokumentum időbélyegzője a napló feldolgozási idejét fogja tartalmazni, nem pedig azt, hogy a szerver oldalon megírták.
  • Mező recipient-address úgy fog kinézni, mint egy építkezés, amely nem teszi lehetővé a levelek címzettjeinek megszámlálását.

Itt az ideje, hogy egy kis varázslatot adjunk a naplófeldolgozási folyamathoz.

Numerikus mezők konvertálása

A dissect pluginnak van egy opciója convert_datatype, amellyel egy szövegmezőt digitális formátumba lehet konvertálni. Például így:

dissect {
  …
  convert_datatype => { "time-taken" => "int" }
  …
}

Érdemes megjegyezni, hogy ez a módszer csak akkor alkalmas, ha a mező határozottan tartalmaz egy karakterláncot. Az opció nem dolgozza fel a mezők null értékeit, és kivételt dob.

A naplók nyomon követéséhez jobb, ha nem használ hasonló konvertálási módszert, mivel a mezők recipient-count и total-bites üres lehet. E mezők konvertálásához jobb, ha egy plugint használ mutálódik:

mutate {
  convert => [ "total-bytes", "integer" ]
  convert => [ "recipient-count", "integer" ]
}

A címzett_cím felosztása egyedi címzettekre

Ez a probléma a Mute plugin használatával is megoldható:

mutate {
  split => ["recipient_address", ";"]
}

Az időbélyeg módosítása

A nyomkövetési naplók esetében a problémát nagyon könnyen megoldja a plugin adat, ami segít a terepen való írásban timestamp dátumot és időt a kívánt formátumban a mezőből date-time:

date {
  match => [ "date-time", "ISO8601" ]
  timezone => "Europe/Moscow"
  remove_field => [ "date-time" ]
}

Az IIS-naplók esetében a terepi adatokat kell kombinálnunk date и time a Mute plugin segítségével regisztrálja a szükséges időzónát, és helyezze be ezt az időbélyeget timestamp a dátum plugin használatával:

mutate { 
  add_field => { "data-time" => "%{date} %{time}" }
  remove_field => [ "date", "time" ]
}
date { 
  match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
  timezone => "UTC"
  remove_field => [ "data-time" ]
}

teljesítmény

A kimeneti szakasz a feldolgozott naplók küldésére szolgál a naplófogadónak. Közvetlenül az Elastic-nak történő küldés esetén egy plugint használnak elasticsearch, amely megadja a szerver címét és az indexnév sablont a generált dokumentum elküldéséhez:

output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Végső konfiguráció

A végső konfiguráció így fog kinézni:

input {
  beats {
    port => 5044
  }
}
 
filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
      convert_datatype => { "time-taken" => "int" }
    }
    mutate { 
      add_field => { "data-time" => "%{date} %{time}" }
      remove_field => [ "date", "time" ]
    }
    date { 
      match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
      timezone => "UTC"
      remove_field => [ "data-time" ]
    }
  }
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
    mutate {
      convert => [ "total-bytes", "integer" ]
      convert => [ "recipient-count", "integer" ]
      split => ["recipient_address", ";"]
    }
    date {
      match => [ "date-time", "ISO8601" ]
      timezone => "Europe/Moscow"
      remove_field => [ "date-time" ]
    }
  }
}
 
output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Hasznos linkek:

Forrás: will.com

Hozzászólás