2. Elastic Stack: Analyse von Sicherheitsprotokollen. Logstash

2. Elastic Stack: Analyse von Sicherheitsprotokollen. Logstash

In der Vergangenheit Artikel wir trafen uns ELK-Stapel, aus welchen Softwareprodukten es besteht. Und die erste Aufgabe eines Ingenieurs bei der Arbeit mit dem ELK-Stack besteht darin, Protokolle zur Speicherung in Elasticsearch zur anschließenden Analyse zu senden. Dies ist jedoch nur ein Lippenbekenntnis, Elasticsearch speichert Protokolle in Form von Dokumenten mit bestimmten Feldern und Werten, was bedeutet, dass der Ingenieur verschiedene Tools verwenden muss, um die Nachricht zu analysieren, die von den Endsystemen gesendet wird. Dies kann auf verschiedene Arten erfolgen: Schreiben Sie selbst ein Programm, das mithilfe der API Dokumente zur Datenbank hinzufügt, oder verwenden Sie vorgefertigte Lösungen. In diesem Kurs werden wir die Lösung betrachten Logstasch, das Teil des ELK-Stacks ist. Wir schauen uns an, wie wir Protokolle von Endpunktsystemen an Logstash senden können, und richten dann eine Konfigurationsdatei zum Parsen und Weiterleiten an die Elasticsearch-Datenbank ein. Dazu nehmen wir Protokolle der Check Point-Firewall als eingehendes System.

Der Kurs behandelt nicht die Installation des ELK-Stacks, da es zu diesem Thema eine große Anzahl von Artikeln gibt; wir werden uns mit der Konfigurationskomponente befassen.

Lassen Sie uns einen Aktionsplan für die Logstash-Konfiguration erstellen:

  1. Überprüfen, ob Elasticsearch Protokolle akzeptiert (Überprüfung der Funktionalität und Offenheit des Ports).
  2. Wir überlegen, wie wir Ereignisse an Logstash senden können, wählen eine Methode aus und implementieren sie.
  3. Wir konfigurieren Input in der Logstash-Konfigurationsdatei.
  4. Wir konfigurieren Output in der Logstash-Konfigurationsdatei im Debug-Modus, um zu verstehen, wie die Protokollmeldung aussieht.
  5. Filter einrichten.
  6. Einrichten der richtigen Ausgabe in ElasticSearch.
  7. Logstash wird gestartet.
  8. Überprüfung der Protokolle in Kibana.

Schauen wir uns jeden Punkt genauer an:

Überprüfen, ob Elasticsearch Protokolle akzeptiert

Dazu können Sie den Curl-Befehl verwenden, um den Zugriff auf Elasticsearch von dem System aus zu überprüfen, auf dem Logstash bereitgestellt wird. Wenn Sie die Authentifizierung konfiguriert haben, übertragen wir den Benutzer/das Passwort auch per Curl unter Angabe des Ports 9200, sofern Sie ihn nicht geändert haben. Wenn Sie eine Antwort wie unten erhalten, ist alles in Ordnung.

[elastic@elasticsearch ~]$ curl -u <<user_name>> : <<password>> -sS -XGET "<<ip_address_elasticsearch>>:9200"
{
  "name" : "elastic-1",
  "cluster_name" : "project",
  "cluster_uuid" : "sQzjTTuCR8q4ZO6DrEis0A",
  "version" : {
    "number" : "7.4.1",
    "build_flavor" : "default",
    "build_type" : "rpm",
    "build_hash" : "fc0eeb6e2c25915d63d871d344e3d0b45ea0ea1e",
    "build_date" : "2019-10-22T17:16:35.176724Z",
    "build_snapshot" : false,
    "lucene_version" : "8.2.0",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"
}
[elastic@elasticsearch ~]$

Wenn die Antwort nicht empfangen wird, kann es zu mehreren Fehlern kommen: Der Elasticsearch-Prozess läuft nicht, der falsche Port ist angegeben oder der Port ist durch eine Firewall auf dem Server blockiert, auf dem Elasticsearch installiert ist.

Sehen wir uns an, wie Sie Protokolle von einer Checkpoint-Firewall an Logstash senden können

Vom Check Point-Verwaltungsserver aus können Sie mithilfe des Dienstprogramms log_exporter Protokolle über Syslog an Logstash senden. Weitere Informationen dazu finden Sie hier Artikel, hier belassen wir nur den Befehl, der den Stream erstellt:

cp_log_export Name hinzufügen check_point_syslog Zielserver < > Zielport 5555 Protokoll TCP-Format generisch Lesemodus halbvereinheitlicht

< > – Adresse des Servers, auf dem Logstash läuft, Zielport 5555 – Port, an den wir Protokolle senden. Das Senden von Protokollen über TCP kann den Server belasten, daher ist es in einigen Fällen richtiger, udp zu verwenden.

Einrichten von INPUT in der Logstash-Konfigurationsdatei

2. Elastic Stack: Analyse von Sicherheitsprotokollen. Logstash

Standardmäßig befindet sich die Konfigurationsdatei im Verzeichnis /etc/logstash/conf.d/. Die Konfigurationsdatei besteht aus 3 sinnvollen Teilen: INPUT, FILTER, OUTPUT. IN SPEISUNG Wir geben an, woher das System Protokolle nimmt FILTER Analysieren Sie das Protokoll – legen Sie fest, wie die Nachricht in Felder und Werte unterteilt werden soll AUSGABE Wir konfigurieren den Ausgabestream – wohin die analysierten Protokolle gesendet werden.

Lassen Sie uns zunächst INPUT konfigurieren und einige der möglichen Typen berücksichtigen: Datei, TCP und Exe.

TCP:

input {
tcp {
    port => 5555
    host => “10.10.1.205”
    type => "checkpoint"
    mode => "server"
}
}

Modus => "Server"
Zeigt an, dass Logstash Verbindungen akzeptiert.

Port => 5555
host => „10.10.1.205“
Wir akzeptieren Verbindungen über die IP-Adresse 10.10.1.205 (Logstash), Port 5555 – der Port muss durch die Firewall-Richtlinie zugelassen sein.

Typ => "Checkpoint"
Wir markieren das Dokument, was sehr praktisch ist, wenn Sie mehrere eingehende Verbindungen haben. Anschließend können Sie für jede Verbindung Ihren eigenen Filter mit dem logischen if-Konstrukt schreiben.

Datei:

input {
  file {
    path => "/var/log/openvas_report/*"
    type => "openvas"
    start_position => "beginning"
    }
}

Beschreibung der Einstellungen:
path => "/var/log/openvas_report/*"
Wir geben das Verzeichnis an, in dem die Dateien gelesen werden müssen.

Typ => „openvas“
Ereignistyp.

start_position => "Anfang"
Beim Ändern einer Datei wird die gesamte Datei gelesen; wenn Sie „Ende“ einstellen, wartet das System darauf, dass neue Datensätze am Ende der Datei erscheinen.

Geschäftsführer:

input {
  exec {
    command => "ls -alh"
    interval => 30
  }
}

Mit dieser Eingabe wird ein (einziger!) Shell-Befehl gestartet und dessen Ausgabe in eine Protokollmeldung umgewandelt.

Befehl => „ls -alh“
Der Befehl, dessen Ausgabe uns interessiert.

Intervall => 30
Befehlsaufrufintervall in Sekunden.

Um Protokolle von der Firewall zu erhalten, registrieren wir einen Filter TCP oder UDP, abhängig davon, wie die Protokolle an Logstash gesendet werden.

Wir konfigurieren Output in der Logstash-Konfigurationsdatei im Debug-Modus, um zu verstehen, wie die Protokollmeldung aussieht

Nachdem wir INPUT konfiguriert haben, müssen wir verstehen, wie die Protokollnachricht aussehen wird und welche Methoden zum Konfigurieren des Protokollfilters (Parsers) verwendet werden müssen.

Dazu verwenden wir einen Filter, der das Ergebnis auf stdout ausgibt, um die ursprüngliche Nachricht anzuzeigen; die vollständige Konfigurationsdatei sieht im Moment so aus:

input 
{
         tcp 
         {
                port => 5555
  	  	type => "checkpoint"
    		mode => "server"
                host => “10.10.1.205”
   	 }
}

output 
{
	if [type] == "checkpoint" 
       {
		stdout { codec=> json }
	}
}

Führen Sie den Befehl aus, um Folgendes zu überprüfen:
sudo /usr/share/logstash/bin//logstash -f /etc/logstash/conf.d/checkpoint.conf
Wir sehen das Ergebnis, das Bild ist anklickbar:

2. Elastic Stack: Analyse von Sicherheitsprotokollen. Logstash

Wenn Sie es kopieren, sieht es so aus:

action="Accept" conn_direction="Internal" contextnum="1" ifdir="outbound" ifname="bond1.101" logid="0" loguid="{0x5dfb8c13,0x5,0xfe0a0a0a,0xc0000000}" origin="10.10.10.254" originsicname="CN=ts-spb-cpgw-01,O=cp-spb-mgmt-01.tssolution.local.kncafb" sequencenum="8" time="1576766483" version="5" context_num="1" dst="10.10.10.10" dst_machine_name="[email protected]" layer_name="TSS-Standard Security" layer_name="TSS-Standard Application" layer_uuid="dae7f01c-4c98-4c3a-a643-bfbb8fcf40f0" layer_uuid="dbee3718-cf2f-4de0-8681-529cb75be9a6" match_id="8" match_id="33554431" parent_rule="0" parent_rule="0" rule_action="Accept" rule_action="Accept" rule_name="Implicit Cleanup" rule_uid="6dc2396f-9644-4546-8f32-95d98a3344e6" product="VPN-1 & FireWall-1" proto="17" s_port="37317" service="53" service_id="domain-udp" src="10.10.1.180" ","type":"qqqqq","host":"10.10.10.250","@version":"1","port":50620}{"@timestamp":"2019-12-19T14:50:12.153Z","message":"time="1576766483" action="Accept" conn_direction="Internal" contextnum="1" ifdir="outbound" ifname="bond1.101" logid="0" loguid="{0x5dfb8c13,

Wenn wir uns diese Nachrichten ansehen, verstehen wir, dass die Protokolle wie folgt aussehen: Feld = Wert oder Schlüssel = Wert, was bedeutet, dass ein Filter namens kv geeignet ist. Um für den jeweiligen Fall den richtigen Filter auszuwählen, empfiehlt es sich, sich in der technischen Dokumentation damit vertraut zu machen oder einen Freund zu fragen.

Filter einrichten

Im letzten Schritt haben wir kv ausgewählt. Die Konfiguration dieses Filters wird unten dargestellt:

filter {
if [type] == "checkpoint"{
	kv {
		value_split => "="
		allow_duplicate_values => false
	}
}
}

Wir wählen das Symbol aus, durch das wir das Feld und den Wert teilen – „=“. Wenn wir identische Einträge im Protokoll haben, speichern wir nur eine Instanz in der Datenbank, sonst erhalten Sie am Ende ein Array identischer Werte, d. h. wenn wir die Meldung „foo = some foo=some“ haben, schreiben wir nur foo = einige.

Einrichten der richtigen Ausgabe in ElasticSearch

Sobald der Filter konfiguriert ist, können Sie Protokolle in die Datenbank hochladen elasticsearch:

output 
{
if [type] == "checkpoint"
{
 	elasticsearch 
        {
		hosts => ["10.10.1.200:9200"]
		index => "checkpoint-%{+YYYY.MM.dd}"
    		user => "tssolution"
    		password => "cool"
  	}
}
}

Wenn das Dokument mit dem Prüfpunkttyp signiert ist, speichern wir das Ereignis in der Elasticsearch-Datenbank, die standardmäßig Verbindungen am 10.10.1.200 auf Port 9200 akzeptiert. Jedes Dokument wird in einem bestimmten Index gespeichert, in diesem Fall speichern wir im Index „Checkpoint-“ + aktuelles Uhrzeitdatum. Jeder Index kann einen bestimmten Satz von Feldern haben oder wird automatisch erstellt, wenn ein neues Feld in einer Nachricht erscheint; Feldeinstellungen und ihr Typ können in Zuordnungen angezeigt werden.

Wenn Sie die Authentifizierung konfiguriert haben (wir schauen uns das später an), müssen die Anmeldeinformationen zum Schreiben in einen bestimmten Index angegeben werden, in diesem Beispiel „tssolution“ mit dem Passwort „cool“. Sie können Benutzerrechte differenzieren, um Protokolle nur für einen bestimmten Index und nicht mehr zu schreiben.

Starten Sie Logstash.

Logstash-Konfigurationsdatei:

input 
{
         tcp 
         {
                port => 5555
  	  	type => "checkpoint"
    		mode => "server"
                host => “10.10.1.205”
   	 }
}

filter {
        if [type] == "checkpoint"{
	kv {
		value_split => "="
		allow_duplicate_values => false
	}
        }
}

output 
{
if [type] == "checkpoint"
{
 	elasticsearch 
        {
		hosts => ["10.10.1.200:9200"]
		index => "checkpoint-%{+YYYY.MM.dd}"
    		user => "tssolution"
    		password => "cool"
  	}
}
}

Wir prüfen die Konfigurationsdatei auf Richtigkeit:
/usr/share/logstash/bin//logstash -f checkpoint.conf
2. Elastic Stack: Analyse von Sicherheitsprotokollen. Logstash

Starten Sie den Logstash-Prozess:
sudo systemctl startet logstash

Wir prüfen, ob der Prozess gestartet wurde:
sudo systemctl status logstash

2. Elastic Stack: Analyse von Sicherheitsprotokollen. Logstash

Überprüfen wir, ob der Socket aktiv ist:
netstat -nat |grep 5555

2. Elastic Stack: Analyse von Sicherheitsprotokollen. Logstash

Überprüfung der Protokolle in Kibana.

Nachdem alles läuft, gehen Sie zu Kibana – Entdecken, stellen Sie sicher, dass alles richtig konfiguriert ist und das Bild anklickbar ist!

2. Elastic Stack: Analyse von Sicherheitsprotokollen. Logstash

Alle Protokolle sind vorhanden und wir können alle Felder und ihre Werte sehen!

Abschluss

Wir haben uns angeschaut, wie man eine Logstash-Konfigurationsdatei schreibt, und als Ergebnis haben wir einen Parser aller Felder und Werte erhalten. Jetzt können wir mit der Suche und dem Plotten für bestimmte Felder arbeiten. Als nächstes werden wir uns im Kurs mit der Visualisierung in Kibana befassen und ein einfaches Dashboard erstellen. Es ist erwähnenswert, dass die Logstash-Konfigurationsdatei in bestimmten Situationen ständig aktualisiert werden muss, beispielsweise wenn wir den Wert eines Feldes von einer Zahl durch ein Wort ersetzen möchten. In den folgenden Artikeln werden wir dies ständig tun.

Also bleibt gespanntTelegram, Facebook, VK, TS-Lösungsblog), Yandex.Den.

Source: habr.com

Kommentar hinzufügen