2. Elastic Stack:安全日誌分析。 日誌儲存

2. Elastic Stack:安全日誌分析。 日誌儲存

在過去 文章 我們見過面 ELK堆疊,它由哪些軟體產品組成。 工程師在使用 ELK 堆疊時面臨的第一個任務是發送日誌以儲存在 elasticsearch 中以供後續分析。 然而,這只是說說而已,elasticsearch 以具有某些欄位和值的文件形式儲存日誌,這意味著工程師必須使用各種工具來解析從端系統發送的訊息。 這可以透過多種方式完成 - 自己編寫一個程序,使用 API 將文件新增至資料庫,或使用現成的解決方案。 在本課程中,我們將考慮解決方案 Logstash,它是 ELK 堆疊的一部分。 我們將了解如何將日誌從端點系統傳送到 Logstash,然後設定一個設定檔來解析並重新導向到 Elasticsearch 資料庫。 為此,我們將來自 Check Point 防火牆的日誌作為傳入系統。

本課程不涉及 ELK 堆疊的安裝,因為有大量關於此主題的文章;我們將考慮配置組件。

讓我們來制定一個 Logstash 配置的行動計畫:

  1. 檢查elasticsearch是否會接受日誌(檢查連接埠的功能和開放性)。
  2. 我們考慮如何將事件傳送到 Logstash、選擇方法並實現它。
  3. 我們在Logstash設定檔中配置Input。
  4. 我們在調試模式下在 Logstash 設定檔中配置輸出,以便了解日誌訊息是什麼樣的。
  5. 設定過濾器。
  6. 在 ElasticSearch 中設定正確的輸出。
  7. Logstash 啟動。
  8. 檢查 Kibana 中的日誌。

讓我們更詳細地看看每一點:

檢查elasticsearch是否接受日誌

為此,您可以使用curl 指令檢查部署Logstash 的系統對Elasticsearch 的存取。 如果您設定了身份驗證,那麼我們也會透過curl 傳輸使用者/密碼,並指定連接埠9200(如果您未更改)。 如果您收到類似於以下內容的回复,則一切正常。

[elastic@elasticsearch ~]$ curl -u <<user_name>> : <<password>> -sS -XGET "<<ip_address_elasticsearch>>:9200"
{
  "name" : "elastic-1",
  "cluster_name" : "project",
  "cluster_uuid" : "sQzjTTuCR8q4ZO6DrEis0A",
  "version" : {
    "number" : "7.4.1",
    "build_flavor" : "default",
    "build_type" : "rpm",
    "build_hash" : "fc0eeb6e2c25915d63d871d344e3d0b45ea0ea1e",
    "build_date" : "2019-10-22T17:16:35.176724Z",
    "build_snapshot" : false,
    "lucene_version" : "8.2.0",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"
}
[elastic@elasticsearch ~]$

如果沒有收到回應,則可能有幾種類型的錯誤:elasticsearch 進程未運行、指定了錯誤的連接埠或該連接埠被安裝elasticsearch 的伺服器上的防火牆阻止。

讓我們看看如何從檢查點防火牆將日誌傳送到 Logstash

從 Check Point 管理伺服器,您可以使用 log_exporter 實用程式透過 syslog 將日誌傳送到 Logstash,您可以在此處閱讀更多相關信息 文章,這裡我們只留下創建流的命令:

cp_log_export 新增名稱 check_point_syslog 目標伺服器 < > 目標連接埠 5555 協定 TCP 格式通用讀取模式半統一

< > - Logstash 運行的伺服器位址,目標端口 5555 - 我們將發送日誌的端口,透過 tcp 發送日誌可以載入服務器,因此在某些情況下使用 udp 更正確。

在 Logstash 設定檔中設定 INPUT

2. Elastic Stack:安全日誌分析。 日誌儲存

預設情況下,設定檔位於 /etc/logstash/conf.d/ 目錄中。 設定檔由 3 個有意義的部分組成:INPUT、FILTER、OUTPUT。 在 輸入 我們指出系統將從哪裡獲取日誌,在 過濾器 解析日誌 - 設定如何將訊息劃分為欄位和值,在 輸出 我們配置輸出流 - 解析後的日誌將發送到其中。

首先,讓我們設定 INPUT,考慮一些可以的類型 - 檔案、tcp 和 exe。

TCP:

input {
tcp {
    port => 5555
    host => “10.10.1.205”
    type => "checkpoint"
    mode => "server"
}
}

模式=>“伺服器”
指示 Logstash 正在接受連線。

連接埠 => 5555
主機=>“10.10.1.205”
我們接受透過 IP 位址 10.10.1.205 (Logstash)、連接埠 5555 的連線 - 防火牆策略必須允許該連接埠。

輸入=>“檢查點”
我們標記文檔,如果您有多個傳入連接,這非常方便。 隨後,對於每個連接,您可以使用邏輯 if 構造編寫自己的過濾器。

文件:

input {
  file {
    path => "/var/log/openvas_report/*"
    type => "openvas"
    start_position => "beginning"
    }
}

設定說明:
路徑=>“/var/log/openvas_report/*”
我們指出需要讀取檔案的目錄。

型態=>“openvas”
事件類型。

開始位置=>“開始”
當更改文件時,它會讀取整個文件;如果設定“end”,系統會等待新記錄出現在文件末尾。

執行:

input {
  exec {
    command => "ls -alh"
    interval => 30
  }
}

使用此輸入,將啟動(僅!)shell 命令,並將其輸出轉換為日誌訊息。

命令=>“ls -alh”
我們對其輸出感興趣的命令。

間隔 => 30
指令呼叫間隔(以秒為單位)。

為了接收來自防火牆的日誌,我們註冊一個過濾器 TCPUDP,取決於日誌傳送到 Logstash 的方式。

我們在調試模式下在Logstash設定檔中配置Output,以便了解日誌訊息是什麼樣的

配置INPUT之後,我們需要了解日誌訊息會是什麼樣子,以及需要使用什麼方法來設定日誌過濾器(解析器)。

為此,我們將使用一個過濾器將結果輸出到 stdout,以便查看原始訊息;目前完整的設定檔將如下所示:

input 
{
         tcp 
         {
                port => 5555
  	  	type => "checkpoint"
    		mode => "server"
                host => “10.10.1.205”
   	 }
}

output 
{
	if [type] == "checkpoint" 
       {
		stdout { codec=> json }
	}
}

運行命令檢查:
sudo /usr/share/logstash/bin//logstash -f /etc/logstash/conf.d/checkpoint.conf
我們看到結果了,圖片可以點擊:

2. Elastic Stack:安全日誌分析。 日誌儲存

如果你複製它,它將看起來像這樣:

action="Accept" conn_direction="Internal" contextnum="1" ifdir="outbound" ifname="bond1.101" logid="0" loguid="{0x5dfb8c13,0x5,0xfe0a0a0a,0xc0000000}" origin="10.10.10.254" originsicname="CN=ts-spb-cpgw-01,O=cp-spb-mgmt-01.tssolution.local.kncafb" sequencenum="8" time="1576766483" version="5" context_num="1" dst="10.10.10.10" dst_machine_name="[email protected]" layer_name="TSS-Standard Security" layer_name="TSS-Standard Application" layer_uuid="dae7f01c-4c98-4c3a-a643-bfbb8fcf40f0" layer_uuid="dbee3718-cf2f-4de0-8681-529cb75be9a6" match_id="8" match_id="33554431" parent_rule="0" parent_rule="0" rule_action="Accept" rule_action="Accept" rule_name="Implicit Cleanup" rule_uid="6dc2396f-9644-4546-8f32-95d98a3344e6" product="VPN-1 & FireWall-1" proto="17" s_port="37317" service="53" service_id="domain-udp" src="10.10.1.180" ","type":"qqqqq","host":"10.10.10.250","@version":"1","port":50620}{"@timestamp":"2019-12-19T14:50:12.153Z","message":"time="1576766483" action="Accept" conn_direction="Internal" contextnum="1" ifdir="outbound" ifname="bond1.101" logid="0" loguid="{0x5dfb8c13,

查看這些訊息,我們了解到日誌看起來像:field = value 或 key = value,這意味著名為 kv 的過濾器是合適的。 為了為每種特定情況選擇正確的過濾器,最好在技術文件中熟悉它們,或詢問朋友。

設定過濾器

在最後階段我們選擇了kv,該過濾器的配置如下所示:

filter {
if [type] == "checkpoint"{
	kv {
		value_split => "="
		allow_duplicate_values => false
	}
}
}

我們選擇用於劃分欄位和值的符號 - “=”。 如果日誌中有相同的條目,我們只在資料庫中保存一個實例,否則最終會得到一組相同值,也就是說,如果我們有訊息“foo = some foo=some”,我們只寫入 foo =一些。

在 ElasticSearch 中設定正確的輸出

配置Filter後,就可以將日誌上傳到資料庫 彈性搜索:

output 
{
if [type] == "checkpoint"
{
 	elasticsearch 
        {
		hosts => ["10.10.1.200:9200"]
		index => "checkpoint-%{+YYYY.MM.dd}"
    		user => "tssolution"
    		password => "cool"
  	}
}
}

如果文件使用檢查點類型進行簽名,我們會將事件儲存到elasticsearch 資料庫中,該資料庫預設接受連接埠10.10.1.200 上的9200 上的連線。 每個文件都儲存到特定的索引,在本例中我們儲存到索引「checkpoint-」+目前時間日期。 每個索引可以有一組特定的字段,或者當訊息中出現新字段時自動建立;可以在映射中查看字段設定及其類型。

如果您設定了身份驗證(我們稍後會介紹),則必須指定寫入特定索引的憑證,在本例中為“tssolution”,密碼為“cool”。 您可以區分使用者權限,使其僅將日誌寫入特定索引,而不再寫入其他索引。

啟動 Logstash。

Logstash設定檔:

input 
{
         tcp 
         {
                port => 5555
  	  	type => "checkpoint"
    		mode => "server"
                host => “10.10.1.205”
   	 }
}

filter {
        if [type] == "checkpoint"{
	kv {
		value_split => "="
		allow_duplicate_values => false
	}
        }
}

output 
{
if [type] == "checkpoint"
{
 	elasticsearch 
        {
		hosts => ["10.10.1.200:9200"]
		index => "checkpoint-%{+YYYY.MM.dd}"
    		user => "tssolution"
    		password => "cool"
  	}
}
}

我們檢查設定檔的正確性:
/usr/share/logstash/bin//logstash -f checkpoint.conf
2. Elastic Stack:安全日誌分析。 日誌儲存

啟動Logstash進程:
sudo systemctl啟動logstash

我們檢查該進程是否已啟動:
sudo systemctl 狀態logstash

2. Elastic Stack:安全日誌分析。 日誌儲存

讓我們檢查一下套接字是否已啟動:
netstat -nat |grep 5555

2. Elastic Stack:安全日誌分析。 日誌儲存

檢查 Kibana 中的日誌。

一切運行完畢後,進入Kibana - Discover,確保一切配置正確,圖片可點擊!

2. Elastic Stack:安全日誌分析。 日誌儲存

所有日誌都已就位,我們可以看到所有欄位及其值!

結論

我們研究瞭如何編寫 Logstash 配置文件,結果我們得到了所有欄位和值的解析器。 現在我們可以對特定欄位進行搜尋和繪圖。 接下來,在課程中,我們將了解 Kibana 中的視覺化並建立一個簡單的儀表板。 值得一提的是,Logstash設定檔在某些情況下需要不斷更新,例如當我們想要將某個欄位的值從數字替換為單字時。 在後續的文章中我們將不斷地這樣做。

所以請繼續關注(Telegram, Facebook, VK, TS 解決方案博客), Yandex Zen.

來源: www.habr.com

添加評論