Somos amigos de ELK e Exchange. Parte 2

Somos amigos de ELK e Exchange. Parte 2

Sigo a miña historia sobre como facer amigos Exchange e ELK (principio aquí). Permíteme lembrarche que esta combinación é capaz de procesar un número moi grande de rexistros sen dúbida. Nesta ocasión falaremos de como facer que Exchange funcione cos compoñentes de Logstash e Kibana.

Logstash na pila ELK úsase para procesar de xeito intelixente os rexistros e preparalos para a súa colocación en Elastic en forma de documentos, en función dos cales é conveniente construír varias visualizacións en Kibana.

Instalación

Consta de dúas etapas:

  • Instalación e configuración do paquete OpenJDK.
  • Instalación e configuración do paquete Logstash.

Instalación e configuración do paquete OpenJDK

O paquete OpenJDK débese descargar e desempaquetar nun directorio específico. A continuación, debe introducirse o camiño a este directorio nas variables $env:Path e $env:JAVA_HOME do sistema operativo Windows:

Somos amigos de ELK e Exchange. Parte 2

Somos amigos de ELK e Exchange. Parte 2

Comprobamos a versión de Java:

PS C:> java -version
openjdk version "13.0.1" 2019-10-15
OpenJDK Runtime Environment (build 13.0.1+9)
OpenJDK 64-Bit Server VM (build 13.0.1+9, mixed mode, sharing)

Instalación e configuración do paquete Logstash

Descarga o ficheiro de arquivo coa distribución Logstash por iso. O arquivo debe estar desempaquetado na raíz do disco. Descomprimir no cartafol C:Program Files Non paga a pena, Logstash rexeitará comezar normalmente. A continuación, cómpre entrar no ficheiro jvm.options correccións responsables de asignar RAM para o proceso Java. Recomendo especificar a metade da memoria RAM do servidor. Se ten 16 GB de RAM a bordo, as teclas predeterminadas son:

-Xms1g
-Xmx1g

debe substituírse por:

-Xms8g
-Xmx8g

Ademais, é recomendable comentar a liña -XX:+UseConcMarkSweepGC. Máis sobre isto aquí. O seguinte paso é crear unha configuración predeterminada no ficheiro logstash.conf:

input {
 stdin{}
}
 
filter {
}
 
output {
 stdout {
 codec => "rubydebug"
 }
}

Con esta configuración, Logstash le os datos da consola, pásaos a través dun filtro baleiro e envíaos de novo á consola. Usando esta configuración probarase a funcionalidade de Logstash. Para facelo, executémolo en modo interactivo:

PS C:...bin> .logstash.bat -f .logstash.conf
...
[2019-12-19T11:15:27,769][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2019-12-19T11:15:27,847][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-12-19T11:15:28,113][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

Logstash lanzouse con éxito no porto 9600.

O paso final da instalación: inicie Logstash como un servizo de Windows. Isto pódese facer, por exemplo, usando o paquete NSSM:

PS C:...bin> .nssm.exe install logstash
Service "logstash" installed successfully!

tolerancia a fallos

A seguridade dos rexistros cando se transfiren desde o servidor de orixe está garantida polo mecanismo de colas persistentes.

Como funciona

A disposición das filas durante o procesamento do rexistro é: entrada → cola → filtro + saída.

O complemento de entrada recibe datos dunha fonte de rexistro, escríbeos nunha cola e envía a confirmación de que os datos foron recibidos á fonte.

As mensaxes da cola son procesadas por Logstash, pasadas polo filtro e polo complemento de saída. Ao recibir a confirmación da saída de que o rexistro foi enviado, Logstash elimina o rexistro procesado da cola. Se Logstash se detén, todas as mensaxes e mensaxes sen procesar das que non se recibiu ningunha confirmación permanecen na cola e Logstash seguirá procesándoas a próxima vez que se inicie.

axuste

Axustable mediante teclas no ficheiro C:Logstashconfiglogstash.yml:

  • queue.type: (valores posibles - persisted и memory (default)).
  • path.queue: (camiño ao cartafol cos ficheiros da cola, que se almacenan en C:Logstashqueue por defecto).
  • queue.page_capacity: (tamaño máximo da páxina da cola, o valor predeterminado é 64 MB).
  • queue.drain: (verdadeiro/falso: activa/desactiva a detención do procesamento da fila antes de apagar Logstash. Non recomendo activalo, porque isto afectará directamente á velocidade de apagado do servidor).
  • queue.max_events: (número máximo de eventos na cola, o valor predeterminado é 0 (ilimitado)).
  • queue.max_bytes: (tamaño máximo da cola en bytes, por defecto - 1024 MB (1 gb)).

Se está configurado queue.max_events и queue.max_bytes, entón as mensaxes deixan de ser aceptadas na cola cando se alcanza o valor de calquera destas opcións. Máis información sobre as colas persistentes aquí.

Un exemplo da parte de logstash.yml responsable de configurar a cola:

queue.type: persisted
queue.max_bytes: 10gb

axuste

A configuración de Logstash normalmente consta de tres partes, responsables das diferentes fases de procesamento dos rexistros entrantes: recepción (sección de entrada), análise (sección de filtro) e envío a Elastic (sección de saída). A continuación analizaremos cada un deles con máis detalle.

Entrada

Recibimos o fluxo entrante con rexistros en bruto dos axentes filebeat. É este complemento o que indicamos na sección de entrada:

input {
  beats {
    port => 5044
  }
}

Despois desta configuración, Logstash comeza a escoitar o porto 5044 e, ao recibir rexistros, procesaos segundo a configuración da sección de filtros. Se é necesario, pode envolver a canle para recibir rexistros de filebit en SSL. Lea máis sobre a configuración do complemento beats aquí.

filtro

Todos os rexistros de texto que xera Exchange que sexan interesantes para o seu procesamento están en formato csv cos campos descritos no propio ficheiro de rexistro. Para analizar rexistros csv, Logstash ofrécenos tres complementos: diseccionar, csv e grok. O primeiro é o máis rápido, pero trata de analizar só os rexistros máis sinxelos.
Por exemplo, dividirá o seguinte rexistro en dous (debido á presenza dunha coma dentro do campo), polo que o rexistro será analizado incorrectamente:

…,"MDB:GUID1, Mailbox:GUID2, Event:526545791, MessageClass:IPM.Note, CreationTime:2020-05-15T12:01:56.457Z, ClientType:MOMT, SubmissionAssistant:MailboxTransportSubmissionEmailAssistant",…

Pódese usar ao analizar rexistros, por exemplo, IIS. Neste caso, a sección de filtro pode verse así:

filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
    }
  }
} 

A configuración de Logstash permítelle usar enunciados condicionais, polo que só podemos enviar rexistros que foron etiquetados coa etiqueta filebeat ao complemento dissect IIS. Dentro do complemento relacionamos os valores dos campos cos seus nomes, eliminamos o campo orixinal message, que contiña unha entrada do rexistro, e podemos engadir un campo personalizado que conterá, por exemplo, o nome da aplicación da que recollemos os rexistros.

No caso dos rexistros de seguimento, é mellor usar o complemento csv; pode procesar correctamente campos complexos:

filter {
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
}

Dentro do complemento relacionamos os valores dos campos cos seus nomes, eliminamos o campo orixinal message (e tamén campos tenant-id и schema-version), que contiña unha entrada do rexistro, e podemos engadir un campo personalizado, que conterá, por exemplo, o nome da aplicación da que recollemos os rexistros.

Á saída da fase de filtrado, recibiremos documentos nunha primeira aproximación, listos para a súa visualización en Kibana. Botaremos en falta o seguinte:

  • Os campos numéricos recoñeceranse como texto, o que impide operacións neles. É dicir, os campos time-taken Rexistro de IIS, así como campos recipient-count и total-bites Seguimento de rexistros.
  • A marca de tempo estándar do documento conterá a hora en que se procesou o rexistro, non a hora en que se escribiu no servidor.
  • Campo recipient-address parecerá unha obra de construción, que non permite a análise para contar os destinatarios das cartas.

É hora de engadir un pouco de maxia ao proceso de procesamento do rexistro.

Conversión de campos numéricos

O complemento de disección ten unha opción convert_datatype, que se pode usar para converter un campo de texto a un formato dixital. Por exemplo, así:

dissect {
  …
  convert_datatype => { "time-taken" => "int" }
  …
}

Paga a pena lembrar que este método só é adecuado se o campo definitivamente conterá unha cadea. A opción non procesa valores nulos dos campos e xera unha excepción.

Para rastrexar rexistros, é mellor non usar un método de conversión similar, xa que os campos recipient-count и total-bites pode estar baleiro. Para converter estes campos é mellor usar un complemento mutar:

mutate {
  convert => [ "total-bytes", "integer" ]
  convert => [ "recipient-count", "integer" ]
}

Dividir destinatario_enderezo en destinatarios individuais

Este problema tamén se pode resolver usando o complemento de mutación:

mutate {
  split => ["recipient_address", ";"]
}

Cambiando a marca de tempo

No caso dos rexistros de seguimento, o problema resólvese moi facilmente polo complemento data, que che axudará a escribir no campo timestamp data e hora no formato requirido no campo date-time:

date {
  match => [ "date-time", "ISO8601" ]
  timezone => "Europe/Moscow"
  remove_field => [ "date-time" ]
}

No caso dos rexistros de IIS, necesitaremos combinar os datos de campo date и time usando o complemento de mutación, rexistre a zona horaria que necesitamos e coloque esta marca de tempo timestamp usando o complemento de data:

mutate { 
  add_field => { "data-time" => "%{date} %{time}" }
  remove_field => [ "date", "time" ]
}
date { 
  match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
  timezone => "UTC"
  remove_field => [ "data-time" ]
}

Saída

A sección de saída úsase para enviar rexistros procesados ​​ao receptor de rexistros. No caso de enviar directamente a Elastic, utilízase un complemento busca elástica, que especifica o enderezo do servidor e o modelo de nome de índice para enviar o documento xerado:

output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Configuración final

A configuración final será así:

input {
  beats {
    port => 5044
  }
}
 
filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
      convert_datatype => { "time-taken" => "int" }
    }
    mutate { 
      add_field => { "data-time" => "%{date} %{time}" }
      remove_field => [ "date", "time" ]
    }
    date { 
      match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
      timezone => "UTC"
      remove_field => [ "data-time" ]
    }
  }
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
    mutate {
      convert => [ "total-bytes", "integer" ]
      convert => [ "recipient-count", "integer" ]
      split => ["recipient_address", ";"]
    }
    date {
      match => [ "date-time", "ISO8601" ]
      timezone => "Europe/Moscow"
      remove_field => [ "date-time" ]
    }
  }
}
 
output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Ligazóns útiles:

Fonte: www.habr.com

Engadir un comentario