ELK的實際應用。 設置日誌存儲

介紹

在部署另一個系統時,我們面臨著處理大量各種日誌的需求。 選擇 ELK 作為樂器。 本文將談談我們建立這個堆棧的經驗。

我們沒有設定一個目標來描述它的所有能力,但我們希望專注於解決實際問題。 這是因為在足夠多的文檔和現成的圖像的情況下,存在很多陷阱,至少我們發現了它們。

我們通過 docker-compose 部署了堆棧。 此外,我們有一個編寫良好的 docker-compose.yml,它使我們能夠毫無問題地提升堆棧。 在我們看來,勝利已經近在咫尺,現在我們將稍微改變一下以適應我們的需要,僅此而已。

不幸的是,嘗試調整系統以接收和處理來自我們應用程序的日誌的嘗試並未立即成功。 因此,我們決定值得分別研究每個組件,然後返回到它們的連接。

因此,讓我們從 logstash 開始。

環境、部署、在容器中運行Logstash

對於部署,我們使用 docker-compose,這裡描述的實驗是在 MacOS 和 Ubuntu 18.0.4 上進行的。

我們在原始 docker-compose.yml 中的 logstash 圖像是 docker.elastic.co/logstash/logstash:6.3.2

我們將用它來做實驗。

為了運行 logstash,我們編寫了一個單獨的 docker-compose.yml。 當然,可以從命令行啟動鏡像,但畢竟我們解決了一個特定的任務,docker-compose 中的所有內容都為我們啟動。

配置文件簡介

從描述中可以看出,logstash可以作為一個通道運行,在這種情況下,它需要傳輸 *.conf 文件或多個通道,在這種情況下,它需要傳輸 pipelines.yml 文件,依次, 將引用每個頻道的文件 .conf。
我們走的是第二條路。 在我們看來,它似乎更加通用和可擴展。 因此,我們創建了 pipelines.yml,並創建了一個 pipelines 目錄,我們將在其中放置每個通道的 .conf 文件。

容器內還有另一個配置文件 - logstash.yml。 我們不碰它,我們按原樣使用它。

所以我們的目錄結構是:

ELK的實際應用。 設置日誌存儲

目前,我們假設這是 5046 端口上的 tcp 來接收輸入數據,我們將使用 stdout 進行輸出。

這是第一次運行的簡單配置。 因為最初的任務是發射。

所以我們有這個 docker-compose.yml

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      	- elk
    ports:
      	- 5046:5046
    volumes:
      	- ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
	- ./config/pipelines:/usr/share/logstash/config/pipelines:ro

我們在這裡看到了什麼?

  1. 網絡和捲取自原始的 docker-compose.yml(啟動整個堆棧的文件),我認為它們不會對此處的整體情況產生太大影響。
  2. 我們從 docker.elastic.co/logstash/logstash:6.3.2 鏡像中創建一個服務(services)logstash,並將其命名為 logstash_one_channel。
  3. 我們將容器內的端口 5046 轉發到同一個內部端口。
  4. 我們將 ./config/pipelines.yml 管道配置文件映射到容器內的 /usr/share/logstash/config/pipelines.yml 文件,logstash 將在其中獲取它並將其設為只讀,以防萬一。
  5. 我們將擁有管道配置文件的 ./config/pipelines 目錄映射到 /usr/share/logstash/config/pipelines 目錄,並將其設為只讀。

ELK的實際應用。 設置日誌存儲

管道.yml 文件

- pipeline.id: HABR
  pipeline.workers: 1
  pipeline.batch.size: 1
  path.config: "./config/pipelines/habr_pipeline.conf"

它使用 HABR 標識符及其配置文件的路徑描述了一個通道。

最後是文件“./config/pipelines/habr_pipeline.conf”

input {
  tcp {
    port => "5046"
   }
  }
filter {
  mutate {
    add_field => [ "habra_field", "Hello Habr" ]
    }
  }
output {
  stdout {
      
    }
  }

我們暫時不進入它的描述,我們嘗試運行:

docker-compose up

我們看到了什麼?

容器已經啟動。 我們可以檢查它的工作:

echo '13123123123123123123123213123213' | nc localhost 5046

我們在容器控制台中看到響應:

ELK的實際應用。 設置日誌存儲

但與此同時,我們也看到:

logstash_one_channel | [2019-04-29T11:28:59,790][錯誤][logstash.licensechecker.licensereader] 無法從許可證服務器檢索許可證信息 {:message=>"Elasticsearch 無法訪問:[http://elasticsearch:9200/][Manticore ::ResolutionFailure]elasticsearch", ...

logstash_one_channel | [2019-04-29T11:28:59,894][INFO][logstash.pipeline] 管道成功啟動 {:pipeline_id=>".monitoring-logstash", :thread=>"# »}

logstash_one_channel | [2019-04-29T11:28:59,988][INFO][logstash.agent] 管道運行 {:count=>2, :running_pipelines=>[:HABR, :".monitoring-logstash"], :non_running_pipelines=>[ ]}
logstash_one_channel | [2019-04-29T11:29:00,015][錯誤][logstash.inputs.metrics] X-Pack 安裝在 Logstash 上,但沒有安裝在 Elasticsearch 上。 請在 Elasticsearch 上安裝 X-Pack 以使用監控功能。 其他功能可能可用。
logstash_one_channel | [2019-04-29T11:29:00,526][INFO][logstash.agent] 成功啟動 Logstash API 端點 {:port=>9600}
logstash_one_channel | [2019-04-29T11:29:04,478][INFO][logstash.outputs.elasticsearch] 運行運行狀況檢查以查看 Elasticsearch 連接是否正常運行 {:healthcheck_url=>http://elasticsearch:9200/, :path=> “/”}
logstash_one_channel | [2019-04-29T11:29:04,487][WARN][logstash.outputs.elasticsearch] 試圖恢復與死 ES 實例的連接,但出現錯誤。 {: 網址=>"彈性搜索:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch 無法訪問: [http://elasticsearch:9200/][Manticore::ResolutionFailure]彈性搜索"}
logstash_one_channel | [2019-04-29T11:29:04,704][INFO][logstash.licensechecker.licensereader] 運行運行狀況檢查以查看 Elasticsearch 連接是否正常運行 {:healthcheck_url=>http://elasticsearch:9200/, :path=> “/”}
logstash_one_channel | [2019-04-29T11:29:04,710][WARN][logstash.licensechecker.licensereader] 試圖恢復與死 ES 實例的連接,但出現錯誤。 {: 網址=>"彈性搜索:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch 無法訪問: [http://elasticsearch:9200/][Manticore::ResolutionFailure]彈性搜索"}

而我們的日誌一直在往上爬。

在這裡,我以綠色突出顯示管道成功啟動的消息,以紅色突出顯示錯誤消息,以黃色突出顯示有關嘗試聯繫的消息 彈性搜索:9200。
發生這種情況是因為在圖像中包含的 logstash.conf 中檢查了 elasticsearch 的可用性。 畢竟,logstash 假定它作為 Elk 堆棧的一部分工作,而我們將其分離。

你可以工作,但不方便。

解決方案是通過 XPACK_MONITORING_ENABLED 環境變量禁用此檢查。

讓我們對 docker-compose.yml 進行更改並再次運行它:

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro

現在,一切都很好。 容器已準備好進行實驗。

我們可以在相鄰的控制台中再次輸入:

echo '13123123123123123123123213123213' | nc localhost 5046

看看:

logstash_one_channel | {
logstash_one_channel |         "message" => "13123123123123123123123213123213",
logstash_one_channel |      "@timestamp" => 2019-04-29T11:43:44.582Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |            "host" => "gateway",
logstash_one_channel |            "port" => 49418
logstash_one_channel | }

在一個頻道內工作

所以我們開始了。 現在您實際上可以花時間直接配置 logstash。 我們暫時不要碰 pipelines.yml 文件,讓我們看看使用一個通道可以得到什麼。

我必須說,使用通道配置文件的一般原則在官方手冊中有很好的描述,這裡 這裡
如果您想用俄語閱讀,那麼我們使用了這個 文章(但是那裡的查詢語法很舊,您需要考慮到這一點)。

讓我們從輸入部分開始依次進行。 我們已經看到了 tcp 上的工作。 這裡還有什麼有趣的地方?

使用心跳測試消息

生成自動測試消息的可能性非常有趣。
為此,您需要在輸入部分包含 heartbean 插件。

input {
  heartbeat {
    message => "HeartBeat!"
   }
  } 

我們打開它,我們開始每分鐘接收一次

logstash_one_channel | {
logstash_one_channel |      "@timestamp" => 2019-04-29T13:52:04.567Z,
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "HeartBeat!",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "host" => "a0667e5c57ec"
logstash_one_channel | }

我們想要更頻繁地接收,我們需要添加間隔參數。
這就是我們每 10 秒收到一條消息的方式。

input {
  heartbeat {
    message => "HeartBeat!"
    interval => 10
   }
  }

從文件中獲取數據

我們還決定查看文件模式。 如果它可以很好地處理文件,那麼可能不需要代理,至少對於本地使用而言是這樣。

根據描述,操作方式應該類似於tail -f,即讀取換行符或可選地讀取整個文件。

那麼我們想要得到的是:

  1. 我們希望接收附加到一個日誌文件的行。
  2. 我們希望接收寫入多個日誌文件的數據,同時能夠區分從何處接收到的數據。
  3. 我們要確保當 logstash 重新啟動時,它不會再次收到此數據。
  4. 我們要檢查是否禁用了 logstash,並且數據繼續寫入文件,然後當我們運行它時,我們將收到這些數據。

為了進行實驗,讓我們在 docker-compose.yml 中再添加一行,打開我們放置文件的目錄。

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro
      - ./logs:/usr/share/logstash/input

並更改 habr_pipeline.conf 中的輸入部分

input {
  file {
    path => "/usr/share/logstash/input/*.log"
   }
  }

我們開始:

docker-compose up

要創建和寫入日誌文件,我們將使用以下命令:


echo '1' >> logs/number1.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:28:53.876Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log"
logstash_one_channel | }

是的,它有效!

同時看到我們已經自動添加了路徑字段。 所以在未來,我們將能夠通過它來過濾記錄。

讓我們再試一次:

echo '2' >> logs/number1.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:28:59.906Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "2",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log"
logstash_one_channel | }

現在到另一個文件:

 echo '1' >> logs/number2.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:29:26.061Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number2.log"
logstash_one_channel | }

偉大的! 文件已拾取,路徑指定正確,一切正常。

停止 logstash 並重新啟動。 我們等等吧。 安靜。 那些。 我們不會再收到這些記錄。

現在是最大膽的實驗。

我們放入logstash並執行:

echo '3' >> logs/number2.log
echo '4' >> logs/number1.log

再次運行 logstash 可以看到:

logstash_one_channel | {
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "3",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number2.log",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:48:50.589Z
logstash_one_channel | }
logstash_one_channel | {
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "4",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:48:50.856Z
logstash_one_channel | }

萬歲! 一切都恢復了。

但是,有必要警告以下內容。 如果刪除了 logstash 容器 (docker stop logstash_one_channel && docker rm logstash_one_channel),則不會拾取任何內容。 讀取文件的位置存儲在容器內。 如果您從頭開始,那麼它將只接受新行。

讀取現有文件

假設我們是第一次運行 logstash,但我們已經有了日誌並且我們想處理它們。
如果我們使用上面使用的輸入部分運行 logstash,我們將不會得到任何東西。 logstash 只會處理換行符。

為了從現有文件中提取行,在輸入部分添加一行:

input {
  file {
    start_position => "beginning"
    path => "/usr/share/logstash/input/*.log"
   }
  }

此外,還有一個細微差別,這只會影響 logstash 尚未看到的新文件。 對於已經在 logstash 視野中的相同文件,它已經記住了它們的大小,現在將只在其中記錄新記錄。

讓我們通過研究輸入部分來停止這一點。 還有更多的選擇,但就目前而言,我們有足夠的空間進行進一步的實驗。

路由和數據轉換

讓我們嘗試解決以下問題,假設我們有來自一個頻道的消息,其中一些是信息性的,一些是錯誤消息。 它們的標籤不同。 有些是 INFO,有些是 ERROR。

我們需要在出口處將它們分開。 那些。 我們在一個通道中寫入信息性消息,在另一個通道中寫入錯誤消息。

為此,從輸入部分轉到過濾和輸出。

使用過濾器部分,我們將解析傳入的消息,從中獲取哈希(鍵值對),我們已經可以使用它,即根據條件解析。 在輸出部分,我們將選擇消息並將每條消息發送到自己的頻道。

使用 grok 解析消息

為了解析文本字符串並從中獲取一組字段,過濾器部分有一個特殊的插件——grok。

沒有給自己設定目標,在這裡給出詳細描述(為此我指的是 官方文檔), 我將舉一個簡單的例子。

為此,您需要決定輸入行的格式。 我有這樣的:

1 信息消息 1
2 錯誤消息 2

那些。 首先是標識符,然後是 INFO/ERROR,然後是一些沒有空格的單詞。
不難,但足以理解操作原理。

因此,在過濾器部分,在 grok 插件中,我們需要定義一個模式來解析我們的字符串。

它看起來像這樣:

filter {
  grok {
    match => { "message" => ["%{INT:message_id} %{LOGLEVEL:message_type} %{WORD:message_text}"] }
   }
  } 

基本上,它是一個正則表達式。 使用現成的模式,如 INT、LOGLEVEL、WORD。 可以在此處查看它們的描述以及其他模式。 這裡

現在,通過這個過濾器,我們的字符串將變成三個字段的散列:message_id、message_type、message_text。

它們將顯示在輸出部分。

使用 if 命令在輸出部分路由消息

在輸出部分,正如我們所記得的,我們打算將消息分成兩個流。 有些是 iNFO,我們將輸出到控制台,如果有錯誤,我們將輸出到文件。

我們如何分享這些信息? 問題的情況已經提示了一個解決方案——畢竟我們已經有了專門的 message_type 字段,它只能取兩個值 INFO 和 ERROR。 正是在它上面,我們將使用 if 語句做出選擇。

if [message_type] == "ERROR" {
        # Здесь выводим в файл
       } else
     {
      # Здесь выводим в stdout
    }

可以在本節中找到使用字段和運算符的描述 官方手冊.

現在,關於結論本身。

控制台輸出,這裡一切都很清楚 - stdout {}

但是文件的輸出——請記住,我們正在從容器中運行所有這些,為了讓我們寫入結果的文件可以從外部訪問,我們需要在 docker-compose.yml 中打開這個目錄。

合計:

我們文件的輸出部分如下所示:


output {
  if [message_type] == "ERROR" {
    file {
          path => "/usr/share/logstash/output/test.log"
          codec => line { format => "custom format: %{message}"}
         }
    } else
     {stdout {
             }
     }
  }

在 docker-compose.yml 中再添加一卷用於輸出:

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro
      - ./logs:/usr/share/logstash/input
      - ./output:/usr/share/logstash/output

我們開始,我們嘗試,我們看到分為兩個流。

來源: www.habr.com

添加評論