Praktična uporaba ELK. Nastavitev logstash

Predstavitev

Pri uvajanju drugega sistema smo se soočili s potrebo po obdelavi velikega števila različnih dnevnikov. Kot orodje je bil izbran ELK. Ta članek bo obravnaval naše izkušnje pri postavljanju tega sklada.

Ne postavljamo si cilja opisati vseh njegovih zmožnosti, ampak se želimo osredotočiti posebej na reševanje praktičnih problemov. To je posledica dejstva, da kljub precejšnji količini dokumentacije in že pripravljenih slik obstaja kar nekaj pasti, vsaj mi smo jih našli.

Sklad smo razmestili prek docker-compose. Poleg tega smo imeli dobro napisan docker-compose.yml, ki nam je omogočal dvig sklada skoraj brez težav. In zdelo se nam je, da je zmaga že blizu, zdaj jo bomo malo priredili po svoje in to je to.

Na žalost poskus konfiguracije sistema za sprejemanje in obdelavo dnevnikov iz naše aplikacije ni bil takoj uspešen. Zato smo se odločili, da je vredno proučiti vsako komponento posebej in se nato vrniti k njihovim povezavam.

Tako smo začeli z logstashom.

Okolje, uvedba, izvajanje Logstash v vsebniku

Za uvajanje uporabljamo docker-compose; tukaj opisani poskusi so bili izvedeni na MacOS in Ubuntu 18.0.4.

Slika logstash, ki je bila registrirana v našem izvirnem docker-compose.yml, je docker.elastic.co/logstash/logstash:6.3.2

Uporabili ga bomo za poskuse.

Napisali smo ločen docker-compose.yml za zagon logstash. Seveda je bilo možno zagnati sliko iz ukazne vrstice, vendar smo reševali specifično težavo, kjer vse poganjamo iz docker-compose.

Na kratko o konfiguracijskih datotekah

Kot izhaja iz opisa, se lahko logstash izvaja za en kanal, v tem primeru mora posredovati datoteko *.conf, ali za več kanalov, v tem primeru mora posredovati datoteko pipelines.yml, ki nato , bo povezal do datotek .conf za vsak kanal.
Ubrali smo drugo pot. Zdelo se nam je bolj univerzalno in razširljivo. Zato smo ustvarili pipelines.yml in naredili imenik pipelines, v katerega bomo dali datoteke .conf za vsak kanal.

Znotraj vsebnika je še ena konfiguracijska datoteka - logstash.yml. Ne dotikamo se ga, uporabljamo ga takšnega, kot je.

Torej, naša struktura imenika:

Praktična uporaba ELK. Nastavitev logstash

Za prejemanje vhodnih podatkov za zdaj predvidevamo, da je to tcp na vratih 5046, za izhod pa bomo uporabili stdout.

Tukaj je preprosta konfiguracija za prvi zagon. Ker je začetna naloga zagon.

Torej, imamo ta docker-compose.yml

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      	- elk
    ports:
      	- 5046:5046
    volumes:
      	- ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
	- ./config/pipelines:/usr/share/logstash/config/pipelines:ro

Kaj vidimo tukaj?

  1. Omrežja in nosilci so bili vzeti iz izvirnega docker-compose.yml (tistega, kjer se zažene celoten sklad) in mislim, da tukaj ne vplivajo veliko na splošno sliko.
  2. Eno storitev logstash ustvarimo iz slike docker.elastic.co/logstash/logstash:6.3.2 in jo poimenujemo logstash_one_channel.
  3. Vrata 5046 posredujemo znotraj vsebnika, na ista notranja vrata.
  4. Konfiguracijsko datoteko cevi ./config/pipelines.yml preslikamo v datoteko /usr/share/logstash/config/pipelines.yml znotraj vsebnika, kjer jo bo logstash pobral in za vsak slučaj naredil samo za branje.
  5. Imenik ./config/pipelines, kjer imamo datoteke z nastavitvami kanala, preslikamo v imenik /usr/share/logstash/config/pipelines in ga tudi naredimo samo za branje.

Praktična uporaba ELK. Nastavitev logstash

Datoteka Pipelines.yml

- pipeline.id: HABR
  pipeline.workers: 1
  pipeline.batch.size: 1
  path.config: "./config/pipelines/habr_pipeline.conf"

Tu sta opisana en kanal z identifikatorjem HABR in pot do njegove konfiguracijske datoteke.

In končno datoteka “./config/pipelines/habr_pipeline.conf”

input {
  tcp {
    port => "5046"
   }
  }
filter {
  mutate {
    add_field => [ "habra_field", "Hello Habr" ]
    }
  }
output {
  stdout {
      
    }
  }

Za zdaj se ne spuščajmo v njegov opis, poskusimo ga zagnati:

docker-compose up

Kaj vidimo?

Posoda se je začela. Njegovo delovanje lahko preverimo:

echo '13123123123123123123123213123213' | nc localhost 5046

In vidimo odgovor v konzoli vsebnika:

Praktična uporaba ELK. Nastavitev logstash

Toda hkrati vidimo tudi:

logstash_one_channel | [2019-04-29T11:28:59,790][NAPAKA][logstash.licensechecker.licensereader] Ni mogoče pridobiti informacij o licenci iz licenčnega strežnika {:message=>»Elasticsearch nedosegljiv: [http://elasticsearch:9200/][Manticore ::ResolutionFailure] elasticsearch", ...

logstash_one_channel | [2019-04-29T11:28:59,894][INFO ][logstash.pipeline ] Cevovod se je uspešno zagnal {:pipeline_id=>".monitoring-logstash", :thread=>"# "}

logstash_one_channel | [2019-04-29T11:28:59,988][INFO ][logstash.agent ] Cevovodi tečejo {:count=>2, :running_pipelines=>[:HABR, :".monitoring-logstash"], :non_running_pipelines=>[ ]}
logstash_one_channel | [2019-04-29T11:29:00,015][NAPAKA][logstash.inputs.metrics] X-Pack je nameščen na Logstash, ne pa tudi na Elasticsearch. Če želite uporabljati funkcijo spremljanja, namestite X-Pack na Elasticsearch. Morda so na voljo tudi druge funkcije.
logstash_one_channel | [2019-04-29T11:29:00,526][INFO ][logstash.agent] Uspešno zagnana končna točka Logstash API {:port=>9600}
logstash_one_channel | [2019-04-29T11:29:04,478][INFO ][logstash.outputs.elasticsearch] Izvajanje preverjanja stanja, da se preveri, ali povezava Elasticsearch deluje {:healthcheck_url=>http://elasticsearch:9200/, :path=> "/"}
logstash_one_channel | [2019-04-29T11:29:04,487][OPOZORILO]][logstash.outputs.elasticsearch] Poskušal sem oživiti povezavo z mrtvim primerkom ES, vendar je prišlo do napake. {:url=>“elastično iskanje:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch nedosegljiv: [http://elasticsearch:9200/][Manticore::ResolutionFailure] elastično iskanje"}
logstash_one_channel | [2019-04-29T11:29:04,704][INFO ][logstash.licensechecker.licensereader] Izvajanje preverjanja stanja, da se preveri, ali povezava Elasticsearch deluje {:healthcheck_url=>http://elasticsearch:9200/, :path=> "/"}
logstash_one_channel | [2019-04-29T11:29:04,710][OPOZORILO ][logstash.licensechecker.licensereader] Poskušal sem oživiti povezavo z mrtvim primerkom ES, vendar je prišlo do napake. {:url=>“elastično iskanje:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch nedosegljiv: [http://elasticsearch:9200/][Manticore::ResolutionFailure] elastično iskanje"}

In naše poleno se ves čas leze navzgor.

Tukaj sem z zeleno označil sporočilo, da se je cevovod uspešno zagnal, z rdečo sporočilo o napaki in z rumeno sporočilo o poskusu vzpostavitve stika elastično iskanje: 9200.
To se zgodi, ker logstash.conf, vključen v sliko, vsebuje preverjanje razpoložljivosti elasticsearch. Navsezadnje logstash predvideva, da deluje kot del sklada Elk, vendar smo ga ločili.

Delati je mogoče, ni pa priročno.

Rešitev je, da onemogočite to preverjanje prek spremenljivke okolja XPACK_MONITORING_ENABLED.

Spremenimo docker-compose.yml in ga znova zaženimo:

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro

Zdaj je vse v redu. Posoda je pripravljena za poskuse.

V naslednji konzoli lahko znova vnesemo:

echo '13123123123123123123123213123213' | nc localhost 5046

In glej:

logstash_one_channel | {
logstash_one_channel |         "message" => "13123123123123123123123213123213",
logstash_one_channel |      "@timestamp" => 2019-04-29T11:43:44.582Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |            "host" => "gateway",
logstash_one_channel |            "port" => 49418
logstash_one_channel | }

Delo znotraj enega kanala

Tako smo začeli. Zdaj si lahko dejansko vzamete čas in konfigurirate sam logstash. Zaenkrat se ne dotikajmo datoteke pipelines.yml, poglejmo, kaj lahko dosežemo z delom z enim kanalom.

Moram reči, da je splošno načelo dela s konfiguracijsko datoteko kanala dobro opisano v uradnem priročniku, tukaj tukaj
Če želite brati v ruščini, smo uporabili to Članek(vendar je sintaksa poizvedbe stara, to moramo upoštevati).

Pojdimo zaporedno od razdelka Vnos. Delo na TCP smo že videli. Kaj bi lahko bilo tu še zanimivega?

Preizkusite sporočila z uporabo srčnega utripa

Obstaja tako zanimiva priložnost za ustvarjanje samodejnih testnih sporočil.
Če želite to narediti, morate v razdelku za vnos omogočiti vtičnik heartbean.

input {
  heartbeat {
    message => "HeartBeat!"
   }
  } 

Vklopite, začnite prejemati enkrat na minuto

logstash_one_channel | {
logstash_one_channel |      "@timestamp" => 2019-04-29T13:52:04.567Z,
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "HeartBeat!",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "host" => "a0667e5c57ec"
logstash_one_channel | }

Če želimo prejemati pogosteje, moramo dodati parameter interval.
Tako bomo prejeli sporočilo vsakih 10 sekund.

input {
  heartbeat {
    message => "HeartBeat!"
    interval => 10
   }
  }

Pridobivanje podatkov iz datoteke

Odločili smo se tudi pogledati način datoteke. Če z datoteko deluje dobro, potem morda noben agent ni potreben, vsaj za lokalno uporabo.

Po opisu naj bi bil način delovanja podoben tail -f, tj. prebere nove vrstice ali, kot možnost, prebere celotno datoteko.

Torej, kaj želimo dobiti:

  1. Želimo prejeti vrstice, ki so dodane eni dnevniški datoteki.
  2. Želimo prejemati podatke, ki so zapisani v več dnevniških datotek, pri tem pa lahko ločimo, od kje je prejeto.
  3. Prepričati se želimo, da ob ponovnem zagonu logstash teh podatkov ne prejme več.
  4. Preveriti želimo, ali če je logstash izklopljen in se podatki še naprej zapisujejo v datoteke, bomo te podatke prejeli, ko ga zaženemo.

Za izvedbo poskusa dodamo še eno vrstico v docker-compose.yml in odpremo imenik, v katerega smo shranili datoteke.

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro
      - ./logs:/usr/share/logstash/input

In spremenite razdelek za vnos v habr_pipeline.conf

input {
  file {
    path => "/usr/share/logstash/input/*.log"
   }
  }

Začnimo:

docker-compose up

Za ustvarjanje in pisanje dnevniških datotek bomo uporabili ukaz:


echo '1' >> logs/number1.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:28:53.876Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log"
logstash_one_channel | }

Ja, deluje!

Hkrati vidimo, da smo samodejno dodali polje poti. To pomeni, da bomo v prihodnosti lahko filtrirali zapise po njem.

Poskusimo znova:

echo '2' >> logs/number1.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:28:59.906Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "2",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log"
logstash_one_channel | }

In zdaj k drugi datoteki:

 echo '1' >> logs/number2.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:29:26.061Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number2.log"
logstash_one_channel | }

Super! Datoteka je bila pobrana, pot je bila pravilno določena, vse je v redu.

Ustavite logstash in začnite znova. Počakajmo. Tišina. Tisti. Te evidence ne prejemamo več.

In zdaj najbolj drzen poskus.

Namestite logstash in izvedite:

echo '3' >> logs/number2.log
echo '4' >> logs/number1.log

Ponovno zaženite logstash in poglejte:

logstash_one_channel | {
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "3",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number2.log",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:48:50.589Z
logstash_one_channel | }
logstash_one_channel | {
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "4",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:48:50.856Z
logstash_one_channel | }

Hura! Vse je bilo pobrano.

Moramo pa vas opozoriti na naslednje. Če se vsebnik logstash izbriše (docker stop logstash_one_channel && docker rm logstash_one_channel), se ne pobere nič. Položaj datoteke, do katere je bila prebrana, je bil shranjen znotraj vsebnika. Če ga zaženete od začetka, bo sprejel le nove vrstice.

Branje obstoječih datotek

Recimo, da logstash lansiramo prvič, vendar že imamo dnevnike in bi jih radi obdelali.
Če zaženemo logstash z vnosnim razdelkom, ki smo ga uporabili zgoraj, ne bomo dobili ničesar. Logstash bo obdelal samo nove vrstice.

Če želite, da se vrstice iz obstoječih datotek povlečejo navzgor, dodajte dodatno vrstico v razdelek za vnos:

input {
  file {
    start_position => "beginning"
    path => "/usr/share/logstash/input/*.log"
   }
  }

Poleg tega obstaja odtenek: to vpliva samo na nove datoteke, ki jih logstash še ni videl. Za iste datoteke, ki so že bile v vidnem polju logstash-a, si je ta že zapomnil njihovo velikost in bo zdaj sprejemal samo nove vnose vanje.

Ustavimo se tukaj in preučimo razdelek za vnos. Možnosti je še veliko, a to nam je zaenkrat dovolj za nadaljnje eksperimente.

Usmerjanje in pretvorba podatkov

Poskusimo rešiti naslednjo težavo, recimo, da imamo sporočila iz enega kanala, nekatera so informativna, druga pa sporočila o napakah. Razlikujejo se po oznaki. Nekateri so INFO, drugi so NAPAKA.

Na izhodu jih moramo ločiti. Tisti. V en kanal pišemo informativna sporočila, v drugega pa sporočila o napakah.

Če želite to narediti, se premaknite iz vhodnega odseka na filter in izhod.

Z odsekom filtra bomo razčlenili dohodno sporočilo in iz njega pridobili zgoščeno vrednost (pare ključ-vrednost), s katerimi že lahko delamo, tj. razstavite glede na pogoje. In v izhodnem delu bomo izbrali sporočila in poslali vsakega na svoj kanal.

Razčlenitev sporočila z grok

Za razčlenitev besedilnih nizov in pridobivanje nabora polj iz njih je v razdelku filtrov poseben vtičnik - grok.

Ne da bi si zadal cilj, da ga tukaj podrobno opišem (za to se sklicujem na uradna dokumentacija), dal bom svoj preprost primer.

Če želite to narediti, se morate odločiti za obliko vhodnih nizov. Jaz jih imam takole:

1 INFO sporočilo1
2 Sporočilo o NAPAKI2

Tisti. Najprej je identifikator, nato INFO/NAPAKA, nato nekaj besed brez presledkov.
Ni težko, vendar je dovolj, da razumete načelo delovanja.

Torej, v razdelku filtrov vtičnika grok moramo definirati vzorec za razčlenjevanje naših nizov.

Videti bo takole:

filter {
  grok {
    match => { "message" => ["%{INT:message_id} %{LOGLEVEL:message_type} %{WORD:message_text}"] }
   }
  } 

V bistvu je to regularni izraz. Uporabljajo se že pripravljeni vzorci, kot so INT, LOGLEVEL, WORD. Njihov opis, kot tudi druge vzorce, najdete tukaj tukaj

Zdaj, ko gre skozi ta filter, se bo naš niz spremenil v zgoščeno vrednost treh polj: message_id, message_type, message_text.

Prikazani bodo v izhodnem delu.

Usmerjanje sporočil v izhodni del z ukazom if

V izhodnem delu, kot se spomnimo, smo nameravali sporočila razdeliti v dva toka. Nekateri - ki so iNFO, bodo izpisani v konzolo, z napakami pa v datoteko.

Kako ločimo ta sporočila? Pogoj problema že nakazuje rešitev - navsezadnje že imamo namensko polje message_type, ki lahko sprejme samo dve vrednosti: INFO in ERROR. Na podlagi tega se bomo odločili s stavkom if.

if [message_type] == "ERROR" {
        # Здесь выводим в файл
       } else
     {
      # Здесь выводим в stdout
    }

Opis dela s polji in operatorji najdete v tem razdelku uradni priročnik.

Zdaj pa o samem zaključku.

Izhod konzole, tukaj je vse jasno - stdout {}

Toda izhod v datoteko - ne pozabite, da vse to izvajamo iz vsebnika in da je datoteka, v katero zapišemo rezultat, dostopna od zunaj, moramo odpreti ta imenik v docker-compose.yml.

Skupaj:

Izhodni del naše datoteke je videti takole:


output {
  if [message_type] == "ERROR" {
    file {
          path => "/usr/share/logstash/output/test.log"
          codec => line { format => "custom format: %{message}"}
         }
    } else
     {stdout {
             }
     }
  }

V docker-compose.yml dodamo še en nosilec za izpis:

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro
      - ./logs:/usr/share/logstash/input
      - ./output:/usr/share/logstash/output

Zaženemo ga, poskusimo in vidimo delitev na dva toka.

Vir: www.habr.com

Dodaj komentar