Zbatimi praktik i ELK. Konfigurimi i logstash

Paraqitje

Gjatë vendosjes së një sistemi tjetër, ne u përballëm me nevojën për të përpunuar një numër të madh regjistrash të ndryshëm. Si mjet u zgjodh ELK. Ky artikull do të diskutojë përvojën tonë në konfigurimin e kësaj rafte.

Ne nuk vendosim një qëllim për të përshkruar të gjitha aftësitë e tij, por duam të përqendrohemi veçanërisht në zgjidhjen e problemeve praktike. Kjo për faktin se edhe pse ka një sasi mjaft të madhe dokumentacioni dhe imazhe të gatshme, ka mjaft gracka, të paktën ne i gjetëm.

Ne vendosëm pirgun nëpërmjet docker-compose. Për më tepër, ne kishim një docker-compose.yml të shkruar mirë, i cili na lejoi të ngrinim stackin pothuajse pa probleme. Dhe na dukej se fitorja ishte tashmë afër, tani do ta rregullojmë pak për t'iu përshtatur nevojave tona dhe kaq.

Fatkeqësisht, përpjekja për të konfiguruar sistemin për të marrë dhe përpunuar regjistrat nga aplikacioni ynë nuk ishte menjëherë e suksesshme. Prandaj, vendosëm që ia vlen të studiojmë secilin komponent veç e veç, dhe më pas të kthehemi te lidhjet e tyre.

Pra, filluam me logstash.

Mjedisi, vendosja, funksionimi i Logstash në një enë

Për vendosjen ne përdorim docker-compose; eksperimentet e përshkruara këtu u kryen në MacOS dhe Ubuntu 18.0.4.

Imazhi logstash që u regjistrua në docker-compose.yml tonë origjinal është docker.elastic.co/logstash/logstash:6.3.2

Ne do ta përdorim atë për eksperimente.

Ne kemi shkruar një docker-compose.yml të veçantë për të ekzekutuar logstash. Natyrisht, ishte e mundur të nisnim imazhin nga linja e komandës, por ne po zgjidhnim një problem specifik, ku drejtonim gjithçka nga docker-compose.

Shkurtimisht për skedarët e konfigurimit

Siç vijon nga përshkrimi, logstash mund të ekzekutohet ose për një kanal, në të cilin rast duhet të kalojë skedarin *.conf, ose për disa kanale, në këtë rast duhet të kalojë skedarin pipelines.yml, i cili nga ana tjetër , do të lidhet me skedarët .conf për çdo kanal.
Ne morëm rrugën e dytë. Na dukej më universale dhe më e shkallëzuar. Prandaj, ne krijuam pipelines.yml dhe krijuam një direktori pipelines në të cilën do të vendosim skedarë .conf për çdo kanal.

Brenda kontejnerit ka një skedar tjetër konfigurimi - logstash.yml. Nuk e prekim, e përdorim ashtu siç është.

Pra, struktura jonë e drejtorisë:

Zbatimi praktik i ELK. Konfigurimi i logstash

Për të marrë të dhëna hyrëse, tani për tani supozojmë se kjo është tcp në portin 5046, dhe për dalje do të përdorim stdout.

Këtu është një konfigurim i thjeshtë për nisjen e parë. Sepse detyra fillestare është nisja.

Pra, ne kemi këtë docker-compose.yml

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      	- elk
    ports:
      	- 5046:5046
    volumes:
      	- ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
	- ./config/pipelines:/usr/share/logstash/config/pipelines:ro

Çfarë shohim këtu?

  1. Rrjetet dhe vëllimet janë marrë nga origjinali docker-compose.yml (ai ku është lëshuar i gjithë grumbulli) dhe mendoj se ato nuk ndikojnë shumë në pamjen e përgjithshme këtu.
  2. Ne krijojmë një shërbim(e) logstash nga imazhi docker.elastic.co/logstash/logstash:6.3.2 dhe e emërtojmë atë logstash_one_channel.
  3. Ne e përcjellim portin 5046 brenda kontejnerit, në të njëjtin port të brendshëm.
  4. Ne hartojmë skedarin tonë të konfigurimit të tubave ./config/pipelines.yml në skedarin /usr/share/logstash/config/pipelines.yml brenda kontejnerit, ku logstash do ta marrë atë dhe do ta bëjë atë vetëm për lexim, për çdo rast.
  5. Ne hartojmë direktorinë ./config/pipelines, ku kemi skedarë me cilësimet e kanalit, në drejtorinë /usr/share/logstash/config/pipelines dhe gjithashtu e bëjmë atë vetëm për lexim.

Zbatimi praktik i ELK. Konfigurimi i logstash

Skedari Pipelines.yml

- pipeline.id: HABR
  pipeline.workers: 1
  pipeline.batch.size: 1
  path.config: "./config/pipelines/habr_pipeline.conf"

Një kanal me identifikuesin HABR dhe shtegu drejt skedarit të konfigurimit të tij përshkruhen këtu.

Dhe në fund skedari "./config/pipelines/habr_pipeline.conf"

input {
  tcp {
    port => "5046"
   }
  }
filter {
  mutate {
    add_field => [ "habra_field", "Hello Habr" ]
    }
  }
output {
  stdout {
      
    }
  }

Le të mos hyjmë në përshkrimin e tij tani për tani, le të përpiqemi ta ekzekutojmë:

docker-compose up

Farë shohim?

Kontejneri ka filluar. Ne mund të kontrollojmë funksionimin e tij:

echo '13123123123123123123123213123213' | nc localhost 5046

Dhe ne e shohim përgjigjen në tastierën e kontejnerit:

Zbatimi praktik i ELK. Konfigurimi i logstash

Por në të njëjtën kohë, ne gjithashtu shohim:

logstash_one_channel | [2019-04-29T11:28:59,790][ERROR][logstash.licensechecker.licensereader] Nuk mund të merret informacioni i licencës nga serveri i licencës {:message=>“Elasticsearch i paarritshëm: [http://elasticsearch:9200/][Manticore ::ResolutionFailure] elasticsearch", ...

logstash_one_channel | [2019-04-29T11:28:59,894][INFO ][logstash.pipeline ] Gazsjellësi filloi me sukses {:pipeline_id=>".monitoring-logstash", :thread=>"# "}

logstash_one_channel | [2019-04-29T11:28:59,988][INFO ][logstash.agent ] Tubacionet që funksionojnë {:count=>2, :running_pipelines=>[:HABR, :".monitoring-logstash"], :non_running_pipelines=>[ ]}
logstash_one_channel | [2019-04-29T11:29:00,015][GABIM][logstash.inputs.metrics] X-Pack është i instaluar në Logstash, por jo në Elasticsearch. Ju lutemi instaloni X-Pack në Elasticsearch për të përdorur funksionin e monitorimit. Mund të jenë të disponueshme veçori të tjera.
logstash_one_channel | [2019-04-29T11:29:00,526][INFO ][logstash.agent ] Filloi me sukses Logstash API pika përfundimtare {:port=>9600}
logstash_one_channel | [2019-04-29T11:29:04,478][INFO ][logstash.outputs.elasticsearch] Kryerja e kontrollit shëndetësor për të parë nëse një lidhje Elasticsearch po funksionon {:healthcheck_url=>http://elasticsearch:9200/, :path=> "/"}
logstash_one_channel | [2019-04-29T11:29:04,487][WARN ][logstash.outputs.elasticsearch] U përpoq të ringjallte lidhjen me shembullin e vdekur ES, por mori një gabim. {:url=>“kërkesë elastike:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch i paarritshëm: [http://elasticsearch:9200/:][Resoureail:] kërkimi elastik"}
logstash_one_channel | [2019-04-29T11:29:04,704][INFO ][logstash.licensechecker.licensereader] Kryerja e kontrollit shëndetësor për të parë nëse një lidhje Elasticsearch po funksionon {:healthcheck_url=>http://elasticsearch:9200/, :path=> "/"}
logstash_one_channel | [2019-04-29T11:29:04,710][KUJDES ][logstash.licensechecker.licensereader] U përpoq të ringjallte lidhjen me shembullin e vdekur ES, por mori një gabim. {:url=>“kërkesë elastike:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch i paarritshëm: [http://elasticsearch:9200/:][Resoureail:] kërkimi elastik"}

Dhe regjistri ynë po zvarritet gjatë gjithë kohës.

Këtu kam theksuar me të gjelbër mesazhin që tubacioni është nisur me sukses, me të kuqe mesazhin e gabimit dhe me të verdhë mesazhin për një përpjekje për të kontaktuar kërkesë elastike: 9200.
Kjo ndodh sepse logstash.conf, i përfshirë në imazh, përmban një kontroll për disponueshmërinë e elasticsearch. Në fund të fundit, logstash supozon se funksionon si pjesë e pirgut Elk, por ne e ndamë atë.

Është e mundur të punosh, por nuk është i përshtatshëm.

Zgjidhja është çaktivizimi i këtij kontrolli nëpërmjet ndryshores së mjedisit XPACK_MONITORING_ENABLED.

Le të bëjmë një ndryshim në docker-compose.yml dhe ta ekzekutojmë përsëri:

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro

Tani, gjithçka është në rregull. Enë është gati për eksperimente.

Mund të shkruajmë përsëri në tastierën tjetër:

echo '13123123123123123123123213123213' | nc localhost 5046

Dhe shikoni:

logstash_one_channel | {
logstash_one_channel |         "message" => "13123123123123123123123213123213",
logstash_one_channel |      "@timestamp" => 2019-04-29T11:43:44.582Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |            "host" => "gateway",
logstash_one_channel |            "port" => 49418
logstash_one_channel | }

Puna brenda një kanali

Kështu që ne nisëm. Tani mund të merrni kohë për të konfiguruar vetë logstash. Le të mos prekim skedarin pipelines.yml tani për tani, le të shohim se çfarë mund të marrim duke punuar me një kanal.

Duhet të them që parimi i përgjithshëm i punës me skedarin e konfigurimit të kanalit përshkruhet mirë në manualin zyrtar, këtu këtu
Nëse dëshironi të lexoni në Rusisht, ne kemi përdorur këtë artikull(por sintaksa e pyetjes atje është e vjetër, duhet ta marrim parasysh këtë).

Le të shkojmë në mënyrë sekuenciale nga seksioni Input. Ne kemi parë tashmë punë në TCP. Çfarë tjetër mund të jetë interesante këtu?

Testoni mesazhet duke përdorur rrahjet e zemrës

Ekziston një mundësi kaq interesante për të gjeneruar mesazhe testimi automatike.
Për ta bërë këtë, duhet të aktivizoni shtojcën heartbean në seksionin e hyrjes.

input {
  heartbeat {
    message => "HeartBeat!"
   }
  } 

Aktivizoni, filloni të merrni një herë në minutë

logstash_one_channel | {
logstash_one_channel |      "@timestamp" => 2019-04-29T13:52:04.567Z,
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "HeartBeat!",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "host" => "a0667e5c57ec"
logstash_one_channel | }

Nëse duam të marrim më shpesh, duhet të shtojmë parametrin interval.
Kështu do të marrim një mesazh çdo 10 sekonda.

input {
  heartbeat {
    message => "HeartBeat!"
    interval => 10
   }
  }

Marrja e të dhënave nga një skedar

Ne gjithashtu vendosëm të shikonim mënyrën e skedarit. Nëse funksionon mirë me skedarin, atëherë ndoshta nuk nevojitet asnjë agjent, të paktën për përdorim lokal.

Sipas përshkrimit, mënyra e funksionimit duhet të jetë e ngjashme me bisht -f, d.m.th. lexon rreshta të rinj ose, si opsion, lexon të gjithë skedarin.

Pra, çfarë duam të marrim:

  1. Ne duam të marrim rreshta që i janë bashkangjitur një skedari log.
  2. Ne duam të marrim të dhëna që shkruhen në disa skedarë log, duke qenë në gjendje të ndajmë atë që merret nga ku.
  3. Ne duam të sigurohemi që kur logstash të riniset, ai të mos i marrë më këto të dhëna.
  4. Ne duam të kontrollojmë që nëse logstash është i fikur dhe të dhënat vazhdojnë të shkruhen në skedarë, atëherë kur ta ekzekutojmë, do t'i marrim këto të dhëna.

Për të kryer eksperimentin, le të shtojmë një rresht tjetër në docker-compose.yml, duke hapur direktorinë në të cilën kemi vendosur skedarët.

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro
      - ./logs:/usr/share/logstash/input

Dhe ndryshoni seksionin e hyrjes në habr_pipeline.conf

input {
  file {
    path => "/usr/share/logstash/input/*.log"
   }
  }

Le të fillojmë:

docker-compose up

Për të krijuar dhe shkruar skedarë log ne do të përdorim komandën:


echo '1' >> logs/number1.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:28:53.876Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log"
logstash_one_channel | }

Po, funksionon!

Në të njëjtën kohë, shohim se kemi shtuar automatikisht fushën e rrugës. Kjo do të thotë që në të ardhmen, ne do të jemi në gjendje të filtrojmë të dhënat sipas tij.

Le ta provojme perseri:

echo '2' >> logs/number1.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:28:59.906Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "2",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log"
logstash_one_channel | }

Dhe tani në një skedar tjetër:

 echo '1' >> logs/number2.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:29:26.061Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number2.log"
logstash_one_channel | }

E madhe! Skedari u mor, shtegu u specifikua saktë, gjithçka është në rregull.

Ndalo logstash dhe fillo përsëri. Le të presim. Heshtje. ato. Ne nuk i marrim më këto të dhëna.

Dhe tani eksperimenti më i guximshëm.

Instaloni logstash dhe ekzekutoni:

echo '3' >> logs/number2.log
echo '4' >> logs/number1.log

Drejtoni përsëri logstash dhe shikoni:

logstash_one_channel | {
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "3",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number2.log",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:48:50.589Z
logstash_one_channel | }
logstash_one_channel | {
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "4",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:48:50.856Z
logstash_one_channel | }

Hora! Gjithçka u mor.

Por ne duhet t'ju paralajmërojmë për sa vijon. Nëse kontejneri me logstash fshihet (docker stop logstash_one_channel && docker rm logstash_one_channel), atëherë asgjë nuk do të merret. Pozicioni i skedarit deri në të cilin u lexua ruhej brenda kontejnerit. Nëse e drejtoni nga e para, ai do të pranojë vetëm linja të reja.

Leximi i skedarëve ekzistues

Le të themi se po lëshojmë logstash për herë të parë, por tashmë kemi regjistra dhe do të donim t'i përpunonim ato.
Nëse ekzekutojmë logstash me seksionin e hyrjes që kemi përdorur më lart, nuk do të marrim asgjë. Vetëm linjat e reja do të përpunohen nga logstash.

Në mënyrë që rreshtat nga skedarët ekzistues të tërhiqen, duhet të shtoni një rresht shtesë në seksionin e hyrjes:

input {
  file {
    start_position => "beginning"
    path => "/usr/share/logstash/input/*.log"
   }
  }

Për më tepër, ekziston një nuancë: kjo prek vetëm skedarët e rinj që logstash nuk i ka parë ende. Për të njëjtët skedarë që ishin tashmë në fushën e shikimit të logstash, ai tashmë ka mbajtur mend madhësinë e tyre dhe tani do të marrë vetëm hyrje të reja në to.

Le të ndalemi këtu dhe të studiojmë seksionin e hyrjes. Ka ende shumë opsione, por kjo është e mjaftueshme për ne për eksperimente të mëtejshme për momentin.

Rutimi dhe Transformimi i të Dhënave

Le të përpiqemi të zgjidhim problemin e mëposhtëm, le të themi se kemi mesazhe nga një kanal, disa prej tyre janë informuese dhe disa janë mesazhe gabimi. Ato ndryshojnë sipas etiketës. Disa janë INFO, të tjera janë ERROR.

Duhet t'i ndajmë në dalje. ato. Ne shkruajmë mesazhe informacioni në një kanal dhe mesazhe gabimi në një tjetër.

Për ta bërë këtë, lëvizni nga seksioni i hyrjes në filtrim dhe dalje.

Duke përdorur seksionin e filtrit, ne do të analizojmë mesazhin në hyrje, duke marrë një hash (çifte të vlerave kyçe) prej tij, me të cilat tashmë mund të punojmë, d.m.th. çmontohet sipas kushteve. Dhe në seksionin e daljes, ne do të zgjedhim mesazhe dhe do t'i dërgojmë secilën në kanalin e vet.

Analiza e një mesazhi me grok

Për të analizuar vargjet e tekstit dhe për të marrë një grup fushash prej tyre, ekziston një shtojcë e veçantë në seksionin e filtrit - grok.

Pa i vënë vetes synimin që të jap një përshkrim të hollësishëm të tij këtu (për këtë i referohem dokumentacion zyrtar), Unë do të jap shembullin tim të thjeshtë.

Për ta bërë këtë, duhet të vendosni për formatin e vargjeve hyrëse. Unë i kam si kjo:

1 mesazh INFO1
2 Mesazh GABIM2

ato. Identifikuesi vjen së pari, pastaj INFO/GABIM, pastaj ndonjë fjalë pa hapësira.
Nuk është e vështirë, por është e mjaftueshme për të kuptuar parimin e funksionimit.

Pra, në seksionin e filtrit të shtojcës grok, ne duhet të përcaktojmë një model për analizimin e vargjeve tona.

Do të duket kështu:

filter {
  grok {
    match => { "message" => ["%{INT:message_id} %{LOGLEVEL:message_type} %{WORD:message_text}"] }
   }
  } 

Në thelb është një shprehje e rregullt. Përdoren modele të gatshme, si INT, LOGLEVEL, WORD. Përshkrimi i tyre, si dhe modele të tjera, mund të gjenden këtu këtu

Tani, duke kaluar nëpër këtë filtër, vargu ynë do të kthehet në një hash prej tre fushash: mesazh_id, mesazh_lloji, mesazh_tekst.

Ato do të shfaqen në seksionin e daljes.

Drejtimi i mesazheve në seksionin e daljes duke përdorur komandën if

Në seksionin e daljes, siç e kujtojmë, ne do të ndanim mesazhet në dy rrjedha. Disa - të cilat janë iNFO, do të dalin në tastierë, dhe me gabime, ne do të nxjerrim në një skedar.

Si i ndajmë këto mesazhe? Gjendja e problemit tashmë sugjeron një zgjidhje - në fund të fundit, ne tashmë kemi një fushë të dedikuar mesazh_lloj, e cila mund të marrë vetëm dy vlera: INFO dhe ERROR. Është mbi këtë bazë që ne do të bëjmë një zgjedhje duke përdorur deklaratën if.

if [message_type] == "ERROR" {
        # Здесь выводим в файл
       } else
     {
      # Здесь выводим в stdout
    }

Një përshkrim i punës me fushat dhe operatorët mund të gjendet në këtë seksion manual zyrtar.

Tani, në lidhje me vetë përfundimin aktual.

Dalja e konsolës, gjithçka është e qartë këtu - stdout {}

Por dalja në një skedar - mbani mend se të gjitha këto po i ekzekutojmë nga një kontejner dhe në mënyrë që skedari në të cilin shkruajmë rezultatin të jetë i aksesueshëm nga jashtë, duhet ta hapim këtë direktori në docker-compose.yml.

Total:

Seksioni i daljes së skedarit tonë duket si ky:


output {
  if [message_type] == "ERROR" {
    file {
          path => "/usr/share/logstash/output/test.log"
          codec => line { format => "custom format: %{message}"}
         }
    } else
     {stdout {
             }
     }
  }

Në docker-compose.yml shtojmë një vëllim tjetër për dalje:

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro
      - ./logs:/usr/share/logstash/input
      - ./output:/usr/share/logstash/output

Ne e lëshojmë atë, e provojmë dhe shohim një ndarje në dy rrjedha.

Burimi: www.habr.com

Shto një koment