
Continuo minha história sobre como fazer amigos Exchange e ELK (começando ). Deixe-me lembrá-lo de que esta combinação é capaz de processar um grande número de logs sem hesitação. Desta vez falaremos sobre como fazer o Exchange funcionar com componentes Logstash e Kibana.
O Logstash na pilha ELK é usado para processar logs de forma inteligente e prepará-los para colocação no Elastic na forma de documentos, com base nos quais é conveniente construir várias visualizações no Kibana.
Instalação
Consiste em duas etapas:
- Instalando e configurando o pacote OpenJDK.
- Instalando e configurando o pacote Logstash.
Instalando e configurando o pacote OpenJDK
O pacote OpenJDK deve ser baixado e descompactado em um diretório específico. Em seguida, o caminho para esse diretório deve ser adicionado às variáveis de ambiente $env:Path e $env:JAVA_HOME do sistema operacional. Windows:


Vamos verificar a versão do Java:
PS C:> java -version
openjdk version "13.0.1" 2019-10-15
OpenJDK Runtime Environment (build 13.0.1+9)
OpenJDK 64-Bit Server VM (build 13.0.1+9, mixed mode, sharing)
Instalando e configurando o pacote Logstash
Baixe o arquivo com a distribuição Logstash . O arquivo deve ser descompactado na raiz do disco. Descompacte para a pasta C:Program Files Não vale a pena, o Logstash se recusará a iniciar normalmente. Então você precisa entrar no arquivo jvm.options correções responsáveis pela alocação de RAM para o processo Java. Recomendo especificar metade da RAM do servidor. Se tiver 16 GB de RAM integrado, as chaves padrão são:
-Xms1g
-Xmx1g
deve ser substituído por:
-Xms8g
-Xmx8g
Além disso, é aconselhável comentar a linha -XX:+UseConcMarkSweepGC. mais sobre isso . A próxima etapa é criar uma configuração padrão no arquivo logstash.conf:
input {
stdin{}
}
filter {
}
output {
stdout {
codec => "rubydebug"
}
}
Com esta configuração, o Logstash lê os dados do console, passa-os por um filtro vazio e os envia de volta para o console. Usar esta configuração testará a funcionalidade do Logstash. Para fazer isso, vamos executá-lo em modo interativo:
PS C:...bin> .logstash.bat -f .logstash.conf
...
[2019-12-19T11:15:27,769][INFO ][logstash.javapipeline ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2019-12-19T11:15:27,847][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-12-19T11:15:28,113][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
Logstash iniciado com sucesso na porta 9600.
A etapa final da instalação: executar o Logstash como um serviço. WindowsIsso pode ser feito, por exemplo, usando o pacote :
PS C:...bin> .nssm.exe install logstash
Service "logstash" installed successfully!
tolerância ao erro
A segurança dos logs quando transferidos do servidor de origem é garantida pelo mecanismo Filas Persistentes.
Como funciona
O layout das filas durante o processamento do log é: entrada → fila → filtro + saída.
O plugin de entrada recebe dados de uma fonte de log, grava-os em uma fila e envia a confirmação de que os dados foram recebidos para a fonte.
As mensagens da fila são processadas pelo Logstash, passadas pelo filtro e pelo plugin de saída. Ao receber a confirmação da saída de que o log foi enviado, o Logstash remove o log processado da fila. Se o Logstash parar, todas as mensagens não processadas e as mensagens para as quais nenhuma confirmação foi recebida permanecerão na fila e o Logstash continuará a processá-las na próxima vez que for iniciado.
Fixação
Ajustável por chaves no arquivo C:Logstashconfiglogstash.yml:
queue.type: (valores possíveis -persistedиmemory (default)).path.queue: (caminho para a pasta com os arquivos da fila, que são armazenados em C:Logstashqueue por padrão).queue.page_capacity: (tamanho máximo da página da fila, o valor padrão é 64 MB).queue.drain: (verdadeiro/falso - ativa/desativa a interrupção do processamento da fila antes de desligar o Logstash. Não recomendo habilitá-lo, pois isso afetará diretamente a velocidade de desligamento do servidor).queue.max_events: (número máximo de eventos na fila, o padrão é 0 (ilimitado)).queue.max_bytes: (tamanho máximo da fila em bytes, padrão - 1024 MB (1 GB)).
Se configurado queue.max_events и queue.max_bytes, as mensagens deixarão de ser aceitas na fila quando o valor de qualquer uma dessas configurações for atingido. Saiba mais sobre filas persistentes .
Um exemplo da parte do logstash.yml responsável por configurar a fila:
queue.type: persisted
queue.max_bytes: 10gb
Fixação
A configuração do Logstash geralmente consiste em três partes, responsáveis por diferentes fases de processamento dos logs de entrada: recebimento (seção de entrada), análise (seção de filtro) e envio ao Elastic (seção de saída). A seguir veremos mais de perto cada um deles.
Entrada
Recebemos o fluxo de entrada com logs brutos dos agentes filebeat. É este plugin que indicamos na seção de entrada:
input {
beats {
port => 5044
}
}
Após esta configuração, o Logstash passa a escutar a porta 5044, e ao receber os logs, os processa de acordo com as configurações da seção de filtros. Se necessário, você pode encapsular o canal para receber logs do filebit em SSL. Leia mais sobre as configurações do plugin Beats .
Filtrar
Todos os logs de texto de interesse para processamento gerados pelo Exchange estão no formato csv com os campos descritos no próprio arquivo de log. Para analisar registros csv, o Logstash nos oferece três plug-ins: , csv e grok. O primeiro é o mais , mas consegue analisar apenas os logs mais simples.
Por exemplo, irá dividir o seguinte registro em dois (devido à presença de uma vírgula dentro do campo), razão pela qual o log será analisado incorretamente:
…,"MDB:GUID1, Mailbox:GUID2, Event:526545791, MessageClass:IPM.Note, CreationTime:2020-05-15T12:01:56.457Z, ClientType:MOMT, SubmissionAssistant:MailboxTransportSubmissionEmailAssistant",…
Pode ser usado ao analisar logs, por exemplo, IIS. Nesse caso, a seção de filtro pode ter esta aparência:
filter {
if "IIS" in [tags] {
dissect {
mapping => {
"message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
}
remove_field => ["message"]
add_field => { "application" => "exchange" }
}
}
}
A configuração do Logstash permite que você use , portanto, só podemos enviar logs que foram marcados com a tag filebeat para o plugin dissect IIS. Dentro do plugin combinamos os valores dos campos com seus nomes, excluímos o campo original message, que continha uma entrada do log, e podemos adicionar um campo personalizado que conterá, por exemplo, o nome do aplicativo do qual coletamos os logs.
No caso de logs de rastreamento, é melhor usar o plugin csv, pois ele pode processar corretamente campos complexos:
filter {
if "Tracking" in [tags] {
csv {
columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
remove_field => ["message", "tenant-id", "schema-version"]
add_field => { "application" => "exchange" }
}
}
Dentro do plugin combinamos os valores dos campos com seus nomes, excluímos o campo original message (e também campos tenant-id и schema-version), que continha uma entrada do log, e podemos adicionar um campo personalizado, que conterá, por exemplo, o nome do aplicativo do qual coletamos os logs.
Ao sair da etapa de filtragem, receberemos os documentos em primeira aproximação, prontos para visualização no Kibana. Sentiremos falta do seguinte:
- Os campos numéricos serão reconhecidos como texto, o que impede operações neles. Ou seja, os campos
time-takenLog do IIS, bem como camposrecipient-countиtotal-bitesRastreamento de registros. - O carimbo de data/hora padrão do documento conterá a hora em que o log foi processado, não a hora em que foi gravado no servidor.
- Campo
recipient-addressparecerá um canteiro de obras, o que não permite a análise para contar os destinatários das cartas.
É hora de adicionar um pouco de mágica ao processo de processamento de log.
Convertendo campos numéricos
O plugin dissecar tem uma opção convert_datatype, que pode ser usado para converter um campo de texto em formato digital. Por exemplo, assim:
dissect {
…
convert_datatype => { "time-taken" => "int" }
…
}
Vale lembrar que este método só é adequado se o campo contiver definitivamente uma string. A opção não processa valores nulos dos campos e lança uma exceção.
Para logs de rastreamento, é melhor não usar um método de conversão semelhante, pois os campos recipient-count и total-bites pode estar vazio. Para converter esses campos é melhor usar um plugin :
mutate {
convert => [ "total-bytes", "integer" ]
convert => [ "recipient-count", "integer" ]
}
Dividindo destinatário_address em destinatários individuais
Este problema também pode ser resolvido usando o plugin mutate:
mutate {
split => ["recipient_address", ";"]
}
Alterando o carimbo de data/hora
No caso de logs de rastreamento, o problema é facilmente resolvido pelo plugin , que o ajudará a escrever no campo timestamp data e hora no formato exigido no campo date-time:
date {
match => [ "date-time", "ISO8601" ]
timezone => "Europe/Moscow"
remove_field => [ "date-time" ]
}
No caso de logs do IIS, precisaremos combinar os dados do campo date и time usando o plugin mutate, registre o fuso horário que precisamos e coloque esse carimbo de hora em timestamp usando o plugin de data:
mutate {
add_field => { "data-time" => "%{date} %{time}" }
remove_field => [ "date", "time" ]
}
date {
match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
timezone => "UTC"
remove_field => [ "data-time" ]
}
saída
A seção de saída é usada para enviar logs processados ao receptor de log. No caso de enviar diretamente para a Elastic, é utilizado um plugin , que especifica o endereço do servidor e o modelo de nome de índice para enviar o documento gerado:
output {
elasticsearch {
hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
manage_template => false
index => "Exchange-%{+YYYY.MM.dd}"
}
}
Configuração final
A configuração final ficará assim:
input {
beats {
port => 5044
}
}
filter {
if "IIS" in [tags] {
dissect {
mapping => {
"message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
}
remove_field => ["message"]
add_field => { "application" => "exchange" }
convert_datatype => { "time-taken" => "int" }
}
mutate {
add_field => { "data-time" => "%{date} %{time}" }
remove_field => [ "date", "time" ]
}
date {
match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
timezone => "UTC"
remove_field => [ "data-time" ]
}
}
if "Tracking" in [tags] {
csv {
columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
remove_field => ["message", "tenant-id", "schema-version"]
add_field => { "application" => "exchange" }
}
mutate {
convert => [ "total-bytes", "integer" ]
convert => [ "recipient-count", "integer" ]
split => ["recipient_address", ";"]
}
date {
match => [ "date-time", "ISO8601" ]
timezone => "Europe/Moscow"
remove_field => [ "date-time" ]
}
}
}
output {
elasticsearch {
hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
manage_template => false
index => "Exchange-%{+YYYY.MM.dd}"
}
}
Links úteis:
Fonte: habr.com
