Somos amigos do ELK e do Exchange. Parte 2

Somos amigos do ELK e do Exchange. Parte 2

Continuo minha história sobre como fazer amigos Exchange e ELK (começando aqui). Deixe-me lembrá-lo de que esta combinação é capaz de processar um grande número de logs sem hesitação. Desta vez falaremos sobre como fazer o Exchange funcionar com componentes Logstash e Kibana.

O Logstash na pilha ELK é usado para processar logs de forma inteligente e prepará-los para colocação no Elastic na forma de documentos, com base nos quais é conveniente construir várias visualizações no Kibana.

Instalação

Consiste em duas etapas:

  • Instalando e configurando o pacote OpenJDK.
  • Instalando e configurando o pacote Logstash.

Instalando e configurando o pacote OpenJDK

O pacote OpenJDK deve ser baixado e descompactado em um diretório específico. Em seguida, o caminho para este diretório deve ser inserido nas variáveis ​​$env:Path e $env:JAVA_HOME do sistema operacional Windows:

Somos amigos do ELK e do Exchange. Parte 2

Somos amigos do ELK e do Exchange. Parte 2

Vamos verificar a versão do Java:

PS C:> java -version
openjdk version "13.0.1" 2019-10-15
OpenJDK Runtime Environment (build 13.0.1+9)
OpenJDK 64-Bit Server VM (build 13.0.1+9, mixed mode, sharing)

Instalando e configurando o pacote Logstash

Baixe o arquivo com a distribuição Logstash por isso. O arquivo deve ser descompactado na raiz do disco. Descompacte para a pasta C:Program Files Não vale a pena, o Logstash se recusará a iniciar normalmente. Então você precisa entrar no arquivo jvm.options correções responsáveis ​​pela alocação de RAM para o processo Java. Recomendo especificar metade da RAM do servidor. Se tiver 16 GB de RAM integrado, as chaves padrão são:

-Xms1g
-Xmx1g

deve ser substituído por:

-Xms8g
-Xmx8g

Além disso, é aconselhável comentar a linha -XX:+UseConcMarkSweepGC. mais sobre isso aqui. A próxima etapa é criar uma configuração padrão no arquivo logstash.conf:

input {
 stdin{}
}
 
filter {
}
 
output {
 stdout {
 codec => "rubydebug"
 }
}

Com esta configuração, o Logstash lê os dados do console, passa-os por um filtro vazio e os envia de volta para o console. Usar esta configuração testará a funcionalidade do Logstash. Para fazer isso, vamos executá-lo em modo interativo:

PS C:...bin> .logstash.bat -f .logstash.conf
...
[2019-12-19T11:15:27,769][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2019-12-19T11:15:27,847][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-12-19T11:15:28,113][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

Logstash iniciado com sucesso na porta 9600.

A etapa final da instalação: inicie o Logstash como um serviço do Windows. Isso pode ser feito, por exemplo, usando o pacote NSSM:

PS C:...bin> .nssm.exe install logstash
Service "logstash" installed successfully!

tolerância ao erro

A segurança dos logs quando transferidos do servidor de origem é garantida pelo mecanismo Filas Persistentes.

Como funciona

O layout das filas durante o processamento do log é: entrada → fila → filtro + saída.

O plugin de entrada recebe dados de uma fonte de log, grava-os em uma fila e envia a confirmação de que os dados foram recebidos para a fonte.

As mensagens da fila são processadas pelo Logstash, passadas pelo filtro e pelo plugin de saída. Ao receber a confirmação da saída de que o log foi enviado, o Logstash remove o log processado da fila. Se o Logstash parar, todas as mensagens não processadas e as mensagens para as quais nenhuma confirmação foi recebida permanecerão na fila e o Logstash continuará a processá-las na próxima vez que for iniciado.

Fixação

Ajustável por chaves no arquivo C:Logstashconfiglogstash.yml:

  • queue.type: (valores possíveis - persisted и memory (default)).
  • path.queue: (caminho para a pasta com os arquivos da fila, que são armazenados em C:Logstashqueue por padrão).
  • queue.page_capacity: (tamanho máximo da página da fila, o valor padrão é 64 MB).
  • queue.drain: (verdadeiro/falso - ativa/desativa a interrupção do processamento da fila antes de desligar o Logstash. Não recomendo habilitá-lo, pois isso afetará diretamente a velocidade de desligamento do servidor).
  • queue.max_events: (número máximo de eventos na fila, o padrão é 0 (ilimitado)).
  • queue.max_bytes: (tamanho máximo da fila em bytes, padrão - 1024 MB (1 GB)).

Se configurado queue.max_events и queue.max_bytes, as mensagens deixarão de ser aceitas na fila quando o valor de qualquer uma dessas configurações for atingido. Saiba mais sobre filas persistentes aqui.

Um exemplo da parte do logstash.yml responsável por configurar a fila:

queue.type: persisted
queue.max_bytes: 10gb

Fixação

A configuração do Logstash geralmente consiste em três partes, responsáveis ​​por diferentes fases de processamento dos logs de entrada: recebimento (seção de entrada), análise (seção de filtro) e envio ao Elastic (seção de saída). A seguir veremos mais de perto cada um deles.

Entrada

Recebemos o fluxo de entrada com logs brutos dos agentes filebeat. É este plugin que indicamos na seção de entrada:

input {
  beats {
    port => 5044
  }
}

Após esta configuração, o Logstash passa a escutar a porta 5044, e ao receber os logs, os processa de acordo com as configurações da seção de filtros. Se necessário, você pode encapsular o canal para receber logs do filebit em SSL. Leia mais sobre as configurações do plugin Beats aqui.

filtros

Todos os logs de texto de interesse para processamento gerados pelo Exchange estão no formato csv com os campos descritos no próprio arquivo de log. Para analisar registros csv, o Logstash nos oferece três plug-ins: dissecar, csv e grok. O primeiro é o mais rápido, mas consegue analisar apenas os logs mais simples.
Por exemplo, irá dividir o seguinte registro em dois (devido à presença de uma vírgula dentro do campo), razão pela qual o log será analisado incorretamente:

…,"MDB:GUID1, Mailbox:GUID2, Event:526545791, MessageClass:IPM.Note, CreationTime:2020-05-15T12:01:56.457Z, ClientType:MOMT, SubmissionAssistant:MailboxTransportSubmissionEmailAssistant",…

Pode ser usado ao analisar logs, por exemplo, IIS. Nesse caso, a seção de filtro pode ter esta aparência:

filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
    }
  }
} 

A configuração do Logstash permite que você use declarações condicionais, portanto, só podemos enviar logs que foram marcados com a tag filebeat para o plugin dissect IIS. Dentro do plugin combinamos os valores dos campos com seus nomes, excluímos o campo original message, que continha uma entrada do log, e podemos adicionar um campo personalizado que conterá, por exemplo, o nome do aplicativo do qual coletamos os logs.

No caso de logs de rastreamento, é melhor usar o plugin csv, pois ele pode processar corretamente campos complexos:

filter {
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
}

Dentro do plugin combinamos os valores dos campos com seus nomes, excluímos o campo original message (e também campos tenant-id и schema-version), que continha uma entrada do log, e podemos adicionar um campo personalizado, que conterá, por exemplo, o nome do aplicativo do qual coletamos os logs.

Ao sair da etapa de filtragem, receberemos os documentos em primeira aproximação, prontos para visualização no Kibana. Sentiremos falta do seguinte:

  • Os campos numéricos serão reconhecidos como texto, o que impede operações neles. Ou seja, os campos time-taken Log do IIS, bem como campos recipient-count и total-bites Rastreamento de registros.
  • O carimbo de data/hora padrão do documento conterá a hora em que o log foi processado, não a hora em que foi gravado no servidor.
  • Campo recipient-address parecerá um canteiro de obras, o que não permite a análise para contar os destinatários das cartas.

É hora de adicionar um pouco de mágica ao processo de processamento de log.

Convertendo campos numéricos

O plugin dissecar tem uma opção convert_datatype, que pode ser usado para converter um campo de texto em formato digital. Por exemplo, assim:

dissect {
  …
  convert_datatype => { "time-taken" => "int" }
  …
}

Vale lembrar que este método só é adequado se o campo contiver definitivamente uma string. A opção não processa valores nulos dos campos e lança uma exceção.

Para logs de rastreamento, é melhor não usar um método de conversão semelhante, pois os campos recipient-count и total-bites pode estar vazio. Para converter esses campos é melhor usar um plugin mudado:

mutate {
  convert => [ "total-bytes", "integer" ]
  convert => [ "recipient-count", "integer" ]
}

Dividindo destinatário_address em destinatários individuais

Este problema também pode ser resolvido usando o plugin mutate:

mutate {
  split => ["recipient_address", ";"]
}

Alterando o carimbo de data/hora

No caso de logs de rastreamento, o problema é facilmente resolvido pelo plugin dados, que o ajudará a escrever no campo timestamp data e hora no formato exigido no campo date-time:

date {
  match => [ "date-time", "ISO8601" ]
  timezone => "Europe/Moscow"
  remove_field => [ "date-time" ]
}

No caso de logs do IIS, precisaremos combinar os dados do campo date и time usando o plugin mutate, registre o fuso horário que precisamos e coloque esse carimbo de hora em timestamp usando o plugin de data:

mutate { 
  add_field => { "data-time" => "%{date} %{time}" }
  remove_field => [ "date", "time" ]
}
date { 
  match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
  timezone => "UTC"
  remove_field => [ "data-time" ]
}

saída

A seção de saída é usada para enviar logs processados ​​ao receptor de log. No caso de enviar diretamente para a Elastic, é utilizado um plugin elasticsearch, que especifica o endereço do servidor e o modelo de nome de índice para enviar o documento gerado:

output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Configuração final

A configuração final ficará assim:

input {
  beats {
    port => 5044
  }
}
 
filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
      convert_datatype => { "time-taken" => "int" }
    }
    mutate { 
      add_field => { "data-time" => "%{date} %{time}" }
      remove_field => [ "date", "time" ]
    }
    date { 
      match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
      timezone => "UTC"
      remove_field => [ "data-time" ]
    }
  }
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
    mutate {
      convert => [ "total-bytes", "integer" ]
      convert => [ "recipient-count", "integer" ]
      split => ["recipient_address", ";"]
    }
    date {
      match => [ "date-time", "ISO8601" ]
      timezone => "Europe/Moscow"
      remove_field => [ "date-time" ]
    }
  }
}
 
output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Links úteis:

Fonte: habr.com

Adicionar um comentário