Aplicación práctica de ELK. Configuración de logstash

introducción

Al implementar otro sistema, nos enfrentamos a la necesidad de procesar una gran cantidad de registros diferentes. ELK fue elegido como instrumento. Este artículo hablará sobre nuestra experiencia en la configuración de esta pila.

No establecemos una meta para describir todas sus capacidades, pero queremos concentrarnos en resolver problemas prácticos. Esto se debe al hecho de que con una cantidad suficientemente grande de documentación e imágenes listas para usar, hay muchas trampas, al menos las encontramos.

Implementamos la pila a través de docker-compose. Además, teníamos un docker-compose.yml bien escrito que nos permitía subir la pila casi sin problemas. Y nos parecía que la victoria ya estaba cerca, ahora le damos un pequeño giro a nuestras necesidades y listo.

Desafortunadamente, un intento de ajustar el sistema para recibir y procesar registros de nuestra aplicación no tuvo éxito desde el principio. Por lo tanto, decidimos que vale la pena estudiar cada componente por separado y luego volver a sus conexiones.

Entonces, comencemos con logstash.

Entorno, implementación, ejecución de Logstash en un contenedor

Para la implementación, usamos docker-compose, los experimentos descritos aquí se llevaron a cabo en MacOS y Ubuntu 18.0.4.

La imagen de logstash que teníamos en nuestro docker-compose.yml original es docker.elastic.co/logstash/logstash:6.3.2

Lo usaremos para experimentos.

Para ejecutar logstash, escribimos un docker-compose.yml separado. Por supuesto, era posible lanzar la imagen desde la línea de comandos, pero después de todo, resolvimos una tarea específica, donde todo, desde docker-compose, se lanza para nosotros.

Brevemente sobre los archivos de configuración

Como sigue de la descripción, logstash se puede ejecutar como para un canal, en este caso, necesita transferir el archivo *.conf o para varios canales, en cuyo caso necesita transferir el archivo pipelines.yml, que, a su vez, , se referirá a los archivos .conf para cada canal.
Tomamos el segundo camino. Nos pareció más versátil y escalable. Por lo tanto, creamos pipelines.yml e hicimos un directorio de pipelines en el que colocaremos archivos .conf para cada canal.

Dentro del contenedor hay otro archivo de configuración: logstash.yml. No lo tocamos, lo usamos tal cual.

Así que nuestra estructura de directorios es:

Aplicación práctica de ELK. Configuración de logstash

Por el momento, asumimos que esto es tcp en el puerto 5046 para recibir datos de entrada y usaremos stdout para la salida.

Aquí hay una configuración tan simple para la primera ejecución. Porque la tarea inicial es lanzar.

Entonces tenemos este docker-compose.yml

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      	- elk
    ports:
      	- 5046:5046
    volumes:
      	- ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
	- ./config/pipelines:/usr/share/logstash/config/pipelines:ro

¿Qué vemos aquí?

  1. Las redes y los volúmenes se tomaron del docker-compose.yml original (aquel en el que se inicia la pila completa) y creo que no afectan en gran medida la imagen general aquí.
  2. Creamos un logstash de servicio (servicios), a partir de la imagen docker.elastic.co/logstash/logstash:6.3.2 y le damos el nombre logstash_one_channel.
  3. Estamos reenviando el puerto 5046 dentro del contenedor, al mismo puerto interno.
  4. Asignamos nuestro archivo de configuración de tubería ./config/pipelines.yml al archivo /usr/share/logstash/config/pipelines.yml dentro del contenedor, donde logstash lo recogerá y lo convertirá en solo lectura, por si acaso.
  5. Asignamos el directorio ./config/pipelines, donde tenemos los archivos de configuración de tuberías, al directorio /usr/share/logstash/config/pipelines y también lo hacemos de solo lectura.

Aplicación práctica de ELK. Configuración de logstash

archivo piping.yml

- pipeline.id: HABR
  pipeline.workers: 1
  pipeline.batch.size: 1
  path.config: "./config/pipelines/habr_pipeline.conf"

Describe un canal con el identificador HABR y la ruta a su archivo de configuración.

Y finalmente el archivo "./config/pipelines/habr_pipeline.conf"

input {
  tcp {
    port => "5046"
   }
  }
filter {
  mutate {
    add_field => [ "habra_field", "Hello Habr" ]
    }
  }
output {
  stdout {
      
    }
  }

No entraremos en su descripción por ahora, intentamos ejecutar:

docker-compose up

¿Qué vemos?

El contenedor ha comenzado. Podemos comprobar su trabajo:

echo '13123123123123123123123213123213' | nc localhost 5046

Y vemos la respuesta en la consola del contenedor:

Aplicación práctica de ELK. Configuración de logstash

Pero al mismo tiempo, también vemos:

logstash_un_canal | [2019-04-29T11:28:59,790][ERROR][logstash.licensechecker.licensereader] No se puede recuperar la información de la licencia del servidor de licencias {:message=>"Elasticsearch inalcanzable: [http://elasticsearch:9200/][Manticore ::ResolutionFailure]elasticsearch", ...

logstash_un_canal | [2019-04-29T11:28:59,894][INFO][logstash.pipeline] La canalización se inició correctamente {:pipeline_id=>".monitoring-logstash", :thread=>"# »}

logstash_un_canal | [2019-04-29T11:28:59,988][INFO][logstash.agent] Canalizaciones en ejecución {:count=>2, :running_pipelines=>[:HABR, :".monitoring-logstash"], :non_running_pipelines=>[ ]}
logstash_un_canal | [2019-04-29T11:29:00,015][ERROR][logstash.inputs.metrics] X-Pack está instalado en Logstash pero no en Elasticsearch. Instale X-Pack en Elasticsearch para usar la función de monitoreo. Otras características pueden estar disponibles.
logstash_un_canal | [2019-04-29T11: 29: 00,526] [INFO] [logstash.agent] Se inició correctamente el extremo de la API de Logstash {: puerto => 9600}
logstash_un_canal | [2019-04-29T11:29:04,478][INFO ][logstash.outputs.elasticsearch] Ejecutando una verificación de estado para ver si una conexión de Elasticsearch está funcionando {:healthcheck_url=>http://elasticsearch:9200/, :path=> "/"}
logstash_un_canal | [2019-04-29T11:29:04,487][WARN][logstash.outputs.elasticsearch] Se intentó resucitar la conexión a la instancia de ES inactiva, pero se produjo un error. {:url=>"elasticsearch:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch inalcanzable: [http://elasticsearch:9200/][Manticore::ResolutionFailure] búsqueda elástica"}
logstash_un_canal | [2019-04-29T11:29:04,704][INFO ][logstash.licensechecker.licensereader] Ejecutando verificación de estado para ver si una conexión de Elasticsearch está funcionando {:healthcheck_url=>http://elasticsearch:9200/, :path=> "/"}
logstash_un_canal | [2019-04-29T11:29:04,710][WARN][logstash.licensechecker.licensereader] Se intentó resucitar la conexión a la instancia de ES inactiva, pero se obtuvo un error. {:url=>"elasticsearch:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch inalcanzable: [http://elasticsearch:9200/][Manticore::ResolutionFailure] búsqueda elástica"}

Y nuestro registro se arrastra todo el tiempo.

Aquí resalté en verde el mensaje de que la canalización se inició correctamente, en rojo el mensaje de error y en amarillo el mensaje sobre intentar contactar elasticsearch: 9200.
Esto sucede debido a que en el logstash.conf incluido en la imagen, se verifica la disponibilidad de elasticsearch. Después de todo, logstash asume que funciona como parte de la pila Elk y lo separamos.

Puedes trabajar, pero no es conveniente.

La solución es deshabilitar esta verificación a través de la variable de entorno XPACK_MONITORING_ENABLED.

Hagamos un cambio en docker-compose.yml y ejecútelo de nuevo:

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro

Ahora, todo está bien. El contenedor está listo para experimentos.

Podemos teclear de nuevo en la consola adyacente:

echo '13123123123123123123123213123213' | nc localhost 5046

Y ver:

logstash_one_channel | {
logstash_one_channel |         "message" => "13123123123123123123123213123213",
logstash_one_channel |      "@timestamp" => 2019-04-29T11:43:44.582Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |            "host" => "gateway",
logstash_one_channel |            "port" => 49418
logstash_one_channel | }

Trabajar dentro de un canal

Así que empezamos. Ahora puede tomarse el tiempo para configurar logstash directamente. No toquemos el archivo pipelines.yml por ahora, veamos qué podemos obtener trabajando con un canal.

Debo decir que el principio general de trabajar con el archivo de configuración del canal está bien descrito en el manual oficial, aquí aquí
Si quieres leer en ruso, usamos este un articulo(pero la sintaxis de consulta es antigua allí, debe tener esto en cuenta).

Vayamos secuencialmente desde la sección Entrada. Ya hemos visto el trabajo en tcp. ¿Qué más puede ser interesante aquí?

Mensajes de prueba usando el latido del corazón

Existe una posibilidad tan interesante de generar mensajes de prueba automáticos.
Para hacer esto, debe incluir el complemento heartbean en la sección de entrada.

input {
  heartbeat {
    message => "HeartBeat!"
   }
  } 

Lo encendemos, comenzamos a recibir una vez por minuto

logstash_one_channel | {
logstash_one_channel |      "@timestamp" => 2019-04-29T13:52:04.567Z,
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "HeartBeat!",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "host" => "a0667e5c57ec"
logstash_one_channel | }

Queremos recibir más a menudo, necesitamos agregar el parámetro de intervalo.
Así recibiremos un mensaje cada 10 segundos.

input {
  heartbeat {
    message => "HeartBeat!"
    interval => 10
   }
  }

Obtener datos de un archivo

También decidimos mirar el modo de archivo. Si funciona bien con el archivo, entonces es posible que no se requiera ningún agente, bueno, al menos para uso local.

De acuerdo con la descripción, el modo de operación debe ser similar a tail -f, es decir, lee saltos de línea u, opcionalmente, lee todo el archivo.

Entonces lo que queremos obtener:

  1. Queremos recibir líneas que se anexan a un archivo de registro.
  2. Queremos recibir datos que se escriben en varios archivos de registro, al tiempo que podemos separar lo que se recibió de dónde.
  3. Queremos asegurarnos de que cuando logstash se reinicie, no volverá a recibir estos datos.
  4. Queremos verificar que si logstash está deshabilitado y los datos continúan escribiéndose en los archivos, cuando lo ejecutemos, recibiremos estos datos.

Para realizar el experimento, agreguemos una línea más a docker-compose.yml, abriendo el directorio donde colocamos los archivos.

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro
      - ./logs:/usr/share/logstash/input

Y cambie la sección de entrada en habr_pipeline.conf

input {
  file {
    path => "/usr/share/logstash/input/*.log"
   }
  }

Empezamos:

docker-compose up

Para crear y escribir archivos de registro, usaremos el comando:


echo '1' >> logs/number1.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:28:53.876Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log"
logstash_one_channel | }

¡Sí, funciona!

Al mismo tiempo, vemos que hemos agregado automáticamente el campo de ruta. Entonces, en el futuro, podremos filtrar registros por él.

Intentemoslo de nuevo:

echo '2' >> logs/number1.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:28:59.906Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "2",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log"
logstash_one_channel | }

Y ahora a otro archivo:

 echo '1' >> logs/number2.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:29:26.061Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number2.log"
logstash_one_channel | }

¡Excelente! Se recogió el archivo, la ruta se especificó correctamente, todo está bien.

Detenga logstash y reinicie. Esperemos. Silencio. Aquellos. No recibimos estos registros de nuevo.

Y ahora el experimento más atrevido.

Ponemos logstash y ejecutamos:

echo '3' >> logs/number2.log
echo '4' >> logs/number1.log

Ejecute logstash nuevamente y vea:

logstash_one_channel | {
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "3",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number2.log",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:48:50.589Z
logstash_one_channel | }
logstash_one_channel | {
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "4",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:48:50.856Z
logstash_one_channel | }

¡Hurra! Todo recogido.

Pero, es necesario advertir sobre lo siguiente. Si se elimina el contenedor de logstash (docker stop logstash_one_channel && docker rm logstash_one_channel), no se recogerá nada. La posición del archivo hasta el cual se leía se almacenaba dentro del contenedor. Si comienza desde cero, solo aceptará nuevas líneas.

Lectura de archivos existentes

Digamos que estamos ejecutando logstash por primera vez, pero ya tenemos registros y nos gustaría procesarlos.
Si ejecutamos logstash con la sección de entrada que usamos anteriormente, no obtendremos nada. Logstash solo procesará las nuevas líneas.

Para extraer líneas de archivos existentes, agregue una línea adicional a la sección de entrada:

input {
  file {
    start_position => "beginning"
    path => "/usr/share/logstash/input/*.log"
   }
  }

Además, hay un matiz, esto solo afecta a los archivos nuevos que Logstash aún no ha visto. Para los mismos archivos que ya estaban en el campo de visión de logstash, ya ha recordado su tamaño y ahora solo tomará nuevos registros en ellos.

Detengámonos en esto estudiando la sección de entrada. Hay muchas más opciones, pero por ahora, tenemos suficientes para más experimentos.

Enrutamiento y transformación de datos

Intentemos resolver el siguiente problema, digamos que tenemos mensajes de un canal, algunos de ellos son informativos y otros son mensajes de error. Se diferencian en la etiqueta. Algunos son INFO, otros son ERROR.

Tenemos que separarlos a la salida. Aquellos. Escribimos mensajes informativos en un canal y mensajes de error en otro.

Para hacer esto, vaya de la sección de entrada a filtro y salida.

Usando la sección de filtro, analizaremos el mensaje entrante, obteniendo un hash (pares clave-valor) de él, con el que ya podemos trabajar, es decir. analizar según las condiciones. Y en el apartado de salida, seleccionaremos mensajes y enviaremos cada uno a su propio canal.

Analizando un mensaje con grok

Para analizar cadenas de texto y obtener un conjunto de campos de ellas, hay un complemento especial en la sección de filtro: grok.

Sin ponerme el objetivo de dar aquí una descripción detallada de la misma (para ello me refiero a documentación oficial), daré mi ejemplo simple.

Para hacer esto, debe decidir el formato de las líneas de entrada. yo los tengo asi:

1 mensaje INFO1
2 mensaje de ERROR2

Aquellos. Identificador primero, luego INFO/ERROR, luego alguna palabra sin espacios.
No es difícil, pero lo suficiente como para comprender el principio de funcionamiento.

Entonces, en la sección de filtro, en el complemento grok, necesitamos definir un patrón para analizar nuestras cadenas.

Se verá así:

filter {
  grok {
    match => { "message" => ["%{INT:message_id} %{LOGLEVEL:message_type} %{WORD:message_text}"] }
   }
  } 

Básicamente, es una expresión regular. Se utilizan patrones preparados, como INT, LOGLEVEL, WORD. Su descripción, así como otros patrones, se pueden ver aquí. aquí

Ahora, al pasar por este filtro, nuestra cadena se convertirá en un hash de tres campos: id_mensaje, tipo_mensaje, texto_mensaje.

Se mostrarán en la sección de salida.

Enrutamiento de mensajes en la sección de salida con el comando if

En la sección de salida, como recordamos, íbamos a dividir los mensajes en dos flujos. Algunos, que son iNFO, los enviaremos a la consola y, con errores, los enviaremos a un archivo.

¿Cómo podemos compartir estos mensajes? La condición del problema ya sugiere una solución; después de todo, ya tenemos un campo de tipo_mensaje dedicado, que puede tomar solo dos valores INFO y ERROR. Es en él que haremos una elección usando la declaración if.

if [message_type] == "ERROR" {
        # Здесь выводим в файл
       } else
     {
      # Здесь выводим в stdout
    }

La descripción del trabajo con campos y operadores se puede encontrar en esta sección manual oficial.

Ahora, sobre la conclusión en sí.

Salida de la consola, todo está claro aquí - stdout {}

Pero la salida al archivo: recuerde que estamos ejecutando todo esto desde el contenedor y para que el archivo en el que escribimos el resultado sea accesible desde el exterior, debemos abrir este directorio en docker-compose.yml.

Total:

La sección de salida de nuestro archivo se ve así:


output {
  if [message_type] == "ERROR" {
    file {
          path => "/usr/share/logstash/output/test.log"
          codec => line { format => "custom format: %{message}"}
         }
    } else
     {stdout {
             }
     }
  }

Agregue un volumen más a docker-compose.yml para la salida:

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro
      - ./logs:/usr/share/logstash/input
      - ./output:/usr/share/logstash/output

Empezamos, probamos, vemos la división en dos corrientes.

Fuente: habr.com

Añadir un comentario