Somos amigos de ELK y Exchange. Parte 2

Somos amigos de ELK y Exchange. Parte 2

Continúo mi historia sobre cómo hacer amigos Exchange y ELK (comenzando aquí). Permítanme recordarles que esta combinación es capaz de procesar una gran cantidad de registros sin dudarlo. Esta vez hablaremos sobre cómo hacer que Exchange funcione con los componentes Logstash y Kibana.

Logstash en la pila ELK se utiliza para procesar registros de manera inteligente y prepararlos para su colocación en Elastic en forma de documentos, a partir de los cuales es conveniente crear varias visualizaciones en Kibana.

Instalación

Consta de dos etapas:

  • Instalación y configuración del paquete OpenJDK.
  • Instalación y configuración del paquete Logstash.

Instalación y configuración del paquete OpenJDK

El paquete OpenJDK debe descargarse y descomprimirse en un directorio específico. Luego se debe ingresar la ruta a este directorio en las variables $env:Path y $env:JAVA_HOME del sistema operativo Windows:

Somos amigos de ELK y Exchange. Parte 2

Somos amigos de ELK y Exchange. Parte 2

Comprobemos la versión de Java:

PS C:> java -version
openjdk version "13.0.1" 2019-10-15
OpenJDK Runtime Environment (build 13.0.1+9)
OpenJDK 64-Bit Server VM (build 13.0.1+9, mixed mode, sharing)

Instalación y configuración del paquete Logstash

Descargue el archivo con la distribución Logstash por lo tanto. El archivo debe descomprimirse en la raíz del disco. Desempaquetar en carpeta C:Program Files No vale la pena, Logstash se negará a iniciarse normalmente. Entonces necesitas ingresar al archivo. jvm.options Correcciones responsables de asignar RAM para el proceso Java. Recomiendo especificar la mitad de la RAM del servidor. Si tiene 16 GB de RAM a bordo, las claves predeterminadas son:

-Xms1g
-Xmx1g

debe ser reemplazado por:

-Xms8g
-Xmx8g

Además, es recomendable comentar la línea. -XX:+UseConcMarkSweepGC. Más sobre eso aquí. El siguiente paso es crear una configuración predeterminada en el archivo logstash.conf:

input {
 stdin{}
}
 
filter {
}
 
output {
 stdout {
 codec => "rubydebug"
 }
}

Con esta configuración, Logstash lee datos de la consola, los pasa a través de un filtro vacío y los devuelve a la consola. El uso de esta configuración probará la funcionalidad de Logstash. Para hacer esto, ejecútelo en modo interactivo:

PS C:...bin> .logstash.bat -f .logstash.conf
...
[2019-12-19T11:15:27,769][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2019-12-19T11:15:27,847][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-12-19T11:15:28,113][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

Logstash se lanzó correctamente en el puerto 9600.

El paso final de instalación: inicie Logstash como un servicio de Windows. Esto se puede hacer, por ejemplo, usando el paquete NSSM:

PS C:...bin> .nssm.exe install logstash
Service "logstash" installed successfully!

Tolerancia a fallos

La seguridad de los registros cuando se transfieren desde el servidor de origen está garantizada por el mecanismo de colas persistentes.

Cómo funciona

El diseño de las colas durante el procesamiento de registros es: entrada → cola → filtro + salida.

El complemento de entrada recibe datos de una fuente de registro, los escribe en una cola y envía confirmación de que los datos se han recibido a la fuente.

Logstash procesa los mensajes de la cola, los pasa a través del filtro y el complemento de salida. Al recibir la confirmación de la salida de que se ha enviado el registro, Logstash elimina el registro procesado de la cola. Si Logstash se detiene, todos los mensajes no procesados ​​y los mensajes para los que no se ha recibido confirmación permanecen en la cola y Logstash continuará procesándolos la próxima vez que se inicie.

Ajuste

Ajustable mediante llaves en la lima. C:Logstashconfiglogstash.yml:

  • queue.type: (valores posibles - persisted и memory (default)).
  • path.queue: (ruta a la carpeta con los archivos de la cola, que se almacenan en C:Logstashqueue de forma predeterminada).
  • queue.page_capacity: (tamaño máximo de página en cola, el valor predeterminado es 64 MB).
  • queue.drain: (verdadero/falso: habilita/deshabilita la parada del procesamiento de la cola antes de cerrar Logstash. No recomiendo habilitarlo, porque esto afectará directamente la velocidad de apagado del servidor).
  • queue.max_events: (número máximo de eventos en la cola, el valor predeterminado es 0 (ilimitado)).
  • queue.max_bytes: (tamaño máximo de cola en bytes, predeterminado: 1024 MB (1 GB)).

Si está configurado queue.max_events и queue.max_bytes, los mensajes dejan de aceptarse en la cola cuando se alcanza el valor de cualquiera de estas configuraciones. Obtenga más información sobre las colas persistentes aquí.

Un ejemplo de la parte de logstash.yml responsable de configurar la cola:

queue.type: persisted
queue.max_bytes: 10gb

Ajuste

La configuración de Logstash generalmente consta de tres partes, responsables de diferentes fases del procesamiento de registros entrantes: recepción (sección de entrada), análisis (sección de filtro) y envío a Elastic (sección de salida). A continuación veremos más de cerca cada uno de ellos.

Entrada

Recibimos la transmisión entrante con registros sin procesar de los agentes de filebeat. Es este complemento el que indicamos en el apartado de entrada:

input {
  beats {
    port => 5044
  }
}

Después de esta configuración, Logstash comienza a escuchar el puerto 5044 y, cuando recibe registros, los procesa de acuerdo con la configuración de la sección de filtro. Si es necesario, puede encapsular el canal para recibir registros de filebit en SSL. Lea más sobre la configuración del complemento Beats aquí.

Filtrar

Todos los registros de texto que son interesantes para el procesamiento que genera Exchange están en formato csv con los campos descritos en el propio archivo de registro. Para analizar registros csv, Logstash nos ofrece tres complementos: disecar, csv y grok. El primero es el más rápido, pero solo se encarga de analizar los registros más simples.
Por ejemplo, dividirá el siguiente registro en dos (debido a la presencia de una coma dentro del campo), razón por la cual el registro se analizará incorrectamente:

…,"MDB:GUID1, Mailbox:GUID2, Event:526545791, MessageClass:IPM.Note, CreationTime:2020-05-15T12:01:56.457Z, ClientType:MOMT, SubmissionAssistant:MailboxTransportSubmissionEmailAssistant",…

Se puede utilizar al analizar registros, por ejemplo, IIS. En este caso, la sección de filtro podría verse así:

filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
    }
  }
} 

La configuración de Logstash le permite utilizar declaraciones condicionales, por lo que solo podemos enviar registros que fueron etiquetados con la etiqueta filebeat al complemento de disección IIS. Dentro del complemento relacionamos los valores de los campos con sus nombres, eliminamos el campo original message, que contenía una entrada del registro, y podemos agregar un campo personalizado que contendrá, por ejemplo, el nombre de la aplicación de la que recopilamos registros.

En el caso de los registros de seguimiento, es mejor utilizar el complemento csv; puede procesar correctamente campos complejos:

filter {
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
}

Dentro del complemento relacionamos los valores de los campos con sus nombres, eliminamos el campo original message (y también campos tenant-id и schema-version), que contenía una entrada del registro, y podemos agregar un campo personalizado que contendrá, por ejemplo, el nombre de la aplicación de la que recopilamos registros.

A la salida de la etapa de filtrado, recibiremos los documentos en una primera aproximación, listos para su visualización en Kibana. Nos faltará lo siguiente:

  • Los campos numéricos serán reconocidos como texto, lo que impide realizar operaciones sobre ellos. Es decir, los campos time-taken Registro de IIS, así como campos recipient-count и total-bites Seguimiento de registros.
  • La marca de tiempo del documento estándar contendrá la hora en que se procesó el registro, no la hora en que se escribió en el lado del servidor.
  • Campo recipient-address se parecerá a una sola obra en construcción, lo que no permite realizar análisis para contar los destinatarios de las cartas.

Es hora de agregar un poco de magia al proceso de procesamiento de registros.

Convertir campos numéricos

El complemento de disección tiene una opción. convert_datatype, que se puede utilizar para convertir un campo de texto a un formato digital. Por ejemplo, así:

dissect {
  …
  convert_datatype => { "time-taken" => "int" }
  …
}

Vale la pena recordar que este método solo es adecuado si el campo contendrá necesariamente una cadena. La opción no procesa valores nulos de los campos y genera una excepción.

Para los registros de seguimiento, es mejor no utilizar un método de conversión similar, ya que los campos recipient-count и total-bites puede estar vacío. Para convertir estos campos es mejor utilizar un complemento. mutar:

mutate {
  convert => [ "total-bytes", "integer" ]
  convert => [ "recipient-count", "integer" ]
}

Dividir dirección_destinatario en destinatarios individuales

Este problema también se puede resolver usando el complemento mutate:

mutate {
  split => ["recipient_address", ";"]
}

Cambiando la marca de tiempo

En el caso de los registros de seguimiento, el problema se resuelve muy fácilmente con el complemento. datos, que te ayudará a escribir en el campo timestamp fecha y hora en el formato requerido del campo date-time:

date {
  match => [ "date-time", "ISO8601" ]
  timezone => "Europe/Moscow"
  remove_field => [ "date-time" ]
}

En el caso de los registros de IIS, necesitaremos combinar datos de campo. date и time usando el complemento mutate, registre la zona horaria que necesitamos y coloque esta marca de tiempo en timestamp usando el complemento de fecha:

mutate { 
  add_field => { "data-time" => "%{date} %{time}" }
  remove_field => [ "date", "time" ]
}
date { 
  match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
  timezone => "UTC"
  remove_field => [ "data-time" ]
}

Salida

La sección de salida se utiliza para enviar registros procesados ​​al receptor de registros. En caso de enviar directamente a Elastic se utiliza un plugin elasticsearch, que especifica la dirección del servidor y la plantilla de nombre de índice para enviar el documento generado:

output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Configuración final

La configuración final se verá así:

input {
  beats {
    port => 5044
  }
}
 
filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
      convert_datatype => { "time-taken" => "int" }
    }
    mutate { 
      add_field => { "data-time" => "%{date} %{time}" }
      remove_field => [ "date", "time" ]
    }
    date { 
      match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
      timezone => "UTC"
      remove_field => [ "data-time" ]
    }
  }
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
    mutate {
      convert => [ "total-bytes", "integer" ]
      convert => [ "recipient-count", "integer" ]
      split => ["recipient_address", ";"]
    }
    date {
      match => [ "date-time", "ISO8601" ]
      timezone => "Europe/Moscow"
      remove_field => [ "date-time" ]
    }
  }
}
 
output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Enlaces de interés:

Fuente: habr.com

Añadir un comentario