Praktyczne zastosowanie ELK. Konfigurowanie logstasha

Wprowadzenie

Wdrażając kolejny system, stanęliśmy przed koniecznością przetworzenia dużej liczby różnych logów. Jako instrument wybrano ELK. W tym artykule omówimy nasze doświadczenia w tworzeniu tego stosu.

Nie stawiamy sobie za cel opisywania wszystkich jego możliwości, ale chcemy skoncentrować się na rozwiązywaniu praktycznych problemów. Wynika to z faktu, że przy odpowiednio dużej ilości dokumentacji i gotowych obrazów pułapek jest sporo, przynajmniej my je znaleźliśmy.

Wdrożyliśmy stos za pomocą docker-compose. Co więcej, mieliśmy dobrze napisany plik docker-compose.yml, który pozwalał nam podnosić stos prawie bez problemów. A wydawało nam się, że zwycięstwo już blisko, teraz trochę to przekręcimy, żeby pasowało do naszych potrzeb i tyle.

Niestety próba dostrojenia systemu do odbierania i przetwarzania logów z naszej aplikacji nie powiodła się od samego początku. Dlatego uznaliśmy, że warto przestudiować każdy komponent z osobna, a następnie wrócić do ich połączeń.

Zacznijmy więc od logstasha.

Środowisko, wdrożenie, uruchomienie Logstash w kontenerze

Do wdrożenia używamy docker-compose, opisane tutaj eksperymenty przeprowadzono na MacOS i Ubuntu 18.0.4.

Obraz logstash, który mieliśmy w naszym oryginalnym pliku docker-compose.yml, to docker.elastic.co/logstash/logstash:6.3.2

Wykorzystamy go do eksperymentów.

Aby uruchomić logstash, napisaliśmy osobny plik docker-compose.yml. Oczywiście można było uruchomić obraz z wiersza poleceń, ale mimo wszystko rozwiązaliśmy konkretne zadanie, w którym wszystko od docker-compose jest uruchamiane za nas.

Krótko o plikach konfiguracyjnych

Jak wynika z opisu, logstash można uruchomić jak dla jednego kanału, w tym przypadku musi przesłać plik *.conf lub dla kilku kanałów, w którym to przypadku musi przesłać plik pipelines.yml, który z kolei , będzie odnosić się do plików .conf dla każdego kanału.
Wybraliśmy drugą ścieżkę. Wydało nam się to bardziej wszechstronne i skalowalne. Dlatego stworzyliśmy potoki.yml i zrobiliśmy katalog potoków, w którym umieścimy pliki .conf dla każdego kanału.

Wewnątrz kontenera znajduje się kolejny plik konfiguracyjny - logstash.yml. Nie dotykamy go, używamy go takim, jaki jest.

Więc nasza struktura katalogów to:

Praktyczne zastosowanie ELK. Konfigurowanie logstasha

Na razie zakładamy, że jest to tcp na porcie 5046 do odbierania danych wejściowych, a do wyjścia użyjemy stdout.

Oto taka prosta konfiguracja do pierwszego uruchomienia. Ponieważ początkowym zadaniem jest uruchomienie.

Mamy więc plik docker-compose.yml

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      	- elk
    ports:
      	- 5046:5046
    volumes:
      	- ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
	- ./config/pipelines:/usr/share/logstash/config/pipelines:ro

Co tu widzimy?

  1. Sieci i wolumeny zostały wzięte z oryginalnego docker-compose.yml (tego, w którym uruchamiany jest cały stos) i myślę, że nie wpływają one znacząco na ogólny obraz tutaj.
  2. Tworzymy jedną usługę (usługi) logstash, z obrazu docker.elastic.co/logstash/logstash:6.3.2 i nadajemy jej nazwę logstash_one_channel.
  3. Przekierowujemy port 5046 wewnątrz kontenera na ten sam port wewnętrzny.
  4. Odwzorowujemy nasz plik konfiguracyjny potoku ./config/pipelines.yml na plik /usr/share/logstash/config/pipelines.yml w kontenerze, skąd logstash pobierze go i uczyni go tylko do odczytu, na wszelki wypadek.
  5. Odwzorowujemy katalog ./config/pipelines, w którym znajdują się pliki konfiguracyjne potoków, na katalog /usr/share/logstash/config/pipelines, a także ustawiamy go jako tylko do odczytu.

Praktyczne zastosowanie ELK. Konfigurowanie logstasha

plik piping.yml

- pipeline.id: HABR
  pipeline.workers: 1
  pipeline.batch.size: 1
  path.config: "./config/pipelines/habr_pipeline.conf"

Opisuje on jeden kanał identyfikatorem HABR i ścieżką do jego pliku konfiguracyjnego.

I wreszcie plik „./config/pipelines/habr_pipeline.conf”

input {
  tcp {
    port => "5046"
   }
  }
filter {
  mutate {
    add_field => [ "habra_field", "Hello Habr" ]
    }
  }
output {
  stdout {
      
    }
  }

Nie będziemy na razie wchodzić w jego opis, spróbujemy uruchomić:

docker-compose up

Co widzimy

Kontener został uruchomiony. Możemy sprawdzić jego działanie:

echo '13123123123123123123123213123213' | nc localhost 5046

I widzimy odpowiedź w konsoli kontenera:

Praktyczne zastosowanie ELK. Konfigurowanie logstasha

Ale jednocześnie widzimy też:

logstash_one_channel | [2019-04-29T11:28:59,790][BŁĄD][logstash.licensechecker.licensereader] Nie można pobrać informacji o licencji z serwera licencji {:message=>"Elasticsearch Unreachable: [http://elasticsearch:9200/][Manticore ::ResolutionFailure]elasticsearch", ...

logstash_one_channel | [2019-04-29T11:28:59,894][INFO ][logstash.pipeline ] Potok został pomyślnie uruchomiony {:pipeline_id=>.monitoring-logstash", :thread=>"# »}

logstash_one_channel | [2019-04-29T11:28:59,988][INFO ][logstash.agent ] Uruchomione potoki {:count=>2, :running_pipelines=>[:HABR, :.monitoring-logstash"], :non_running_pipelines=>[ ]}
logstash_one_channel | [2019-04-29T11:29:00,015][BŁĄD][logstash.inputs.metrics ] X-Pack jest zainstalowany na Logstash, ale nie na Elasticsearch. Zainstaluj X-Pack na Elasticsearch, aby korzystać z funkcji monitorowania. Mogą być dostępne inne funkcje.
logstash_one_channel | [2019-04-29T11:29:00,526][INFO ][logstash.agent ] Pomyślnie uruchomiono punkt końcowy interfejsu API Logstash {:port=>9600}
logstash_one_channel | [2019-04-29T11:29:04,478][INFO ][logstash.outputs.elasticsearch] Uruchamianie kontroli stanu, aby sprawdzić, czy połączenie Elasticsearch działa {:healthcheck_url=>http://elasticsearch:9200/, :path=> "/"}
logstash_one_channel | [2019-04-29T11:29:04,487][OSTRZEŻENIE ][logstash.outputs.elasticsearch] Próbowano przywrócić połączenie z martwą instancją ES, ale wystąpił błąd. {:url=>"wyszukiwanie elastyczne:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch nieosiągalny: [http://elasticsearch:9200/][Manticore::ResolutionFailure] elastyczne wyszukiwanie"}
logstash_one_channel | [2019-04-29T11:29:04,704][INFO ][logstash.licensechecker.licensereader] Uruchamianie kontroli stanu, aby sprawdzić, czy połączenie Elasticsearch działa {:healthcheck_url=>http://elasticsearch:9200/, :path=> "/"}
logstash_one_channel | [2019-04-29T11:29:04,710][OSTRZEŻENIE ][logstash.licensechecker.licensereader] Próbowano przywrócić połączenie z martwą instancją ES, ale wystąpił błąd. {:url=>"wyszukiwanie elastyczne:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch nieosiągalny: [http://elasticsearch:9200/][Manticore::ResolutionFailure] elastyczne wyszukiwanie"}

A nasz dziennik cały czas się indeksuje.

Tutaj na zielono zaznaczyłem komunikat o pomyślnym uruchomieniu potoku, na czerwono komunikat o błędzie, a na żółto komunikat o próbie kontaktu wyszukiwanie elastyczne: 9200.
Dzieje się tak dzięki temu, że w pliku logstash.conf zawartym w obrazie znajduje się sprawdzenie dostępności elasticsearch. W końcu logstash zakłada, że ​​działa jako część stosu Elk, a my go rozdzieliliśmy.

Możesz pracować, ale nie jest to wygodne.

Rozwiązaniem jest wyłączenie tego sprawdzania za pomocą zmiennej środowiskowej XPACK_MONITORING_ENABLED.

Wprowadźmy zmiany w pliku docker-compose.yml i uruchommy go ponownie:

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro

Teraz wszystko jest w porządku. Pojemnik jest gotowy do eksperymentów.

Możemy ponownie wpisać w sąsiedniej konsoli:

echo '13123123123123123123123213123213' | nc localhost 5046

I zobaczyć:

logstash_one_channel | {
logstash_one_channel |         "message" => "13123123123123123123123213123213",
logstash_one_channel |      "@timestamp" => 2019-04-29T11:43:44.582Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |            "host" => "gateway",
logstash_one_channel |            "port" => 49418
logstash_one_channel | }

Pracuj w ramach jednego kanału

Więc zaczęliśmy. Teraz możesz poświęcić trochę czasu na bezpośrednie skonfigurowanie logstash. Nie dotykajmy na razie pliku pipelines.yml, zobaczmy, co możemy uzyskać pracując z jednym kanałem.

Muszę powiedzieć, że ogólna zasada pracy z plikiem konfiguracyjnym kanału jest dobrze opisana w oficjalnej instrukcji, tutaj tutaj
Jeśli chcesz czytać po rosyjsku, skorzystaliśmy z tego artykuł(ale składnia zapytania jest tam stara, musisz wziąć to pod uwagę).

Przejdźmy po kolei od sekcji Input. Widzieliśmy już prace nad tcp. Co jeszcze może być tutaj ciekawego?

Testuj wiadomości za pomocą pulsu

Jest taka ciekawa możliwość generowania automatycznych komunikatów testowych.
Aby to zrobić, musisz dołączyć wtyczkę heartbean do sekcji wejściowej.

input {
  heartbeat {
    message => "HeartBeat!"
   }
  } 

Włączamy, zaczynamy odbierać raz na minutę

logstash_one_channel | {
logstash_one_channel |      "@timestamp" => 2019-04-29T13:52:04.567Z,
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "HeartBeat!",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "host" => "a0667e5c57ec"
logstash_one_channel | }

Chcemy otrzymywać częściej, musimy dodać parametr interwału.
W ten sposób co 10 sekund będziemy otrzymywać wiadomość.

input {
  heartbeat {
    message => "HeartBeat!"
    interval => 10
   }
  }

Pobieranie danych z pliku

Postanowiliśmy również przyjrzeć się trybowi plików. Jeśli działa dobrze z plikiem, możliwe, że żaden agent nie jest wymagany, przynajmniej do użytku lokalnego.

Zgodnie z opisem sposób działania powinien być podobny do tail -f, tj. odczytuje znaki nowej linii lub opcjonalnie odczytuje cały plik.

Więc co chcemy uzyskać:

  1. Chcemy otrzymywać wiersze, które są dołączane do jednego pliku dziennika.
  2. Chcemy otrzymywać dane, które są zapisywane w kilku plikach dziennika, mając jednocześnie możliwość oddzielenia tego, co zostało odebrane, od którego miejsca.
  3. Chcemy mieć pewność, że po ponownym uruchomieniu logstash nie otrzyma ponownie tych danych.
  4. Chcemy sprawdzić, czy jeśli logstash jest wyłączony, a dane są nadal zapisywane do plików, to po uruchomieniu otrzymamy te dane.

Aby przeprowadzić eksperyment, dodajmy jeszcze jedną linię do pliku docker-compose.yml, otwierając katalog, w którym umieściliśmy pliki.

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro
      - ./logs:/usr/share/logstash/input

I zmień sekcję input w habr_pipeline.conf

input {
  file {
    path => "/usr/share/logstash/input/*.log"
   }
  }

Zaczynamy:

docker-compose up

Aby utworzyć i zapisać pliki dziennika, użyjemy polecenia:


echo '1' >> logs/number1.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:28:53.876Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log"
logstash_one_channel | }

Tak, to działa!

Jednocześnie widzimy, że automatycznie dodaliśmy pole ścieżki. Dzięki temu w przyszłości będziemy mogli filtrować według niego rekordy.

Spróbujmy jeszcze raz:

echo '2' >> logs/number1.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:28:59.906Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "2",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log"
logstash_one_channel | }

A teraz do innego pliku:

 echo '1' >> logs/number2.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:29:26.061Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number2.log"
logstash_one_channel | }

Świetnie! Plik został pobrany, ścieżka została podana poprawnie, wszystko jest w porządku.

Zatrzymaj logstash i uruchom ponownie. Poczekajmy. Cisza. Te. Nie otrzymujemy ponownie tych zapisów.

A teraz najbardziej śmiały eksperyment.

Umieszczamy logstash i wykonujemy:

echo '3' >> logs/number2.log
echo '4' >> logs/number1.log

Uruchom ponownie logstash i zobacz:

logstash_one_channel | {
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "3",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number2.log",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:48:50.589Z
logstash_one_channel | }
logstash_one_channel | {
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "4",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:48:50.856Z
logstash_one_channel | }

Brawo! Wszystko odebrane.

Ale należy ostrzec o następujących kwestiach. Jeśli kontener logstash zostanie usunięty (docker stop logstash_one_channel && docker rm logstash_one_channel), nic nie zostanie pobrane. Pozycja pliku, do którego został odczytany, była przechowywana w kontenerze. Jeśli zaczniesz od zera, zaakceptuje tylko nowe linie.

Odczytywanie istniejących plików

Powiedzmy, że uruchamiamy logstash po raz pierwszy, ale mamy już logi i chcielibyśmy je przetworzyć.
Jeśli uruchomimy logstash z sekcją wejściową, której użyliśmy powyżej, nic nie otrzymamy. Logstash przetwarza tylko znaki nowej linii.

Aby wyciągnąć linie z istniejących plików, dodaj dodatkową linię do sekcji wejściowej:

input {
  file {
    start_position => "beginning"
    path => "/usr/share/logstash/input/*.log"
   }
  }

Co więcej, istnieje niuans, dotyczy to tylko nowych plików, których logstash jeszcze nie widział. Dla tych samych plików, które były już w polu widzenia logstash, zapamiętał już ich rozmiar i teraz będzie w nich przyjmował tylko nowe rekordy.

Zatrzymajmy się na tym, studiując sekcję wprowadzania. Możliwości jest znacznie więcej, ale na razie wystarczy na dalsze eksperymenty.

Routing i transformacja danych

Spróbujmy rozwiązać następujący problem, powiedzmy, że mamy komunikaty z jednego kanału, niektóre z nich mają charakter informacyjny, a niektóre to komunikaty o błędach. Różnią się tagiem. Niektóre to INFO, inne to BŁĄD.

Musimy ich rozdzielić przy wyjściu. Te. Piszemy komunikaty informacyjne w jednym kanale, a komunikaty o błędach w innym.

Aby to zrobić, przejdź z sekcji wejściowej do filtra i wyjścia.

Korzystając z sekcji filter, przeanalizujemy przychodzącą wiadomość, pobierając z niej hash (pary klucz-wartość), z którym możemy już pracować, tj. analizować zgodnie z warunkami. A w sekcji wyjściowej wybierzemy wiadomości i wyślemy każdą do własnego kanału.

Analizowanie wiadomości za pomocą groka

Aby parsować ciągi tekstowe i uzyskać z nich zestaw pól, w sekcji filtrów znajduje się specjalna wtyczka - grok.

Nie stawiając sobie za cel podania tutaj jego szczegółowego opisu (w tym celu odsyłam do oficjalna dokumentacja), podam mój prosty przykład.

Aby to zrobić, musisz zdecydować o formacie linii wejściowych. Mam je tak:

1 komunikat INFORMACYJNY1
2 Komunikat BŁĘDU 2

Te. Najpierw identyfikator, potem INFO/BŁĄD, potem jakieś słowo bez spacji.
Nie trudne, ale wystarczające do zrozumienia zasady działania.

Tak więc w sekcji filtrów we wtyczce grok musimy zdefiniować wzorzec parsowania naszych łańcuchów.

będzie wyglądać tak:

filter {
  grok {
    match => { "message" => ["%{INT:message_id} %{LOGLEVEL:message_type} %{WORD:message_text}"] }
   }
  } 

Zasadniczo jest to wyrażenie regularne. Wykorzystuje się gotowe wzorce, takie jak INT, LOGLEVEL, WORD. Ich opis, a także inne wzory, można obejrzeć tutaj. tutaj

Teraz, przechodząc przez ten filtr, nasz ciąg zmieni się w hash trzech pól: id_wiadomości, typ_wiadomości, tekst_wiadomości.

Zostaną wyświetlone w sekcji wyjściowej.

Routing komunikatów w sekcji wyjściowej za pomocą polecenia if

W sekcji wyjściowej, jak pamiętamy, zamierzaliśmy podzielić wiadomości na dwa strumienie. Niektóre - które są iNFO, wyprowadzimy na konsolę, a z błędami wyprowadzimy do pliku.

Jak możemy udostępniać te wiadomości? Stan problemu już sugeruje rozwiązanie – w końcu mamy już dedykowane pole message_type, które może przyjąć tylko dwie wartości INFO i ERROR. To na nim dokonamy wyboru za pomocą instrukcji if.

if [message_type] == "ERROR" {
        # Здесь выводим в файл
       } else
     {
      # Здесь выводим в stdout
    }

Opis pracy z polami i operatorami znajduje się w tym rozdziale oficjalny podręcznik.

A teraz o samej konkluzji.

Wyjście konsoli, tutaj wszystko jest jasne - stdout {}

Ale wyjście do pliku - pamiętajmy, że uruchamiamy to wszystko z kontenera i aby plik, w którym zapisujemy wynik był dostępny z zewnątrz, musimy otworzyć ten katalog w docker-compose.yml.

Razem:

Sekcja wyjściowa naszego pliku wygląda następująco:


output {
  if [message_type] == "ERROR" {
    file {
          path => "/usr/share/logstash/output/test.log"
          codec => line { format => "custom format: %{message}"}
         }
    } else
     {stdout {
             }
     }
  }

Dodaj jeszcze jeden wolumin do pliku docker-compose.yml w celu uzyskania danych wyjściowych:

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro
      - ./logs:/usr/share/logstash/input
      - ./output:/usr/share/logstash/output

Zaczynamy, próbujemy, widzimy podział na dwa nurty.

Źródło: www.habr.com

Dodaj komentarz