Aplicação prática do ELK. Configurando logstash

Introdução

Ao implantar outro sistema, enfrentamos a necessidade de processar um grande número de vários logs. ELK foi escolhido como o instrumento. Este artigo falará sobre nossa experiência na configuração dessa pilha.

Não estabelecemos uma meta para descrever todas as suas capacidades, mas queremos nos concentrar na resolução de problemas práticos. Isso se deve ao fato de que com uma quantidade suficientemente grande de documentação e imagens prontas, existem muitas armadilhas, pelo menos as encontramos.

Implantamos a pilha por meio do docker-compose. Além disso, tínhamos um docker-compose.yml bem escrito que nos permitia aumentar a pilha quase sem problemas. E parecia que a vitória já estava próxima, agora vamos torcer um pouco para atender às nossas necessidades e pronto.

Infelizmente, uma tentativa de ajustar o sistema para receber e processar logs de nosso aplicativo não foi bem-sucedida logo de cara. Portanto, decidimos que vale a pena estudar cada componente separadamente e depois retornar às suas conexões.

Então, vamos começar com o logstash.

Ambiente, implantação, execução do Logstash em um contêiner

Para implantação, utilizamos o docker-compose, os experimentos descritos aqui foram realizados em MacOS e Ubuntu 18.0.4.

A imagem logstash que tínhamos em nosso docker-compose.yml original é docker.elastic.co/logstash/logstash:6.3.2

Vamos usá-lo para experimentos.

Para executar o logstash, escrevemos um docker-compose.yml separado. Claro, foi possível lançar a imagem da linha de comando, mas afinal resolvemos uma tarefa específica, onde tudo do docker-compose é lançado para nós.

Resumidamente sobre arquivos de configuração

Conforme segue a descrição, o logstash pode ser executado como para um canal, neste caso, ele precisa transferir o arquivo *.conf ou para vários canais, caso em que precisa transferir o arquivo pipelines.yml, que, por sua vez, , fará referência aos arquivos .conf para cada canal.
Pegamos o segundo caminho. Pareceu-nos mais versátil e escalável. Portanto, criamos pipelines.yml e fizemos um diretório de pipelines no qual colocaremos arquivos .conf para cada canal.

Dentro do contêiner há outro arquivo de configuração - logstash.yml. Não o tocamos, usamos como está.

Portanto, nossa estrutura de diretórios é:

Aplicação prática do ELK. Configurando logstash

Por enquanto, assumimos que é tcp na porta 5046 para receber dados de entrada e usaremos stdout para saída.

Aqui está uma configuração tão simples para a primeira execução. Porque a tarefa inicial é lançar.

Portanto, temos este docker-compose.yml

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      	- elk
    ports:
      	- 5046:5046
    volumes:
      	- ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
	- ./config/pipelines:/usr/share/logstash/config/pipelines:ro

O que vemos aqui?

  1. Redes e volumes foram retirados do docker-compose.yml original (aquele onde toda a pilha é iniciada) e acho que eles não afetam muito a imagem geral aqui.
  2. Criamos um logstash de serviço (serviços) a partir da imagem docker.elastic.co/logstash/logstash:6.3.2 e damos a ele o nome logstash_one_channel.
  3. Estamos encaminhando a porta 5046 dentro do container, para a mesma porta interna.
  4. Mapeamos nosso arquivo de configuração de canal ./config/pipelines.yml para o arquivo /usr/share/logstash/config/pipelines.yml dentro do contêiner, onde o logstash o selecionará e o tornará somente leitura, apenas por precaução.
  5. Mapeamos o diretório ./config/pipelines, onde temos os arquivos de configuração do pipe, para o diretório /usr/share/logstash/config/pipelines e também o tornamos somente leitura.

Aplicação prática do ELK. Configurando logstash

arquivo piping.yml

- pipeline.id: HABR
  pipeline.workers: 1
  pipeline.batch.size: 1
  path.config: "./config/pipelines/habr_pipeline.conf"

Ele descreve um canal com o identificador HABR e o caminho para seu arquivo de configuração.

E finalmente o arquivo "./config/pipelines/habr_pipeline.conf"

input {
  tcp {
    port => "5046"
   }
  }
filter {
  mutate {
    add_field => [ "habra_field", "Hello Habr" ]
    }
  }
output {
  stdout {
      
    }
  }

Não entraremos em sua descrição por enquanto, tentamos executar:

docker-compose up

O que vemos?

O contêiner foi iniciado. Podemos verificar seu funcionamento:

echo '13123123123123123123123213123213' | nc localhost 5046

E vemos a resposta no console do contêiner:

Aplicação prática do ELK. Configurando logstash

Mas, ao mesmo tempo, também vemos:

logstash_one_channel | [2019-04-29T11:28:59,790][ERROR][logstash.licensechecker.licensereader] Não é possível recuperar informações de licença do servidor de licenças {:message=>"Elasticsearch Unreachable: [http://elasticsearch:9200/][Manticore ::ResolutionFailure]elasticsearch", ...

logstash_one_channel | [2019-04-29T11:28:59,894][INFO ][logstash.pipeline] Pipeline iniciado com sucesso {:pipeline_id=>".monitoring-logstash", :thread=>"# »}

logstash_one_channel | [2019-04-29T11:28:59,988][INFO ][logstash.agent] Pipelines em execução {:count=>2, :running_pipelines=>[:HABR, :.monitoring-logstash"], :non_running_pipelines=>[ ]}
logstash_one_channel | [2019-04-29T11:29:00,015][ERROR][logstash.inputs.metrics] O X-Pack está instalado no Logstash, mas não no Elasticsearch. Instale o X-Pack no Elasticsearch para usar o recurso de monitoramento. Outros recursos podem estar disponíveis.
logstash_one_channel | [2019-04-29T11:29:00,526][INFO ][logstash.agent] Logstash API endpoint {:port=>9600} iniciado com sucesso
logstash_one_channel | [2019-04-29T11:29:04,478][INFO ][logstash.outputs.elasticsearch] Executando verificação de integridade para ver se uma conexão Elasticsearch está funcionando {:healthcheck_url=>http://elasticsearch:9200/, :path=> "/"}
logstash_one_channel | [2019-04-29T11:29:04,487][WARN ][logstash.outputs.elasticsearch] Tentativa de ressuscitar a conexão com a instância ES morta, mas ocorreu um erro. {:url=>"elasticsearch:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch Unreachable: [http://elasticsearch:9200/][Manticore::ResolutionFailure] elasticsearch"}
logstash_one_channel | [2019-04-29T11:29:04,704][INFO ][logstash.licensechecker.licensereader] Executando verificação de integridade para ver se uma conexão Elasticsearch está funcionando {:healthcheck_url=>http://elasticsearch:9200/, :path=> "/"}
logstash_one_channel | [2019-04-29T11:29:04,710][WARN ][logstash.licensechecker.licensereader] Tentativa de ressuscitar a conexão com a instância ES morta, mas ocorreu um erro. {:url=>"elasticsearch:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch Unreachable: [http://elasticsearch:9200/][Manticore::ResolutionFailure] elasticsearch"}

E nosso tronco rasteja o tempo todo.

Aqui destaquei em verde a mensagem de que o pipeline iniciou com sucesso, em vermelho a mensagem de erro e em amarelo a mensagem sobre tentativa de contato elasticsearch: 9200.
Isso acontece devido ao fato de que no logstash.conf incluído na imagem, há uma verificação da disponibilidade do elasticsearch. Afinal, o logstash assume que funciona como parte da pilha Elk e nós o separamos.

Você pode trabalhar, mas não é conveniente.

A solução é desabilitar essa verificação por meio da variável de ambiente XPACK_MONITORING_ENABLED.

Vamos fazer uma alteração em docker-compose.yml e executá-lo novamente:

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro

Agora, está tudo bem. O contêiner está pronto para experimentos.

Podemos digitar novamente no console adjacente:

echo '13123123123123123123123213123213' | nc localhost 5046

E veja:

logstash_one_channel | {
logstash_one_channel |         "message" => "13123123123123123123123213123213",
logstash_one_channel |      "@timestamp" => 2019-04-29T11:43:44.582Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |            "host" => "gateway",
logstash_one_channel |            "port" => 49418
logstash_one_channel | }

Trabalhe em um canal

Então, começamos. Agora você pode reservar um tempo para configurar o logstash diretamente. Não vamos tocar no arquivo pipelines.yml por enquanto, vamos ver o que podemos obter trabalhando com um canal.

Devo dizer que o princípio geral de trabalhar com o arquivo de configuração do canal está bem descrito no manual oficial, aqui aqui
Se você quiser ler em russo, usamos este um artigo(mas a sintaxe da consulta é antiga aí, você precisa levar isso em conta).

Vamos sequencialmente da seção de entrada. Já vimos o trabalho no tcp. O que mais pode ser interessante aqui?

Testar mensagens usando heartbeat

Existe uma possibilidade tão interessante de gerar mensagens de teste automáticas.
Para fazer isso, você precisa incluir o plug-in heartbean na seção de entrada.

input {
  heartbeat {
    message => "HeartBeat!"
   }
  } 

Ligamos, começamos a receber uma vez por minuto

logstash_one_channel | {
logstash_one_channel |      "@timestamp" => 2019-04-29T13:52:04.567Z,
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "HeartBeat!",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "host" => "a0667e5c57ec"
logstash_one_channel | }

Queremos receber com mais frequência, precisamos adicionar o parâmetro de intervalo.
É assim que receberemos uma mensagem a cada 10 segundos.

input {
  heartbeat {
    message => "HeartBeat!"
    interval => 10
   }
  }

Obtendo dados de um arquivo

Também decidimos examinar o modo de arquivo. Se funcionar bem com o arquivo, é possível que nenhum agente seja necessário, bem, pelo menos para uso local.

De acordo com a descrição, o modo de operação deve ser semelhante a tail -f, ou seja, lê novas linhas ou, opcionalmente, lê o arquivo inteiro.

Então o que queremos obter:

  1. Queremos receber linhas anexadas a um arquivo de log.
  2. Queremos receber dados que são gravados em vários arquivos de log, podendo separar o que foi recebido de onde.
  3. Queremos ter certeza de que, quando o logstash for reiniciado, ele não receberá esses dados novamente.
  4. Queremos verificar se o logstash está desativado e os dados continuam sendo gravados nos arquivos, quando o executarmos, receberemos esses dados.

Para realizar o experimento, vamos adicionar mais uma linha ao docker-compose.yml, abrindo o diretório onde colocamos os arquivos.

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro
      - ./logs:/usr/share/logstash/input

E altere a seção de entrada em habr_pipeline.conf

input {
  file {
    path => "/usr/share/logstash/input/*.log"
   }
  }

Nós começamos:

docker-compose up

Para criar e gravar arquivos de log, usaremos o comando:


echo '1' >> logs/number1.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:28:53.876Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log"
logstash_one_channel | }

Sim, funciona!

Ao mesmo tempo, vemos que adicionamos automaticamente o campo de caminho. Portanto, no futuro, poderemos filtrar os registros por ele.

Vamos tentar de novo:

echo '2' >> logs/number1.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:28:59.906Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "2",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log"
logstash_one_channel | }

E agora para outro arquivo:

 echo '1' >> logs/number2.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:29:26.061Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number2.log"
logstash_one_channel | }

Ótimo! O arquivo foi obtido, o caminho foi especificado corretamente, está tudo bem.

Pare o logstash e reinicie. Vamos esperar. Silêncio. Aqueles. Não recebemos esses registros novamente.

E agora o experimento mais ousado.

Colocamos logstash e executamos:

echo '3' >> logs/number2.log
echo '4' >> logs/number1.log

Execute logstash novamente e veja:

logstash_one_channel | {
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "3",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number2.log",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:48:50.589Z
logstash_one_channel | }
logstash_one_channel | {
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "4",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:48:50.856Z
logstash_one_channel | }

Viva! Tudo levantado.

Mas, é necessário alertar sobre o seguinte. Se o contêiner logstash for removido (docker stop logstash_one_channel && docker rm logstash_one_channel), nada será coletado. A posição do arquivo até o qual foi lido foi armazenada dentro do contêiner. Se você começar do zero, ele aceitará apenas novas linhas.

Lendo arquivos existentes

Digamos que estamos executando logstash pela primeira vez, mas já temos logs e gostaríamos de processá-los.
Se executarmos logstash com a seção de entrada que usamos acima, não obteremos nada. Somente novas linhas serão processadas pelo logstash.

Para extrair linhas de arquivos existentes, adicione uma linha adicional à seção de entrada:

input {
  file {
    start_position => "beginning"
    path => "/usr/share/logstash/input/*.log"
   }
  }

Além disso, há uma nuance, isso afeta apenas os novos arquivos que o logstash ainda não viu. Para os mesmos arquivos que já estavam no campo de visão do logstash, ele já lembrou o tamanho deles e agora vai levar apenas novos registros neles.

Vamos parar com isso estudando a seção de entrada. Existem muito mais opções, mas, por enquanto, temos o suficiente para outros experimentos.

Roteamento e transformação de dados

Vamos tentar resolver o seguinte problema, digamos que temos mensagens de um canal, algumas delas são informativas e outras são mensagens de erro. Eles diferem na etiqueta. Alguns são INFO, outros são ERRO.

Precisamos separá-los na saída. Aqueles. Escrevemos mensagens informativas em um canal e mensagens de erro em outro.

Para fazer isso, vá da seção de entrada para filtro e saída.

Usando a seção de filtro, analisaremos a mensagem recebida, obtendo dela um hash (pares chave-valor), com o qual já podemos trabalhar, ou seja, analisar de acordo com as condições. E na seção de saída, selecionaremos as mensagens e enviaremos cada uma para seu próprio canal.

Analisando uma mensagem com grok

Para analisar strings de texto e obter um conjunto de campos delas, existe um plug-in especial na seção de filtros - grok.

Sem me propor aqui a descrevê-lo detalhadamente (para isso me refiro a documentação oficial), darei meu exemplo simples.

Para fazer isso, você precisa decidir sobre o formato das linhas de entrada. Eu os tenho assim:

1 mensagem de INFORMAÇÃO1
2 mensagem de ERRO2

Aqueles. Identificador primeiro, depois INFO/ERROR, depois alguma palavra sem espaços.
Não é difícil, mas o suficiente para entender o princípio de funcionamento.

Então, na seção de filtro, no plug-in grok, precisamos definir um padrão para analisar nossas strings.

Isso parecerá assim:

filter {
  grok {
    match => { "message" => ["%{INT:message_id} %{LOGLEVEL:message_type} %{WORD:message_text}"] }
   }
  } 

Basicamente, é uma expressão regular. Padrões prontos são usados, como INT, LOGLEVEL, WORD. Sua descrição, bem como outros padrões, podem ser vistos aqui. aqui

Agora, passando por este filtro, nossa string se transformará em um hash de três campos: message_id, message_type, message_text.

Eles serão exibidos na seção de saída.

Roteando mensagens na seção de saída com o comando if

Na seção de saída, como lembramos, íamos dividir as mensagens em dois fluxos. Alguns - que são iNFO, enviaremos para o console e, com erros, enviaremos para um arquivo.

Como podemos compartilhar essas mensagens? A condição do problema já sugere uma solução - afinal, já temos um campo message_type dedicado, que pode assumir apenas dois valores INFO e ERROR. É nele que faremos uma escolha usando a instrução if.

if [message_type] == "ERROR" {
        # Здесь выводим в файл
       } else
     {
      # Здесь выводим в stdout
    }

A descrição do trabalho com campos e operadores pode ser encontrada nesta seção manual oficial.

Agora, sobre a conclusão em si.

Saída do console, tudo está claro aqui - stdout {}

Mas a saída para o arquivo - lembre-se de que estamos executando tudo isso do contêiner e para que o arquivo no qual escrevemos o resultado seja acessível de fora, precisamos abrir esse diretório em docker-compose.yml.

Total:

A seção de saída do nosso arquivo se parece com isso:


output {
  if [message_type] == "ERROR" {
    file {
          path => "/usr/share/logstash/output/test.log"
          codec => line { format => "custom format: %{message}"}
         }
    } else
     {stdout {
             }
     }
  }

Adicione mais um volume ao docker-compose.yml para saída:

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro
      - ./logs:/usr/share/logstash/input
      - ./output:/usr/share/logstash/output

Começamos, tentamos, vemos a divisão em dois fluxos.

Fonte: habr.com

Adicionar um comentário