Kami berteman dengan ELK dan Exchange. Bagian 2

Kami berteman dengan ELK dan Exchange. Bagian 2

Saya melanjutkan cerita saya tentang cara berteman Exchange dan ELK (awal di sini). Izinkan saya mengingatkan Anda bahwa kombinasi ini mampu memproses log dalam jumlah yang sangat besar tanpa ragu-ragu. Kali ini kita akan membahas tentang cara membuat Exchange berfungsi dengan komponen Logstash dan Kibana.

Logstash di tumpukan ELK digunakan untuk memproses log secara cerdas dan mempersiapkannya untuk ditempatkan di Elastic dalam bentuk dokumen, yang menjadi dasar untuk membuat berbagai visualisasi di Kibana.

Instalasi

Terdiri dari dua tahap:

  • Menginstal dan mengkonfigurasi paket OpenJDK.
  • Menginstal dan mengkonfigurasi paket Logstash.

Menginstal dan mengkonfigurasi paket OpenJDK

Paket OpenJDK harus diunduh dan dibongkar ke direktori tertentu. Kemudian path ke direktori ini harus dimasukkan ke dalam variabel $env:Path dan $env:JAVA_HOME sistem operasi Windows:

Kami berteman dengan ELK dan Exchange. Bagian 2

Kami berteman dengan ELK dan Exchange. Bagian 2

Mari kita periksa versi Java:

PS C:> java -version
openjdk version "13.0.1" 2019-10-15
OpenJDK Runtime Environment (build 13.0.1+9)
OpenJDK 64-Bit Server VM (build 13.0.1+9, mixed mode, sharing)

Menginstal dan mengkonfigurasi paket Logstash

Unduh file arsip dengan distribusi Logstash karenanya. Arsip harus dibongkar ke root disk. Buka paket ke folder C:Program Files Itu tidak sepadan, Logstash akan menolak untuk memulai secara normal. Maka Anda harus masuk ke dalam file jvm.options perbaikan yang bertanggung jawab untuk mengalokasikan RAM untuk proses Java. Saya sarankan menentukan setengah dari RAM server. Jika memiliki RAM 16 GB, maka kunci defaultnya adalah:

-Xms1g
-Xmx1g

harus diganti dengan:

-Xms8g
-Xmx8g

Selain itu, disarankan untuk mengomentari baris tersebut -XX:+UseConcMarkSweepGC. Lebih lanjut tentang itu di sini. Langkah selanjutnya adalah membuat konfigurasi default pada file logstash.conf:

input {
 stdin{}
}
 
filter {
}
 
output {
 stdout {
 codec => "rubydebug"
 }
}

Dengan konfigurasi ini, Logstash membaca data dari konsol, meneruskannya melalui filter kosong, dan mengeluarkannya kembali ke konsol. Menggunakan konfigurasi ini akan menguji fungsionalitas Logstash. Untuk melakukan ini, mari kita jalankan dalam mode interaktif:

PS C:...bin> .logstash.bat -f .logstash.conf
...
[2019-12-19T11:15:27,769][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2019-12-19T11:15:27,847][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-12-19T11:15:28,113][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

Logstash berhasil diluncurkan pada port 9600.

Langkah instalasi terakhir: luncurkan Logstash sebagai layanan Windows. Hal ini dapat dilakukan, misalnya dengan menggunakan paket NSSM:

PS C:...bin> .nssm.exe install logstash
Service "logstash" installed successfully!

toleransi kesalahan

Keamanan log saat ditransfer dari server sumber dijamin oleh mekanisme Antrian Persisten.

Cara kerjanya

Tata letak antrian selama pemrosesan log adalah: input β†’ antrian β†’ filter + output.

Plugin input menerima data dari sumber log, menulisnya ke antrian, dan mengirimkan konfirmasi bahwa data telah diterima ke sumber.

Pesan dari antrian diproses oleh Logstash, melewati filter dan plugin keluaran. Saat menerima konfirmasi dari output bahwa log telah dikirim, Logstash menghapus log yang diproses dari antrian. Jika Logstash berhenti, semua pesan yang belum diproses dan pesan yang belum menerima konfirmasi akan tetap berada dalam antrean, dan Logstash akan terus memprosesnya saat Logstash dimulai lagi.

pengaturan

Dapat disesuaikan dengan tombol di file C:Logstashconfiglogstash.yml:

  • queue.type: (nilai yang mungkin - persisted ΠΈ memory (default)).
  • path.queue: (jalur ke folder dengan file antrian, yang disimpan di C:Logstashqueue secara default).
  • queue.page_capacity: (ukuran halaman antrian maksimum, nilai default adalah 64mb).
  • queue.drain: (benar/salah - mengaktifkan/menonaktifkan penghentian pemrosesan antrian sebelum mematikan Logstash. Saya tidak menyarankan untuk mengaktifkannya, karena ini akan secara langsung mempengaruhi kecepatan penutupan server).
  • queue.max_events: (jumlah maksimum kejadian dalam antrian, defaultnya adalah 0 (tidak terbatas)).
  • queue.max_bytes: (ukuran antrian maksimum dalam byte, default - 1024mb (1gb)).

Jika dikonfigurasi queue.max_events ΠΈ queue.max_bytes, lalu pesan berhenti diterima ke dalam antrean ketika nilai salah satu pengaturan ini tercapai. Pelajari lebih lanjut tentang Antrean Persisten di sini.

Contoh bagian logstash.yml yang bertanggung jawab untuk menyiapkan antrian:

queue.type: persisted
queue.max_bytes: 10gb

pengaturan

Konfigurasi Logstash biasanya terdiri dari tiga bagian, yang bertanggung jawab atas berbagai fase pemrosesan log masuk: penerimaan (bagian masukan), penguraian (bagian filter), dan pengiriman ke Elastic (bagian keluaran). Di bawah ini kita akan melihat lebih dekat masing-masingnya.

Memasukkan

Kami menerima aliran masuk dengan log mentah dari agen filebeat. Plugin inilah yang kami tunjukkan di bagian input:

input {
  beats {
    port => 5044
  }
}

Setelah konfigurasi ini, Logstash mulai mendengarkan port 5044, dan ketika menerima log, memprosesnya sesuai dengan pengaturan bagian filter. Jika perlu, Anda dapat membungkus saluran untuk menerima log dari filebit di SSL. Baca selengkapnya tentang pengaturan plugin beats di sini.

Filter

Semua log teks yang menarik untuk diproses yang dihasilkan Exchange berada dalam format csv dengan bidang yang dijelaskan dalam file log itu sendiri. Untuk mengurai data csv, Logstash menawarkan tiga plugin: membedah, csv dan grok. Yang pertama adalah yang paling banyak cepat, tetapi hanya mampu menguraikan log yang paling sederhana.
Misalnya, ini akan membagi catatan berikut menjadi dua (karena adanya koma di dalam bidang), itulah sebabnya log akan diurai secara tidak benar:

…,"MDB:GUID1, Mailbox:GUID2, Event:526545791, MessageClass:IPM.Note, CreationTime:2020-05-15T12:01:56.457Z, ClientType:MOMT, SubmissionAssistant:MailboxTransportSubmissionEmailAssistant",…

Ini dapat digunakan saat mengurai log, misalnya IIS. Dalam hal ini, bagian filter mungkin terlihat seperti ini:

filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
    }
  }
} 

Konfigurasi Logstash memungkinkan Anda untuk menggunakan pernyataan bersyarat, jadi kami hanya dapat mengirim log yang diberi tag filebeat ke plugin dissect IIS. Di dalam plugin kami mencocokkan nilai bidang dengan namanya, menghapus bidang asli message, yang berisi entri dari log, dan kita dapat menambahkan bidang khusus yang, misalnya, berisi nama aplikasi tempat kami mengumpulkan log.

Dalam hal log pelacakan, lebih baik menggunakan plugin csv; plugin ini dapat memproses bidang kompleks dengan benar:

filter {
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
}

Di dalam plugin kami mencocokkan nilai bidang dengan namanya, menghapus bidang asli message (dan juga bidang tenant-id ΠΈ schema-version), yang berisi entri dari log, dan kita dapat menambahkan bidang khusus, yang, misalnya, berisi nama aplikasi tempat kami mengumpulkan log.

Di pintu keluar dari tahap pemfilteran, kita akan menerima dokumen perkiraan pertama, siap untuk divisualisasikan di Kibana. Kami akan melewatkan hal-hal berikut:

  • Bidang numerik akan dikenali sebagai teks, sehingga mencegah pengoperasian pada bidang tersebut. Yakni, bidang time-taken Log IIS, serta bidang recipient-count ΠΈ total-bites Pelacakan Log.
  • Stempel waktu dokumen standar akan berisi waktu pemrosesan log, bukan waktu penulisan di sisi server.
  • Lapangan recipient-address akan terlihat seperti satu lokasi konstruksi, yang tidak memungkinkan analisis menghitung penerima surat.

Saatnya menambahkan sedikit keajaiban pada proses pemrosesan log.

Mengonversi bidang numerik

Plugin membedah memiliki opsi convert_datatype, yang dapat digunakan untuk mengonversi bidang teks ke format digital. Misalnya seperti ini:

dissect {
  …
  convert_datatype => { "time-taken" => "int" }
  …
}

Perlu diingat bahwa metode ini hanya cocok jika bidang tersebut pasti berisi string. Opsi ini tidak memproses nilai Null dari bidang dan memunculkan pengecualian.

Untuk melacak log, lebih baik tidak menggunakan metode konversi serupa, karena bidangnya recipient-count ΠΈ total-bites mungkin kosong. Untuk mengonversi bidang ini lebih baik menggunakan plugin mengubah:

mutate {
  convert => [ "total-bytes", "integer" ]
  convert => [ "recipient-count", "integer" ]
}

Memisahkan receiver_address menjadi masing-masing penerima

Masalah ini juga dapat diselesaikan dengan menggunakan plugin mutate:

mutate {
  split => ["recipient_address", ";"]
}

Mengubah stempel waktu

Dalam hal pelacakan log, masalahnya sangat mudah diselesaikan dengan plugin tanggal, yang akan membantu Anda menulis di lapangan timestamp tanggal dan waktu dalam format yang diperlukan dari lapangan date-time:

date {
  match => [ "date-time", "ISO8601" ]
  timezone => "Europe/Moscow"
  remove_field => [ "date-time" ]
}

Dalam kasus log IIS, kita perlu menggabungkan data lapangan date ΠΈ time menggunakan plugin mutate, daftarkan zona waktu yang kita butuhkan dan tempatkan stempel waktu ini timestamp menggunakan plugin tanggal:

mutate { 
  add_field => { "data-time" => "%{date} %{time}" }
  remove_field => [ "date", "time" ]
}
date { 
  match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
  timezone => "UTC"
  remove_field => [ "data-time" ]
}

Keluaran

Bagian output digunakan untuk mengirim log yang diproses ke penerima log. Jika mengirim langsung ke Elastic, sebuah plugin digunakan pencarian elastis, yang menentukan alamat server dan templat nama indeks untuk mengirim dokumen yang dihasilkan:

output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Konfigurasi akhir

Konfigurasi akhir akan terlihat seperti ini:

input {
  beats {
    port => 5044
  }
}
 
filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
      convert_datatype => { "time-taken" => "int" }
    }
    mutate { 
      add_field => { "data-time" => "%{date} %{time}" }
      remove_field => [ "date", "time" ]
    }
    date { 
      match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
      timezone => "UTC"
      remove_field => [ "data-time" ]
    }
  }
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
    mutate {
      convert => [ "total-bytes", "integer" ]
      convert => [ "recipient-count", "integer" ]
      split => ["recipient_address", ";"]
    }
    date {
      match => [ "date-time", "ISO8601" ]
      timezone => "Europe/Moscow"
      remove_field => [ "date-time" ]
    }
  }
}
 
output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Tautan yang bermanfaat:

Sumber: www.habr.com

Tambah komentar