2. Elastic stack: аналіз security логаў. Logstash

2. Elastic stack: аналіз security логаў. Logstash

У мінулым артыкуле мы пазнаёміліся са стэкам ELK, з якіх праграмных прадуктаў ён складаецца. І першая задача з якой сутыкаецца інжынер пры працы з ELK стэкам гэта адпраўленне логаў для захоўвання ў elasticsearch для наступнага аналізу. Аднак, гэта проста толькі на словах, elasticsearch захоўвае логі ў выглядзе дакументаў з пэўнымі палямі і значэннямі, а значыць інжынер павінен выкарыстоўваючы розныя інструменты распарсіць паведамленне, якое адпраўляецца з канчатковых сістэм. Зрабіць гэта можна некалькімі спосабамі - самому напісаць праграму, якая па API будзе дадаваць дакументы ў базу або выкарыстоўваць ужо гатовыя рашэнні. У рамках дадзенага курса мы будзем разглядаць рашэнне Logstash, Якое з'яўляецца часткай ELK stack. Мы паглядзім як можна адправіць логі з канчатковых сістэм у Logstash, а затым будзем наладжваць канфігурацыйны файл для парсінгу і перанакіраванні ў базу дадзеных Elasticsearch. Для гэтага ў якасці ўваходнай сістэмы бярэм логі з міжсеткавага экрана Check Point.

У рамках курса не разглядаецца ўстаноўка ELK stack, так як існуе вялікая колькасць артыкулаў на гэтую тэму, будзем разглядаць канфігурацыйны складнік.

Складзем план дзеянняў па канфігурацыі Logstash:

  1. Праверка што elasticsearch будзе прымаць логі (праверка працаздольнасці і адкрытасці порта).
  2. Разглядаем якім чынам мы можам адправіць падзеі ў Logstash, выбіраемы спосаб, рэалізуемы.
  3. Наладжваем Input у канфігурацыйным файле Logstash.
  4. Наладжваем Output у канфігурацыйным файле Logstash у рэжыме дэбага, для таго каб зразумець як выглядае лог паведамленне.
  5. Наладжваем Filter.
  6. Наладжваем карэктны Output у ElasticSearch.
  7. Запускам Logstash.
  8. Правяраем логі ў Kibana.

Разгледзім больш дэталёва кожны пункт:

Праверка што elasticsearch будзе прымаць логі

Для гэтага можна праверыць камандай curl доступ у Elasticsearch з сістэмы, на якой разгорнуты Logstash. Калі ў вас наладжана аўтэнтыфікацыя, то таксама праз curl перадаем карыстач/пароль, паказваем порт 9200 калі вы яго не змянялі. Калі прыходзіць адказ накшталт як паказана ніжэй, значыць усё ў парадку.

[elastic@elasticsearch ~]$ curl -u <<user_name>> : <<password>> -sS -XGET "<<ip_address_elasticsearch>>:9200"
{
  "name" : "elastic-1",
  "cluster_name" : "project",
  "cluster_uuid" : "sQzjTTuCR8q4ZO6DrEis0A",
  "version" : {
    "number" : "7.4.1",
    "build_flavor" : "default",
    "build_type" : "rpm",
    "build_hash" : "fc0eeb6e2c25915d63d871d344e3d0b45ea0ea1e",
    "build_date" : "2019-10-22T17:16:35.176724Z",
    "build_snapshot" : false,
    "lucene_version" : "8.2.0",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"
}
[elastic@elasticsearch ~]$

Калі адказ не прыйшоў, то можа быць некалькі выглядаў памылак: не працуе працэс elasticsearch, паказаны не той порт, альбо порт заблакаваны фаервалам на серверы, дзе стаіць elasticsearch.

Разглядаем як можна адправіць логі ў Logstash з міжсеткавага экрана check point

З Check Point management server можна адправіць логі ў Logstash праз syslog, выкарыстаючы ўтыліту log_exporter, больш падрабязна можна азнаёміцца ​​з ёй па дадзенай. артыкуле, тут пакінем толькі каманду якая стварае паток:

cp_log_export add name check_point_syslog target-server < > target-port 5555 protocol tcp format generic read-mode semi-unified

< > - адрас сервера на якім круціцца Logstash, target-port 5555 - порт на які будзем адпраўляць логі, адпраўленне логаў па tcp можа нагрузіць сервер, таму ў некаторых выпадках больш правільна ставіць udp.

Наладжваем INPUT у канфігурацыйным файле Logstash

2. Elastic stack: аналіз security логаў. Logstash

Па змаўчанні канфігурацыйны файл знаходзіцца ў дырэкторыі /etc/logstash/conf.d/. Канфігурацыйны файл складаецца з 3 асэнсаваных частак: INPUT, FILTER, OUTPUT. У УВАХОД мы паказваем адкуль сістэма будзе браць логі, у Фільтр парсім лог - наладжваем як падзяліць паведамленне на палі і значэнні, у ВЫХАД наладжваем які выходзіць струмень - куды распарсенные логі будуць адпраўляцца.

Спачатку наладзім INPUT, разгледзім некаторыя тыпы якія могуць быць - file, tcp і exe.

TCP:

input {
tcp {
    port => 5555
    host => “10.10.1.205”
    type => "checkpoint"
    mode => "server"
}
}

mode => "server"
Кажа аб тым што Logstash прымае злучэнні.

port => 5555
host => "10.10.1.205"
Прымаем злучэнні па IP адрасе 10.10.1.205 (Logstash), порт 5555 – порт павінен быць дазволены палітыкай фаервала.

type => "checkpoint"
Маркіруем дакумент, вельмі зручна ў выпадку, калі ў вас некалькі ўваходных злучэнняў. У наступным для кожнага злучэння можна напісаць свой фільтр з дапамогай лагічнай канструкцыі if.

Файл:

input {
  file {
    path => "/var/log/openvas_report/*"
    type => "openvas"
    start_position => "beginning"
    }
}

Апісанне налад:
path => "/var/log/openvas_report/*"
Указваем дырэкторыю, файлы ў якой неабходна лічыць.

type => «openvas»
Тып падзеі.

start_position => «beginning»
Пры змене файла счытвае файл цалкам, калі выставіць "end" то сістэма чакае з'яўленні новых запісаў у канцы файла.

Выканаўца:

input {
  exec {
    command => "ls -alh"
    interval => 30
  }
}

Па дадзеным инпуту запускаецца (толькі!) shell каманда і яе выснова абарочваецца ў лог паведамленне.

command => "ls -alh"
Каманда, вывад якой нас цікавіць.

interval => 30
Інтэрвал выкліку каманды ў секундах.

Для таго каб прымаць логі з міжсеткавага экрана прапісваем фільтр ткп або udp, у залежнасці ад таго як адпраўляюцца логі ў Logstash.

Наладжваем Output у канфігурацыйным файле Logstash у рэжыме дэбага, для таго каб зразумець як выглядае лог паведамленне

Пасля таго як мы сканфігуравалі INPUT, неабходна зразумець як будзе выглядаць лог паведамленне, якія метады неабходна выкарыстоўваць для налады фільтра (парсера) логаў.

Для гэтага будзем выкарыстоўваць фільтр які выдае вынік у stdout для таго каб прагледзець зыходнае паведамленне, цалкам канфігурацыйны файл на бягучы момант будзе выглядаць так:

input 
{
         tcp 
         {
                port => 5555
  	  	type => "checkpoint"
    		mode => "server"
                host => “10.10.1.205”
   	 }
}

output 
{
	if [type] == "checkpoint" 
       {
		stdout { codec=> json }
	}
}

Запускам каманду для праверкі:
sudo /usr/share/logstash/bin//logstash -f /etc/logstash/conf.d/checkpoint.conf
Бачым вынік, карцінка клікабельная:

2. Elastic stack: аналіз security логаў. Logstash

Калі скапіяваць гэта будзе выглядаць так:

action="Accept" conn_direction="Internal" contextnum="1" ifdir="outbound" ifname="bond1.101" logid="0" loguid="{0x5dfb8c13,0x5,0xfe0a0a0a,0xc0000000}" origin="10.10.10.254" originsicname="CN=ts-spb-cpgw-01,O=cp-spb-mgmt-01.tssolution.local.kncafb" sequencenum="8" time="1576766483" version="5" context_num="1" dst="10.10.10.10" dst_machine_name="[email protected]" layer_name="TSS-Standard Security" layer_name="TSS-Standard Application" layer_uuid="dae7f01c-4c98-4c3a-a643-bfbb8fcf40f0" layer_uuid="dbee3718-cf2f-4de0-8681-529cb75be9a6" match_id="8" match_id="33554431" parent_rule="0" parent_rule="0" rule_action="Accept" rule_action="Accept" rule_name="Implicit Cleanup" rule_uid="6dc2396f-9644-4546-8f32-95d98a3344e6" product="VPN-1 & FireWall-1" proto="17" s_port="37317" service="53" service_id="domain-udp" src="10.10.1.180" ","type":"qqqqq","host":"10.10.10.250","@version":"1","port":50620}{"@timestamp":"2019-12-19T14:50:12.153Z","message":"time="1576766483" action="Accept" conn_direction="Internal" contextnum="1" ifdir="outbound" ifname="bond1.101" logid="0" loguid="{0x5dfb8c13,

Гледзячы на ​​дадзеныя паведамленні, разумеем што логі маюць выгляд: поле = значэнне ці key = value, а значыць падыходзіць фільтр, які завецца kv. Для таго каб правільна абраць фільтр да кожнага пэўнага выпадку было б нядрэнна з імі азнаёміцца ​​ў тэхнічнай дакументацыі, альбо спытаць сябра.

Наладжваем Filter

На мінулым этапе абралі kv, далей прадстаўлена канфігурацыя гэтага фільтра:

filter {
if [type] == "checkpoint"{
	kv {
		value_split => "="
		allow_duplicate_values => false
	}
}
}

Выбіраемы знак па якім будзем дзяліць поле і значэнне - "=". Калі ў нас ёсць у логу аднолькавыя запісы, захоўваем у базе толькі адзін асобнік, інакш у вас атрымаецца масіў з аднолькавых значэнняў, гэта значыць, калі маем паведамленне “foo = some foo=some” запісваем толькі foo = some.

Наладжваем карэктны Output у ElasticSearch

Пасля таго як настроены Filter можна выгружаць логі ў базу эластычны пошук:

output 
{
if [type] == "checkpoint"
{
 	elasticsearch 
        {
		hosts => ["10.10.1.200:9200"]
		index => "checkpoint-%{+YYYY.MM.dd}"
    		user => "tssolution"
    		password => "cool"
  	}
}
}

Калі дакумент падпісаны тыпам checkpoint, захоўваем падзею ў базу elasticsearch, якая прымае злучэнні на 10.10.1.200 на порт 9200 па дэфолце. Кожны дакумент захоўваецца ў пэўны індэкс, у дадзеным выпадку захоўваем у індэкс "checkpoint-" + бягучая часовая дата. Кожны азначнік можа валодаць вызначаным наборам палёў, альбо ствараецца аўтаматычна пры з'яўленні новага поля ў паведамленні, налады палёў і іх тып можна паглядзець у mappings.

Калі ў вас наладжана аўтэнтыфікацыя (разгледзім пазней), абавязкова павінны быць указаны крэдыты для запісу ў пэўны індэкс, у дадзеным прыкладзе гэта "tssolution" з паролем "cool". Можна размежаваць правы карыстачоў для запісу логаў толькі ў вызначаны азначнік і ніякія больш.

Запускаем Logstash.

Канфігурацыйны файл Logstash:

input 
{
         tcp 
         {
                port => 5555
  	  	type => "checkpoint"
    		mode => "server"
                host => “10.10.1.205”
   	 }
}

filter {
        if [type] == "checkpoint"{
	kv {
		value_split => "="
		allow_duplicate_values => false
	}
        }
}

output 
{
if [type] == "checkpoint"
{
 	elasticsearch 
        {
		hosts => ["10.10.1.200:9200"]
		index => "checkpoint-%{+YYYY.MM.dd}"
    		user => "tssolution"
    		password => "cool"
  	}
}
}

Правяраем канфігурацыйны файл на правільнасць складання:
/usr/share/logstash/bin//logstash -f checkpoint.conf
2. Elastic stack: аналіз security логаў. Logstash

Запускаем працэс Logstash:
Судо systemctl пачаць logstash

Правяраем што працэс запусціўся:
sudo systemctl status logstash

2. Elastic stack: аналіз security логаў. Logstash

Правяраем ці падняўся сокет:
netstat -nat |grep 5555

2. Elastic stack: аналіз security логаў. Logstash

Правяраем логі ў Kibana.

Пасля таго як усё запушчана, заходзім у Kibana – Discover, пераконваемся што ўсё наладжана правільна, малюнак кликабельна!

2. Elastic stack: аналіз security логаў. Logstash

Усе логі на месцы і мы можам бачыць усе палі і іх значэння!

Заключэнне

Мы разгледзелі як пісаць канфігурацыйны файл Logstash, у выніку атрымалі парсер усіх палёў і значэнняў. Цяпер мы можам працаваць з пошукам і складаннем графікаў па пэўных палях. Далей у курсе мы разгледзім візуалізацыю ў Kibana, створым просценькі дашбоард. Варта згадаць што канфігурацыйны файл Logstash патрабуецца стала дапісваць у вызначаных сітуацыях, напрыклад, калі жадаем замяніць значэнне поля з лічбы на слова. У наступных артыкулах мы будзем увесь час гэта рабіць.

Так што сочыце за абнаўленнямі (Тэлеграма, Facebook, VK, TS Solution Blog), Яндэкс.Дзэн.

Крыніца: habr.com

Дадаць каментар