2. Elastični sklad: analiza varnostnih dnevnikov. Logstash

2. Elastični sklad: analiza varnostnih dnevnikov. Logstash

V preteklosti članek sva se srečala sklad ELK, iz katerih programskih izdelkov je sestavljen. In prva naloga, s katero se sooči inženir pri delu s skladom ELK, je pošiljanje dnevnikov za shranjevanje v elasticsearch za poznejšo analizo. Vendar je to le beseda, elasticsearch shranjuje dnevnike v obliki dokumentov z določenimi polji in vrednostmi, kar pomeni, da mora inženir uporabiti različna orodja za razčlenitev sporočila, ki je poslano iz končnih sistemov. To je mogoče storiti na več načinov - sami napišite program, ki bo dodal dokumente v bazo podatkov s pomočjo API-ja, ali uporabite že pripravljene rešitve. V tem tečaju bomo obravnavali rešitev Logstash, ki je del sklada ELK. Ogledali si bomo, kako lahko pošiljamo dnevnike iz sistemov končnih točk v Logstash, nato pa bomo nastavili konfiguracijsko datoteko za razčlenitev in preusmeritev v bazo podatkov Elasticsearch. Za to vzamemo dnevnike iz požarnega zidu Check Point kot vhodni sistem.

Tečaj ne zajema namestitve sklada ELK, ker je na to temo ogromno člankov, upoštevali bomo konfiguracijsko komponento.

Naredimo akcijski načrt za konfiguracijo Logstash:

  1. Preverjanje, ali elasticsearch sprejema dnevnike (preverjanje funkcionalnosti in odprtosti vrat).
  2. Razmislimo, kako lahko pošljemo dogodke v Logstash, izberemo metodo in jo implementiramo.
  3. Vhod konfiguriramo v konfiguracijski datoteki Logstash.
  4. Izhod konfiguriramo v konfiguracijski datoteki Logstash v načinu za odpravljanje napak, da razumemo, kako izgleda sporočilo dnevnika.
  5. Nastavitev filtra.
  6. Nastavitev pravilnega izhoda v ElasticSearch.
  7. Zažene se Logstash.
  8. Preverjanje dnevnikov v Kibani.

Oglejmo si vsako točko podrobneje:

Preverjanje, ali elasticsearch sprejema dnevnike

Če želite to narediti, lahko z ukazom curl preverite dostop do Elasticsearch iz sistema, v katerem je nameščen Logstash. Če imate konfigurirano preverjanje pristnosti, prenesemo tudi uporabnika/geslo prek curl, pri čemer navedemo vrata 9200, če jih niste spremenili. Če prejmete odgovor, podoben spodnjemu, potem je vse v redu.

[elastic@elasticsearch ~]$ curl -u <<user_name>> : <<password>> -sS -XGET "<<ip_address_elasticsearch>>:9200"
{
  "name" : "elastic-1",
  "cluster_name" : "project",
  "cluster_uuid" : "sQzjTTuCR8q4ZO6DrEis0A",
  "version" : {
    "number" : "7.4.1",
    "build_flavor" : "default",
    "build_type" : "rpm",
    "build_hash" : "fc0eeb6e2c25915d63d871d344e3d0b45ea0ea1e",
    "build_date" : "2019-10-22T17:16:35.176724Z",
    "build_snapshot" : false,
    "lucene_version" : "8.2.0",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"
}
[elastic@elasticsearch ~]$

Če odgovor ni prejet, lahko pride do več vrst napak: proces elasticsearch se ne izvaja, podana so napačna vrata ali pa vrata blokira požarni zid na strežniku, kjer je nameščen elasticsearch.

Poglejmo, kako lahko pošljete dnevnike v Logstash iz požarnega zidu kontrolne točke

Iz strežnika za upravljanje Check Point lahko pošiljate dnevnike v Logstash prek syslog z uporabo pripomočka log_exporter, več o tem lahko preberete tukaj članek, tukaj bomo pustili le ukaz, ki ustvari tok:

cp_log_export dodaj ime check_point_syslog ciljni strežnik < > ciljna vrata 5555 protokol tcp format generični bralni način pol poenoten

< > - naslov strežnika, na katerem teče Logstash, target-port 5555 - vrata, kamor bomo pošiljali dnevnike, pošiljanje dnevnikov preko tcp lahko obremeni strežnik, zato je v nekaterih primerih pravilneje uporabiti udp.

Nastavitev INPUT v konfiguracijski datoteki Logstash

2. Elastični sklad: analiza varnostnih dnevnikov. Logstash

Privzeto se konfiguracijska datoteka nahaja v imeniku /etc/logstash/conf.d/. Konfiguracijska datoteka je sestavljena iz 3 pomembnih delov: INPUT, FILTER, OUTPUT. IN INPUT navedemo, od kod bo sistem jemal dnevnike, v FILTER razčleniti dnevnik - nastaviti, kako razdeliti sporočilo na polja in vrednosti, v IZHOD konfiguriramo izhodni tok - kam bodo poslani razčlenjeni dnevniki.

Najprej konfigurirajmo INPUT, razmislimo o nekaterih vrstah, ki so lahko - datoteka, tcp in exe.

TCP:

input {
tcp {
    port => 5555
    host => “10.10.1.205”
    type => "checkpoint"
    mode => "server"
}
}

način => "strežnik"
Označuje, da Logstash sprejema povezave.

vrata => 5555
gostitelj => “10.10.1.205”
Sprejemamo povezave prek naslova IP 10.10.1.205 (Logstash), vrata 5555 - vrata morajo biti dovoljena s pravilnikom požarnega zidu.

tip => "kontrolna točka"
Označimo dokument, kar je zelo priročno, če imate več dohodnih povezav. Nato lahko za vsako povezavo napišete svoj filter z uporabo logične konstrukcije if.

Datoteka:

input {
  file {
    path => "/var/log/openvas_report/*"
    type => "openvas"
    start_position => "beginning"
    }
}

Opis nastavitev:
pot => "/var/log/openvas_report/*"
Označimo imenik, v katerem je treba prebrati datoteke.

tip => "openvas"
Vrsta dogodka.

start_position => "začetek"
Pri spreminjanju datoteke prebere celotno datoteko, če nastavite »konec«, sistem počaka, da se na koncu datoteke prikažejo novi zapisi.

Izvajalec:

input {
  exec {
    command => "ls -alh"
    interval => 30
  }
}

Z uporabo tega vnosa se zažene (samo!) lupinski ukaz in njegov izhod se spremeni v sporočilo dnevnika.

ukaz => "ls -alh"
Ukaz, katerega rezultat nas zanima.

interval => 30
Interval priklica ukaza v sekundah.

Za prejemanje dnevnikov iz požarnega zidu registriramo filter tcp ali udp, odvisno od tega, kako so dnevniki poslani v Logstash.

Izhod konfiguriramo v konfiguracijski datoteki Logstash v načinu za odpravljanje napak, da bi razumeli, kako izgleda sporočilo dnevnika

Ko smo konfigurirali INPUT, moramo razumeti, kako bo izgledalo sporočilo dnevnika in katere metode je treba uporabiti za konfiguracijo filtra dnevnika (razčlenjevalnik).

Za to bomo uporabili filter, ki izpiše rezultat v stdout, da si ogledamo izvirno sporočilo; celotna konfiguracijska datoteka bo trenutno videti takole:

input 
{
         tcp 
         {
                port => 5555
  	  	type => "checkpoint"
    		mode => "server"
                host => “10.10.1.205”
   	 }
}

output 
{
	if [type] == "checkpoint" 
       {
		stdout { codec=> json }
	}
}

Zaženite ukaz za preverjanje:
sudo /usr/share/logstash/bin//logstash -f /etc/logstash/conf.d/checkpoint.conf
Vidimo rezultat, sliko lahko kliknemo:

2. Elastični sklad: analiza varnostnih dnevnikov. Logstash

Če ga kopirate, bo videti takole:

action="Accept" conn_direction="Internal" contextnum="1" ifdir="outbound" ifname="bond1.101" logid="0" loguid="{0x5dfb8c13,0x5,0xfe0a0a0a,0xc0000000}" origin="10.10.10.254" originsicname="CN=ts-spb-cpgw-01,O=cp-spb-mgmt-01.tssolution.local.kncafb" sequencenum="8" time="1576766483" version="5" context_num="1" dst="10.10.10.10" dst_machine_name="[email protected]" layer_name="TSS-Standard Security" layer_name="TSS-Standard Application" layer_uuid="dae7f01c-4c98-4c3a-a643-bfbb8fcf40f0" layer_uuid="dbee3718-cf2f-4de0-8681-529cb75be9a6" match_id="8" match_id="33554431" parent_rule="0" parent_rule="0" rule_action="Accept" rule_action="Accept" rule_name="Implicit Cleanup" rule_uid="6dc2396f-9644-4546-8f32-95d98a3344e6" product="VPN-1 & FireWall-1" proto="17" s_port="37317" service="53" service_id="domain-udp" src="10.10.1.180" ","type":"qqqqq","host":"10.10.10.250","@version":"1","port":50620}{"@timestamp":"2019-12-19T14:50:12.153Z","message":"time="1576766483" action="Accept" conn_direction="Internal" contextnum="1" ifdir="outbound" ifname="bond1.101" logid="0" loguid="{0x5dfb8c13,

Če pogledamo ta sporočila, ugotovimo, da so dnevniki videti takole: polje = vrednost ali ključ = vrednost, kar pomeni, da je filter, imenovan kv, primeren. Da bi izbrali pravi filter za vsak konkreten primer, bi bilo dobro, da se z njimi seznanite v tehnični dokumentaciji ali vprašate prijatelja.

Nastavitev filtra

Na zadnji stopnji smo izbrali kv, konfiguracija tega filtra je predstavljena spodaj:

filter {
if [type] == "checkpoint"{
	kv {
		value_split => "="
		allow_duplicate_values => false
	}
}
}

Izberemo simbol, s katerim bomo razdelili polje in vrednost - “=”. Če imamo v dnevniku enake vnose, shranimo samo en primerek v bazo podatkov, sicer boste na koncu imeli niz enakih vrednosti, to pomeni, da če imamo sporočilo "foo = nekaj foo=some", napišemo samo foo = nekaj.

Nastavitev pravilnega izhoda v ElasticSearch

Ko je filter konfiguriran, lahko naložite dnevnike v bazo podatkov elastično iskanje:

output 
{
if [type] == "checkpoint"
{
 	elasticsearch 
        {
		hosts => ["10.10.1.200:9200"]
		index => "checkpoint-%{+YYYY.MM.dd}"
    		user => "tssolution"
    		password => "cool"
  	}
}
}

Če je dokument podpisan s tipom kontrolne točke, shranimo dogodek v bazo podatkov elasticsearch, ki privzeto sprejema povezave na 10.10.1.200 na vratih 9200. Vsak dokument se shrani v določen indeks, v tem primeru shranimo v indeks “kontrolna točka-” + trenutni čas datum. Vsak indeks ima lahko določen nabor polj ali pa se ustvari samodejno, ko se v sporočilu pojavi novo polje; nastavitve polj in njihovo vrsto si lahko ogledate v preslikavah.

Če imate konfigurirano preverjanje pristnosti (ogledali si ga bomo kasneje), je treba podati poverilnice za pisanje v določen indeks, v tem primeru je to »tssolution« z geslom »cool«. Pravice uporabnikov lahko ločite za pisanje dnevnikov samo v določen indeks in nič več.

Zaženite Logstash.

Konfiguracijska datoteka Logstash:

input 
{
         tcp 
         {
                port => 5555
  	  	type => "checkpoint"
    		mode => "server"
                host => “10.10.1.205”
   	 }
}

filter {
        if [type] == "checkpoint"{
	kv {
		value_split => "="
		allow_duplicate_values => false
	}
        }
}

output 
{
if [type] == "checkpoint"
{
 	elasticsearch 
        {
		hosts => ["10.10.1.200:9200"]
		index => "checkpoint-%{+YYYY.MM.dd}"
    		user => "tssolution"
    		password => "cool"
  	}
}
}

Preverimo pravilnost konfiguracijske datoteke:
/usr/share/logstash/bin//logstash -f checkpoint.conf
2. Elastični sklad: analiza varnostnih dnevnikov. Logstash

Zaženite postopek Logstash:
sudo sistemctl start logstash

Preverimo, ali se je postopek začel:
sudo systemctl status logstash

2. Elastični sklad: analiza varnostnih dnevnikov. Logstash

Preverimo, ali je vtičnica odprta:
netstat -nat |grep 5555

2. Elastični sklad: analiza varnostnih dnevnikov. Logstash

Preverjanje dnevnikov v Kibani.

Ko se vse zažene, pojdite na Kibana - Discover, preverite, ali je vse pravilno konfigurirano, sliko je mogoče klikniti!

2. Elastični sklad: analiza varnostnih dnevnikov. Logstash

Vsi dnevniki so na svojem mestu in lahko vidimo vsa polja in njihove vrednosti!

Zaključek

Pogledali smo, kako napisati konfiguracijsko datoteko Logstash, in kot rezultat smo dobili razčlenjevalnik vseh polj in vrednosti. Zdaj lahko delamo z iskanjem in risanjem za določena polja. Naslednji tečaj si bomo ogledali vizualizacijo v Kibani in ustvarili preprosto nadzorno ploščo. Omeniti velja, da je treba konfiguracijsko datoteko Logstash v določenih situacijah nenehno posodabljati, na primer, ko želimo vrednost polja zamenjati iz številke v besedo. V naslednjih člankih bomo to počeli nenehno.

Zato ostanite z nami (Telegram, Facebook , VK, Spletni dnevnik rešitev TS), Yandex Zen.

Vir: www.habr.com

Dodaj komentar