Praktische Anwendung von ELK. Logstash einrichten

Einführung

Bei der Bereitstellung eines anderen Systems standen wir vor der Notwendigkeit, eine große Anzahl unterschiedlicher Protokolle zu verarbeiten. Als Instrument wurde ELK gewählt. In diesem Artikel werden wir über unsere Erfahrungen beim Einrichten dieses Stapels sprechen.

Unser Ziel ist es nicht, alle Möglichkeiten zu beschreiben, sondern wir wollen uns auf die Lösung praktischer Probleme konzentrieren. Das liegt daran, dass es bei einer ausreichend großen Menge an Dokumentation und vorgefertigten Bildern viele Fallstricke gibt, zumindest haben wir sie gefunden.

Wir haben den Stack über Docker-Compose bereitgestellt. Darüber hinaus verfügten wir über eine gut geschriebene docker-compose.yml, die es uns ermöglichte, den Stack nahezu problemlos zu erhöhen. Und es kam uns so vor, als wäre der Sieg schon nahe, jetzt werden wir ihn noch ein wenig verdrehen, um ihn an unsere Bedürfnisse anzupassen, und das war's.

Leider war ein Versuch, das System auf den Empfang und die Verarbeitung von Protokollen unserer Anwendung abzustimmen, nicht auf Anhieb erfolgreich. Daher haben wir beschlossen, dass es sich lohnt, jede Komponente einzeln zu untersuchen und dann auf ihre Zusammenhänge zurückzukommen.

Beginnen wir also mit Logstash.

Umgebung, Bereitstellung, Ausführen von Logstash in einem Container

Zur Bereitstellung nutzen wir Docker-Compose, die hier beschriebenen Experimente wurden unter MacOS und Ubuntu 18.0.4 durchgeführt.

Das Logstash-Image, das wir in unserer ursprünglichen docker-compose.yml hatten, ist docker.elastic.co/logstash/logstash:6.3.2

Wir werden es für Experimente verwenden.

Um logstash auszuführen, haben wir eine separate docker-compose.yml geschrieben. Natürlich war es möglich, das Image über die Befehlszeile zu starten, aber schließlich haben wir eine bestimmte Aufgabe gelöst, bei der alles von Docker-Compose für uns gestartet wird.

Kurz zu Konfigurationsdateien

Wie aus der Beschreibung hervorgeht, kann logstash für einen Kanal ausgeführt werden. In diesem Fall muss die *.conf-Datei übertragen werden, oder für mehrere Kanäle. In diesem Fall muss die Datei „pipelines.yml“ übertragen werden, die wiederum die Datei „pipelines.yml“ übertragen muss , verweist auf die .conf-Dateien für jeden Kanal.
Wir haben den zweiten Weg gewählt. Es erschien uns vielseitiger und skalierbarer. Deshalb haben wir „pipelines.yml“ erstellt und ein Pipelines-Verzeichnis erstellt, in dem wir .conf-Dateien für jeden Kanal ablegen.

Im Container befindet sich eine weitere Konfigurationsdatei – logstash.yml. Wir berühren es nicht, wir verwenden es so wie es ist.

Unsere Verzeichnisstruktur ist also:

Praktische Anwendung von ELK. Logstash einrichten

Wir gehen vorerst davon aus, dass es sich um TCP auf Port 5046 handelt, um Eingabedaten zu empfangen, und verwenden für die Ausgabe stdout.

Hier ist eine so einfache Konfiguration für den ersten Start. Denn die erste Aufgabe besteht darin, zu starten.

Wir haben also diese docker-compose.yml

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      	- elk
    ports:
      	- 5046:5046
    volumes:
      	- ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
	- ./config/pipelines:/usr/share/logstash/config/pipelines:ro

Was sehen wir hier?

  1. Netzwerke und Volumes wurden aus der ursprünglichen docker-compose.yml (der Datei, in der der gesamte Stack gestartet wird) übernommen und ich denke, dass sie hier keinen großen Einfluss auf das Gesamtbild haben.
  2. Wir erstellen einen Dienst-Logstash aus dem Bild docker.elastic.co/logstash/logstash:6.3.2 und geben ihm den Namen logstash_one_channel.
  3. Wir leiten Port 5046 im Container an denselben internen Port weiter.
  4. Wir ordnen unsere Pipe-Konfigurationsdatei ./config/pipelines.yml der Datei /usr/share/logstash/config/pipelines.yml im Container zu, wo Logstash sie aufnimmt und für alle Fälle schreibgeschützt macht.
  5. Wir ordnen das Verzeichnis ./config/pipelines, in dem wir die Pipe-Konfigurationsdateien haben, dem Verzeichnis /usr/share/logstash/config/pipelines zu und machen es außerdem schreibgeschützt.

Praktische Anwendung von ELK. Logstash einrichten

piping.yml-Datei

- pipeline.id: HABR
  pipeline.workers: 1
  pipeline.batch.size: 1
  path.config: "./config/pipelines/habr_pipeline.conf"

Es beschreibt einen Kanal mit der HABR-Kennung und dem Pfad zu seiner Konfigurationsdatei.

Und schließlich die Datei „./config/pipelines/habr_pipeline.conf“

input {
  tcp {
    port => "5046"
   }
  }
filter {
  mutate {
    add_field => [ "habra_field", "Hello Habr" ]
    }
  }
output {
  stdout {
      
    }
  }

Wir werden vorerst nicht auf die Beschreibung eingehen, sondern versuchen Folgendes auszuführen:

docker-compose up

Was sehen wir?

Der Container wurde gestartet. Wir können seine Arbeit überprüfen:

echo '13123123123123123123123213123213' | nc localhost 5046

Und wir sehen die Antwort in der Containerkonsole:

Praktische Anwendung von ELK. Logstash einrichten

Aber gleichzeitig sehen wir auch:

logstash_one_channel | [2019T04:29:11][FEHLER][logstash.licensechecker.licensereader] Lizenzinformationen können nicht vom Lizenzserver abgerufen werden {:message=>"Elasticsearch Unreachable: [http://elasticsearch:28/][Manticore ::ResolutionFailure]elasticsearch", ...

logstash_one_channel | [2019T04:29:11][INFO][logstash.pipeline] Pipeline erfolgreich gestartet {:pipeline_id=>".monitoring-logstash", :thread=>"# »}

logstash_one_channel | [2019T04:29:11][INFO][logstash.agent] Pipelines, die {:count=>28, :running_pipelines=>[:HABR, :".monitoring-logstash"], :non_running_pipelines=>[ ]}
logstash_one_channel | [2019T04:29:11][FEHLER][logstash.inputs.metrics] X-Pack ist auf Logstash installiert, aber nicht auf Elasticsearch. Bitte installieren Sie X-Pack auf Elasticsearch, um die Überwachungsfunktion zu nutzen. Möglicherweise sind weitere Funktionen verfügbar.
logstash_one_channel | [2019T04:29:11][INFO][logstash.agent] Logstash API-Endpunkt {:port=>29} erfolgreich gestartet
logstash_one_channel | [2019T04:29:11][INFO][logstash.outputs.elasticsearch] Ausführen einer Gesundheitsprüfung, um zu sehen, ob eine Elasticsearch-Verbindung funktioniert {:healthcheck_url=>http://elasticsearch:29/, :path=> "/"}
logstash_one_channel | [2019T04:29:11][WARNUNG][logstash.outputs.elasticsearch] Es wurde versucht, die Verbindung zur toten ES-Instanz wiederherzustellen, es wurde jedoch ein Fehler angezeigt. {:url=>"elasticsearch:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch Unreachable: [http://elasticsearch:9200/][Manticore::ResolutionFailure] elastische Suche"}
logstash_one_channel | [2019-04-29T11:29:04,704][INFO][logstash.licensechecker.licensereader] Ausführen einer Gesundheitsprüfung, um zu sehen, ob eine Elasticsearch-Verbindung funktioniert {:healthcheck_url=>http://elasticsearch:9200/, :path=> "/"}
logstash_one_channel | [2019T04:29:11][WARNUNG][logstash.licensechecker.licensereader] Es wurde versucht, die Verbindung zur toten ES-Instanz wiederherzustellen, es wurde jedoch ein Fehler angezeigt. {:url=>"elasticsearch:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch Unreachable: [http://elasticsearch:9200/][Manticore::ResolutionFailure] elastische Suche"}

Und unser Protokoll kriecht ständig nach oben.

Hier habe ich die Meldung, dass die Pipeline erfolgreich gestartet wurde, grün hervorgehoben, die Fehlermeldung rot und die Meldung über den Kontaktversuch gelb elasticsearch: 9200.
Dies liegt daran, dass in der im Image enthaltenen logstash.conf eine Prüfung auf Verfügbarkeit von Elasticsearch erfolgt. Schließlich geht Logstash davon aus, dass es als Teil des Elk-Stacks funktioniert, und wir haben es getrennt.

Sie können arbeiten, aber es ist nicht bequem.

Die Lösung besteht darin, diese Prüfung über die Umgebungsvariable XPACK_MONITORING_ENABLED zu deaktivieren.

Nehmen wir eine Änderung an docker-compose.yml vor und führen es erneut aus:

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro

Jetzt ist alles in Ordnung. Der Behälter ist bereit für Experimente.

Wir können noch einmal in die nebenstehende Konsole eingeben:

echo '13123123123123123123123213123213' | nc localhost 5046

Und sehen:

logstash_one_channel | {
logstash_one_channel |         "message" => "13123123123123123123123213123213",
logstash_one_channel |      "@timestamp" => 2019-04-29T11:43:44.582Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |            "host" => "gateway",
logstash_one_channel |            "port" => 49418
logstash_one_channel | }

Arbeiten Sie innerhalb eines Kanals

Also fingen wir an. Jetzt können Sie sich tatsächlich die Zeit nehmen, Logstash direkt zu konfigurieren. Lassen Sie uns die Datei „pipelines.yml“ vorerst nicht berühren, sondern sehen, was wir durch die Arbeit mit einem Kanal erreichen können.

Ich muss sagen, dass das allgemeine Prinzip der Arbeit mit der Kanalkonfigurationsdatei im offiziellen Handbuch hier gut beschrieben ist hier
Wenn Sie auf Russisch lesen möchten, haben wir dieses verwendet ein Artikel(aber die Abfragesyntax ist dort alt, das müssen Sie berücksichtigen).

Gehen wir nacheinander vom Abschnitt „Eingabe“ aus. Wir haben die Arbeit an TCP bereits gesehen. Was kann hier noch interessant sein?

Testen Sie Nachrichten mit Heartbeat

Es gibt so eine interessante Möglichkeit, automatische Testnachrichten zu generieren.
Dazu müssen Sie das Heartbean-Plugin in den Eingabebereich einbinden.

input {
  heartbeat {
    message => "HeartBeat!"
   }
  } 

Wir schalten es ein und beginnen einmal pro Minute mit dem Empfang

logstash_one_channel | {
logstash_one_channel |      "@timestamp" => 2019-04-29T13:52:04.567Z,
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "HeartBeat!",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "host" => "a0667e5c57ec"
logstash_one_channel | }

Wenn wir häufiger empfangen möchten, müssen wir den Intervallparameter hinzufügen.
Auf diese Weise erhalten wir alle 10 Sekunden eine Nachricht.

input {
  heartbeat {
    message => "HeartBeat!"
    interval => 10
   }
  }

Daten aus einer Datei abrufen

Wir haben uns auch für den Dateimodus entschieden. Wenn es mit der Datei einwandfrei funktioniert, ist möglicherweise kein Agent erforderlich, zumindest für die lokale Verwendung.

Die Funktionsweise soll laut Beschreibung ähnlich wie tail -f sein, also liest Zeilenumbrüche oder optional die gesamte Datei.

Was wir also erreichen wollen:

  1. Wir möchten Zeilen erhalten, die an eine Protokolldatei angehängt werden.
  2. Wir möchten Daten empfangen, die in mehrere Protokolldateien geschrieben werden, und dabei unterscheiden können, was von wo empfangen wurde.
  3. Wir möchten sicherstellen, dass Logstash diese Daten beim Neustart nicht erneut erhält.
  4. Wir möchten überprüfen, ob wir diese Daten erhalten, wenn Logstash deaktiviert ist und weiterhin Daten in Dateien geschrieben werden, wenn wir es ausführen.

Um das Experiment durchzuführen, fügen wir eine weitere Zeile zu docker-compose.yml hinzu und öffnen das Verzeichnis, in dem wir die Dateien ablegen.

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro
      - ./logs:/usr/share/logstash/input

Und ändern Sie den Eingabeabschnitt in habr_pipeline.conf

input {
  file {
    path => "/usr/share/logstash/input/*.log"
   }
  }

Wir starten:

docker-compose up

Um Protokolldateien zu erstellen und zu schreiben, verwenden wir den Befehl:


echo '1' >> logs/number1.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:28:53.876Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log"
logstash_one_channel | }

Ja, es funktioniert!

Gleichzeitig sehen wir, dass wir das Pfadfeld automatisch hinzugefügt haben. Daher werden wir in Zukunft Datensätze danach filtern können.

Lass es uns erneut versuchen:

echo '2' >> logs/number1.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:28:59.906Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "2",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log"
logstash_one_channel | }

Und nun zu einer anderen Datei:

 echo '1' >> logs/number2.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:29:26.061Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number2.log"
logstash_one_channel | }

Großartig! Die Datei wurde abgeholt, der Pfad wurde korrekt angegeben, alles ist in Ordnung.

Stoppen Sie Logstash und starten Sie neu. Lass uns warten. Schweigen. Diese. Wir erhalten diese Aufzeichnungen nicht erneut.

Und jetzt das gewagteste Experiment.

Wir legen Logstash ab und führen Folgendes aus:

echo '3' >> logs/number2.log
echo '4' >> logs/number1.log

Führen Sie logstash erneut aus und sehen Sie:

logstash_one_channel | {
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "3",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number2.log",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:48:50.589Z
logstash_one_channel | }
logstash_one_channel | {
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "4",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:48:50.856Z
logstash_one_channel | }

Hurra! Alles nahm Fahrt auf.

Es ist jedoch notwendig, vor Folgendem zu warnen. Wenn der Logstash-Container entfernt wird (docker stop logstash_one_channel && docker rm logstash_one_channel), wird nichts abgeholt. Die Position der Datei, bis zu der sie gelesen wurde, wurde im Container gespeichert. Wenn Sie bei Null anfangen, werden nur neue Zeilen akzeptiert.

Vorhandene Dateien lesen

Nehmen wir an, wir führen logstash zum ersten Mal aus, haben aber bereits Protokolle und möchten diese verarbeiten.
Wenn wir logstash mit dem oben verwendeten Eingabeabschnitt ausführen, erhalten wir nichts. Nur Zeilenumbrüche werden von logstash verarbeitet.

Um Zeilen aus vorhandenen Dateien abzurufen, fügen Sie dem Eingabeabschnitt eine zusätzliche Zeile hinzu:

input {
  file {
    start_position => "beginning"
    path => "/usr/share/logstash/input/*.log"
   }
  }

Darüber hinaus gibt es eine Nuance: Dies betrifft nur neue Dateien, die logstash noch nicht gesehen hat. Für dieselben Dateien, die sich bereits im Sichtfeld von Logstash befanden, hat es sich bereits deren Größe gemerkt und nimmt nun nur noch neue Datensätze in sie auf.

Lassen Sie uns damit aufhören, indem wir den Eingabeabschnitt studieren. Es gibt noch viel mehr Möglichkeiten, aber im Moment haben wir genug für weitere Experimente.

Routing und Datentransformation

Versuchen wir, das folgende Problem zu lösen. Nehmen wir an, wir haben Nachrichten von einem Kanal, einige davon dienen der Information und andere sind Fehlermeldungen. Sie unterscheiden sich im Tag. Einige sind INFO, andere sind FEHLER.

Wir müssen sie am Ausgang trennen. Diese. Wir schreiben Informationsnachrichten in einen Kanal und Fehlermeldungen in einen anderen.

Gehen Sie dazu vom Eingabebereich zu Filter und Ausgabe.

Mithilfe des Filterabschnitts analysieren wir die eingehende Nachricht und erhalten daraus einen Hash (Schlüssel-Wert-Paare), mit dem wir bereits arbeiten können, d. h. entsprechend den Bedingungen analysieren. Und im Ausgabebereich wählen wir Nachrichten aus und senden jede an ihren eigenen Kanal.

Eine Nachricht mit grok analysieren

Um Textzeichenfolgen zu analysieren und daraus eine Reihe von Feldern zu erhalten, gibt es im Filterbereich ein spezielles Plugin – grok.

Ohne mir das Ziel zu setzen, hier eine ausführliche Beschreibung zu geben (hierzu verweise ich auf amtliche Dokumentation), werde ich mein einfaches Beispiel geben.

Dazu müssen Sie das Format der Eingabezeilen festlegen. Ich habe sie so:

1 INFO-Nachricht1
2 FEHLERmeldung2

Diese. Zuerst die Kennung, dann INFO/ERROR, dann ein Wort ohne Leerzeichen.
Nicht schwierig, aber ausreichend, um das Funktionsprinzip zu verstehen.

Daher müssen wir im Filterabschnitt des Grok-Plugins ein Muster zum Parsen unserer Strings definieren.

Es wird so aussehen:

filter {
  grok {
    match => { "message" => ["%{INT:message_id} %{LOGLEVEL:message_type} %{WORD:message_text}"] }
   }
  } 

Im Grunde handelt es sich um einen regulären Ausdruck. Es werden vorgefertigte Muster wie INT, LOGLEVEL, WORD verwendet. Ihre Beschreibung sowie andere Muster können hier eingesehen werden. hier

Wenn wir nun diesen Filter durchlaufen, wird unser String zu einem Hash aus drei Feldern: message_id, message_type, message_text.

Sie werden im Ausgabebereich angezeigt.

Weiterleiten von Nachrichten im Ausgabeabschnitt mit dem if-Befehl

Wie wir uns erinnern, wollten wir im Ausgabeabschnitt die Nachrichten in zwei Streams aufteilen. Einige, bei denen es sich um iNFO handelt, werden auf der Konsole ausgegeben, und bei Fehlern werden wir sie in eine Datei ausgeben.

Wie können wir diese Botschaften teilen? Der Zustand des Problems deutet bereits auf eine Lösung hin – schließlich haben wir bereits ein dediziertes message_type-Feld, das nur zwei Werte INFO und ERROR annehmen kann. Daraufhin werden wir mithilfe der if-Anweisung eine Auswahl treffen.

if [message_type] == "ERROR" {
        # Здесь выводим в файл
       } else
     {
      # Здесь выводим в stdout
    }

Eine Beschreibung der Arbeit mit Feldern und Operatoren finden Sie in diesem Abschnitt offizielles Handbuch.

Nun zur Schlussfolgerung selbst.

Konsolenausgabe, hier ist alles klar - stdout {}

Aber die Ausgabe in die Datei – denken Sie daran, dass wir das alles aus dem Container ausführen und damit die Datei, in die wir das Ergebnis schreiben, von außen zugänglich ist, müssen wir dieses Verzeichnis in docker-compose.yml öffnen.

Total:

Der Ausgabeabschnitt unserer Datei sieht folgendermaßen aus:


output {
  if [message_type] == "ERROR" {
    file {
          path => "/usr/share/logstash/output/test.log"
          codec => line { format => "custom format: %{message}"}
         }
    } else
     {stdout {
             }
     }
  }

Fügen Sie für die Ausgabe ein weiteres Volume zu docker-compose.yml hinzu:

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro
      - ./logs:/usr/share/logstash/input
      - ./output:/usr/share/logstash/output

Wir fangen an, wir versuchen es, wir sehen die Aufteilung in zwei Ströme.

Source: habr.com

Kommentar hinzufügen