Aplicarea practică a ELK. Configurarea logstash

Introducere

În timpul implementării unui alt sistem, ne-am confruntat cu nevoia de a procesa un număr mare de jurnale diferite. ELK a fost ales ca instrument. Acest articol va discuta despre experiența noastră în configurarea acestei stive.

Nu ne stabilim un obiectiv de a descrie toate capacitățile sale, dar dorim să ne concentrăm în mod special pe rezolvarea problemelor practice. Acest lucru se datorează faptului că, deși există o cantitate destul de mare de documentație și imagini gata făcute, există destul de multe capcane, cel puțin noi le-am găsit.

Am implementat stiva prin docker-compose. Mai mult, am avut un docker-compose.yml bine scris, care ne-a permis să ridicăm stiva aproape fără probleme. Și ni s-a părut că victoria era deja aproape, acum o vom ajusta puțin pentru a se potrivi nevoilor noastre și gata.

Din păcate, încercarea de a configura sistemul pentru a primi și procesa jurnalele din aplicația noastră nu a avut succes imediat. Prin urmare, am decis că merită să studiem fiecare componentă separat și apoi să revenim la conexiunile lor.

Deci, am început cu logstash.

Mediu, implementare, rulare Logstash într-un container

Pentru implementare folosim docker-compose; experimentele descrise aici au fost efectuate pe MacOS și Ubuntu 18.0.4.

Imaginea logstash care a fost înregistrată în docker-compose.yml original este docker.elastic.co/logstash/logstash:6.3.2

Îl vom folosi pentru experimente.

Am scris un docker-compose.yml separat pentru a rula logstash. Desigur, era posibil să lansăm imaginea din linia de comandă, dar rezolvăm o problemă specifică, în care rulăm totul de la docker-compose.

Pe scurt despre fișierele de configurare

După cum rezultă din descriere, logstash poate fi rulat fie pentru un canal, caz în care trebuie să treacă fișierul *.conf, fie pentru mai multe canale, caz în care trebuie să treacă fișierul pipelines.yml, care, la rândul său , va conecta la fișierele .conf pentru fiecare canal.
Am luat-o pe a doua cale. Ni s-a părut mai universal și mai scalabil. Prin urmare, am creat pipelines.yml și am creat un director pipelines în care vom pune fișiere .conf pentru fiecare canal.

În interiorul containerului există un alt fișier de configurare - logstash.yml. Nu îl atingem, îl folosim așa cum este.

Deci, structura noastră de directoare:

Aplicarea practică a ELK. Configurarea logstash

Pentru a primi date de intrare, deocamdată presupunem că acesta este tcp pe portul 5046, iar pentru ieșire vom folosi stdout.

Iată o configurație simplă pentru prima lansare. Pentru că sarcina inițială este de a lansa.

Deci, avem acest docker-compose.yml

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      	- elk
    ports:
      	- 5046:5046
    volumes:
      	- ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
	- ./config/pipelines:/usr/share/logstash/config/pipelines:ro

Ce vedem aici?

  1. Rețelele și volumele au fost preluate din docker-compose.yml original (cel în care este lansat întreaga stivă) și cred că nu afectează foarte mult imaginea de ansamblu aici.
  2. Creăm un serviciu (servicii) logstash din imaginea docker.elastic.co/logstash/logstash:6.3.2 și îi denumim logstash_one_channel.
  3. Redirecționăm portul 5046 în interiorul containerului, către același port intern.
  4. Mapăm fișierul nostru de configurare pipe ./config/pipelines.yml la fișierul /usr/share/logstash/config/pipelines.yml din interiorul containerului, de unde logstash îl va ridica și îl va face doar în citire, pentru orice eventualitate.
  5. Mapăm directorul ./config/pipelines, unde avem fișiere cu setări de canal, în directorul /usr/share/logstash/config/pipelines și îl facem, de asemenea, doar pentru citire.

Aplicarea practică a ELK. Configurarea logstash

Fișierul Pipelines.yml

- pipeline.id: HABR
  pipeline.workers: 1
  pipeline.batch.size: 1
  path.config: "./config/pipelines/habr_pipeline.conf"

Un canal cu identificatorul HABR și calea către fișierul său de configurare sunt descrise aici.

Și în sfârșit fișierul „./config/pipelines/habr_pipeline.conf”

input {
  tcp {
    port => "5046"
   }
  }
filter {
  mutate {
    add_field => [ "habra_field", "Hello Habr" ]
    }
  }
output {
  stdout {
      
    }
  }

Să nu intrăm în descrierea sa deocamdată, să încercăm să o rulăm:

docker-compose up

Ce vedem?

Containerul a pornit. Putem verifica funcționarea acestuia:

echo '13123123123123123123123213123213' | nc localhost 5046

Și vedem răspunsul în consola containerului:

Aplicarea practică a ELK. Configurarea logstash

Dar, în același timp, vedem și:

logstash_one_channel | [2019-04-29T11:28:59,790][EROARE][logstash.licensechecker.licensereader] Nu se pot prelua informațiile de licență de pe serverul de licență {:message=>„Elasticsearch inaccesibil: [http://elasticsearch:9200/][Manticore ::ResolutionFailure] elasticsearch",...

logstash_one_channel | [2019-04-29T11:28:59,894][INFO ][logstash.pipeline ] Conducta a început cu succes {:pipeline_id=>".monitoring-logstash", :thread=>"# "}

logstash_one_channel | [2019-04-29T11:28:59,988][INFO ][logstash.agent ] Conducte care rulează {:count=>2, :running_pipelines=>[:HABR, :".monitoring-logstash"], :non_running_pipelines=>[ ]}
logstash_one_channel | [2019-04-29T11:29:00,015][EROARE][logstash.inputs.metrics] X-Pack este instalat pe Logstash, dar nu pe Elasticsearch. Instalați X-Pack pe Elasticsearch pentru a utiliza funcția de monitorizare. Alte caracteristici pot fi disponibile.
logstash_one_channel | [2019-04-29T11:29:00,526][INFO ][logstash.agent ] Punctul final API Logstash a pornit cu succes {:port=>9600}
logstash_one_channel | [2019-04-29T11:29:04,478][INFO ][logstash.outputs.elasticsearch] Se execută verificarea stării de sănătate pentru a vedea dacă o conexiune Elasticsearch funcționează {:healthcheck_url=>http://elasticsearch:9200/, :path=> "/"}
logstash_one_channel | [2019-04-29T11:29:04,487][WARN ][logstash.outputs.elasticsearch] S-a încercat să reînvie conexiunea la instanța ES moartă, dar a primit o eroare. {:url =>“elastic căutare:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch Unreachable: [http://elasticsearch:9200/][Manticore::ResolutionFailure] elasticsearch"}
logstash_one_channel | [2019-04-29T11:29:04,704][INFO ][logstash.licensechecker.licensereader] Se execută verificarea stării de sănătate pentru a vedea dacă o conexiune Elasticsearch funcționează {:healthcheck_url=>http://elasticsearch:9200/, :path=> "/"}
logstash_one_channel | [2019-04-29T11:29:04,710][WARN ][logstash.licensechecker.licensereader] S-a încercat să reînvie conexiunea la instanța ES moartă, dar a primit o eroare. {:url =>“elastic căutare:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch Unreachable: [http://elasticsearch:9200/][Manticore::ResolutionFailure] elasticsearch"}

Și jurnalul nostru se târăște tot timpul.

Aici am evidențiat cu verde mesajul că conducta s-a lansat cu succes, cu roșu mesajul de eroare și cu galben mesajul despre o încercare de a contacta elastic căutare: 9200.
Acest lucru se întâmplă deoarece logstash.conf, inclus în imagine, conține o verificare a disponibilității elasticsearch. La urma urmei, logstash presupune că funcționează ca parte a stivei Elk, dar l-am separat.

Se poate lucra, dar nu este convenabil.

Soluția este să dezactivați această verificare prin variabila de mediu XPACK_MONITORING_ENABLED.

Să facem o modificare la docker-compose.yml și să-l rulăm din nou:

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro

Acum, totul este bine. Recipientul este pregătit pentru experimente.

Putem scrie din nou în următoarea consolă:

echo '13123123123123123123123213123213' | nc localhost 5046

Si vezi:

logstash_one_channel | {
logstash_one_channel |         "message" => "13123123123123123123123213123213",
logstash_one_channel |      "@timestamp" => 2019-04-29T11:43:44.582Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |            "host" => "gateway",
logstash_one_channel |            "port" => 49418
logstash_one_channel | }

Lucrul pe un singur canal

Așa că ne-am lansat. Acum vă puteți face timp pentru a configura logstash în sine. Să nu atingem fișierul pipelines.yml deocamdată, să vedem ce putem obține lucrând cu un singur canal.

Trebuie să spun că principiul general de lucru cu fișierul de configurare a canalului este bine descris în manualul oficial, aici aici
Dacă vrei să citești în rusă, l-am folosit pe acesta articol(dar sintaxa interogării de acolo este veche, trebuie să luăm în considerare acest lucru).

Să mergem secvenţial din secţiunea Intrare. Am văzut deja lucrări pe TCP. Ce altceva ar putea fi interesant aici?

Testați mesajele folosind bătăile inimii

Există o oportunitate atât de interesantă de a genera mesaje automate de testare.
Pentru a face acest lucru, trebuie să activați pluginul heartbean în secțiunea de introducere.

input {
  heartbeat {
    message => "HeartBeat!"
   }
  } 

Porniți-l, începeți să primiți o dată pe minut

logstash_one_channel | {
logstash_one_channel |      "@timestamp" => 2019-04-29T13:52:04.567Z,
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "HeartBeat!",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "host" => "a0667e5c57ec"
logstash_one_channel | }

Dacă vrem să primim mai des, trebuie să adăugăm parametrul interval.
Așa vom primi un mesaj la fiecare 10 secunde.

input {
  heartbeat {
    message => "HeartBeat!"
    interval => 10
   }
  }

Preluarea datelor dintr-un fișier

Am decis să ne uităm și la modul fișier. Dacă funcționează bine cu fișierul, atunci poate că nu este necesar niciun agent, cel puțin pentru uz local.

Conform descrierii, modul de operare ar trebui să fie similar cu tail -f, adică. citește linii noi sau, opțional, citește întregul fișier.

Deci, ce vrem să obținem:

  1. Dorim să primim linii care sunt atașate unui fișier jurnal.
  2. Dorim să primim date care sunt scrise în mai multe fișiere jurnal, putând în același timp să separă ceea ce este primit de unde.
  3. Dorim să ne asigurăm că atunci când logstash este repornit, nu primește din nou aceste date.
  4. Vrem să verificăm că dacă logstash este dezactivat și datele continuă să fie scrise în fișiere, atunci când îl rulăm, vom primi aceste date.

Pentru a efectua experimentul, să adăugăm o altă linie la docker-compose.yml, deschizând directorul în care am pus fișierele.

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro
      - ./logs:/usr/share/logstash/input

Și modificați secțiunea de intrare în habr_pipeline.conf

input {
  file {
    path => "/usr/share/logstash/input/*.log"
   }
  }

Să începem:

docker-compose up

Pentru a crea și scrie fișiere jurnal vom folosi comanda:


echo '1' >> logs/number1.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:28:53.876Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log"
logstash_one_channel | }

Da, funcționează!

În același timp, vedem că am adăugat automat câmpul de cale. Aceasta înseamnă că, în viitor, vom putea filtra înregistrările după aceasta.

Hai sa incercam din nou:

echo '2' >> logs/number1.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:28:59.906Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "2",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log"
logstash_one_channel | }

Și acum la alt fișier:

 echo '1' >> logs/number2.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:29:26.061Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number2.log"
logstash_one_channel | }

Grozav! Fișierul a fost preluat, calea a fost specificată corect, totul este în regulă.

Opriți logstash și începeți din nou. Să așteptăm. Tăcere. Acestea. Nu mai primim aceste înregistrări.

Și acum cel mai îndrăzneț experiment.

Instalați logstash și executați:

echo '3' >> logs/number2.log
echo '4' >> logs/number1.log

Rulați logstash din nou și vedeți:

logstash_one_channel | {
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "3",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number2.log",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:48:50.589Z
logstash_one_channel | }
logstash_one_channel | {
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "4",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:48:50.856Z
logstash_one_channel | }

Ura! Totul a fost ridicat.

Dar trebuie să vă avertizăm despre următoarele. Dacă containerul cu logstash este șters (docker stop logstash_one_channel && docker rm logstash_one_channel), atunci nimic nu va fi ridicat. Poziția fișierului până la care a fost citit a fost stocată în interiorul containerului. Dacă îl rulați de la zero, va accepta doar linii noi.

Citirea fișierelor existente

Să presupunem că lansăm logstash pentru prima dată, dar avem deja jurnalele și am dori să le procesăm.
Dacă rulăm logstash cu secțiunea de intrare pe care am folosit-o mai sus, nu vom obține nimic. Numai liniile noi vor fi procesate de logstash.

Pentru ca liniile din fișierele existente să fie extrase, ar trebui să adăugați o linie suplimentară la secțiunea de introducere:

input {
  file {
    start_position => "beginning"
    path => "/usr/share/logstash/input/*.log"
   }
  }

Mai mult, există o nuanță: aceasta afectează doar fișierele noi pe care logstash nu le-a văzut încă. Pentru aceleași fișiere care se aflau deja în câmpul vizual al logstash, acesta și-a amintit deja dimensiunea lor și acum va primi doar intrări noi în ele.

Să ne oprim aici și să studiem secțiunea de intrare. Există încă multe opțiuni, dar asta este suficient pentru noi pentru experimente ulterioare deocamdată.

Rutarea și transformarea datelor

Să încercăm să rezolvăm următoarea problemă, să presupunem că avem mesaje de la un canal, unele dintre ele sunt informaționale, iar altele sunt mesaje de eroare. Ele diferă prin etichetă. Unele sunt INFO, altele sunt EROARE.

Trebuie să le separăm la ieșire. Acestea. Scriem mesaje informative pe un canal, iar mesaje de eroare pe altul.

Pentru a face acest lucru, treceți de la secțiunea de intrare la filtru și ieșire.

Folosind secțiunea de filtrare, vom analiza mesajul primit, obținând din acesta un hash (perechi cheie-valoare), cu care putem deja să lucrăm, i.e. demontați în funcție de condiții. Și în secțiunea de ieșire, vom selecta mesaje și le vom trimite pe fiecare pe propriul canal.

Analizând un mesaj cu grok

Pentru a analiza șirurile de text și a obține un set de câmpuri din ele, există un plugin special în secțiunea de filtrare - grok.

Fără a-mi stabili scopul de a oferi o descriere detaliată a acesteia aici (pentru aceasta mă refer la documentație oficială), voi da exemplul meu simplu.

Pentru a face acest lucru, trebuie să decideți asupra formatului șirurilor de intrare. Le am asa:

1 mesaj INFO1
2 Mesaj de EROARE2

Acestea. Mai întâi vine identificatorul, apoi INFO/EROARE, apoi un cuvânt fără spații.
Nu este dificil, dar este suficient pentru a înțelege principiul de funcționare.

Deci, în secțiunea de filtrare a pluginului grok, trebuie să definim un model pentru analizarea șirurilor noastre.

Va arata asa:

filter {
  grok {
    match => { "message" => ["%{INT:message_id} %{LOGLEVEL:message_type} %{WORD:message_text}"] }
   }
  } 

În esență, este o expresie regulată. Sunt utilizate modele gata făcute, cum ar fi INT, LOGLEVEL, WORD. Descrierea lor, precum și alte modele, pot fi găsite aici aici

Acum, trecând prin acest filtru, șirul nostru se va transforma într-un hash de trei câmpuri: message_id, message_type, message_text.

Acestea vor fi afișate în secțiunea de ieșire.

Dirijarea mesajelor către secțiunea de ieșire folosind comanda if

În secțiunea de ieșire, după cum ne amintim, urma să împărțim mesajele în două fluxuri. Unele - care sunt iNFO, vor fi scoase pe consolă, iar cu erori, vom scoate într-un fișier.

Cum separăm aceste mesaje? Starea problemei sugerează deja o soluție - la urma urmei, avem deja un câmp dedicat tip_mesaj, care poate lua doar două valori: INFO și ERROR. Pe această bază vom face o alegere folosind declarația if.

if [message_type] == "ERROR" {
        # Здесь выводим в файл
       } else
     {
      # Здесь выводим в stdout
    }

O descriere a lucrului cu câmpuri și operatori poate fi găsită în această secțiune manualul oficial.

Acum, despre concluzia propriu-zisă.

Ieșire din consolă, totul este clar aici - stdout {}

Dar ieșirea într-un fișier - rețineți că rulăm toate acestea dintr-un container și pentru ca fișierul în care scriem rezultatul să fie accesibil din exterior, trebuie să deschidem acest director în docker-compose.yml.

Total:

Secțiunea de ieșire a fișierului nostru arată astfel:


output {
  if [message_type] == "ERROR" {
    file {
          path => "/usr/share/logstash/output/test.log"
          codec => line { format => "custom format: %{message}"}
         }
    } else
     {stdout {
             }
     }
  }

În docker-compose.yml adăugăm un alt volum pentru ieșire:

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro
      - ./logs:/usr/share/logstash/input
      - ./output:/usr/share/logstash/output

Îl lansăm, îl încercăm și vedem o împărțire în două fluxuri.

Sursa: www.habr.com

Adauga un comentariu