Praktická aplikácia ELK. Nastavenie logstash

Úvod

Pri nasadzovaní iného systému sme sa stretli s potrebou spracovať veľké množstvo rôznych logov. Ako nástroj bol vybraný ELK. Tento článok bude diskutovať o našich skúsenostiach s nastavením tohto zásobníka.

Nekladieme si za cieľ popísať všetky jeho možnosti, ale chceme sa sústrediť konkrétne na riešenie praktických problémov. Je to spôsobené tým, že aj keď je dokumentácie a hotových obrázkov pomerne veľké množstvo, úskalí je tu pomerne veľa, aspoň sme ich našli.

Zásobník sme nasadili cez docker-compose. Okrem toho sme mali dobre napísaný docker-compose.yml, ktorý nám umožnil takmer bez problémov zvýšiť zásobník. A zdalo sa nám, že víťazstvo je už blízko, teraz to trochu upravíme, aby vyhovovalo našim potrebám a je to.

Bohužiaľ, pokus o konfiguráciu systému na prijímanie a spracovanie protokolov z našej aplikácie nebol okamžite úspešný. Preto sme sa rozhodli, že stojí za to študovať každý komponent samostatne a potom sa vrátiť k ich spojeniam.

Začali sme teda logstashom.

Prostredie, nasadenie, spustenie Logstash v kontajneri

Na nasadenie používame docker-compose; tu opísané experimenty boli vykonané na MacOS a Ubuntu 18.0.4.

Obrázok logstash, ktorý bol zaregistrovaný v našom pôvodnom docker-compose.yml, je docker.elastic.co/logstash/logstash:6.3.2

Použijeme ho na experimenty.

Na spustenie logstash sme napísali samostatný docker-compose.yml. Samozrejme bolo možné spustiť obrázok z príkazového riadku, no riešili sme špecifický problém, kedy všetko spúšťame z docker-compose.

Stručne o konfiguračných súboroch

Ako vyplýva z popisu, logstash môže byť spustený buď pre jeden kanál, v takom prípade potrebuje odovzdať súbor *.conf, alebo pre viacero kanálov, v takom prípade je potrebné odovzdať súbor pipelines.yml, ktorý naopak , bude odkazovať na súbory .conf pre každý kanál.
Vybrali sme sa druhou cestou. Zdal sa nám univerzálnejší a škálovateľnejší. Preto sme vytvorili pipelines.yml a vytvorili adresár pipelines, do ktorého vložíme súbory .conf pre každý kanál.

Vo vnútri kontajnera je ďalší konfiguračný súbor - logstash.yml. Nedotýkame sa ho, používame ho tak, ako je.

Takže naša adresárová štruktúra:

Praktická aplikácia ELK. Nastavenie logstash

Na príjem vstupných údajov zatiaľ predpokladáme, že ide o tcp na porte 5046 a na výstup použijeme stdout.

Tu je jednoduchá konfigurácia pre prvé spustenie. Pretože prvotnou úlohou je spustenie.

Takže máme tento docker-compose.yml

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      	- elk
    ports:
      	- 5046:5046
    volumes:
      	- ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
	- ./config/pipelines:/usr/share/logstash/config/pipelines:ro

Čo tu vidíme?

  1. Siete a zväzky boli prevzaté z pôvodného docker-compose.yml (ten, kde sa spúšťa celý zásobník) a myslím si, že celkový obraz tu veľmi neovplyvňujú.
  2. Vytvoríme jednu službu(y) logstash z obrázka docker.elastic.co/logstash/logstash:6.3.2 a pomenujeme ju logstash_one_channel.
  3. Preposielame port 5046 vo vnútri kontajnera na rovnaký interný port.
  4. Náš konfiguračný súbor potrubia ./config/pipelines.yml namapujeme na súbor /usr/share/logstash/config/pipelines.yml vo vnútri kontajnera, kde ho logstash vyberie a pre každý prípad urobí len na čítanie.
  5. Adresár ./config/pipelines, kde máme súbory s nastaveniami kanálov, namapujeme do adresára /usr/share/logstash/config/pipelines a urobíme ho iba na čítanie.

Praktická aplikácia ELK. Nastavenie logstash

Súbor Pipelines.yml

- pipeline.id: HABR
  pipeline.workers: 1
  pipeline.batch.size: 1
  path.config: "./config/pipelines/habr_pipeline.conf"

Tu je popísaný jeden kanál s identifikátorom HABR a cesta k jeho konfiguračnému súboru.

A nakoniec súbor „./config/pipelines/habr_pipeline.conf“

input {
  tcp {
    port => "5046"
   }
  }
filter {
  mutate {
    add_field => [ "habra_field", "Hello Habr" ]
    }
  }
output {
  stdout {
      
    }
  }

Zatiaľ sa nebudeme zaoberať jeho popisom, skúsme ho spustiť:

docker-compose up

Čo vidíme

Kontajner sa spustil. Môžeme skontrolovať jeho fungovanie:

echo '13123123123123123123123213123213' | nc localhost 5046

A vidíme odpoveď v kontajnerovej konzole:

Praktická aplikácia ELK. Nastavenie logstash

Zároveň však vidíme aj:

logstash_one_channel | [2019-04-29T11:28:59,790][CHYBA][logstash.licensechecker.licensereader] Nepodarilo sa získať informácie o licencii z licenčného servera {:message=>“Elasticsearch Unreachable: [http://elasticsearch:9200/][Manticore ::ResolutionFailure] elasticsearch", ...

logstash_one_channel | [2019-04-29T11:28:59,894][INFO ][logstash.pipeline ] Pipeline sa úspešne spustil {:pipeline_id=>".monitoring-logstash", :thread=>"# "}

logstash_one_channel | [2019-04-29T11:28:59,988][INFO ][logstash.agent ] Spustené potrubia {:count=>2, :running_pipelines=>[:HABR, :".monitoring-logstash"], :non_running_pipelines=>[ ]}
logstash_one_channel | [2019-04-29T11:29:00,015][ERROR][logstash.inputs.metrics] X-Pack je nainštalovaný na Logstash, ale nie na Elasticsearch. Ak chcete používať funkciu monitorovania, nainštalujte si X-Pack na Elasticsearch. K dispozícii môžu byť aj ďalšie funkcie.
logstash_one_channel | [2019-04-29T11:29:00,526][INFO ][logstash.agent ] Koncový bod rozhrania Logstash API bol úspešne spustený {:port=>9600}
logstash_one_channel | [2019-04-29T11:29:04,478][INFO ][logstash.outputs.elasticsearch] Spustenie kontroly stavu, aby sa zistilo, či pripojenie Elasticsearch funguje {:healthcheck_url=>http://elasticsearch:9200/, :path=> "/"}
logstash_one_channel | [2019-04-29T11:29:04,487][WARN ][logstash.outputs.elasticsearch] Pokus o obnovenie pripojenia k mŕtvej inštancii ES, ale vyskytla sa chyba. {:url=>“ElasticSearch:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch Nedosiahnuteľné: [http://elasticsearch:9200/][Manticore::ResolutionFailure] elasticsearch"}
logstash_one_channel | [2019-04-29T11:29:04,704][INFO ][logstash.licensechecker.licensereader] Spustenie kontroly stavu, aby sa zistilo, či pripojenie Elasticsearch funguje {:healthcheck_url=>http://elasticsearch:9200/, :path=> "/"}
logstash_one_channel | [2019-04-29T11:29:04,710][WARN ][logstash.licensechecker.licensereader] Pokus o obnovenie pripojenia k mŕtvej inštancii ES, ale vyskytla sa chyba. {:url=>“ElasticSearch:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch Nedosiahnuteľné: [http://elasticsearch:9200/][Manticore::ResolutionFailure] elasticsearch"}

A naše poleno sa neustále plazí.

Tu som zelenou farbou zvýraznil hlásenie, že sa potrubie úspešne spustilo, červenou chybové hlásenie a žltou hlásenie o pokuse o kontakt ElasticSearch: 9200.
Stáva sa to preto, že logstash.conf, zahrnutý v obrázku, obsahuje kontrolu dostupnosti elasticsearch. Koniec koncov, logstash predpokladá, že funguje ako súčasť zásobníka Elk, ale oddelili sme ho.

Je možné pracovať, ale nie je to pohodlné.

Riešením je zakázať túto kontrolu prostredníctvom premennej prostredia XPACK_MONITORING_ENABLED.

Zmeňte súbor docker-compose.yml a spustite ho znova:

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro

Teraz je všetko v poriadku. Nádoba je pripravená na experimenty.

V ďalšej konzole môžeme znova zadať:

echo '13123123123123123123123213123213' | nc localhost 5046

A pozri:

logstash_one_channel | {
logstash_one_channel |         "message" => "13123123123123123123123213123213",
logstash_one_channel |      "@timestamp" => 2019-04-29T11:43:44.582Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |            "host" => "gateway",
logstash_one_channel |            "port" => 49418
logstash_one_channel | }

Práca v rámci jedného kanála

Tak sme spustili. Teraz si môžete skutočne nájsť čas na konfiguráciu samotného logstash. Zatiaľ sa nedotýkajme súboru pipelines.yml, pozrime sa, čo môžeme získať prácou s jedným kanálom.

Musím povedať, že všeobecný princíp práce s konfiguračným súborom kanála je dobre popísaný v oficiálnom manuáli tu tu
Ak chcete čítať v ruštine, použili sme tento článok(ale syntax dopytu je stará, musíme to vziať do úvahy).

Poďme postupne zo sekcie Vstup. Už sme videli prácu na TCP. Čo tu ešte môže byť zaujímavé?

Otestujte správy pomocou srdcového tepu

Existuje taká zaujímavá príležitosť na generovanie automatických testovacích správ.
Ak to chcete urobiť, musíte povoliť doplnok heartbean v sekcii vstupu.

input {
  heartbeat {
    message => "HeartBeat!"
   }
  } 

Zapnite ho a začnite prijímať raz za minútu

logstash_one_channel | {
logstash_one_channel |      "@timestamp" => 2019-04-29T13:52:04.567Z,
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "HeartBeat!",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "host" => "a0667e5c57ec"
logstash_one_channel | }

Ak chceme prijímať častejšie, musíme pridať parameter interval.
Takto dostaneme správu každých 10 sekúnd.

input {
  heartbeat {
    message => "HeartBeat!"
    interval => 10
   }
  }

Načítanie údajov zo súboru

Rozhodli sme sa pozrieť aj na režim súborov. Ak to so súborom funguje dobre, potom možno nie je potrebný žiadny agent, aspoň na lokálne použitie.

Podľa popisu by mal byť prevádzkový režim podobný ako tail -f, t.j. prečíta nové riadky alebo voliteľne prečíta celý súbor.

Čo teda chceme získať:

  1. Chceme prijímať riadky, ktoré sú pripojené k jednému súboru denníka.
  2. Chceme prijímať údaje, ktoré sú zapísané do niekoľkých protokolových súborov, a zároveň byť schopní oddeliť to, čo je prijaté odkiaľ.
  3. Chceme sa uistiť, že keď sa logstash reštartuje, už tieto údaje nedostane.
  4. Chceme skontrolovať, že ak je logstash vypnutý a údaje sa naďalej zapisujú do súborov, potom keď ho spustíme, dostaneme tieto údaje.

Ak chcete vykonať experiment, pridajte ďalší riadok do docker-compose.yml, čím otvoríme adresár, do ktorého sme uložili súbory.

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro
      - ./logs:/usr/share/logstash/input

A zmeňte vstupnú sekciu v habr_pipeline.conf

input {
  file {
    path => "/usr/share/logstash/input/*.log"
   }
  }

Začnime:

docker-compose up

Na vytvorenie a zápis log súborov použijeme príkaz:


echo '1' >> logs/number1.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:28:53.876Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log"
logstash_one_channel | }

Áno, funguje to!

Zároveň vidíme, že sme automaticky pridali pole cesty. To znamená, že v budúcnosti budeme môcť podľa nej filtrovať záznamy.

Skúsme to opäť:

echo '2' >> logs/number1.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:28:59.906Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "2",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log"
logstash_one_channel | }

A teraz k inému súboru:

 echo '1' >> logs/number2.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:29:26.061Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number2.log"
logstash_one_channel | }

Skvelé! Súbor bol vybratý, cesta bola zadaná správne, všetko je v poriadku.

Zastavte logstash a začnite znova. Počkajme. Ticho. Tie. Tieto záznamy už nedostávame.

A teraz najodvážnejší experiment.

Nainštalujte logstash a vykonajte:

echo '3' >> logs/number2.log
echo '4' >> logs/number1.log

Znova spustite logstash a uvidíte:

logstash_one_channel | {
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "3",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number2.log",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:48:50.589Z
logstash_one_channel | }
logstash_one_channel | {
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "4",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:48:50.856Z
logstash_one_channel | }

Hurá! Všetko bolo vyzdvihnuté.

Musíme vás však upozorniť na nasledujúce. Ak sa odstráni kontajner logstash (docker stop logstash_one_channel && docker rm logstash_one_channel), nič sa nevyzdvihne. Pozícia súboru, do ktorej bol načítaný, bola uložená vo vnútri kontajnera. Ak ho spustíte od začiatku, bude akceptovať iba nové riadky.

Čítanie existujúcich súborov

Povedzme, že spúšťame logstash prvýkrát, no už máme logy a chceli by sme ich spracovať.
Ak spustíme logstash so vstupnou sekciou, ktorú sme použili vyššie, nedostaneme nič. Logstash spracuje iba nové riadky.

Aby sa mohli vytiahnuť riadky z existujúcich súborov, mali by ste do vstupnej sekcie pridať ďalší riadok:

input {
  file {
    start_position => "beginning"
    path => "/usr/share/logstash/input/*.log"
   }
  }

Okrem toho je tu nuansa: týka sa to iba nových súborov, ktoré logstash ešte nevidel. Pre tie isté súbory, ktoré už boli v zornom poli logstash, si už zapamätal ich veľkosť a teraz v nich bude brať len nové záznamy.

Zastavme sa tu a preštudujme si vstupnú časť. Možností je ešte veľa, ale na ďalšie experimenty nám to zatiaľ stačí.

Smerovanie a transformácia údajov

Pokúsme sa vyriešiť nasledujúci problém, povedzme, že máme správy z jedného kanála, niektoré z nich sú informačné a niektoré chybové. Líšia sa štítkom. Niektoré sú INFO, iné sú ERROR.

Musíme ich oddeliť pri východe. Tie. Do jedného kanála zapisujeme informačné správy a do iného chybové hlásenia.

Ak to chcete urobiť, prejdite zo vstupnej časti na filter a výstup.

Pomocou sekcie filter rozoberieme prichádzajúcu správu, pričom z nej získame hash (páry kľúč – hodnota), s ktorým už vieme pracovať, t.j. rozoberať podľa podmienok. A vo výstupnej sekcii vyberieme správy a pošleme každú na vlastný kanál.

Analýza správy pomocou groku

Aby ste mohli analyzovať textové reťazce a získať z nich množinu polí, v sekcii filtrov existuje špeciálny plugin - grok.

Bez toho, aby som si stanovil za cieľ uviesť to tu podrobne (na to odkazujem oficiálna dokumentácia), uvediem svoj jednoduchý príklad.

Aby ste to dosiahli, musíte sa rozhodnúť pre formát vstupných reťazcov. Mám ich takto:

1 INFO správa1
2 CHYBOVÉ hlásenie2

Tie. Na prvom mieste je identifikátor, potom INFO/CHYBA, potom nejaké slovo bez medzier.
Nie je to ťažké, ale stačí pochopiť princíp fungovania.

Takže v sekcii filtrovania doplnku grok musíme definovať vzor na analýzu našich reťazcov.

Bude to vyzerať takto:

filter {
  grok {
    match => { "message" => ["%{INT:message_id} %{LOGLEVEL:message_type} %{WORD:message_text}"] }
   }
  } 

V podstate ide o regulárny výraz. Používajú sa hotové vzory ako INT, LOGLEVEL, WORD. Ich popis, ako aj ďalšie vzory nájdete tu tu

Teraz, keď prejdeme týmto filtrom, náš reťazec sa zmení na hash troch polí: message_id, message_type, message_text.

Zobrazia sa vo výstupnej sekcii.

Smerovanie správ do výstupnej sekcie pomocou príkazu if

Vo výstupnej časti, ako si pamätáme, sme sa chystali rozdeliť správy do dvoch prúdov. Niektoré - ktoré sú iNFO, sa vypíšu do konzoly a s chybami sa vypíšu do súboru.

Ako oddeľujeme tieto správy? Už stav problému naznačuje riešenie – veď už máme vyhradené pole message_type, ktoré môže nadobudnúť iba dve hodnoty: INFO a ERROR. Na tomto základe urobíme výber pomocou príkazu if.

if [message_type] == "ERROR" {
        # Здесь выводим в файл
       } else
     {
      # Здесь выводим в stdout
    }

Popis práce s poľami a operátormi nájdete v tejto časti oficiálny manuál.

Teraz k samotnému záveru.

Výstup konzoly, tu je všetko jasné - stdout {}

Ale výstup do súboru - nezabudnite, že toto všetko spúšťame z kontajnera a aby bol súbor, do ktorého zapisujeme výsledok, prístupný zvonku, musíme tento adresár otvoriť v docker-compose.yml.

Celkom:

Výstupná časť nášho súboru vyzerá takto:


output {
  if [message_type] == "ERROR" {
    file {
          path => "/usr/share/logstash/output/test.log"
          codec => line { format => "custom format: %{message}"}
         }
    } else
     {stdout {
             }
     }
  }

V docker-compose.yml pridáme ďalší zväzok pre výstup:

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro
      - ./logs:/usr/share/logstash/input
      - ./output:/usr/share/logstash/output

Spustíme to, vyskúšame a uvidíme rozdelenie na dva prúdy.

Zdroj: hab.com

Pridať komentár