Applicazione pratica di ELK. Configurazione di logstash

Introduzione

Durante l'implementazione di un altro sistema, ci siamo trovati di fronte alla necessità di elaborare un gran numero di log diversi. ELK è stato scelto come strumento. Questo articolo discuterà la nostra esperienza nella configurazione di questo stack.

Non ci poniamo un obiettivo per descrivere tutte le sue capacità, ma vogliamo concentrarci specificamente sulla risoluzione di problemi pratici. Ciò è dovuto al fatto che, nonostante la quantità di documentazione e immagini già pronte, ci sono molte insidie, almeno le abbiamo trovate.

Abbiamo distribuito lo stack tramite docker-compose. Inoltre, avevamo un docker-compose.yml ben scritto, che ci ha permesso di aumentare lo stack quasi senza problemi. E ci sembrava che la vittoria fosse già vicina, ora la modifichiamo un po’ per adattarla alle nostre esigenze e basta.

Sfortunatamente, il tentativo di configurare il sistema per ricevere ed elaborare i log dalla nostra applicazione non ha avuto successo immediatamente. Pertanto, abbiamo deciso che valeva la pena studiare ciascun componente separatamente e poi tornare alle loro connessioni.

Quindi, abbiamo iniziato con logstash.

Ambiente, distribuzione, esecuzione di Logstash in un contenitore

Per il deploy utilizziamo docker-compose; gli esperimenti qui descritti sono stati condotti su MacOS e Ubuntu 18.0.4.

L'immagine logstash registrata nel nostro docker-compose.yml originale è docker.elastic.co/logstash/logstash:6.3.2

Lo useremo per esperimenti.

Abbiamo scritto un docker-compose.yml separato per eseguire logstash. Ovviamente era possibile avviare l'immagine dalla riga di comando, ma stavamo risolvendo un problema specifico, ovvero eseguivamo tutto da docker-compose.

Brevemente sui file di configurazione

Come segue dalla descrizione, logstash può essere eseguito per un canale, nel qual caso deve passare il file *.conf, o per più canali, nel qual caso deve passare il file pipelines.yml, che, a sua volta , collegherà ai file .conf per ciascun canale.
Abbiamo preso la seconda strada. Ci è sembrato più universale e scalabile. Pertanto, abbiamo creato pipelines.yml e creato una directory pipeline in cui inseriremo i file .conf per ciascun canale.

All'interno del contenitore c'è un altro file di configurazione: logstash.yml. Non lo tocchiamo, lo usiamo così com'è.

Quindi, la nostra struttura di directory:

Applicazione pratica di ELK. Configurazione di logstash

Per ricevere i dati in input, per ora assumiamo che questo sia tcp sulla porta 5046, e per l'output utilizzeremo stdout.

Ecco una semplice configurazione per il primo avvio. Perché il compito iniziale è lanciare.

Quindi, abbiamo questo docker-compose.yml

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      	- elk
    ports:
      	- 5046:5046
    volumes:
      	- ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
	- ./config/pipelines:/usr/share/logstash/config/pipelines:ro

Cosa vediamo qui?

  1. Reti e volumi sono stati presi dal docker-compose.yml originale (quello in cui viene lanciato l'intero stack) e penso che qui non influenzino molto il quadro generale.
  2. Creiamo uno o più servizi logstash dall'immagine docker.elastic.co/logstash/logstash:6.3.2 e lo denominiamo logstash_one_channel.
  3. Inoltriamo la porta 5046 all'interno del container, allo stesso porto interno.
  4. Mappiamo il nostro file di configurazione della pipe ./config/pipelines.yml al file /usr/share/logstash/config/pipelines.yml all'interno del contenitore, dove logstash lo preleverà e lo renderà di sola lettura, per ogni evenienza.
  5. Mappiamo la directory ./config/pipelines, dove abbiamo i file con le impostazioni del canale, nella directory /usr/share/logstash/config/pipelines e la rendiamo anche di sola lettura.

Applicazione pratica di ELK. Configurazione di logstash

File Pipelines.yml

- pipeline.id: HABR
  pipeline.workers: 1
  pipeline.batch.size: 1
  path.config: "./config/pipelines/habr_pipeline.conf"

Qui vengono descritti un canale con l'identificatore HABR e il percorso del relativo file di configurazione.

E infine il file “./config/pipelines/habr_pipeline.conf”

input {
  tcp {
    port => "5046"
   }
  }
filter {
  mutate {
    add_field => [ "habra_field", "Hello Habr" ]
    }
  }
output {
  stdout {
      
    }
  }

Non entriamo nella sua descrizione per ora, proviamo a eseguirlo:

docker-compose up

Cosa vediamo?

Il contenitore è iniziato. Possiamo verificarne il funzionamento:

echo '13123123123123123123123213123213' | nc localhost 5046

E vediamo la risposta nella console del contenitore:

Applicazione pratica di ELK. Configurazione di logstash

Ma allo stesso tempo vediamo anche:

logstash_one_channel | [2019-04-29T11:28:59,790][ERRORE][logstash.licensechecker.licensereader] Impossibile recuperare le informazioni sulla licenza dal server delle licenze {:message=>“Elasticsearch non raggiungibile: [http://elasticsearch:9200/][Manticore ::RisoluzioneFailure] elasticsearch", ...

logstash_one_channel | [2019-04-29T11:28:59,894][INFO ][logstash.pipeline ] La pipeline è stata avviata correttamente {:pipeline_id=>".monitoring-logstash", :thread=>"# "}

logstash_one_channel | [2019-04-29T11:28:59,988][INFO ][logstash.agent ] Pipeline che eseguono {:count=>2, :running_pipelines=>[:HABR, :.monitoring-logstash"], :non_running_pipelines=>[ ]}
logstash_one_channel | [2019-04-29T11:29:00,015] [ERRORE] [logstash.inputs.metrics] X-Pack è installato su Logstash ma non su Elasticsearch. Installa X-Pack su Elasticsearch per utilizzare la funzionalità di monitoraggio. Potrebbero essere disponibili altre funzionalità.
logstash_one_channel | [2019-04-29T11:29:00,526][INFO ][logstash.agent] Endpoint API Logstash avviato correttamente {:port=>9600}
logstash_one_channel | [2019-04-29T11:29:04,478][INFO ][logstash.outputs.elasticsearch] Esecuzione del controllo dello stato per verificare se una connessione Elasticsearch funziona {:healthcheck_url=>http://elasticsearch:9200/, :path=> "/"}
logstash_one_channel | [2019-04-29T11:29:04,487][WARN] [logstash.outputs.elasticsearch] Tentativo di resuscitare la connessione all'istanza ES inattiva, ma si è verificato un errore. {:url=>"elasticsearch:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch Irraggiungibile: [http://elasticsearch:9200/][Manticore::ResolutionFailure] ricercaelastica"}
logstash_one_channel | [2019-04-29T11:29:04,704][INFO ][logstash.licensechecker.licensereader] Esecuzione del controllo dello stato per verificare se una connessione Elasticsearch funziona {:healthcheck_url=>http://elasticsearch:9200/, :path=> "/"}
logstash_one_channel | [2019-04-29T11:29:04,710] [WARN] [logstash.licensechecker.licensereader] Tentativo di resuscitare la connessione all'istanza ES inattiva, ma si è verificato un errore. {:url=>"elasticsearch:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch Irraggiungibile: [http://elasticsearch:9200/][Manticore::ResolutionFailure] ricercaelastica"}

E il nostro registro si insinua continuamente.

Qui ho evidenziato in verde il messaggio che la pipeline è stata avviata con successo, in rosso il messaggio di errore e in giallo il messaggio relativo a un tentativo di contatto elasticsearch: 9200.
Ciò accade perché logstash.conf, incluso nell'immagine, contiene un controllo sulla disponibilità di elasticsearch. Dopotutto, logstash presuppone che funzioni come parte dello stack Elk, ma lo abbiamo separato.

È possibile lavorare, ma non è conveniente.

La soluzione è disabilitare questo controllo tramite la variabile d'ambiente XPACK_MONITORING_ENABLED.

Apportiamo una modifica a docker-compose.yml ed eseguiamolo di nuovo:

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro

Adesso va tutto bene. Il contenitore è pronto per gli esperimenti.

Possiamo digitare nuovamente nella console successiva:

echo '13123123123123123123123213123213' | nc localhost 5046

E vedi:

logstash_one_channel | {
logstash_one_channel |         "message" => "13123123123123123123123213123213",
logstash_one_channel |      "@timestamp" => 2019-04-29T11:43:44.582Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |            "host" => "gateway",
logstash_one_channel |            "port" => 49418
logstash_one_channel | }

Lavorare all'interno di un canale

Quindi abbiamo lanciato. Ora puoi effettivamente prenderti il ​​tempo per configurare logstash stesso. Non tocchiamo il file pipelines.yml per ora, vediamo cosa possiamo ottenere lavorando con un canale.

Devo dire che il principio generale di lavoro con il file di configurazione del canale è ben descritto nel manuale ufficiale, qui qui
Se vuoi leggere in russo, abbiamo usato questo un articolo(ma la sintassi della query è vecchia, dobbiamo tenerne conto).

Andiamo in sequenza dalla sezione Input. Abbiamo già visto il lavoro su tcp. Cos'altro potrebbe essere interessante qui?

Testare i messaggi utilizzando il battito cardiaco

Esiste un'opportunità davvero interessante per generare messaggi di test automatici.
Per fare ciò, è necessario abilitare il plugin heartbean nella sezione di input.

input {
  heartbeat {
    message => "HeartBeat!"
   }
  } 

Accendilo, inizia a ricevere una volta al minuto

logstash_one_channel | {
logstash_one_channel |      "@timestamp" => 2019-04-29T13:52:04.567Z,
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "HeartBeat!",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "host" => "a0667e5c57ec"
logstash_one_channel | }

Se vogliamo ricevere più spesso dobbiamo aggiungere il parametro intervallo.
In questo modo riceveremo un messaggio ogni 10 secondi.

input {
  heartbeat {
    message => "HeartBeat!"
    interval => 10
   }
  }

Recupero dei dati da un file

Abbiamo anche deciso di esaminare la modalità file. Se funziona correttamente con il file, forse non è necessario alcun agente, almeno per l'uso locale.

Secondo la descrizione, la modalità operativa dovrebbe essere simile a tail -f, cioè legge nuove righe o, come opzione, legge l'intero file.

Quindi cosa vogliamo ottenere:

  1. Vogliamo ricevere righe che vengono aggiunte a un file di registro.
  2. Vogliamo ricevere i dati scritti in diversi file di registro, pur essendo in grado di separare ciò che viene ricevuto da dove.
  3. Vogliamo assicurarci che quando logstash viene riavviato, non riceva nuovamente questi dati.
  4. Vogliamo verificare che se logstash è disattivato e i dati continuano a essere scritti sui file, quando lo eseguiamo riceveremo questi dati.

Per condurre l'esperimento, aggiungiamo un'altra riga a docker-compose.yml, aprendo la directory in cui inseriamo i file.

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro
      - ./logs:/usr/share/logstash/input

E cambia la sezione di input in habr_pipeline.conf

input {
  file {
    path => "/usr/share/logstash/input/*.log"
   }
  }

Iniziamo:

docker-compose up

Per creare e scrivere file di log utilizzeremo il comando:


echo '1' >> logs/number1.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:28:53.876Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log"
logstash_one_channel | }

Sì, funziona!

Allo stesso tempo, vediamo che abbiamo aggiunto automaticamente il campo percorso. Ciò significa che in futuro saremo in grado di filtrare i record in base ad esso.

Proviamo di nuovo:

echo '2' >> logs/number1.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:28:59.906Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "2",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log"
logstash_one_channel | }

E ora passiamo ad un altro file:

 echo '1' >> logs/number2.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:29:26.061Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number2.log"
logstash_one_channel | }

Grande! Il file è stato prelevato, il percorso è stato specificato correttamente, va tutto bene.

Arresta logstash e ricomincia. Aspettiamo. Silenzio. Quelli. Non riceviamo più questi record.

E ora l'esperimento più audace.

Installa logstash ed esegui:

echo '3' >> logs/number2.log
echo '4' >> logs/number1.log

Esegui nuovamente logstash e vedi:

logstash_one_channel | {
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "3",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number2.log",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:48:50.589Z
logstash_one_channel | }
logstash_one_channel | {
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "4",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:48:50.856Z
logstash_one_channel | }

Evviva! Tutto è stato raccolto.

Ma dobbiamo avvertirti di quanto segue. Se il contenitore logstash viene eliminato (docker stop logstash_one_channel && docker rm logstash_one_channel), non verrà raccolto nulla. La posizione del file fino al quale è stato letto veniva memorizzata all'interno del contenitore. Se lo esegui da zero, accetterà solo nuove righe.

Lettura di file esistenti

Diciamo che stiamo lanciando logstash per la prima volta, ma abbiamo già dei log e vorremmo elaborarli.
Se eseguiamo logstash con la sezione di input che abbiamo usato sopra, non otterremo nulla. Solo le nuove righe verranno elaborate da logstash.

Per poter estrarre le righe dai file esistenti, è necessario aggiungere una riga aggiuntiva alla sezione di input:

input {
  file {
    start_position => "beginning"
    path => "/usr/share/logstash/input/*.log"
   }
  }

Inoltre, c'è una sfumatura: riguarda solo i nuovi file che logstash non ha ancora visto. Per gli stessi file che erano già nel campo visivo di logstash, ha già ricordato la loro dimensione e ora accetterà solo nuove voci al loro interno.

Fermiamoci qui e studiamo la sezione di input. Ci sono ancora molte opzioni, ma per ora ci bastano per ulteriori esperimenti.

Routing e trasformazione dei dati

Proviamo a risolvere il seguente problema, supponiamo di avere messaggi da un canale, alcuni di essi sono informativi e altri sono messaggi di errore. Differiscono per tag. Alcuni sono INFO, altri sono ERRORE.

Dobbiamo separarli all'uscita. Quelli. Scriviamo messaggi di informazione in un canale e messaggi di errore in un altro.

Per fare ciò, spostati dalla sezione input a filter e output.

Utilizzando la sezione filtro, analizzeremo il messaggio in arrivo, ottenendo da esso un hash (coppie chiave-valore), con cui possiamo già lavorare, ad es. smontare in base alle condizioni. E nella sezione di output selezioneremo i messaggi e invieremo ciascuno al proprio canale.

Analisi di un messaggio con grok

Per analizzare stringhe di testo e ottenere da esse una serie di campi, c'è un plugin speciale nella sezione filtri: grok.

Senza pormi l’obiettivo di darne qui una descrizione dettagliata (per questo rimando a documentazione ufficiale), fornirò il mio semplice esempio.

Per fare ciò, devi decidere il formato delle stringhe di input. Li ho così:

1 messaggio INFORMAZIONE1
2 messaggio di ERRORE2

Quelli. Viene prima l'identificatore, poi INFO/ERROR, quindi una parola senza spazi.
Non è difficile, ma è sufficiente per comprendere il principio di funzionamento.

Quindi, nella sezione filtro del plugin grok, dobbiamo definire un modello per analizzare le nostre stringhe.

Apparirà così:

filter {
  grok {
    match => { "message" => ["%{INT:message_id} %{LOGLEVEL:message_type} %{WORD:message_text}"] }
   }
  } 

Essenzialmente è un'espressione regolare. Vengono utilizzati modelli già pronti, come INT, LOGLEVEL, WORD. La loro descrizione, così come altri modelli, può essere trovata qui qui

Ora, passando attraverso questo filtro, la nostra stringa si trasformerà in un hash di tre campi: message_id, message_type, message_text.

Verranno visualizzati nella sezione di output.

Instradare i messaggi alla sezione di output utilizzando il comando if

Nella sezione di output, come ricordiamo, avremmo diviso i messaggi in due flussi. Alcuni, che sono iNFO, verranno inviati alla console e, in caso di errori, verranno inviati a un file.

Come separiamo questi messaggi? La condizione del problema suggerisce già una soluzione - dopo tutto, abbiamo già un campo message_type dedicato, che può assumere solo due valori: INFO ed ERROR. È su questa base che faremo una scelta utilizzando l'istruzione if.

if [message_type] == "ERROR" {
        # Здесь выводим в файл
       } else
     {
      # Здесь выводим в stdout
    }

In questa sezione è possibile trovare una descrizione dell'utilizzo dei campi e degli operatori manuale ufficiale.

Ora, riguardo alla conclusione stessa.

Output della console, qui è tutto chiaro: stdout {}

Ma l'output in un file: ricorda che stiamo eseguendo tutto questo da un contenitore e affinché il file in cui scriviamo il risultato sia accessibile dall'esterno, dobbiamo aprire questa directory in docker-compose.yml.

Totale:

La sezione di output del nostro file è simile alla seguente:


output {
  if [message_type] == "ERROR" {
    file {
          path => "/usr/share/logstash/output/test.log"
          codec => line { format => "custom format: %{message}"}
         }
    } else
     {stdout {
             }
     }
  }

In docker-compose.yml aggiungiamo un altro volume per l'output:

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro
      - ./logs:/usr/share/logstash/input
      - ./output:/usr/share/logstash/output

Lo lanciamo, lo proviamo e vediamo una divisione in due flussi.

Fonte: habr.com

Aggiungi un commento