Aplicación práctica de ELK. Configurando logstash

Introdución

Mentres implementamos outro sistema, enfrontámonos á necesidade de procesar un gran número de rexistros diversos. ELK foi elixido como instrumento. Neste artigo falarase da nosa experiencia na configuración desta pila.

Non nos marcamos como obxectivo describir todas as súas capacidades, pero queremos concentrarnos en resolver problemas prácticos. Isto débese a que cunha cantidade suficientemente grande de documentación e imaxes preparadas, hai moitas trampas, polo menos as atopamos.

Implementamos a pila mediante docker-compose. Ademais, tiñamos un docker-compose.yml ben escrito que nos permitía elevar a pila sen case ningún problema. E parecíanos que a vitoria xa estaba preto, agora torcerémola un pouco para axustarse ás nosas necesidades e xa está.

Desafortunadamente, un intento de axustar o sistema para recibir e procesar rexistros da nosa aplicación non tivo éxito de entrada. Polo tanto, decidimos que paga a pena estudar cada compoñente por separado e despois volver ás súas conexións.

Entón, imos comezar con logstash.

Ambiente, implantación, execución de Logstash nun contedor

Para a implantación, usamos docker-compose, os experimentos aquí descritos realizáronse en MacOS e Ubuntu 18.0.4.

A imaxe de logstash que tiñamos no noso docker-compose.yml orixinal é docker.elastic.co/logstash/logstash:6.3.2

Usarémolo para experimentos.

Para executar logstash, escribimos un docker-compose.yml separado. Por suposto, era posible lanzar a imaxe desde a liña de comandos, pero despois de todo, resolvemos unha tarefa específica, na que se lanza todo desde docker-compose.

Brevemente sobre os ficheiros de configuración

Como se desprende da descrición, logstash pódese executar como para unha canle, neste caso, ten que transferir o ficheiro *.conf ou para varias canles, nese caso debe transferir o ficheiro pipelines.yml, que á súa vez , referirase aos ficheiros .conf para cada canle.
Collemos o segundo camiño. Pareceunos máis versátil e escalable. Por iso, creamos pipelines.yml, e creamos un directorio pipelines no que poñeremos ficheiros .conf para cada canle.

Dentro do contedor hai outro ficheiro de configuración: logstash.yml. Non o tocamos, usámolo como está.

Entón, a nosa estrutura de directorios é:

Aplicación práctica de ELK. Configurando logstash

Polo momento, supoñemos que este é tcp no porto 5046 para recibir datos de entrada, e usaremos stdout para a saída.

Aquí tes unha configuración tan sinxela para a primeira execución. Porque a tarefa inicial é lanzar.

Polo tanto, temos este docker-compose.yml

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      	- elk
    ports:
      	- 5046:5046
    volumes:
      	- ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
	- ./config/pipelines:/usr/share/logstash/config/pipelines:ro

Que vemos aquí?

  1. As redes e os volumes foron tomados do docker-compose.yml orixinal (aquel no que se lanza toda a pila) e creo que non afectan moito a imaxe xeral aquí.
  2. Creamos un servizo (servizos) logstash, a partir da imaxe docker.elastic.co/logstash/logstash:6.3.2 e dámoslle o nome logstash_one_channel.
  3. Estamos a reenviar o porto 5046 dentro do contedor, ao mesmo porto interno.
  4. Asignamos o noso ficheiro de configuración de canalización ./config/pipelines.yml ao ficheiro /usr/share/logstash/config/pipelines.yml dentro do contedor, onde logstash o recollerá e o fará de só lectura, por se acaso.
  5. Mapeamos o directorio ./config/pipelines, onde temos os ficheiros de configuración da canalización, ao directorio /usr/share/logstash/config/pipelines e tamén o convertemos en só lectura.

Aplicación práctica de ELK. Configurando logstash

ficheiro piping.yml

- pipeline.id: HABR
  pipeline.workers: 1
  pipeline.batch.size: 1
  path.config: "./config/pipelines/habr_pipeline.conf"

Describe unha canle co identificador HABR e a ruta ao seu ficheiro de configuración.

E finalmente o ficheiro "./config/pipelines/habr_pipeline.conf"

input {
  tcp {
    port => "5046"
   }
  }
filter {
  mutate {
    add_field => [ "habra_field", "Hello Habr" ]
    }
  }
output {
  stdout {
      
    }
  }

Non entraremos na súa descrición polo momento, tentamos executar:

docker-compose up

Que vemos?

O contedor comezou. Podemos comprobar o seu traballo:

echo '13123123123123123123123213123213' | nc localhost 5046

E vemos a resposta na consola do contedor:

Aplicación práctica de ELK. Configurando logstash

Pero ao mesmo tempo, tamén vemos:

logstash_one_channel | [2019-04-29T11:28:59,790][ERRO][logstash.licensechecker.licensereader] Non se puido recuperar a información da licenza do servidor de licenzas {:message=>"Elasticsearch inalcanzable: [http://elasticsearch:9200/][Manticore ::ResolutionFailure]elasticsearch",...

logstash_one_channel | [2019-04-29T11:28:59,894][INFO ][logstash.pipeline ] A canalización iniciouse correctamente {:pipeline_id=>".monitoring-logstash", :thread=>"# »}

logstash_one_channel | [2019-04-29T11:28:59,988][INFO ][logstash.agent ] Canalizacións en execución {:count=>2, :running_pipelines=>[:HABR, :".monitoring-logstash"], :non_running_pipelines=>[ ]}
logstash_one_channel | [2019-04-29T11:29:00,015][ERRO][logstash.inputs.metrics ] X-Pack está instalado en Logstash pero non en Elasticsearch. Instala X-Pack en Elasticsearch para usar a función de monitorización. Outras funcións poden estar dispoñibles.
logstash_one_channel | [2019-04-29T11:29:00,526][INFO ][logstash.agent ] O punto final da API de Logstash iniciouse correctamente {:port=>9600}
logstash_one_channel | [2019-04-29T11:29:04,478][INFO ][logstash.outputs.elasticsearch] Execución da comprobación de estado para ver se unha conexión de Elasticsearch funciona {:healthcheck_url=>http://elasticsearch:9200/, :path=> "/"}
logstash_one_channel | [2019-04-29T11:29:04,487][WARN ][logstash.outputs.elasticsearch] Tentouse resucitar a conexión coa instancia ES morta, pero produciuse un erro. {:url =>"busca elástica:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch inalcanzable: [http://elasticsearch:9200/][Manticore::ResolutionFailure] elasticsearch"}
logstash_one_channel | [2019-04-29T11:29:04,704][INFO ][logstash.licensechecker.licensereader] Execución da comprobación de estado para ver se unha conexión Elasticsearch funciona {:healthcheck_url=>http://elasticsearch:9200/, :path=> "/"}
logstash_one_channel | [2019-04-29T11:29:04,710][WARN ][logstash.licensechecker.licensereader] Tentouse resucitar a conexión coa instancia ES morta, pero produciuse un erro. {:url =>"busca elástica:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch inalcanzable: [http://elasticsearch:9200/][Manticore::ResolutionFailure] elasticsearch"}

E o noso rexistro arrastra todo o tempo.

Aquí destaquei en verde a mensaxe de que a canalización comezou correctamente, en vermello a mensaxe de erro e en amarelo a mensaxe sobre tentar contactar busca elástica: 9200.
Isto ocorre debido ao feito de que no logstash.conf incluído na imaxe, hai unha comprobación da dispoñibilidade de elasticsearch. Despois de todo, logstash asume que funciona como parte da pila Elk, e separámola.

Podes traballar, pero non é conveniente.

A solución é desactivar esta comprobación mediante a variable de ambiente XPACK_MONITORING_ENABLED.

Imos facer un cambio en docker-compose.yml e executalo de novo:

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro

Agora, todo está ben. O recipiente está listo para os experimentos.

Podemos escribir de novo na consola adxacente:

echo '13123123123123123123123213123213' | nc localhost 5046

E mira:

logstash_one_channel | {
logstash_one_channel |         "message" => "13123123123123123123123213123213",
logstash_one_channel |      "@timestamp" => 2019-04-29T11:43:44.582Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |            "host" => "gateway",
logstash_one_channel |            "port" => 49418
logstash_one_channel | }

Traballa nunha canle

Entón comezamos. Agora podes dedicarte o tempo a configurar logstash directamente. Non toquemos o ficheiro pipelines.yml polo momento, vexamos o que podemos conseguir traballando cunha canle.

Debo dicir que o principio xeral de traballar co ficheiro de configuración da canle está ben descrito no manual oficial, aquí aquí
Se queres ler en ruso, usamos este artigo(pero a sintaxe da consulta é antiga alí, cómpre telo en conta).

Imos secuencialmente desde a sección de entrada. Xa vimos o traballo en tcp. Que máis pode ser interesante aquí?

Proba as mensaxes usando o latido do corazón

Hai unha posibilidade tan interesante de xerar mensaxes de proba automáticas.
Para iso, cómpre incluír o complemento heartbean na sección de entrada.

input {
  heartbeat {
    message => "HeartBeat!"
   }
  } 

Acendemos, comezamos a recibir unha vez por minuto

logstash_one_channel | {
logstash_one_channel |      "@timestamp" => 2019-04-29T13:52:04.567Z,
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "HeartBeat!",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "host" => "a0667e5c57ec"
logstash_one_channel | }

Queremos recibir máis veces, necesitamos engadir o parámetro intervalo.
Así é como recibiremos unha mensaxe cada 10 segundos.

input {
  heartbeat {
    message => "HeartBeat!"
    interval => 10
   }
  }

Obtención de datos dun ficheiro

Tamén decidimos mirar o modo ficheiro. Se funciona ben co ficheiro, entón é posible que non se requira ningún axente, ben, polo menos para uso local.

Segundo a descrición, o modo de operación debe ser similar ao tail -f, é dicir. le as novas liñas ou, opcionalmente, le todo o ficheiro.

Entón o que queremos conseguir:

  1. Queremos recibir liñas que se engaden a un ficheiro de rexistro.
  2. Queremos recibir datos que se escriben en varios ficheiros de rexistro, ao tempo que podemos separar o que se recibiu de onde.
  3. Queremos asegurarnos de que cando logstash se reinicie, non volverá recibir estes datos.
  4. Queremos comprobar que se o logstash está desactivado e se seguen escribindo datos nos ficheiros, cando o executemos, recibiremos estes datos.

Para levar a cabo o experimento, imos engadir unha liña máis a docker-compose.yml, abrindo o directorio onde poñemos os ficheiros.

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro
      - ./logs:/usr/share/logstash/input

E cambia a sección de entrada en habr_pipeline.conf

input {
  file {
    path => "/usr/share/logstash/input/*.log"
   }
  }

Comezamos:

docker-compose up

Para crear e escribir ficheiros de rexistro, usaremos o comando:


echo '1' >> logs/number1.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:28:53.876Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log"
logstash_one_channel | }

Si, funciona!

Ao mesmo tempo, vemos que engadimos automaticamente o campo de ruta. Polo tanto, no futuro, poderemos filtrar os rexistros por el.

Imos tentalo de novo:

echo '2' >> logs/number1.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:28:59.906Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "2",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log"
logstash_one_channel | }

E agora a outro ficheiro:

 echo '1' >> logs/number2.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:29:26.061Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number2.log"
logstash_one_channel | }

Genial! O ficheiro foi recollido, o camiño especificouse correctamente, todo está ben.

Detén o logstash e reinicia. Agardemos. Silencio. Eses. Non volvemos recibir estes rexistros.

E agora o experimento máis atrevido.

Poñemos logstash e executamos:

echo '3' >> logs/number2.log
echo '4' >> logs/number1.log

Executa logstash de novo e mira:

logstash_one_channel | {
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "3",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number2.log",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:48:50.589Z
logstash_one_channel | }
logstash_one_channel | {
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "4",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:48:50.856Z
logstash_one_channel | }

Hurra! Todo colleu.

Pero, é necesario advertir sobre o seguinte. Se se elimina o contenedor logstash (docker stop logstash_one_channel && docker rm logstash_one_channel), non se recollerá nada. A posición do ficheiro ata o que foi lido gardouse dentro do contedor. Se comezas de cero, só aceptará liñas novas.

Lectura de ficheiros existentes

Digamos que estamos a executar logstash por primeira vez, pero xa temos rexistros e gustaríanos procesalos.
Se executamos logstash coa sección de entrada que usamos anteriormente, non obteremos nada. Logstash só procesará as novas liñas.

Para extraer liñas de ficheiros existentes, engade unha liña adicional á sección de entrada:

input {
  file {
    start_position => "beginning"
    path => "/usr/share/logstash/input/*.log"
   }
  }

Ademais, hai un matiz, isto só afecta aos novos ficheiros que Logstash aínda non viu. Para os mesmos ficheiros que xa estaban no campo de visión de logstash, xa lembrou o seu tamaño e agora só levará rexistros novos neles.

Detémonos nisto estudando a sección de entrada. Hai moitas máis opcións, pero polo de agora, temos suficiente para máis experimentos.

Enrutamento e transformación de datos

Intentemos resolver o seguinte problema, digamos que temos mensaxes dunha canle, algunhas delas son informativas e outras son mensaxes de erro. Diferéncianse na etiqueta. Algúns son INFO, outros son ERRO.

Temos que separalos na saída. Eses. Escribimos mensaxes informativas nunha canle e mensaxes de erro noutra.

Para iso, vaia desde a sección de entrada para filtrar e saír.

Usando a sección de filtros, analizaremos a mensaxe entrante, obtendo dela un hash (pares clave-valor), co que xa podemos traballar, é dicir. analizar segundo as condicións. E na sección de saída, seleccionaremos mensaxes e enviaremos cada unha á súa propia canle.

Analizando unha mensaxe con grok

Para analizar cadeas de texto e obter un conxunto de campos a partir delas, hai un complemento especial na sección de filtros: grok.

Sen que me propoña dar aquí unha descrición detallada (para isto refírome a documentación oficial), vou poñer o meu exemplo sinxelo.

Para iso, cómpre decidir o formato das liñas de entrada. Téñoos así:

1 mensaxe de información 1
2 Mensaxe de ERRO2

Eses. Primeiro o identificador, despois INFO/ERRO e despois algunha palabra sen espazos.
Non é difícil, pero o suficiente para comprender o principio de traballo.

Entón, na sección de filtros, no complemento grok, necesitamos definir un patrón para analizar as nosas cadeas.

Será así:

filter {
  grok {
    match => { "message" => ["%{INT:message_id} %{LOGLEVEL:message_type} %{WORD:message_text}"] }
   }
  } 

Basicamente, é unha expresión regular. Utilízanse patróns preparados, como INT, LOGLEVEL, WORD. A súa descrición, así como outros patróns, pódense ver aquí. aquí

Agora, ao pasar por este filtro, a nosa cadea converterase nun hash de tres campos: message_id, message_type, message_text.

Mostraranse na sección de saída.

Enrutamento de mensaxes na sección de saída co comando if

Na sección de saída, como lembramos, íamos dividir as mensaxes en dous fluxos. Algúns, que son iNFO, sairemos á consola e, con erros, sairemos a un ficheiro.

Como podemos compartir estas mensaxes? A condición do problema xa suxire unha solución; despois de todo, xa temos un campo de tipo message_type dedicado, que só pode tomar dous valores INFO e ERROR. É sobre o que faremos unha elección usando a declaración if.

if [message_type] == "ERROR" {
        # Здесь выводим в файл
       } else
     {
      # Здесь выводим в stdout
    }

Nesta sección pódese atopar a descrición do traballo con campos e operadores manual oficial.

Agora, sobre a propia conclusión.

Saída da consola, todo está claro aquí - stdout {}

Pero a saída ao ficheiro: lembre que estamos executando todo isto desde o contedor e para que o ficheiro no que escribimos o resultado estea dispoñible desde fóra, necesitamos abrir este directorio en docker-compose.yml.

Total:

A sección de saída do noso ficheiro ten o seguinte aspecto:


output {
  if [message_type] == "ERROR" {
    file {
          path => "/usr/share/logstash/output/test.log"
          codec => line { format => "custom format: %{message}"}
         }
    } else
     {stdout {
             }
     }
  }

Engade un volume máis a docker-compose.yml para a saída:

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro
      - ./logs:/usr/share/logstash/input
      - ./output:/usr/share/logstash/output

Comezamos, intentamos, vemos a división en dúas correntes.

Fonte: www.habr.com

Engadir un comentario