我們是 ELK 和 Exchange 的朋友。 第2部分

我們是 ELK 和 Exchange 的朋友。 第2部分

我繼續講如何交友Exchange和ELK(開頭 這裡)。 讓我提醒您,這種組合能夠毫不猶豫地處理大量日誌。 這次我們將討論如何讓 Exchange 與 Logstash 和 Kibana 元件一起運作。

ELK堆疊中的Logstash用於智慧處理日誌並準備以文件的形式放置在Elastic中,在此基礎上可以方便地在Kibana中建立各種視覺化。

安裝

由兩個階段組成:

  • 安裝和設定 OpenJDK 套件。
  • 安裝和設定 Logstash 軟體包。

安裝和設定 OpenJDK 套件

必須下載 OpenJDK 套件並將其解壓縮到特定目錄中。 然後必須將此目錄的路徑輸入到 Windows 作業系統的 $env:Path 和 $env:JAVA_HOME 變數中:

我們是 ELK 和 Exchange 的朋友。 第2部分

我們是 ELK 和 Exchange 的朋友。 第2部分

讓我們檢查一下 Java 版本:

PS C:> java -version
openjdk version "13.0.1" 2019-10-15
OpenJDK Runtime Environment (build 13.0.1+9)
OpenJDK 64-Bit Server VM (build 13.0.1+9, mixed mode, sharing)

安裝與設定 Logstash 套件

使用 Logstash 發行版下載存檔文件 。 存檔必須解壓縮到磁碟根目錄。 解壓縮到資料夾 C:Program Files 不值得,Logstash 將拒絕正常啟動。 然後你需要輸入到文件中 jvm.options 負責為 Java 進程分配 RAM 的修復。 我建議指定伺服器 RAM 的一半。 如果板上有 16 GB RAM,則預設鍵為:

-Xms1g
-Xmx1g

必須替換為:

-Xms8g
-Xmx8g

另外,建議註解掉該行 -XX:+UseConcMarkSweepGC。 有關此的更多信息 這裡。 下一步是在logstash.conf 檔案中建立預設配置:

input {
 stdin{}
}
 
filter {
}
 
output {
 stdout {
 codec => "rubydebug"
 }
}

透過此配置,Logstash 從控制台讀取數據,將其通過空過濾器,然後將其輸出回控制台。 使用此配置將測試 Logstash 的功能。 為此,讓我們以交互模式運行它:

PS C:...bin> .logstash.bat -f .logstash.conf
...
[2019-12-19T11:15:27,769][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2019-12-19T11:15:27,847][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-12-19T11:15:28,113][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

Logstash 在連接埠 9600 上成功啟動。

最後的安裝步驟:將 Logstash 作為 Windows 服務啟動。 例如,可以使用套件來完成此操作 國家安全管理體系:

PS C:...bin> .nssm.exe install logstash
Service "logstash" installed successfully!

容錯

持久性佇列機制保證了來源伺服器傳輸日誌的安全性。

它是如何工作的

日誌處理過程中佇列的佈局是:輸入→佇列→過濾器+輸出。

輸入插件從日誌來源接收數據,將其寫入佇列,並向來源發送資料已收到的確認。

來自佇列的消息由 Logstash 處理,並透過過濾器和輸出插件。 當從輸出中收到日誌已傳送的確認時,Logstash 將從佇列中刪除已處理的日誌。 如果Logstash停止,所有未處理的訊息和未收到確認的訊息都會保留在佇列中,Logstash將在下次啟動時繼續處理它們。

調整

可透過文件中的鍵進行調整 C:Logstashconfiglogstash.yml:

  • queue.type:(可能的值- persisted и memory (default)).
  • path.queue: (佇列檔案所在資料夾的路徑,預設儲存在 C:Logstashqueue 中)。
  • queue.page_capacity:(最大隊列頁面大小,預設值為 64mb)。
  • queue.drain:(true/false - 啟用/停用在關閉Logstash之前停止佇列處理。我不建議啟用它,因為這會直接影響伺服器關閉的速度)。
  • queue.max_events:(佇列中的最大事件數,預設為 0(無限制))。
  • queue.max_bytes:(最大佇列大小以位元組為單位,預設 - 1024mb (1gb))。

如果配置了 queue.max_events и queue.max_bytes,那麼當達到任何這些設定的值時,訊息將停止被接受到佇列中。 了解有關持久隊列的更多信息 這裡.

Logstash.yml 中負責設定佇列的部分的範例:

queue.type: persisted
queue.max_bytes: 10gb

調整

Logstash 配置通常由三個部分組成,負責處理傳入日誌的不同階段:接收(輸入部分)、解析(過濾部分)和發送到 Elastic(輸出部分)。 下面我們將仔細研究它們。

輸入

我們從 filebeat 代理程式接收帶有原始日誌的傳入流。 我們在輸入部分指出的就是這個插件:

input {
  beats {
    port => 5044
  }
}

完成此配置後,Logstash開始監聽5044端口,當接收到日誌時,根據filter部分的設定進行處理。 如有必要,您可以將用於從 filebit 接收日誌的通道包裝在 SSL 中。 閱讀有關 Beats 插件設定的更多信息 這裡.

篩選

Exchange 產生的所有需要​​處理的文字日誌均採用 csv 格式,其欄位在日誌檔案本身中進行了描述。 為了解析 csv 記錄,Logstash 為我們提供了三個外掛程式: 解剖、csv 和 grok。 第一個是最 ,但只處理解析最簡單的日誌。
例如,它會將以下記錄拆分為兩條(由於欄位中存在逗號),這就是日誌將被錯誤解析的原因:

…,"MDB:GUID1, Mailbox:GUID2, Event:526545791, MessageClass:IPM.Note, CreationTime:2020-05-15T12:01:56.457Z, ClientType:MOMT, SubmissionAssistant:MailboxTransportSubmissionEmailAssistant",…

解析日誌時可以使用,例如IIS。 在這種情況下,過濾器部分可能如下所示:

filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
    }
  }
} 

Logstash 配置允許您使用 條件語句,所以我們只能將帶有 filebeat 標籤的日誌傳送到 dissect 插件 IIS。 在插件內部我們將字段值與其名稱相匹配,刪除原始字段 message,其中包含日誌中的條目,我們可以新增一個自訂字段,例如,其中包含我們從中收集日誌的應用程式的名稱。

在追蹤日誌的情況下,最好使用csv插件;它可以正確處理複雜的欄位:

filter {
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
}

在插件內部我們將字段值與其名稱相匹配,刪除原始字段 message (還有字段 tenant-id и schema-version),其中包含日誌中的條目,我們可以添加一個自訂字段,例如,其中包含我們從中收集日誌的應用程式的名稱。

在過濾階段的出口,我們將收到第一個近似值的文檔,準備在 Kibana 中可視化。 我們將缺少以下內容:

  • 數字欄位將被識別為文本,這會阻止對其進行操作。 即,字段 time-taken IIS日誌以及字段 recipient-count и total-bites 日誌追蹤。
  • 標準文檔時間戳將包含處理日誌的時間,而不是在伺服器端寫入日誌的時間。
  • 領域 recipient-address 看起來就像一個建築工地,無法進行分析以計算信件收件人的數量。

是時候為日誌處理過程添加一點魔法了。

轉換數字字段

解剖插件有一個選項 convert_datatype,可用於將文字欄位轉換為數字格式。 例如,像這樣:

dissect {
  …
  convert_datatype => { "time-taken" => "int" }
  …
}

值得記住的是,此方法僅適用於欄位肯定包含字串的情況。 此選項不處理欄位中的 Null 值並引發異常。

對於追蹤日誌,最好不要使用類似的轉換方法,因為字段 recipient-count и total-bites 可能為空。 要轉換這些字段,最好使用插件 變異:

mutate {
  convert => [ "total-bytes", "integer" ]
  convert => [ "recipient-count", "integer" ]
}

將recipient_address拆分為單獨的收件者

這個問題也可以用 mutate 外掛來解決:

mutate {
  split => ["recipient_address", ";"]
}

更改時間戳

在追蹤日誌的情況下,這個問題很容易透過外掛程式解決 日期,這將幫助您在現場寫作 timestamp 現場所需格式的日期和時間 date-time:

date {
  match => [ "date-time", "ISO8601" ]
  timezone => "Europe/Moscow"
  remove_field => [ "date-time" ]
}

對於 IIS 日誌,我們需要合併現場數據 date и time 使用 mutate 插件,註冊我們需要的時區並將此時間戳記在 timestamp 使用日期外掛:

mutate { 
  add_field => { "data-time" => "%{date} %{time}" }
  remove_field => [ "date", "time" ]
}
date { 
  match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
  timezone => "UTC"
  remove_field => [ "data-time" ]
}

產量

輸出部分用於將處理後的日誌傳送到日誌接收器。 如果直接傳送到 Elastic,則使用插件 彈性搜索,指定發送產生文件的伺服器位址和索引名稱範本:

output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

最終配置

最終配置將如下所示:

input {
  beats {
    port => 5044
  }
}
 
filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
      convert_datatype => { "time-taken" => "int" }
    }
    mutate { 
      add_field => { "data-time" => "%{date} %{time}" }
      remove_field => [ "date", "time" ]
    }
    date { 
      match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
      timezone => "UTC"
      remove_field => [ "data-time" ]
    }
  }
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
    mutate {
      convert => [ "total-bytes", "integer" ]
      convert => [ "recipient-count", "integer" ]
      split => ["recipient_address", ";"]
    }
    date {
      match => [ "date-time", "ISO8601" ]
      timezone => "Europe/Moscow"
      remove_field => [ "date-time" ]
    }
  }
}
 
output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

相關鏈接:

來源: www.habr.com

添加評論