ELK的实际应用。 设置logstash

介绍

在部署另一个系统时,我们面临着处理大量不同日志的需要。 选择ELK作为工具。 本文将讨论我们设置此堆栈的经验。

我们不会设定一个目标来描述其所有功能,但我们希望专注于解决实际问题。 这是因为,尽管有相当多的文档和现成的图像,但陷阱也不少,至少我们发现了它们。

我们通过 docker-compose 部署了堆栈。 此外,我们有一个编写良好的 docker-compose.yml,它使我们几乎可以毫无问题地提升堆栈。 在我们看来,胜利已经很接近了,现在我们将对其进行一些调整以满足我们的需求,仅此而已。

不幸的是,配置系统以接收和处理来自我们的应用程序的日志的尝试并没有立即成功。 因此,我们认为值得单独研究每个组件,然后返回到它们的连接。

所以,我们从 Logstash 开始。

环境、部署、在容器中运行Logstash

对于部署,我们使用 docker-compose;此处描述的实验是在 MacOS 和 Ubuntu 18.0.4 上进行的。

在我们原来的 docker-compose.yml 中注册的logstash镜像是 docker.elastic.co/logstash/logstash:6.3.2

我们将用它进行实验。

我们编写了一个单独的 docker-compose.yml 来运行logstash。 当然,可以从命令行启动图像,但我们正在解决一个特定的问题,我们从 docker-compose 运行所有内容。

简单介绍一下配置文件

从描述来看,logstash 可以为一个通道运行,在这种情况下,它需要传递 *.conf 文件,也可以为多个通道运行,在这种情况下,它需要传递 pipelines.yml 文件,而该文件又,将链接到每个通道的 .conf 文件。
我们选择了第二条路。 对我们来说,它似乎更加通用和可扩展。 因此,我们创建了 pipelines.yml,并创建了一个 pipelines 目录,我们将在其中放置每个通道的 .conf 文件。

容器内部还有另一个配置文件 -logstash.yml。 我们不碰它,我们按原样使用它。

所以,我们的目录结构:

ELK的实际应用。 设置logstash

为了接收输入数据,现在我们假设这是端口 5046 上的 tcp,对于输出,我们将使用 stdout。

这是首次启动的简单配置。 因为最初的任务是发射。

所以,我们有这个 docker-compose.yml

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      	- elk
    ports:
      	- 5046:5046
    volumes:
      	- ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
	- ./config/pipelines:/usr/share/logstash/config/pipelines:ro

我们在这里看到了什么?

  1. 网络和卷取自原始的 docker-compose.yml(启动整个堆栈的那个),我认为它们不会对这里的整体情况产生很大影响。
  2. 我们从 docker.elastic.co/logstash/logstash:6.3.2 镜像创建一个logstash服务,并将其命名为logstash_one_channel。
  3. 我们将容器内的端口 5046 转发到同一个内部端口。
  4. 我们将管道配置文件 ./config/pipelines.yml 映射到容器内的文件 /usr/share/logstash/config/pipelines.yml,logstash 将在其中拾取它并将其设置为只读,以防万一。
  5. 我们将 ./config/pipelines 目录(其中包含带有通道设置的文件)映射到 /usr/share/logstash/config/pipelines 目录,并使其只读。

ELK的实际应用。 设置logstash

Pipelines.yml 文件

- pipeline.id: HABR
  pipeline.workers: 1
  pipeline.batch.size: 1
  path.config: "./config/pipelines/habr_pipeline.conf"

此处描述了具有 HABR 标识符的通道及其配置文件的路径。

最后是文件“./config/pipelines/habr_pipeline.conf”

input {
  tcp {
    port => "5046"
   }
  }
filter {
  mutate {
    add_field => [ "habra_field", "Hello Habr" ]
    }
  }
output {
  stdout {
      
    }
  }

我们暂时不讨论它的描述,让我们尝试运行它:

docker-compose up

我们看到了什么?

容器已启动。 我们可以检查一下它的运行情况:

echo '13123123123123123123123213123213' | nc localhost 5046

我们在容器控制台中看到响应:

ELK的实际应用。 设置logstash

但与此同时,我们也看到:

Logstash_one_channel | | [2019-04-29T11:28:59,790][错误][logstash.licensechecker.licensereader]无法从许可证服务器检索许可证信息{:message =>“Elasticsearch无法访问:[http://elasticsearch:9200/][Manticore ::解决方案失败]elasticsearch”,...

Logstash_one_channel | | [2019-04-29T11:28:59,894][INFO ][logstash.pipeline ] 管道已成功启动 {:pipeline_id=>".monitoring-logstash", :thread=>"# ”}

Logstash_one_channel | | [2019-04-29T11:28:59,988][INFO ][logstash.agent ] 正在运行的管道 {:count=>2, :running_pipelines=>[:HABR, :".monitoring-logstash"], :non_running_pipelines=>[ ]}
Logstash_one_channel | | [2019-04-29T11:29:00,015][错误][logstash.inputs.metrics] X-Pack 安装在 Logstash 上,但未安装在 Elasticsearch 上。 请在 Elasticsearch 上安装 X-Pack 以使用监控功能。 可能还有其他功能可用。
Logstash_one_channel | | [2019-04-29T11:29:00,526][INFO][logstash.agent] 成功启动 Logstash API 端点 {:port=>9600}
Logstash_one_channel | | [2019-04-29T11:29:04,478][INFO ][logstash.outputs.elasticsearch] 运行运行状况检查以查看 Elasticsearch 连接是否正常工作 {:healthcheck_url=>http://elasticsearch:9200/, :path=> “/”}
Logstash_one_channel | | [2019-04-29T11:29:04,487][警告][logstash.outputs.elasticsearch] 尝试恢复与失效 ES 实例的连接,但出现错误。 {:网址=>“elasticsearch:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch 无法访问: [http://elasticsearch:9200/][Manticore::ResolutionFailure]弹性搜索”}
Logstash_one_channel | | [2019-04-29T11:29:04,704][INFO ][logstash.licensechecker.licensereader] 运行运行状况检查以查看 Elasticsearch 连接是否正常工作 {:healthcheck_url=>http://elasticsearch:9200/, :path=> “/”}
Logstash_one_channel | | [2019-04-29T11:29:04,710][警告][logstash.licensechecker.licensereader] 尝试恢复与失效 ES 实例的连接,但出现错误。 {:网址=>“elasticsearch:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch 无法访问: [http://elasticsearch:9200/][Manticore::ResolutionFailure]弹性搜索”}

我们的日志一直在爬升。

在这里,我用绿色突出显示管道已成功启动的消息,用红色突出显示错误消息,用黄色突出显示有关尝试联系的消息 elasticsearch:9200。
发生这种情况是因为镜像中包含的logstash.conf 包含对elasticsearch 可用性的检查。 毕竟,logstash 假设它作为 Elk 堆栈的一部分工作,但我们将其分开。

上班是可以的,但是不太方便。

解决方案是通过 XPACK_MONITORING_ENABLED 环境变量禁用此检查。

让我们对 docker-compose.yml 进行更改并再次运行它:

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro

现在,一切都很好。 容器已准备好进行实验。

我们可以在下一个控制台中再次输入:

echo '13123123123123123123123213123213' | nc localhost 5046

并看到:

logstash_one_channel | {
logstash_one_channel |         "message" => "13123123123123123123123213123213",
logstash_one_channel |      "@timestamp" => 2019-04-29T11:43:44.582Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |            "host" => "gateway",
logstash_one_channel |            "port" => 49418
logstash_one_channel | }

在一个通道内工作

所以我们推出了。 现在您实际上可以花时间配置logstash 本身。 我们暂时不要触及 pipelines.yml 文件,让我们看看通过使用一个通道可以得到什么。

我必须说,使用通道配置文件的一般原理在官方手册中有很好的描述,在这里 这里
如果你想阅读俄语,我们用了这个 一篇文章(但是查询语法很旧,我们需要考虑到这一点)。

让我们从输入部分开始依次进行。 我们已经看到了 TCP 上的工作。 这里还有什么有趣的地方?

使用心跳测试消息

有这样一个有趣的机会来生成自动测试消息。
为此,您需要在输入部分启用 heartbean 插件。

input {
  heartbeat {
    message => "HeartBeat!"
   }
  } 

打开它,开始每分钟接收一次

logstash_one_channel | {
logstash_one_channel |      "@timestamp" => 2019-04-29T13:52:04.567Z,
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "HeartBeat!",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "host" => "a0667e5c57ec"
logstash_one_channel | }

如果我们想要更频繁地接收,我们需要添加间隔参数。
这就是我们每 10 秒收到一条消息的方式。

input {
  heartbeat {
    message => "HeartBeat!"
    interval => 10
   }
  }

从文件中检索数据

我们还决定研究一下文件模式。 如果它可以很好地处理该文件,那么也许不需要代理,至少对于本地使用来说是这样。

根据描述,运行方式应该和tail -f类似,即读取新行,或者作为一个选项,读取整个文件。

那么我们想要得到的是:

  1. 我们希望接收附加到一个日志文件的行。
  2. 我们希望接收写入多个日志文件的数据,同时能够区分接收的内容和位置。
  3. 我们希望确保当logstash重新启动时,它不会再次收到此数据。
  4. 我们想要检查如果logstash被关闭,并且数据继续写入文件,那么当我们运行它时,我们将收到这些数据。

为了进行实验,我们在 docker-compose.yml 中添加另一行,打开我们放置文件的目录。

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro
      - ./logs:/usr/share/logstash/input

并更改 habr_pipeline.conf 中的输入部分

input {
  file {
    path => "/usr/share/logstash/input/*.log"
   }
  }

开始吧:

docker-compose up

要创建和写入日志文件,我们将使用以下命令:


echo '1' >> logs/number1.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:28:53.876Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log"
logstash_one_channel | }

是的,它有效!

同时我们看到我们已经自动添加了path字段。 这意味着将来我们将能够通过它来过滤记录。

让我们再试一次:

echo '2' >> logs/number1.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:28:59.906Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "2",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log"
logstash_one_channel | }

现在到另一个文件:

 echo '1' >> logs/number2.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:29:26.061Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number2.log"
logstash_one_channel | }

伟大的! 文件已被拾取,路径已正确指定,一切都很好。

停止logstash并重新启动。 我们等等吧。 安静。 那些。 我们不会再收到这些记录。

现在是最大胆的实验。

安装logstash并执行:

echo '3' >> logs/number2.log
echo '4' >> logs/number1.log

再次运行logstash,可以看到:

logstash_one_channel | {
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "3",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number2.log",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:48:50.589Z
logstash_one_channel | }
logstash_one_channel | {
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "4",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:48:50.856Z
logstash_one_channel | }

万岁! 所有的东西都被捡起来了。

但我们必须警告您以下事项。 如果删除了logstash容器(docker stoplogstash_one_channel && docker rmlogstash_one_channel),则不会拾取任何内容。 文件读取到的位置存储在容器内。 如果您从头开始运行它,它只会接受新行。

读取现有文件

假设我们是第一次启动logstash,但我们已经有日志并且想要处理它们。
如果我们使用上面使用的输入部分运行logstash,我们将什么也得不到。 Logstash 仅处理新行。

为了从现有文件中提取行,您应该在输入部分添加一行:

input {
  file {
    start_position => "beginning"
    path => "/usr/share/logstash/input/*.log"
   }
  }

此外,还有一个细微差别:这只影响 Logstash 尚未见过的新文件。 对于已经在 Logstash 视野中的相同文件,它已经记住了它们的大小,现在只会在其中获取新条目。

我们就到此为止,研究一下输入部分。 还有很多选择,但这足以让我们暂时进行进一步的实验。

路由和数据转换

让我们尝试解决以下问题,假设我们有来自一个通道的消息,其中一些是信息性的,一些是错误消息。 它们因标签而异。 有些是信息,有些是错误。

我们需要在出口处将他们分开。 那些。 我们在一个通道中写入信息消息,在另一个通道中写入错误消息。

为此,请从输入部分移至过滤器和输出部分。

使用过滤器部分,我们将解析传入的消息,从中获取我们已经可以使用的哈希(键值对),即根据情况进行拆卸。 在输出部分,我们将选择消息并将每条消息发送到其自己的通道。

使用 grok 解析消息

为了解析文本字符串并从中获取一组字段,过滤器部分有一个特殊的插件 - grok。

没有给自己设定一个目标,在这里给出详细的描述(为此我参考 官方文档),我举一个简单的例子。

为此,您需要决定输入字符串的格式。 我有这样的:

1 条信息消息1
2 错误消息2

那些。 标识符首先出现,然后是 INFO/ERROR,然后是一些不带空格的单词。
并不难,但了解操作原理就足够了。

因此,在 grok 插件的过滤器部分中,我们必须定义一个用于解析字符串的模式。

它看起来像这样:

filter {
  grok {
    match => { "message" => ["%{INT:message_id} %{LOGLEVEL:message_type} %{WORD:message_text}"] }
   }
  } 

本质上它是一个正则表达式。 使用现成的模式,例如 INT、LOGLEVEL、WORD。 它们的描述以及其他模式可以在这里找到 这里

现在,通过这个过滤器,我们的字符串将变成三个字段的哈希:message_id、message_type、message_text。

它们将显示在输出部分。

使用 if 命令将消息路由到输出部分

正如我们所记得的,在输出部分,我们将把消息分成两个流。 有些是 iNFO,将输出到控制台,如果出现错误,我们将输出到文件。

我们如何分离这些消息? 问题的情况已经提出了解决方案——毕竟我们已经有一个专用的message_type字段,它只能取两个值:INFO和ERROR。 正是在此基础上,我们才会使用if语句进行选择。

if [message_type] == "ERROR" {
        # Здесь выводим в файл
       } else
     {
      # Здесь выводим в stdout
    }

有关使用字段和运算符的说明可以在本节中找到 官方手册.

现在,关于实际结论本身。

控制台输出,这里一切都很清楚 - stdout {}

但是输出到文件 - 请记住,我们是从容器运行所有这些,并且为了使我们在其中写入结果的文件可以从外部访问,我们需要在 docker-compose.yml 中打开此目录。

合计:

我们文件的输出部分如下所示:


output {
  if [message_type] == "ERROR" {
    file {
          path => "/usr/share/logstash/output/test.log"
          codec => line { format => "custom format: %{message}"}
         }
    } else
     {stdout {
             }
     }
  }

在 docker-compose.yml 中,我们添加另一个卷用于输出:

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro
      - ./logs:/usr/share/logstash/input
      - ./output:/usr/share/logstash/output

我们启动它,尝试一下,然后看到它分为两个流。

来源: habr.com

添加评论