Praktická aplikace ELK. Nastavení logstash

úvod

Při nasazování dalšího systému jsme se potýkali s nutností zpracovávat velké množství různých logů. Jako nástroj byl vybrán ELK. Tento článek bude hovořit o našich zkušenostech s nastavením tohoto zásobníku.

Neklademe si za cíl popsat všechny jeho možnosti, ale chceme se soustředit na řešení praktických problémů. Je to dáno tím, že s dostatečně velkým množstvím dokumentace a hotových obrázků je spousta úskalí, alespoň jsme je našli.

Nasadili jsme zásobník pomocí docker-compose. Navíc jsme měli dobře napsaný soubor docker-compose.yml, který nám umožnil zvýšit zásobník téměř bez problémů. A zdálo se nám, že vítězství je už blízko, teď to trochu pootočíme, aby vyhovovalo našim potřebám, a je to.

Pokus vyladit systém tak, aby přijímal a zpracovával logy z naší aplikace, bohužel nebyl hned na začátku úspěšný. Proto jsme se rozhodli, že stojí za to studovat každou součást zvlášť a poté se vrátit k jejich souvislostem.

Začněme tedy logstash.

Prostředí, nasazení, spuštění Logstashe v kontejneru

Pro nasazení používáme docker-compose, zde popsané experimenty byly provedeny na MacOS a Ubuntu 18.0.4.

Obrázek logstash, který jsme měli v našem původním docker-compose.yml, je docker.elastic.co/logstash/logstash:6.3.2

Použijeme to na experimenty.

Pro spuštění logstash jsme napsali samostatný docker-compose.yml. Samozřejmě bylo možné spustit obrázek z příkazové řádky, ale přeci jen jsme řešili konkrétní úlohu, kdy se nám spouští vše z docker-compose.

Stručně o konfiguračních souborech

Jak vyplývá z popisu, logstash lze spustit jako pro jeden kanál, v tomto případě je potřeba přenést soubor *.conf nebo pro více kanálů, v takovém případě je potřeba přenést soubor pipelines.yml, který naopak , bude odkazovat na soubory .conf pro každý kanál.
Vydali jsme se druhou cestou. Zdál se nám všestrannější a škálovatelnější. Proto jsme vytvořili pipelines.yml a vytvořili adresář pipelines, do kterého vložíme soubory .conf pro každý kanál.

Uvnitř kontejneru je další konfigurační soubor - logstash.yml. Nedotýkáme se ho, používáme ho tak, jak je.

Takže naše adresářová struktura je:

Praktická aplikace ELK. Nastavení logstash

Prozatím předpokládáme, že se jedná o tcp na portu 5046 pro příjem vstupních dat a pro výstup použijeme stdout.

Zde je taková jednoduchá konfigurace pro první spuštění. Protože prvotním úkolem je spustit.

Takže máme tento docker-compose.yml

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      	- elk
    ports:
      	- 5046:5046
    volumes:
      	- ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
	- ./config/pipelines:/usr/share/logstash/config/pipelines:ro

co tady vidíme?

  1. Sítě a svazky byly převzaty z původního docker-compose.yml (tam, kde se spouští celý zásobník) a myslím, že celkový obrázek zde příliš neovlivňují.
  2. Vytvoříme jednu službu (služby) logstash z obrázku docker.elastic.co/logstash/logstash:6.3.2 a pojmenujeme ji logstash_one_channel.
  3. Přesměrováváme port 5046 uvnitř kontejneru na stejný interní port.
  4. Náš konfigurační soubor potrubí ./config/pipelines.yml namapujeme na soubor /usr/share/logstash/config/pipelines.yml uvnitř kontejneru, kde jej logstash vyzvedne a pro každý případ z něj udělá pouze čtení.
  5. Adresář ./config/pipelines, kde máme konfigurační soubory potrubí, namapujeme do adresáře /usr/share/logstash/config/pipelines a také jej uděláme pouze pro čtení.

Praktická aplikace ELK. Nastavení logstash

soubor potrubí.yml

- pipeline.id: HABR
  pipeline.workers: 1
  pipeline.batch.size: 1
  path.config: "./config/pipelines/habr_pipeline.conf"

Popisuje jeden kanál s identifikátorem HABR a cestu k jeho konfiguračnímu souboru.

A nakonec soubor "./config/pipelines/habr_pipeline.conf"

input {
  tcp {
    port => "5046"
   }
  }
filter {
  mutate {
    add_field => [ "habra_field", "Hello Habr" ]
    }
  }
output {
  stdout {
      
    }
  }

Do jeho popisu se zatím nebudeme pouštět, pokusíme se spustit:

docker-compose up

Co vidíme?

Kontejner byl spuštěn. Můžeme zkontrolovat jeho práci:

echo '13123123123123123123123213123213' | nc localhost 5046

A vidíme odpověď v kontejnerové konzoli:

Praktická aplikace ELK. Nastavení logstash

Ale zároveň vidíme:

logstash_one_channel | [2019-04-29T11:28:59,790][CHYBA][logstash.licensechecker.licensereader] Nelze načíst licenční informace z licenčního serveru {:message=>"Elasticsearch Unreachable: [http://elasticsearch:9200/][Manticore ::ResolutionFailure]elasticsearch", ...

logstash_one_channel | [2019-04-29T11:28:59,894][INFO ][logstash.pipeline ] Pipeline byl úspěšně spuštěn {:pipeline_id=>".monitoring-logstash", :thread=>"# »}

logstash_one_channel | [2019-04-29T11:28:59,988][INFO ][logstash.agent ] Potrubí běží {:count=>2, :running_pipelines=>[:HABR, :".monitoring-logstash"], :non_running_pipelines=>[ ]}
logstash_one_channel | [2019-04-29T11:29:00,015][ERROR][logstash.inputs.metrics] X-Pack je nainstalován na Logstash, ale ne na Elasticsearch. Chcete-li používat funkci monitorování, nainstalujte si X-Pack na Elasticsearch. Mohou být k dispozici další funkce.
logstash_one_channel | [2019-04-29T11:29:00,526][INFO ][logstash.agent ] Koncový bod Logstash API byl úspěšně spuštěn {:port=>9600}
logstash_one_channel | [2019-04-29T11:29:04,478][INFO ][logstash.outputs.elasticsearch] Spuštění kontroly stavu, aby se zjistilo, zda připojení Elasticsearch funguje {:healthcheck_url=>http://elasticsearch:9200/, :path=> "/"}
logstash_one_channel | [2019-04-29T11:29:04,487][WARN ][logstash.outputs.elasticsearch] Pokus o obnovení připojení k mrtvé instanci ES, ale došlo k chybě. {:url=>"elasticsearch:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch Nedosažitelné: [http://elasticsearch:9200/][Manticore::ResolutionFailure] elasticsearch"}
logstash_one_channel | [2019-04-29T11:29:04,704][INFO ][logstash.licensechecker.licensereader] Spuštění kontroly stavu, aby se zjistilo, zda připojení Elasticsearch funguje {:healthcheck_url=>http://elasticsearch:9200/, :path=> "/"}
logstash_one_channel | [2019-04-29T11:29:04,710][WARN ][logstash.licensechecker.licensereader] Pokus o obnovení připojení k mrtvé instanci ES, ale došlo k chybě. {:url=>"elasticsearch:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch Nedosažitelné: [http://elasticsearch:9200/][Manticore::ResolutionFailure] elasticsearch"}

A naše kláda se neustále plazí nahoru.

Zde jsem zeleně zvýraznil zprávu o úspěšném spuštění potrubí, červeně chybovou hlášku a žlutě zprávu o pokusu o kontakt elasticsearch: 9200.
To je způsobeno skutečností, že v souboru logstash.conf, který je součástí obrázku, je kontrola dostupnosti elasticsearch. Logstash koneckonců předpokládá, že funguje jako součást Elk stacku, a my jsme ho oddělili.

Můžete pracovat, ale není to pohodlné.

Řešením je zakázat tuto kontrolu prostřednictvím proměnné prostředí XPACK_MONITORING_ENABLED.

Udělejme změnu v docker-compose.yml a spusťte jej znovu:

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro

Nyní je vše v pořádku. Nádoba je připravena na experimenty.

Můžeme znovu napsat do sousední konzoly:

echo '13123123123123123123123213123213' | nc localhost 5046

A vidí:

logstash_one_channel | {
logstash_one_channel |         "message" => "13123123123123123123123213123213",
logstash_one_channel |      "@timestamp" => 2019-04-29T11:43:44.582Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |            "host" => "gateway",
logstash_one_channel |            "port" => 49418
logstash_one_channel | }

Pracujte v rámci jednoho kanálu

Tak jsme začali. Nyní si můžete udělat čas na přímou konfiguraci logstash. Zatím se nedotýkejme souboru pipelines.yml, pojďme se podívat, co můžeme získat prací s jedním kanálem.

Musím říci, že obecný princip práce s konfiguračním souborem kanálu je dobře popsán v oficiálním manuálu zde zde
Pokud chcete číst v ruštině, pak jsme použili tento článek(ale syntaxe dotazu je tam stará, s tím je třeba počítat).

Pojďme postupně ze sekce Vstup. Už jsme viděli práci na tcp. Co dalšího zde může být zajímavého?

Testování zpráv pomocí prezenčního signálu

Existuje tak zajímavá možnost generování automatických testovacích zpráv.
Chcete-li to provést, musíte do vstupní části zahrnout plugin heartbean.

input {
  heartbeat {
    message => "HeartBeat!"
   }
  } 

Zapneme, jednou za minutu začneme přijímat

logstash_one_channel | {
logstash_one_channel |      "@timestamp" => 2019-04-29T13:52:04.567Z,
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "HeartBeat!",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "host" => "a0667e5c57ec"
logstash_one_channel | }

Chceme přijímat častěji, musíme přidat parametr interval.
Takto budeme dostávat zprávu každých 10 sekund.

input {
  heartbeat {
    message => "HeartBeat!"
    interval => 10
   }
  }

Získávání dat ze souboru

Rozhodli jsme se také podívat na režim souborů. Pokud to se souborem funguje dobře, je možné, že není vyžadován žádný agent, tedy alespoň pro místní použití.

Podle popisu by měl být režim provozu podobný tail -f, tzn. přečte nové řádky nebo volitelně přečte celý soubor.

Co tedy chceme získat:

  1. Chceme přijímat řádky, které jsou připojeny k jednomu souboru protokolu.
  2. Chceme přijímat data, která jsou zapsána do několika souborů protokolu, a zároveň být schopni oddělit, co bylo přijato odkud.
  3. Chceme se ujistit, že po restartu logstash tato data znovu neobdrží.
  4. Chceme zkontrolovat, že pokud je logstash deaktivován a data se nadále zapisují do souborů, pak když jej spustíme, obdržíme tato data.

Chcete-li provést experiment, přidejte další řádek do docker-compose.yml a otevřeme adresář, kam jsme uložili soubory.

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro
      - ./logs:/usr/share/logstash/input

A změňte vstupní sekci v habr_pipeline.conf

input {
  file {
    path => "/usr/share/logstash/input/*.log"
   }
  }

Začínáme:

docker-compose up

K vytvoření a zápisu souborů protokolu použijeme příkaz:


echo '1' >> logs/number1.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:28:53.876Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log"
logstash_one_channel | }

Jo, funguje to!

Zároveň vidíme, že jsme automaticky přidali pole cesty. V budoucnu tedy budeme moci filtrovat záznamy podle něj.

Pojď to zkusit znovu:

echo '2' >> logs/number1.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:28:59.906Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "2",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log"
logstash_one_channel | }

A teď k jinému souboru:

 echo '1' >> logs/number2.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:29:26.061Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number2.log"
logstash_one_channel | }

Skvělý! Soubor byl vyzvednut, cesta byla zadána správně, vše v pořádku.

Zastavte logstash a restartujte. Počkejme. Umlčet. Tito. Tyto záznamy znovu neobdržíme.

A nyní nejodvážnější experiment.

Vložíme logstash a provedeme:

echo '3' >> logs/number2.log
echo '4' >> logs/number1.log

Spusťte znovu logstash a uvidíte:

logstash_one_channel | {
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "3",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number2.log",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:48:50.589Z
logstash_one_channel | }
logstash_one_channel | {
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "4",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:48:50.856Z
logstash_one_channel | }

Hurá! Všechno sebralo.

Je však nutné upozornit na následující. Pokud je kontejner logstash odstraněn (docker stop logstash_one_channel && docker rm logstash_one_channel), nic se nezvedne. Pozice souboru, do které byl načten, byla uložena uvnitř kontejneru. Pokud začnete od nuly, bude přijímat pouze nové řádky.

Čtení existujících souborů

Řekněme, že logstash spouštíme poprvé, ale protokoly již máme a rádi bychom je zpracovali.
Pokud spustíme logstash se vstupní sekcí, kterou jsme použili výše, nezískáme nic. Logstash zpracuje pouze nové řádky.

Chcete-li vytáhnout řádky z existujících souborů, přidejte do vstupní části další řádek:

input {
  file {
    start_position => "beginning"
    path => "/usr/share/logstash/input/*.log"
   }
  }

Navíc existuje nuance, která se týká pouze nových souborů, které logstash ještě neviděl. U stejných souborů, které již byly v zorném poli logstashe, si již zapamatoval jejich velikost a nyní v nich bude brát pouze nové záznamy.

Zastavme se u toho studiem vstupní části. Možností je mnohem více, ale zatím máme dost na další experimenty.

Směrování a transformace dat

Pokusme se vyřešit následující problém, řekněme, že máme zprávy z jednoho kanálu, některé z nich jsou informativní a některé jsou chybové. Liší se tagem. Některé jsou INFO, jiné jsou ERROR.

Musíme je oddělit u východu. Tito. Do jednoho kanálu zapisujeme informační zprávy a do jiného chybové zprávy.

Chcete-li to provést, přejděte ze vstupní části na filtr a výstup.

Pomocí sekce filtru rozebereme příchozí zprávu, získáme z ní hash (páry klíč-hodnota), se kterým již můžeme pracovat, tzn. rozebrat podle podmínek. A ve výstupní části vybereme zprávy a pošleme každou na svůj vlastní kanál.

Analýza zprávy pomocí Groka

Aby bylo možné analyzovat textové řetězce a získat z nich sadu polí, existuje v sekci filtrů speciální plugin - grok.

Aniž bych si kladl za cíl to zde podrobně popsat (na to odkazuji oficiální dokumentace), uvedu svůj jednoduchý příklad.

Chcete-li to provést, musíte se rozhodnout o formátu vstupních řádků. Mám je takto:

1 INFO zpráva1
2 CHYBOVÁ zpráva2

Tito. Nejprve identifikátor, poté INFO/CHYBA, poté nějaké slovo bez mezer.
Není to těžké, ale stačí k pochopení principu fungování.

Takže v sekci filtru v pluginu grok musíme definovat vzor pro analýzu našich řetězců.

Bude to vypadat takto:

filter {
  grok {
    match => { "message" => ["%{INT:message_id} %{LOGLEVEL:message_type} %{WORD:message_text}"] }
   }
  } 

V podstatě je to regulární výraz. Používají se hotové vzory jako INT, LOGLEVEL, WORD. Jejich popis, stejně jako další vzory, si můžete prohlédnout zde. zde

Nyní, když procházíme tímto filtrem, náš řetězec se změní na hash tří polí: message_id, message_type, message_text.

Budou zobrazeny ve výstupní části.

Směrování zpráv ve výstupní části pomocí příkazu if

Ve výstupní části, jak si pamatujeme, jsme se chystali rozdělit zprávy do dvou proudů. Některé – což jsou iNFO, vypíšeme do konzole a s chybami vypíšeme do souboru.

Jak můžeme tyto zprávy sdílet? Už stav problému naznačuje řešení – ostatně již máme vyhrazené pole message_type, které může nabývat pouze dvou hodnot INFO a ERROR. Právě na něm provedeme volbu pomocí příkazu if.

if [message_type] == "ERROR" {
        # Здесь выводим в файл
       } else
     {
      # Здесь выводим в stdout
    }

Popis práce s poli a operátory naleznete v této sekci oficiální manuál.

Nyní k samotnému závěru.

Výstup konzoly, zde je vše jasné - stdout {}

Ale výstup do souboru - pamatujte, že to vše spouštíme z kontejneru a aby byl soubor, do kterého zapisujeme výsledek, přístupný zvenčí, musíme tento adresář otevřít v docker-compose.yml.

Celkem:

Výstupní část našeho souboru vypadá takto:


output {
  if [message_type] == "ERROR" {
    file {
          path => "/usr/share/logstash/output/test.log"
          codec => line { format => "custom format: %{message}"}
         }
    } else
     {stdout {
             }
     }
  }

Přidejte jeden další svazek do docker-compose.yml pro výstup:

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro
      - ./logs:/usr/share/logstash/input
      - ./output:/usr/share/logstash/output

Začínáme, zkoušíme, vidíme rozdělení na dva proudy.

Zdroj: www.habr.com

Přidat komentář