2. Stiva elastică: analiza jurnalelor de securitate. Logstash

2. Stiva elastică: analiza jurnalelor de securitate. Logstash

În trecut articol ne-am intalnit stiva ELK, din ce produse software constă. Și prima sarcină cu care se confruntă un inginer atunci când lucrează cu stiva ELK este trimiterea de jurnalele pentru stocare în elasticsearch pentru analiză ulterioară. Cu toate acestea, acesta este doar un serviciu pe buze, elasticsearch stochează jurnalele sub formă de documente cu anumite câmpuri și valori, ceea ce înseamnă că inginerul trebuie să folosească diverse instrumente pentru a analiza mesajul care este trimis de la sistemele finale. Acest lucru se poate face în mai multe moduri - scrieți singur un program care va adăuga documente la baza de date folosind API-ul sau utilizați soluții gata făcute. În acest curs vom lua în considerare soluția Logstash, care face parte din stiva ELK. Vom analiza cum putem trimite jurnalele de la sistemele terminale către Logstash, apoi vom configura un fișier de configurare pentru a analiza și redirecționa către baza de date Elasticsearch. Pentru a face acest lucru, luăm jurnalele din firewall-ul Check Point ca sistem de intrare.

Cursul nu acoperă instalarea stivei ELK, deoarece există un număr mare de articole pe acest subiect; vom lua în considerare componenta de configurare.

Să întocmim un plan de acțiune pentru configurarea Logstash:

  1. Verificarea faptului că elasticsearch va accepta jurnalele (verificarea funcționalității și deschiderii portului).
  2. Luăm în considerare modul în care putem trimite evenimente către Logstash, alegem o metodă și o implementăm.
  3. Configuram Input in fisierul de configurare Logstash.
  4. Configuram Ieșirea în fișierul de configurare Logstash în modul de depanare pentru a înțelege cum arată mesajul de jurnal.
  5. Configurarea filtrului.
  6. Configurarea ieșirii corecte în ElasticSearch.
  7. Se lansează Logstash.
  8. Verificarea jurnalelor din Kibana.

Să ne uităm la fiecare punct mai detaliat:

Verificarea faptului că elasticsearch va accepta jurnalele

Pentru a face acest lucru, puteți utiliza comanda curl pentru a verifica accesul la Elasticsearch din sistemul pe care este implementat Logstash. Dacă aveți autentificare configurată, atunci transferăm și utilizatorul/parola prin curl, specificând portul 9200 dacă nu l-ați schimbat. Dacă primești un răspuns similar cu cel de mai jos, atunci totul este în ordine.

[elastic@elasticsearch ~]$ curl -u <<user_name>> : <<password>> -sS -XGET "<<ip_address_elasticsearch>>:9200"
{
  "name" : "elastic-1",
  "cluster_name" : "project",
  "cluster_uuid" : "sQzjTTuCR8q4ZO6DrEis0A",
  "version" : {
    "number" : "7.4.1",
    "build_flavor" : "default",
    "build_type" : "rpm",
    "build_hash" : "fc0eeb6e2c25915d63d871d344e3d0b45ea0ea1e",
    "build_date" : "2019-10-22T17:16:35.176724Z",
    "build_snapshot" : false,
    "lucene_version" : "8.2.0",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"
}
[elastic@elasticsearch ~]$

Dacă răspunsul nu este primit, atunci pot exista mai multe tipuri de erori: procesul elasticsearch nu rulează, este specificat portul greșit sau portul este blocat de un firewall pe serverul pe care este instalat elasticsearch.

Să vedem cum puteți trimite jurnalele către Logstash dintr-un firewall punct de control

De pe serverul de management Check Point puteți trimite jurnale către Logstash prin syslog folosind utilitarul log_exporter, puteți citi mai multe despre el aici articol, aici vom lăsa doar comanda care creează fluxul:

cp_log_export adăugați nume check_point_syslog target-server < > port-țintă 5555 protocol tcp format generic modul citire semi-unificat

< > - adresa serverului pe care ruleaza Logstash, target-port 5555 - port catre care vom trimite log-uri, trimiterea log-urilor prin tcp poate incarca serverul, deci in unele cazuri este mai corect sa folosesti udp.

Configurarea INPUT în fișierul de configurare Logstash

2. Stiva elastică: analiza jurnalelor de securitate. Logstash

În mod implicit, fișierul de configurare se află în directorul /etc/logstash/conf.d/. Fișierul de configurare este format din 3 părți semnificative: INPUT, FILTER, OUTPUT. ÎN INTRARE indicăm de unde va prelua sistemul de jurnalele, în FILTRU analizați jurnalul - configurați modul de împărțire a mesajului în câmpuri și valori, în REZULTATE configuram fluxul de iesire - unde vor fi trimise jurnalele analizate.

Mai întâi, să configuram INPUT, să luăm în considerare unele dintre tipurile care pot fi - fișier, tcp și exe.

TCP:

input {
tcp {
    port => 5555
    host => “10.10.1.205”
    type => "checkpoint"
    mode => "server"
}
}

mod => "server"
Indică faptul că Logstash acceptă conexiuni.

port => 5555
gazdă => „10.10.1.205”
Acceptăm conexiuni prin adresa IP 10.10.1.205 (Logstash), portul 5555 - portul trebuie să fie permis de politica de firewall.

tip => "punct de control"
Marcăm documentul, foarte convenabil dacă aveți mai multe conexiuni de intrare. Ulterior, pentru fiecare conexiune puteți scrie propriul filtru folosind constructul logic if.

Fişier:

input {
  file {
    path => "/var/log/openvas_report/*"
    type => "openvas"
    start_position => "beginning"
    }
}

Descrierea setărilor:
cale => "/var/log/openvas_report/*"
Indicăm directorul în care trebuie citite fișierele.

tip => "openvas"
Tip de eveniment.

start_position => "început"
Când schimbați un fișier, acesta citește întregul fișier; dacă setați „sfârșit”, sistemul așteaptă să apară înregistrări noi la sfârșitul fișierului.

Executar:

input {
  exec {
    command => "ls -alh"
    interval => 30
  }
}

Folosind această intrare, o comandă shell (numai!) este lansată și ieșirea sa este transformată într-un mesaj de jurnal.

comandă => "ls -alh"
Comanda a cărei ieșire ne interesează.

interval => 30
Interval de invocare a comenzii în secunde.

Pentru a primi jurnalele de la firewall, înregistrăm un filtru tcp sau udp, în funcție de modul în care sunt trimise jurnalele către Logstash.

Configuram Ieșirea în fișierul de configurare Logstash în modul de depanare pentru a înțelege cum arată mesajul de jurnal

După ce am configurat INPUT, trebuie să înțelegem cum va arăta mesajul de jurnal și ce metode trebuie utilizate pentru a configura filtrul de jurnal (parser).

Pentru a face acest lucru, vom folosi un filtru care scoate rezultatul la stdout pentru a vizualiza mesajul original; fișierul de configurare complet în acest moment va arăta astfel:

input 
{
         tcp 
         {
                port => 5555
  	  	type => "checkpoint"
    		mode => "server"
                host => “10.10.1.205”
   	 }
}

output 
{
	if [type] == "checkpoint" 
       {
		stdout { codec=> json }
	}
}

Rulați comanda pentru a verifica:
sudo /usr/share/logstash/bin//logstash -f /etc/logstash/conf.d/checkpoint.conf
Vedem rezultatul, poza se poate face clic:

2. Stiva elastică: analiza jurnalelor de securitate. Logstash

Dacă îl copiați, va arăta astfel:

action="Accept" conn_direction="Internal" contextnum="1" ifdir="outbound" ifname="bond1.101" logid="0" loguid="{0x5dfb8c13,0x5,0xfe0a0a0a,0xc0000000}" origin="10.10.10.254" originsicname="CN=ts-spb-cpgw-01,O=cp-spb-mgmt-01.tssolution.local.kncafb" sequencenum="8" time="1576766483" version="5" context_num="1" dst="10.10.10.10" dst_machine_name="[email protected]" layer_name="TSS-Standard Security" layer_name="TSS-Standard Application" layer_uuid="dae7f01c-4c98-4c3a-a643-bfbb8fcf40f0" layer_uuid="dbee3718-cf2f-4de0-8681-529cb75be9a6" match_id="8" match_id="33554431" parent_rule="0" parent_rule="0" rule_action="Accept" rule_action="Accept" rule_name="Implicit Cleanup" rule_uid="6dc2396f-9644-4546-8f32-95d98a3344e6" product="VPN-1 & FireWall-1" proto="17" s_port="37317" service="53" service_id="domain-udp" src="10.10.1.180" ","type":"qqqqq","host":"10.10.10.250","@version":"1","port":50620}{"@timestamp":"2019-12-19T14:50:12.153Z","message":"time="1576766483" action="Accept" conn_direction="Internal" contextnum="1" ifdir="outbound" ifname="bond1.101" logid="0" loguid="{0x5dfb8c13,

Privind aceste mesaje, înțelegem că jurnalele arată astfel: câmp = valoare sau cheie = valoare, ceea ce înseamnă că un filtru numit kv este potrivit. Pentru a alege filtrul potrivit pentru fiecare caz concret, ar fi o idee bună să vă familiarizați cu ele în documentația tehnică, sau să întrebați un prieten.

Configurarea filtrului

La ultima etapă am selectat kv, configurația acestui filtru este prezentată mai jos:

filter {
if [type] == "checkpoint"{
	kv {
		value_split => "="
		allow_duplicate_values => false
	}
}
}

Selectăm simbolul prin care vom împărți câmpul și valoarea - „=”. Dacă avem intrări identice în jurnal, salvăm o singură instanță în baza de date, altfel veți ajunge cu o matrice de valori identice, adică dacă avem mesajul „foo = some foo=some” scriem doar foo = unele.

Configurarea ieșirii corecte în ElasticSearch

Odată ce Filtrul este configurat, puteți încărca jurnalele în baza de date elastic căutare:

output 
{
if [type] == "checkpoint"
{
 	elasticsearch 
        {
		hosts => ["10.10.1.200:9200"]
		index => "checkpoint-%{+YYYY.MM.dd}"
    		user => "tssolution"
    		password => "cool"
  	}
}
}

Dacă documentul este semnat cu tipul punct de control, salvăm evenimentul în baza de date elasticsearch, care acceptă implicit conexiuni pe 10.10.1.200 pe portul 9200. Fiecare document este salvat într-un index specific, în acest caz salvăm în indexul „checkpoint-” + data oră curentă. Fiecare index poate avea un set specific de câmpuri sau este creat automat când un câmp nou apare într-un mesaj; setările câmpului și tipul lor pot fi vizualizate în mapări.

Dacă aveți autentificare configurată (ne vom uita mai târziu), trebuie specificate acreditările pentru scrierea la un anumit index, în acest exemplu este „tssolution” cu parola „cool”. Puteți diferenția drepturile utilizatorului de a scrie jurnalele numai la un anumit index și nu mai mult.

Lansați Logstash.

Fișierul de configurare Logstash:

input 
{
         tcp 
         {
                port => 5555
  	  	type => "checkpoint"
    		mode => "server"
                host => “10.10.1.205”
   	 }
}

filter {
        if [type] == "checkpoint"{
	kv {
		value_split => "="
		allow_duplicate_values => false
	}
        }
}

output 
{
if [type] == "checkpoint"
{
 	elasticsearch 
        {
		hosts => ["10.10.1.200:9200"]
		index => "checkpoint-%{+YYYY.MM.dd}"
    		user => "tssolution"
    		password => "cool"
  	}
}
}

Verificăm fișierul de configurare pentru corectitudine:
/usr/share/logstash/bin//logstash -f checkpoint.conf
2. Stiva elastică: analiza jurnalelor de securitate. Logstash

Începeți procesul Logstash:
sudo systemctl începe logstash

Verificăm dacă procesul a început:
sudo systemctl status logstash

2. Stiva elastică: analiza jurnalelor de securitate. Logstash

Să verificăm dacă priza este deschisă:
netstat -nat |grep 5555

2. Stiva elastică: analiza jurnalelor de securitate. Logstash

Verificarea jurnalelor din Kibana.

După ce totul rulează, mergeți la Kibana - Discover, asigurați-vă că totul este configurat corect, poza se poate face clic!

2. Stiva elastică: analiza jurnalelor de securitate. Logstash

Toate jurnalele sunt la locul lor și putem vedea toate câmpurile și valorile lor!

Concluzie

Ne-am uitat la cum să scriem un fișier de configurare Logstash și, ca rezultat, am obținut un parser al tuturor câmpurilor și valorilor. Acum putem lucra cu căutarea și trasarea pentru anumite câmpuri. În continuare, în curs, ne vom uita la vizualizarea în Kibana și vom crea un tablou de bord simplu. Este de menționat că fișierul de configurare Logstash trebuie actualizat constant în anumite situații, de exemplu, când dorim să înlocuim valoarea unui câmp dintr-un număr într-un cuvânt. În articolele următoare vom face acest lucru în mod constant.

Așa că rămâneți pe fazăTelegramă, Facebook, VK, TS Solution Blog), Yandex Zen.

Sursa: www.habr.com

Adauga un comentariu