2. Stack elastik: analiza e regjistrave të sigurisë. Logstash

2. Stack elastik: analiza e regjistrave të sigurisë. Logstash

Në të fundit artikull ne u takuam pirg ELK, nga cilat produkte softuerësh përbëhet. Dhe detyra e parë me të cilën përballet një inxhinier kur punon me pirgun ELK është dërgimi i regjistrave për ruajtje në elasticsearch për analiza të mëvonshme. Megjithatë, ky është vetëm shërbim i buzëve, elasticsearch ruan regjistrat në formën e dokumenteve me fusha dhe vlera të caktuara, që do të thotë se inxhinieri duhet të përdorë mjete të ndryshme për të analizuar mesazhin që dërgohet nga sistemet fundore. Kjo mund të bëhet në disa mënyra - shkruani vetë një program që do të shtojë dokumente në bazën e të dhënave duke përdorur API, ose përdorni zgjidhje të gatshme. Në këtë kurs do të shqyrtojmë zgjidhjen logstash, e cila është pjesë e pirgut ELK. Ne do të shikojmë se si mund të dërgojmë regjistrat nga sistemet e pikës fundore në Logstash dhe më pas do të konfigurojmë një skedar konfigurimi për të analizuar dhe ridrejtuar në bazën e të dhënave Elasticsearch. Për ta bërë këtë, ne marrim regjistrat nga muri i zjarrit Check Point si sistemi hyrës.

Kursi nuk mbulon instalimin e pirgut ELK, pasi ka një numër të madh artikujsh mbi këtë temë; ne do të shqyrtojmë komponentin e konfigurimit.

Le të hartojmë një plan veprimi për konfigurimin e Logstash:

  1. Kontrollimi që elasticsearch do të pranojë regjistrat (duke kontrolluar funksionalitetin dhe hapjen e portit).
  2. Ne konsiderojmë se si mund të dërgojmë ngjarje në Logstash, të zgjedhim një metodë dhe ta zbatojmë atë.
  3. Ne konfigurojmë Inputin në skedarin e konfigurimit Logstash.
  4. Ne konfigurojmë Output-in në skedarin e konfigurimit Logstash në modalitetin e korrigjimit në mënyrë që të kuptojmë se si duket mesazhi i regjistrit.
  5. Konfigurimi i filtrit.
  6. Vendosja e daljes së saktë në ElasticSearch.
  7. Logstash nis.
  8. Kontrollimi i regjistrave në Kibana.

Le të shohim më në detaje çdo pikë:

Kontrollimi që elasticsearch do të pranojë regjistrat

Për ta bërë këtë, mund të përdorni komandën curl për të kontrolluar hyrjen në Elasticsearch nga sistemi në të cilin është vendosur Logstash. Nëse e keni konfiguruar vërtetimin, atëherë ne transferojmë edhe përdoruesin/fjalëkalimin nëpërmjet curl, duke specifikuar portin 9200 nëse nuk e keni ndryshuar. Nëse merrni një përgjigje të ngjashme me atë më poshtë, atëherë gjithçka është në rregull.

[elastic@elasticsearch ~]$ curl -u <<user_name>> : <<password>> -sS -XGET "<<ip_address_elasticsearch>>:9200"
{
  "name" : "elastic-1",
  "cluster_name" : "project",
  "cluster_uuid" : "sQzjTTuCR8q4ZO6DrEis0A",
  "version" : {
    "number" : "7.4.1",
    "build_flavor" : "default",
    "build_type" : "rpm",
    "build_hash" : "fc0eeb6e2c25915d63d871d344e3d0b45ea0ea1e",
    "build_date" : "2019-10-22T17:16:35.176724Z",
    "build_snapshot" : false,
    "lucene_version" : "8.2.0",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"
}
[elastic@elasticsearch ~]$

Nëse përgjigja nuk merret, atëherë mund të ketë disa lloje gabimesh: procesi elasticsearch nuk po funksionon, porti i gabuar është specifikuar ose porti është bllokuar nga një mur zjarri në serverin ku është instaluar elasticsearch.

Le të shohim se si mund të dërgoni regjistra në Logstash nga një mur zjarri i pikës së kontrollit

Nga serveri i menaxhimit të Check Point mund të dërgoni regjistra në Logstash nëpërmjet syslog duke përdorur mjetin log_exporter, mund të lexoni më shumë rreth tij këtu artikull, këtu do të lëmë vetëm komandën që krijon rrymën:

cp_log_export shtoni emrin check_point_syslog target-server < > target-port 5555 protokolli i formatit tcp modaliteti i përgjithshëm i leximit gjysmë i unifikuar

< > - adresa e serverit në të cilin funksionon Logstash, target-port 5555 - porta në të cilën do të dërgojmë regjistrat, dërgimi i regjistrave përmes tcp mund të ngarkojë serverin, kështu që në disa raste është më korrekte të përdoret udp.

Konfigurimi i INPUT në skedarin e konfigurimit Logstash

2. Stack elastik: analiza e regjistrave të sigurisë. Logstash

Si parazgjedhje, skedari i konfigurimit ndodhet në drejtorinë /etc/logstash/conf.d/. Skedari i konfigurimit përbëhet nga 3 pjesë domethënëse: INPUT, FILTER, OUTPUT. NË INPUT ne tregojmë se nga do të marrë regjistrat e sistemit, në Filtër analizoni regjistrin - vendosni se si të ndani mesazhin në fusha dhe vlera, në OUTPUT ne konfigurojmë rrjedhën e daljes - ku do të dërgohen regjistrat e analizuar.

Së pari, le të konfigurojmë INPUT, të shqyrtojmë disa nga llojet që mund të jenë - file, tcp dhe exe.

Tcp:

input {
tcp {
    port => 5555
    host => “10.10.1.205”
    type => "checkpoint"
    mode => "server"
}
}

mode => "server"
Tregon që Logstash po pranon lidhje.

port => 5555
host => "10.10.1.205"
Ne pranojmë lidhje nëpërmjet adresës IP 10.10.1.205 (Logstash), porti 5555 - porti duhet të lejohet nga politika e murit të zjarrit.

tip => "pikë kontrolli"
Ne e shënojmë dokumentin, shumë i përshtatshëm nëse keni disa lidhje hyrëse. Më pas, për çdo lidhje mund të shkruani filtrin tuaj duke përdorur konstruktin logjik if.

Dosja:

input {
  file {
    path => "/var/log/openvas_report/*"
    type => "openvas"
    start_position => "beginning"
    }
}

Përshkrimi i cilësimeve:
rrugë => "/var/log/openvas_report/*"
Ne tregojmë drejtorinë në të cilën skedarët duhet të lexohen.

tip => "openvas"
Lloji i ngjarjes.

start_position => "fillimi"
Kur ndryshon një skedar, ai lexon të gjithë skedarin; nëse vendosni "fund", sistemi pret që të dhënat e reja të shfaqen në fund të skedarit.

Ekzekutiv:

input {
  exec {
    command => "ls -alh"
    interval => 30
  }
}

Duke përdorur këtë hyrje, lëshohet një komandë (vetëm!) shell dhe dalja e saj kthehet në një mesazh log.

komanda => "ls -alh"
Komanda për rezultatin e së cilës ne jemi të interesuar.

intervali => 30
Intervali i thirrjes së komandës në sekonda.

Për të marrë regjistrat nga muri i zjarrit, ne regjistrojmë një filtër tcp ose pudre, në varësi të mënyrës se si regjistrat dërgohen në Logstash.

Ne konfigurojmë Outputin në skedarin e konfigurimit Logstash në modalitetin e korrigjimit në mënyrë që të kuptojmë se si duket mesazhi i regjistrit

Pasi të kemi konfiguruar INPUT, duhet të kuptojmë se si do të duket mesazhi i regjistrit dhe cilat metoda duhet të përdoren për të konfiguruar filtrin e log-it (parserin).

Për ta bërë këtë, ne do të përdorim një filtër që nxjerr rezultatin në stdout për të parë mesazhin origjinal; skedari i plotë i konfigurimit për momentin do të duket kështu:

input 
{
         tcp 
         {
                port => 5555
  	  	type => "checkpoint"
    		mode => "server"
                host => “10.10.1.205”
   	 }
}

output 
{
	if [type] == "checkpoint" 
       {
		stdout { codec=> json }
	}
}

Ekzekutoni komandën për të kontrolluar:
sudo /usr/share/logstash/bin//logstash -f /etc/logstash/conf.d/checkpoint.conf
Ne shohim rezultatin, fotografia mund të klikohet:

2. Stack elastik: analiza e regjistrave të sigurisë. Logstash

Nëse e kopjoni do të duket kështu:

action="Accept" conn_direction="Internal" contextnum="1" ifdir="outbound" ifname="bond1.101" logid="0" loguid="{0x5dfb8c13,0x5,0xfe0a0a0a,0xc0000000}" origin="10.10.10.254" originsicname="CN=ts-spb-cpgw-01,O=cp-spb-mgmt-01.tssolution.local.kncafb" sequencenum="8" time="1576766483" version="5" context_num="1" dst="10.10.10.10" dst_machine_name="[email protected]" layer_name="TSS-Standard Security" layer_name="TSS-Standard Application" layer_uuid="dae7f01c-4c98-4c3a-a643-bfbb8fcf40f0" layer_uuid="dbee3718-cf2f-4de0-8681-529cb75be9a6" match_id="8" match_id="33554431" parent_rule="0" parent_rule="0" rule_action="Accept" rule_action="Accept" rule_name="Implicit Cleanup" rule_uid="6dc2396f-9644-4546-8f32-95d98a3344e6" product="VPN-1 & FireWall-1" proto="17" s_port="37317" service="53" service_id="domain-udp" src="10.10.1.180" ","type":"qqqqq","host":"10.10.10.250","@version":"1","port":50620}{"@timestamp":"2019-12-19T14:50:12.153Z","message":"time="1576766483" action="Accept" conn_direction="Internal" contextnum="1" ifdir="outbound" ifname="bond1.101" logid="0" loguid="{0x5dfb8c13,

Duke parë këto mesazhe, kuptojmë se regjistrat duken si: fushë = vlerë ose çelës = vlerë, që do të thotë se një filtër i quajtur kv është i përshtatshëm. Për të zgjedhur filtrin e duhur për çdo rast specifik, do të ishte mirë të njiheni me to në dokumentacionin teknik ose të pyesni një mik.

Konfigurimi i filtrit

Në fazën e fundit që kemi zgjedhur kv, konfigurimi i këtij filtri është paraqitur më poshtë:

filter {
if [type] == "checkpoint"{
	kv {
		value_split => "="
		allow_duplicate_values => false
	}
}
}

Ne zgjedhim simbolin me të cilin do të ndajmë fushën dhe vlerën - "=". Nëse kemi hyrje identike në regjistër, ruajmë vetëm një shembull në bazën e të dhënave, përndryshe do të përfundoni me një grup vlerash identike, domethënë nëse kemi mesazhin "foo = disa foo = disa" shkruajmë vetëm foo = disa.

Vendosja e daljes së saktë në ElasticSearch

Pasi të konfigurohet Filtri, mund të ngarkoni regjistrat në bazën e të dhënave kërkesë elastike:

output 
{
if [type] == "checkpoint"
{
 	elasticsearch 
        {
		hosts => ["10.10.1.200:9200"]
		index => "checkpoint-%{+YYYY.MM.dd}"
    		user => "tssolution"
    		password => "cool"
  	}
}
}

Nëse dokumenti është i nënshkruar me llojin e pikës së kontrollit, ne e ruajmë ngjarjen në bazën e të dhënave elasticsearch, e cila pranon lidhjet më 10.10.1.200 në portin 9200 si parazgjedhje. Çdo dokument ruhet në një indeks specifik, në këtë rast ruajmë në indeksin "pikë kontrolli-" + datën aktuale të kohës. Çdo indeks mund të ketë një grup specifik fushash, ose krijohet automatikisht kur një fushë e re shfaqet në një mesazh; cilësimet e fushës dhe lloji i tyre mund të shihen në harta.

Nëse keni konfiguruar vërtetimin (do ta shikojmë më vonë), duhet të specifikohen kredencialet për të shkruar në një indeks specifik, në këtë shembull është "tssolution" me fjalëkalimin "cool". Ju mund të dalloni të drejtat e përdoruesve për të shkruar regjistra vetëm në një indeks specifik dhe jo më shumë.

Nisni Logstash.

Skedari i konfigurimit Logstash:

input 
{
         tcp 
         {
                port => 5555
  	  	type => "checkpoint"
    		mode => "server"
                host => “10.10.1.205”
   	 }
}

filter {
        if [type] == "checkpoint"{
	kv {
		value_split => "="
		allow_duplicate_values => false
	}
        }
}

output 
{
if [type] == "checkpoint"
{
 	elasticsearch 
        {
		hosts => ["10.10.1.200:9200"]
		index => "checkpoint-%{+YYYY.MM.dd}"
    		user => "tssolution"
    		password => "cool"
  	}
}
}

Ne kontrollojmë skedarin e konfigurimit për korrektësi:
/usr/share/logstash/bin//logstash -f checkpoint.conf
2. Stack elastik: analiza e regjistrave të sigurisë. Logstash

Filloni procesin e Logstash:
sudo systemctl fillo logstash

Ne kontrollojmë që procesi ka filluar:
sudo systemctl statusi logstash

2. Stack elastik: analiza e regjistrave të sigurisë. Logstash

Le të kontrollojmë nëse priza është ngritur:
netstat -nat |grep 5555

2. Stack elastik: analiza e regjistrave të sigurisë. Logstash

Kontrollimi i regjistrave në Kibana.

Pasi gjithçka të funksionojë, shkoni te Kibana - Discover, sigurohuni që gjithçka të jetë konfiguruar saktë, fotografia mund të klikohet!

2. Stack elastik: analiza e regjistrave të sigurisë. Logstash

Të gjitha regjistrat janë në vend dhe ne mund të shohim të gjitha fushat dhe vlerat e tyre!

Përfundim

Ne shikuam se si të shkruanim një skedar konfigurimi Logstash, dhe si rezultat morëm një analizues të të gjitha fushave dhe vlerave. Tani mund të punojmë me kërkimin dhe hartimin e fushave specifike. Më pas në kurs do të shikojmë vizualizimin në Kibana dhe do të krijojmë një panel të thjeshtë. Vlen të përmendet se skedari i konfigurimit Logstash duhet të përditësohet vazhdimisht në situata të caktuara, për shembull, kur duam të zëvendësojmë vlerën e një fushe nga një numër në një fjalë. Në artikujt vijues do ta bëjmë këtë vazhdimisht.

Pra, qëndroni të sintonizuarTelegram, Facebook, VK, TS Zgjidhja Blog), Yandex Zen.

Burimi: www.habr.com

Shto një koment