Biz ELK və Exchange ilə dostuq. 2-ci hissə

Biz ELK və Exchange ilə dostuq. 2-ci hissə

Exchange və ELK ilə necə dostluq etmək haqqında hekayəmi davam etdirirəm (başlanğıc burada). Nəzərinizə çatdırım ki, bu birləşmə tərəddüd etmədən çox sayda logları emal etməyə qadirdir. Bu dəfə biz Exchange-in Logstash və Kibana komponentləri ilə necə işləməsi haqqında danışacağıq.

ELK yığınındakı Logstash jurnalları ağıllı şəkildə emal etmək və sənədlər şəklində Elastic-də yerləşdirməyə hazırlamaq üçün istifadə olunur ki, bunun əsasında Kibana-da müxtəlif vizuallaşdırmalar qurmaq rahatdır.

Quraşdırma

İki mərhələdən ibarətdir:

  • OpenJDK paketinin quraşdırılması və konfiqurasiyası.
  • Logstash paketinin quraşdırılması və konfiqurasiyası.

OpenJDK paketinin quraşdırılması və konfiqurasiyası

OpenJDK paketi endirilməli və xüsusi qovluğa açılmalıdır. Sonra bu qovluğa gedən yol Windows əməliyyat sisteminin $env:Path və $env:JAVA_HOME dəyişənlərinə daxil edilməlidir:

Biz ELK və Exchange ilə dostuq. 2-ci hissə

Biz ELK və Exchange ilə dostuq. 2-ci hissə

Java versiyasını yoxlayaq:

PS C:> java -version
openjdk version "13.0.1" 2019-10-15
OpenJDK Runtime Environment (build 13.0.1+9)
OpenJDK 64-Bit Server VM (build 13.0.1+9, mixed mode, sharing)

Logstash paketinin quraşdırılması və konfiqurasiyası

Logstash paylanması ilə arxiv faylını endirin buradan. Arxiv diskin kökünə açılmalıdır. Qovluğa çıxarın C:Program Files Buna dəyməz, Logstash normal başlamaqdan imtina edəcək. Sonra fayla daxil olmalısınız jvm.options Java prosesi üçün RAM ayırmaqdan məsul olan düzəlişlər. Mən serverin operativ yaddaşının yarısını təyin etməyi məsləhət görürəm. Bortda 16 GB RAM varsa, defolt düymələr bunlardır:

-Xms1g
-Xmx1g

ilə əvəz edilməlidir:

-Xms8g
-Xmx8g

Bundan əlavə, xətti şərh etmək məsləhətdir -XX:+UseConcMarkSweepGC. Bu barədə ətraflı burada. Növbəti addım logstash.conf faylında standart konfiqurasiya yaratmaqdır:

input {
 stdin{}
}
 
filter {
}
 
output {
 stdout {
 codec => "rubydebug"
 }
}

Bu konfiqurasiya ilə Logstash məlumatı konsoldan oxuyur, boş filtrdən keçir və onu yenidən konsola çıxarır. Bu konfiqurasiyadan istifadə Logstash-ın funksionallığını sınayacaq. Bunun üçün onu interaktiv rejimdə işə salaq:

PS C:...bin> .logstash.bat -f .logstash.conf
...
[2019-12-19T11:15:27,769][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2019-12-19T11:15:27,847][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-12-19T11:15:28,113][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

Logstash 9600 portunda uğurla işə salındı.

Son quraşdırma addımı: Logstash-ı Windows xidməti olaraq işə salın. Bu, məsələn, paketdən istifadə etməklə edilə bilər NSSM:

PS C:...bin> .nssm.exe install logstash
Service "logstash" installed successfully!

xətaya dözümlülük

Mənbə serverindən ötürülən logların təhlükəsizliyi Davamlı Növbələr mexanizmi ilə təmin edilir.

Nasıl çalışır

Jurnalın işlənməsi zamanı növbələrin düzülüşü belədir: giriş → növbə → filtr + çıxış.

Daxiletmə plagini log mənbəyindən məlumatları qəbul edir, onları növbəyə yazır və məlumatın mənbəyə qəbul edilməsinin təsdiqini göndərir.

Növbədən gələn mesajlar Logstash tərəfindən işlənir, filtrdən və çıxış plaginindən keçirilir. Çıxışdan jurnalın göndərilməsinə dair təsdiq aldıqda, Logstash işlənmiş jurnalı növbədən çıxarır. Logstash dayanarsa, bütün işlənməmiş mesajlar və təsdiqi alınmayan mesajlar növbədə qalır və Logstash növbəti dəfə işə başlayanda onları emal etməyə davam edəcək.

nizamlama

Fayldakı düymələrlə tənzimlənir C:Logstashconfiglogstash.yml:

  • queue.type: (mümkün dəyərlər - persisted и memory (default)).
  • path.queue: (defolt olaraq C:Logstashqueue-də saxlanılan növbə faylları olan qovluğa gedən yol).
  • queue.page_capacity: (maksimum növbə səhifə ölçüsü, standart dəyər 64mb-dir).
  • queue.drain: (doğru/yanlış - Logstash-ı bağlamazdan əvvəl növbə emalını dayandırmağa imkan verir/deaktiv edir. Mən onu aktivləşdirməyi məsləhət görmürəm, çünki bu, serverin bağlanma sürətinə birbaşa təsir edəcək).
  • queue.max_events: (növbədəki hadisələrin maksimum sayı, standart 0 (məhdudiyyətsiz)).
  • queue.max_bytes: (baytla maksimum növbə ölçüsü, standart - 1024mb (1gb)).

Əgər konfiqurasiya olunubsa queue.max_events и queue.max_bytes, sonra bu parametrlərdən hər hansı birinin dəyərinə çatdıqda mesajların növbəyə qəbulu dayandırılır. Davamlı növbələr haqqında ətraflı məlumat əldə edin burada.

Növbənin qurulmasına cavabdeh olan logstash.yml hissəsinə nümunə:

queue.type: persisted
queue.max_bytes: 10gb

nizamlama

Logstash konfiqurasiyası adətən daxil olan jurnalların işlənməsinin müxtəlif mərhələlərindən məsul olan üç hissədən ibarətdir: qəbul (giriş bölməsi), təhlil (filtr bölməsi) və Elastikə göndərmə (çıxış bölməsi). Aşağıda onların hər birinə daha yaxından nəzər salacağıq.

Input

Biz filebeat agentlərindən xam qeydlərlə daxil olan axını alırıq. Giriş bölməsində göstərdiyimiz bu plagindir:

input {
  beats {
    port => 5044
  }
}

Bu konfiqurasiyadan sonra Logstash 5044 portunu dinləməyə başlayır və logları qəbul edərkən onları filtr bölməsinin parametrlərinə uyğun olaraq emal edir. Lazım gələrsə, SSL-də filebit-dən qeydləri qəbul etmək üçün kanalı bağlaya bilərsiniz. Beats plagin parametrləri haqqında ətraflı oxuyun burada.

Süzgəc

Exchange-in yaratdığı emal üçün maraqlı olan bütün mətn qeydləri log faylının özündə təsvir olunan sahələrlə csv formatındadır. Csv qeydlərini təhlil etmək üçün Logstash bizə üç plagin təklif edir: disekti, csv və grok. Birincisi ən çox sürətlə, lakin yalnız ən sadə logların təhlili ilə məşğul olur.
Məsələn, o, aşağıdakı qeydi ikiyə böləcək (sahə daxilində vergül olduğuna görə), buna görə də jurnal səhv təhlil ediləcək:

…,"MDB:GUID1, Mailbox:GUID2, Event:526545791, MessageClass:IPM.Note, CreationTime:2020-05-15T12:01:56.457Z, ClientType:MOMT, SubmissionAssistant:MailboxTransportSubmissionEmailAssistant",…

Qeydləri təhlil edərkən istifadə edilə bilər, məsələn, IIS. Bu halda filtr bölməsi belə görünə bilər:

filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
    }
  }
} 

Logstash konfiqurasiyası istifadə etməyə imkan verir şərti ifadələr, buna görə də biz yalnız filebeat teqi ilə işarələnmiş qeydləri dissect plagininə göndərə bilərik IIS. Pluginin içərisində sahə dəyərlərini adları ilə uyğunlaşdırırıq, orijinal sahəni silin message, jurnaldan bir girişi ehtiva edir və biz, məsələn, qeydləri topladığımız tətbiqin adını ehtiva edən fərdi sahə əlavə edə bilərik.

İzləmə qeydləri vəziyyətində csv plaginindən istifadə etmək daha yaxşıdır, o, mürəkkəb sahələri düzgün emal edə bilər:

filter {
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
}

Pluginin içərisində sahə dəyərlərini adları ilə uyğunlaşdırırıq, orijinal sahəni silin message (həmçinin sahələr tenant-id и schema-version), jurnaldan bir girişi ehtiva edən və biz, məsələn, qeydləri topladığımız tətbiqin adını ehtiva edən fərdi sahə əlavə edə bilərik.

Filtrləmə mərhələsindən çıxışda biz sənədləri Kibanada vizuallaşdırmaya hazır olan ilk yaxınlaşmada alacağıq. Aşağıdakıları əldən verəcəyik:

  • Rəqəmsal sahələr mətn kimi tanınacaq ki, bu da onlarda əməliyyatların qarşısını alır. Daha doğrusu, sahələr time-taken IIS jurnalı, eləcə də sahələr recipient-count и total-bites Giriş İzləmə.
  • Standart sənəd vaxt damğası server tərəfində yazıldığı vaxtı deyil, jurnalın işləndiyi vaxtı ehtiva edir.
  • Sahə recipient-address məktubları qəbul edənləri saymaq üçün təhlil etməyə imkan verməyən bir tikinti sahəsinə bənzəyəcək.

Günlük emal prosesinə bir az sehr əlavə etməyin vaxtı gəldi.

Rəqəmsal sahələrin çevrilməsi

Dissect plagininin bir seçimi var convert_datatype, mətn sahəsini rəqəmsal formata çevirmək üçün istifadə edilə bilər. Məsələn, bu kimi:

dissect {
  …
  convert_datatype => { "time-taken" => "int" }
  …
}

Yadda saxlamaq lazımdır ki, bu üsul yalnız sahə mütləq bir sətirdən ibarət olduqda uyğundur. Seçim sahələrdən Null dəyərləri emal etmir və istisna atır.

Günlükləri izləmək üçün sahələr olduğu üçün oxşar çevirmə metodundan istifadə etməmək daha yaxşıdır recipient-count и total-bites boş ola bilər. Bu sahələri çevirmək üçün plaqindən istifadə etmək daha yaxşıdır mutasiya:

mutate {
  convert => [ "total-bytes", "integer" ]
  convert => [ "recipient-count", "integer" ]
}

Alıcı_ünvanının fərdi alıcılara bölünməsi

Bu problem mutate plaginindən istifadə etməklə də həll edilə bilər:

mutate {
  split => ["recipient_address", ";"]
}

Vaxt damğasının dəyişdirilməsi

İzləmə qeydləri vəziyyətində, problem plagin tərəfindən çox asanlıqla həll olunur tarix, bu sahədə yazmağa kömək edəcək timestamp sahədən tələb olunan formatda tarix və saat date-time:

date {
  match => [ "date-time", "ISO8601" ]
  timezone => "Europe/Moscow"
  remove_field => [ "date-time" ]
}

IIS qeydləri vəziyyətində, sahə məlumatlarını birləşdirməliyik date и time mutate plaginindən istifadə edərək, bizə lazım olan saat qurşağını qeyd edin və bu vaxt möhürünü daxil edin timestamp tarix plaginindən istifadə edərək:

mutate { 
  add_field => { "data-time" => "%{date} %{time}" }
  remove_field => [ "date", "time" ]
}
date { 
  match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
  timezone => "UTC"
  remove_field => [ "data-time" ]
}

Buraxılış

Çıxış bölməsi işlənmiş jurnalları jurnal qəbuledicisinə göndərmək üçün istifadə olunur. Birbaşa Elastic-ə göndərildikdə, bir plagin istifadə olunur elastik tədqiqat, yaradılan sənədin göndərilməsi üçün server ünvanını və indeks adı şablonunu təyin edir:

output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Son konfiqurasiya

Son konfiqurasiya belə görünəcək:

input {
  beats {
    port => 5044
  }
}
 
filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
      convert_datatype => { "time-taken" => "int" }
    }
    mutate { 
      add_field => { "data-time" => "%{date} %{time}" }
      remove_field => [ "date", "time" ]
    }
    date { 
      match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
      timezone => "UTC"
      remove_field => [ "data-time" ]
    }
  }
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
    mutate {
      convert => [ "total-bytes", "integer" ]
      convert => [ "recipient-count", "integer" ]
      split => ["recipient_address", ";"]
    }
    date {
      match => [ "date-time", "ISO8601" ]
      timezone => "Europe/Moscow"
      remove_field => [ "date-time" ]
    }
  }
}
 
output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Faydalı linklər:

Mənbə: www.habr.com

Добавить комментарий