Az ELK gyakorlati alkalmazása. Logstash beállítása

Bevezetés

Egy másik rendszer üzembe helyezése közben azzal kellett szembesülnünk, hogy nagyszámú különböző naplót kellett feldolgoznunk. Eszközként az ELK-t választották. Ez a cikk a verem beállításával kapcsolatos tapasztalatainkat tárgyalja.

Nem azt tűzzük ki célul, hogy minden képességét leírjuk, hanem kifejezetten a gyakorlati problémák megoldására kívánunk koncentrálni. Ez annak köszönhető, hogy bár elég sok a dokumentáció és a kész kép, azért elég sok a buktató, legalábbis mi találtuk.

A verem telepítését a docker-compose segítségével végeztük. Sőt, volt egy jól megírt docker-compose.yml-ünk, amivel szinte problémamentesen emelhettük a veremet. És nekünk úgy tűnt, hogy már közel van a győzelem, most kicsit finomítjuk az igényeinknek megfelelően, és ennyi.

Sajnos a rendszert úgy konfigurálni, hogy az alkalmazásunkból naplókat fogadjon és dolgozzon fel, nem járt azonnal sikerrel. Ezért úgy döntöttünk, hogy érdemes az egyes komponenseket külön-külön tanulmányozni, majd visszatérni az összefüggéseikre.

Tehát a logstash-val kezdtük.

Környezet, telepítés, Logstash futtatása tárolóban

A telepítéshez docker-compose-t használunk, az itt leírt kísérleteket MacOS és Ubuntu 18.0.4 rendszeren végeztük.

Az eredeti docker-compose.yml fájlban regisztrált logstash kép a következő: docker.elastic.co/logstash/logstash:6.3.2

Kísérletekhez fogjuk használni.

Írtunk egy külön docker-compose.yml fájlt a logstash futtatásához. Természetesen parancssorból is el lehetett indítani a képet, de egy konkrét problémát oldottunk meg, ahol mindent a docker-compose-ból futtatunk.

Röviden a konfigurációs fájlokról

Ahogy a leírásból következik, a logstash futtatható egy csatornán, amely esetben át kell adnia a *.conf fájlt, vagy több csatornán, amely esetben át kell adnia a pipelines.yml fájlt, ami viszont , az egyes csatornák .conf fájljaira hivatkozik.
A második utat választottuk. Számunkra univerzálisabbnak és skálázhatóbbnak tűnt. Ezért létrehoztuk a pipelines.yml fájlt, és létrehoztunk egy pipelines könyvtárat, amelybe minden csatornához .conf fájlokat helyezünk el.

A tárolóban van egy másik konfigurációs fájl - logstash.yml. Nem nyúlunk hozzá, úgy használjuk, ahogy van.

Tehát a mi könyvtárszerkezetünk:

Az ELK gyakorlati alkalmazása. Logstash beállítása

A bemeneti adatok fogadásához egyelőre feltételezzük, hogy ez a tcp az 5046-os porton, a kimenethez pedig az stdout-ot fogjuk használni.

Íme egy egyszerű konfiguráció az első indításhoz. Mert a kezdeti feladat az indítás.

Tehát megvan ez a docker-compose.yml

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      	- elk
    ports:
      	- 5046:5046
    volumes:
      	- ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
	- ./config/pipelines:/usr/share/logstash/config/pipelines:ro

Mit látunk itt?

  1. A hálózatok és a kötetek az eredeti docker-compose.yml-ből származnak (ahol a teljes verem indul), és úgy gondolom, hogy ezek itt nem befolyásolják nagyban az összképet.
  2. Létrehozunk egy logstash szolgáltatás(oka)t a docker.elastic.co/logstash/logstash:6.3.2 képből, és elnevezzük logstash_one_channel.
  3. A konténerben lévő 5046-os portot ugyanarra a belső portra továbbítjuk.
  4. A ./config/pipelines.yml csőkonfigurációs fájlunkat leképezzük a tárolóban lévő /usr/share/logstash/config/pipelines.yml fájlra, ahol a logstash felveszi és csak olvashatóvá teszi, minden esetre.
  5. A ./config/pipelines könyvtárat, ahol a csatornabeállításokkal rendelkező fájljaink vannak, leképezzük a /usr/share/logstash/config/pipelines könyvtárba, és csak olvashatóvá tesszük.

Az ELK gyakorlati alkalmazása. Logstash beállítása

Pipelines.yml fájl

- pipeline.id: HABR
  pipeline.workers: 1
  pipeline.batch.size: 1
  path.config: "./config/pipelines/habr_pipeline.conf"

Itt van leírva egy HABR azonosítóval rendelkező csatorna és a konfigurációs fájl elérési útja.

És végül a „./config/pipelines/habr_pipeline.conf” fájl

input {
  tcp {
    port => "5046"
   }
  }
filter {
  mutate {
    add_field => [ "habra_field", "Hello Habr" ]
    }
  }
output {
  stdout {
      
    }
  }

A leírásába most ne menjünk bele, próbáljuk meg futtatni:

docker-compose up

Mit látunk?

A konténer elindult. Működését ellenőrizhetjük:

echo '13123123123123123123123213123213' | nc localhost 5046

És látjuk a választ a tárolókonzolon:

Az ELK gyakorlati alkalmazása. Logstash beállítása

De ugyanakkor azt is látjuk, hogy:

logstash_one_channel | [2019-04-29T11:28:59,790][HIBA][logstash.licensechecker.licensereader] Nem sikerült lekérni a licencadatokat a licencszerverről {:message=>„Elasticsearch elérhetetlen: [http://elasticsearch:9200/][Manticore ::ResolutionFailure] elasticsearch", ...

logstash_one_channel | [2019-04-29T11:28:59,894][INFO ][logstash.pipeline ] A folyamat sikeresen elindult {:pipeline_id=>".monitoring-logstash", :thread=>"# "}

logstash_one_channel | [2019-04-29T11:28:59,988][INFO ][logstash.agent ] Futó folyamatok: {:count=>2, :running_pipelines=>[:HABR, :".monitoring-logstash"], :non_running_pipelines=>[ ]}
logstash_one_channel | [2019-04-29T11:29:00,015][HIBA][logstash.inputs.metrics] Az X-Pack telepítve van a Logstash-ra, de az Elasticsearch-re nincs. Kérjük, telepítse az X-Pack-et az Elasticsearch-re a figyelési funkció használatához. Más funkciók is elérhetők lehetnek.
logstash_one_channel | [2019-04-29T11:29:00,526][INFO ][logstash.agent ] Sikeresen elindult a Logstash API végpont {:port=>9600}
logstash_one_channel | [2019-04-29T11:29:04,478][INFO ][logstash.outputs.elasticsearch] Állapotellenőrzés, hogy ellenőrizze, működik-e az Elasticsearch kapcsolat {:healthcheck_url=>http://elasticsearch:9200/, :path=> "/"}
logstash_one_channel | [2019-04-29T11:29:04,487][WARN ][logstash.outputs.elasticsearch] Megpróbálta újraéleszteni a kapcsolatot egy halott ES-példánnyal, de hibaüzenetet kapott. {:url=>"elasticsearch:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch nem elérhető: [http://elasticsearch:9200/][Manticore::Re]solution rugalmas keresés"}
logstash_one_channel | [2019-04-29T11:29:04,704][INFO ][logstash.licensechecker.licensereader] Állapotellenőrzés, hogy ellenőrizze, működik-e az Elasticsearch kapcsolat {:healthcheck_url=>http://elasticsearch:9200/, :path=> "/"}
logstash_one_channel | [2019-04-29T11:29:04,710][WARN ][logstash.licensechecker.licensereader] Megpróbálta újraéleszteni a kapcsolatot a halott ES-példányhoz, de hibaüzenetet kapott. {:url=>"elasticsearch:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch nem elérhető: [http://elasticsearch:9200/][Manticore::Re]solution rugalmas keresés"}

A naplónk pedig állandóan felkúszik.

Itt zölddel kiemeltem azt az üzenetet, hogy a csővezeték sikeresen elindult, pirossal a hibaüzenetet és sárgával a kapcsolatfelvételi kísérletről szóló üzenetet. elasticsearch: 9200.
Ez azért történik, mert a képben található logstash.conf tartalmazza az elasticsearch elérhetőségének ellenőrzését. Végül is a logstash feltételezi, hogy az Elk verem részeként működik, de mi elkülönítettük.

Lehet dolgozni, de nem kényelmes.

A megoldás az, hogy letiltja ezt az ellenőrzést az XPACK_MONITORING_ENABLED környezeti változón keresztül.

Változtassuk meg a docker-compose.yml fájlt, és futtassuk újra:

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro

Most minden rendben van. A tartály készen áll a kísérletekre.

Újra beírhatjuk a következő konzolba:

echo '13123123123123123123123213123213' | nc localhost 5046

És nézd:

logstash_one_channel | {
logstash_one_channel |         "message" => "13123123123123123123123213123213",
logstash_one_channel |      "@timestamp" => 2019-04-29T11:43:44.582Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |            "host" => "gateway",
logstash_one_channel |            "port" => 49418
logstash_one_channel | }

Egy csatornán belül működik

Tehát elindítottuk. Most már valóban időt szánhat magának a logstash konfigurálására. Egyelőre ne nyúljunk a pipelines.yml fájlhoz, nézzük meg, mit érhetünk el, ha egy csatornával dolgozunk.

Azt kell mondanom, hogy a csatornakonfigurációs fájllal való munka általános elve jól le van írva a hivatalos kézikönyvben, itt itt
Ha oroszul szeretne olvasni, ezt használtuk cikk(de a lekérdezés szintaxisa ott régi, ezt figyelembe kell vennünk).

Lépjünk sorban az Input részből. Láttunk már munkát a TCP-n. Mi lehet még itt érdekes?

Tesztelje az üzeneteket szívveréssel

Van egy ilyen érdekes lehetőség az automatikus tesztüzenetek generálására.
Ehhez engedélyeznie kell a szívbean plugint a beviteli részben.

input {
  heartbeat {
    message => "HeartBeat!"
   }
  } 

Kapcsolja be, kezdje el kapni percenként egyszer

logstash_one_channel | {
logstash_one_channel |      "@timestamp" => 2019-04-29T13:52:04.567Z,
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "HeartBeat!",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "host" => "a0667e5c57ec"
logstash_one_channel | }

Ha gyakrabban szeretnénk kapni, akkor hozzá kell adni az intervallum paramétert.
Így 10 másodpercenként kapunk üzenetet.

input {
  heartbeat {
    message => "HeartBeat!"
    interval => 10
   }
  }

Adatok lekérése fájlból

Úgy döntöttünk, hogy megnézzük a fájlmódot is. Ha jól működik a fájllal, akkor talán nincs szükség ügynökre, legalábbis helyi használatra.

A leírás szerint az üzemmódnak a farok -f-hez hasonlónak kell lennie, azaz. beolvassa az új sorokat, vagy opcionálisan a teljes fájlt.

Tehát amit szeretnénk kapni:

  1. Olyan sorokat szeretnénk kapni, amelyek egy naplófájlhoz vannak hozzáfűzve.
  2. Több naplófájlba írt adatokat szeretnénk kapni, miközben el tudjuk különíteni, hogy mi honnan érkezik.
  3. Biztosak akarunk lenni abban, hogy a logstash újraindításakor ne kapja meg újra ezeket az adatokat.
  4. Szeretnénk ellenőrizni, hogy ha a logstash ki van kapcsolva, és az adatok továbbra is fájlba íródnak, akkor a futtatáskor ezeket az adatokat megkapjuk.

A kísérlet végrehajtásához adjunk hozzá még egy sort a docker-compose.yml fájlhoz, megnyitva azt a könyvtárat, amelybe a fájlokat helyeztük.

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro
      - ./logs:/usr/share/logstash/input

És módosítsa a beviteli részt a habr_pipeline.conf fájlban

input {
  file {
    path => "/usr/share/logstash/input/*.log"
   }
  }

Kezdjük:

docker-compose up

A naplófájlok létrehozásához és írásához a következő parancsot fogjuk használni:


echo '1' >> logs/number1.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:28:53.876Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log"
logstash_one_channel | }

Igen, működik!

Ugyanakkor azt látjuk, hogy automatikusan hozzáadtuk az elérési út mezőt. Ez azt jelenti, hogy a jövőben ez alapján szűrhetjük majd a rekordokat.

Próbáljuk meg újra:

echo '2' >> logs/number1.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:28:59.906Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "2",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log"
logstash_one_channel | }

És most egy másik fájlhoz:

 echo '1' >> logs/number2.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:29:26.061Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number2.log"
logstash_one_channel | }

Nagy! A fájlt felvették, az elérési utat helyesen adták meg, minden rendben.

Állítsa le a logstash-t, és indítsa újra. Várjunk. Csend. Azok. Ezeket a feljegyzéseket többé nem kapjuk meg.

És most a legmerészebb kísérlet.

Telepítse a logstash-t és futtassa:

echo '3' >> logs/number2.log
echo '4' >> logs/number1.log

Futtassa újra a logstash-t, és nézze meg:

logstash_one_channel | {
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "3",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number2.log",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:48:50.589Z
logstash_one_channel | }
logstash_one_channel | {
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "4",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:48:50.856Z
logstash_one_channel | }

Hurrá! Mindent felvettek.

De figyelmeztetnünk kell a következőkre. Ha a logstash-t tartalmazó tároló törlődik (docker stop logstash_one_channel && docker rm logstash_one_channel), akkor a rendszer semmit sem vesz fel. A fájl pozíciója, ameddig az olvasásra került, a tárolóban volt tárolva. Ha a nulláról futtatod, akkor csak új sorokat fogad el.

Meglévő fájlok olvasása

Tegyük fel, hogy először indítjuk el a logstash-t, de már vannak naplóink, és szeretnénk azokat feldolgozni.
Ha a logstash-t a fent használt bemeneti szekcióval futtatjuk, akkor semmit nem kapunk. A logstash csak az új sorokat dolgozza fel.

A meglévő fájlok sorainak felhúzásához adjon hozzá egy további sort a beviteli szakaszhoz:

input {
  file {
    start_position => "beginning"
    path => "/usr/share/logstash/input/*.log"
   }
  }

Sőt, van egy árnyalat: ez csak az új fájlokat érinti, amelyeket a logstash még nem látott. Ugyanazoknál a fájloknál, amelyek már a logstash látómezejében voltak, már megjegyezte a méretüket, és most csak új bejegyzéseket vesz fel bennük.

Álljunk meg itt, és tanulmányozzuk a beviteli részt. Sok lehetőség van még, de ez egyelőre elegendő a további kísérletekhez.

Útválasztás és adatátalakítás

Próbáljuk meg megoldani a következő problémát, tegyük fel, hogy egy csatornáról vannak üzeneteink, ezek egy része tájékoztató jellegű, és néhány hibaüzenet. Címke szerint különböznek egymástól. Egyesek INFO, mások ERROR.

El kell választanunk őket a kijáratnál. Azok. Az egyik csatornába információs üzeneteket írunk, a másikba hibaüzeneteket.

Ehhez lépjen a bemeneti részből a szűrőre és a kimenetre.

A szűrő szekció segítségével elemezni fogjuk a bejövő üzenetet, kapunk belőle egy hash-t (kulcs-érték párokat), amivel már tudunk dolgozni, pl. a feltételeknek megfelelően szétszerelni. A kimeneti részben pedig kiválasztjuk az üzeneteket, és mindegyiket elküldjük a saját csatornájára.

Üzenet elemzése grokkal

A szöveges karakterláncok elemzéséhez és belőlük mezők gyűjtéséhez van egy speciális bővítmény a szűrő részben - grok.

Anélkül, hogy célul tűztem volna ki, hogy itt részletes leírást adjak róla (erre hivatkozom hivatalos dokumentáció), hozom az egyszerű példámat.

Ehhez el kell döntenie a bemeneti karakterláncok formátumát. Nekem ilyenek vannak:

1 INFO üzenet1
2 HIBA üzenet2

Azok. Először az azonosító, majd az INFO/ERROR, majd valami szóközök nélküli szó.
Nem nehéz, de a működési elv megértéséhez elegendő.

Tehát a grok plugin szűrő részében meg kell határoznunk egy mintát a karakterláncaink elemzéséhez.

Így fog kinézni:

filter {
  grok {
    match => { "message" => ["%{INT:message_id} %{LOGLEVEL:message_type} %{WORD:message_text}"] }
   }
  } 

Lényegében ez egy reguláris kifejezés. Kész mintákat használnak, például INT, LOGLEVEL, WORD. Leírásuk, valamint egyéb minták itt találhatók itt

Most, hogy áthaladunk ezen a szűrőn, a karakterláncunk három mezőből álló hash-be fog alakulni: message_id, message_type, message_text.

Ezek a kimeneti részben jelennek meg.

Üzenetek irányítása a kimeneti szakaszba az if paranccsal

A kimeneti részben, mint emlékszünk, két folyamra akartuk felosztani az üzeneteket. Néhányat - amelyek iNFO - a konzolra küldjük, és hibák esetén egy fájlba adjuk ki.

Hogyan különítjük el ezeket az üzeneteket? A probléma feltétele máris megoldást sugall - elvégre már van egy dedikált message_type mezőnk, amely csak két értéket vehet fel: INFO és ERROR. Ennek alapján választunk az if utasítással.

if [message_type] == "ERROR" {
        # Здесь выводим в файл
       } else
     {
      # Здесь выводим в stdout
    }

A mezőkkel és operátorokkal végzett munka leírása ebben a részben található hivatalos kézikönyv.

Most magáról a tényleges következtetésről.

Konzol kimenet, itt minden világos - stdout {}

De a kimenet egy fájlba - ne feledje, hogy mindezt egy tárolóból futtatjuk, és ahhoz, hogy a fájl, amelybe az eredményt írjuk, kívülről elérhető legyen, meg kell nyitnunk ezt a könyvtárat a docker-compose.yml-ben.

Összesen:

Fájlunk kimeneti része így néz ki:


output {
  if [message_type] == "ERROR" {
    file {
          path => "/usr/share/logstash/output/test.log"
          codec => line { format => "custom format: %{message}"}
         }
    } else
     {stdout {
             }
     }
  }

A docker-compose.yml fájlhoz hozzáadunk egy másik kötetet a kimenethez:

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro
      - ./logs:/usr/share/logstash/input
      - ./output:/usr/share/logstash/output

Elindítjuk, kipróbáljuk, és két folyamra oszlik.

Forrás: will.com

Hozzászólás