在過去
本課程不涉及 ELK 堆疊的安裝,因為有大量關於此主題的文章;我們將考慮配置組件。
讓我們來制定一個 Logstash 配置的行動計畫:
- 檢查elasticsearch是否會接受日誌(檢查連接埠的功能和開放性)。
- 我們考慮如何將事件傳送到 Logstash、選擇方法並實現它。
- 我們在Logstash設定檔中配置Input。
- 我們在調試模式下在 Logstash 設定檔中配置輸出,以便了解日誌訊息是什麼樣的。
- 設定過濾器。
- 在 ElasticSearch 中設定正確的輸出。
- Logstash 啟動。
- 檢查 Kibana 中的日誌。
讓我們更詳細地看看每一點:
檢查elasticsearch是否接受日誌
為此,您可以使用curl 指令檢查部署Logstash 的系統對Elasticsearch 的存取。 如果您設定了身份驗證,那麼我們也會透過curl 傳輸使用者/密碼,並指定連接埠9200(如果您未更改)。 如果您收到類似於以下內容的回复,則一切正常。
[elastic@elasticsearch ~]$ curl -u <<user_name>> : <<password>> -sS -XGET "<<ip_address_elasticsearch>>:9200"
{
"name" : "elastic-1",
"cluster_name" : "project",
"cluster_uuid" : "sQzjTTuCR8q4ZO6DrEis0A",
"version" : {
"number" : "7.4.1",
"build_flavor" : "default",
"build_type" : "rpm",
"build_hash" : "fc0eeb6e2c25915d63d871d344e3d0b45ea0ea1e",
"build_date" : "2019-10-22T17:16:35.176724Z",
"build_snapshot" : false,
"lucene_version" : "8.2.0",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
}
[elastic@elasticsearch ~]$
如果沒有收到回應,則可能有幾種類型的錯誤:elasticsearch 進程未運行、指定了錯誤的連接埠或該連接埠被安裝elasticsearch 的伺服器上的防火牆阻止。
讓我們看看如何從檢查點防火牆將日誌傳送到 Logstash
從 Check Point 管理伺服器,您可以使用 log_exporter 實用程式透過 syslog 將日誌傳送到 Logstash,您可以在此處閱讀更多相關信息
cp_log_export 新增名稱 check_point_syslog 目標伺服器 < > 目標連接埠 5555 協定 TCP 格式通用讀取模式半統一
< > - Logstash 運行的伺服器位址,目標端口 5555 - 我們將發送日誌的端口,透過 tcp 發送日誌可以載入服務器,因此在某些情況下使用 udp 更正確。
在 Logstash 設定檔中設定 INPUT
預設情況下,設定檔位於 /etc/logstash/conf.d/ 目錄中。 設定檔由 3 個有意義的部分組成:INPUT、FILTER、OUTPUT。 在 輸入 我們指出系統將從哪裡獲取日誌,在 過濾器 解析日誌 - 設定如何將訊息劃分為欄位和值,在 輸出 我們配置輸出流 - 解析後的日誌將發送到其中。
首先,讓我們設定 INPUT,考慮一些可以的類型 - 檔案、tcp 和 exe。
TCP:
input {
tcp {
port => 5555
host => “10.10.1.205”
type => "checkpoint"
mode => "server"
}
}
模式=>“伺服器”
指示 Logstash 正在接受連線。
連接埠 => 5555
主機=>“10.10.1.205”
我們接受透過 IP 位址 10.10.1.205 (Logstash)、連接埠 5555 的連線 - 防火牆策略必須允許該連接埠。
輸入=>“檢查點”
我們標記文檔,如果您有多個傳入連接,這非常方便。 隨後,對於每個連接,您可以使用邏輯 if 構造編寫自己的過濾器。
文件:
input {
file {
path => "/var/log/openvas_report/*"
type => "openvas"
start_position => "beginning"
}
}
設定說明:
路徑=>“/var/log/openvas_report/*”
我們指出需要讀取檔案的目錄。
型態=>“openvas”
事件類型。
開始位置=>“開始”
當更改文件時,它會讀取整個文件;如果設定“end”,系統會等待新記錄出現在文件末尾。
執行:
input {
exec {
command => "ls -alh"
interval => 30
}
}
使用此輸入,將啟動(僅!)shell 命令,並將其輸出轉換為日誌訊息。
命令=>“ls -alh”
我們對其輸出感興趣的命令。
間隔 => 30
指令呼叫間隔(以秒為單位)。
為了接收來自防火牆的日誌,我們註冊一個過濾器 TCP 或 UDP,取決於日誌傳送到 Logstash 的方式。
我們在調試模式下在Logstash設定檔中配置Output,以便了解日誌訊息是什麼樣的
配置INPUT之後,我們需要了解日誌訊息會是什麼樣子,以及需要使用什麼方法來設定日誌過濾器(解析器)。
為此,我們將使用一個過濾器將結果輸出到 stdout,以便查看原始訊息;目前完整的設定檔將如下所示:
input
{
tcp
{
port => 5555
type => "checkpoint"
mode => "server"
host => “10.10.1.205”
}
}
output
{
if [type] == "checkpoint"
{
stdout { codec=> json }
}
}
運行命令檢查:
sudo /usr/share/logstash/bin//logstash -f /etc/logstash/conf.d/checkpoint.conf
我們看到結果了,圖片可以點擊:
如果你複製它,它將看起來像這樣:
action="Accept" conn_direction="Internal" contextnum="1" ifdir="outbound" ifname="bond1.101" logid="0" loguid="{0x5dfb8c13,0x5,0xfe0a0a0a,0xc0000000}" origin="10.10.10.254" originsicname="CN=ts-spb-cpgw-01,O=cp-spb-mgmt-01.tssolution.local.kncafb" sequencenum="8" time="1576766483" version="5" context_num="1" dst="10.10.10.10" dst_machine_name="[email protected]" layer_name="TSS-Standard Security" layer_name="TSS-Standard Application" layer_uuid="dae7f01c-4c98-4c3a-a643-bfbb8fcf40f0" layer_uuid="dbee3718-cf2f-4de0-8681-529cb75be9a6" match_id="8" match_id="33554431" parent_rule="0" parent_rule="0" rule_action="Accept" rule_action="Accept" rule_name="Implicit Cleanup" rule_uid="6dc2396f-9644-4546-8f32-95d98a3344e6" product="VPN-1 & FireWall-1" proto="17" s_port="37317" service="53" service_id="domain-udp" src="10.10.1.180" ","type":"qqqqq","host":"10.10.10.250","@version":"1","port":50620}{"@timestamp":"2019-12-19T14:50:12.153Z","message":"time="1576766483" action="Accept" conn_direction="Internal" contextnum="1" ifdir="outbound" ifname="bond1.101" logid="0" loguid="{0x5dfb8c13,
查看這些訊息,我們了解到日誌看起來像:field = value 或 key = value,這意味著名為 kv 的過濾器是合適的。 為了為每種特定情況選擇正確的過濾器,最好在技術文件中熟悉它們,或詢問朋友。
設定過濾器
在最後階段我們選擇了kv,該過濾器的配置如下所示:
filter {
if [type] == "checkpoint"{
kv {
value_split => "="
allow_duplicate_values => false
}
}
}
我們選擇用於劃分欄位和值的符號 - “=”。 如果日誌中有相同的條目,我們只在資料庫中保存一個實例,否則最終會得到一組相同值,也就是說,如果我們有訊息“foo = some foo=some”,我們只寫入 foo =一些。
在 ElasticSearch 中設定正確的輸出
配置Filter後,就可以將日誌上傳到資料庫 彈性搜索:
output
{
if [type] == "checkpoint"
{
elasticsearch
{
hosts => ["10.10.1.200:9200"]
index => "checkpoint-%{+YYYY.MM.dd}"
user => "tssolution"
password => "cool"
}
}
}
如果文件使用檢查點類型進行簽名,我們會將事件儲存到elasticsearch 資料庫中,該資料庫預設接受連接埠10.10.1.200 上的9200 上的連線。 每個文件都儲存到特定的索引,在本例中我們儲存到索引「checkpoint-」+目前時間日期。 每個索引可以有一組特定的字段,或者當訊息中出現新字段時自動建立;可以在映射中查看字段設定及其類型。
如果您設定了身份驗證(我們稍後會介紹),則必須指定寫入特定索引的憑證,在本例中為“tssolution”,密碼為“cool”。 您可以區分使用者權限,使其僅將日誌寫入特定索引,而不再寫入其他索引。
啟動 Logstash。
Logstash設定檔:
input
{
tcp
{
port => 5555
type => "checkpoint"
mode => "server"
host => “10.10.1.205”
}
}
filter {
if [type] == "checkpoint"{
kv {
value_split => "="
allow_duplicate_values => false
}
}
}
output
{
if [type] == "checkpoint"
{
elasticsearch
{
hosts => ["10.10.1.200:9200"]
index => "checkpoint-%{+YYYY.MM.dd}"
user => "tssolution"
password => "cool"
}
}
}
我們檢查設定檔的正確性:
/usr/share/logstash/bin//logstash -f checkpoint.conf
啟動Logstash進程:
sudo systemctl啟動logstash
我們檢查該進程是否已啟動:
sudo systemctl 狀態logstash
讓我們檢查一下套接字是否已啟動:
netstat -nat |grep 5555
檢查 Kibana 中的日誌。
一切運行完畢後,進入Kibana - Discover,確保一切配置正確,圖片可點擊!
所有日誌都已就位,我們可以看到所有欄位及其值!
結論
我們研究瞭如何編寫 Logstash 配置文件,結果我們得到了所有欄位和值的解析器。 現在我們可以對特定欄位進行搜尋和繪圖。 接下來,在課程中,我們將了解 Kibana 中的視覺化並建立一個簡單的儀表板。 值得一提的是,Logstash設定檔在某些情況下需要不斷更新,例如當我們想要將某個欄位的值從數字替換為單字時。 在後續的文章中我們將不斷地這樣做。
所以請繼續關注(
來源: www.habr.com