Chúng tôi là bạn của ELK và Exchange. Phần 2

Chúng tôi là bạn của ELK và Exchange. Phần 2

Mình tiếp tục câu chuyện về cách kết bạn Exchange và ELK (bắt đầu đây). Hãy để tôi nhắc bạn rằng sự kết hợp này có khả năng xử lý một số lượng lớn nhật ký mà không phải đắn đo. Lần này chúng ta sẽ nói về cách làm cho Exchange hoạt động với các thành phần Logstash và Kibana.

Logstash trong ngăn xếp ELK được sử dụng để xử lý nhật ký một cách thông minh và chuẩn bị chúng để đặt vào Elastic dưới dạng tài liệu, trên cơ sở đó sẽ thuận tiện cho việc xây dựng nhiều hình ảnh trực quan khác nhau trong Kibana.

Cài đặt

Bao gồm hai giai đoạn:

  • Cài đặt và cấu hình gói OpenJDK.
  • Cài đặt và cấu hình gói Logstash.

Cài đặt và định cấu hình gói OpenJDK

Gói OpenJDK phải được tải xuống và giải nén vào một thư mục cụ thể. Khi đó đường dẫn đến thư mục này phải được nhập vào biến $env:Path và $env:JAVA_HOME của hệ điều hành Windows:

Chúng tôi là bạn của ELK và Exchange. Phần 2

Chúng tôi là bạn của ELK và Exchange. Phần 2

Hãy kiểm tra phiên bản Java:

PS C:> java -version
openjdk version "13.0.1" 2019-10-15
OpenJDK Runtime Environment (build 13.0.1+9)
OpenJDK 64-Bit Server VM (build 13.0.1+9, mixed mode, sharing)

Cài đặt và cấu hình gói Logstash

Tải xuống tệp lưu trữ với bản phân phối Logstash do đó. Kho lưu trữ phải được giải nén vào thư mục gốc của đĩa. Giải nén vào thư mục C:Program Files Không đáng đâu, Logstash sẽ từ chối khởi động bình thường. Sau đó, bạn cần nhập vào tập tin jvm.options sửa lỗi chịu trách nhiệm phân bổ RAM cho quy trình Java. Tôi khuyên bạn nên chỉ định một nửa RAM của máy chủ. Nếu nó có RAM 16 GB thì các phím mặc định là:

-Xms1g
-Xmx1g

phải được thay thế bằng:

-Xms8g
-Xmx8g

Ngoài ra, nên bình luận thêm dòng -XX:+UseConcMarkSweepGC. Thêm về nó đây. Bước tiếp theo là tạo cấu hình mặc định trong tệp logstash.conf:

input {
 stdin{}
}
 
filter {
}
 
output {
 stdout {
 codec => "rubydebug"
 }
}

Với cấu hình này, Logstash đọc dữ liệu từ bảng điều khiển, chuyển dữ liệu qua bộ lọc trống và xuất dữ liệu trở lại bảng điều khiển. Sử dụng cấu hình này sẽ kiểm tra chức năng của Logstash. Để làm điều này, hãy chạy nó ở chế độ tương tác:

PS C:...bin> .logstash.bat -f .logstash.conf
...
[2019-12-19T11:15:27,769][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2019-12-19T11:15:27,847][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-12-19T11:15:28,113][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

Logstash đã khởi chạy thành công trên cổng 9600.

Bước cài đặt cuối cùng: khởi chạy Logstash dưới dạng dịch vụ Windows. Điều này có thể được thực hiện, ví dụ, bằng cách sử dụng gói NSSM:

PS C:...bin> .nssm.exe install logstash
Service "logstash" installed successfully!

khả năng chịu lỗi

Sự an toàn của nhật ký khi được truyền từ máy chủ nguồn được đảm bảo bởi cơ chế Hàng đợi liên tục.

Làm thế nào nó hoạt động

Bố cục của hàng đợi trong quá trình xử lý nhật ký là: đầu vào → hàng đợi → bộ lọc + đầu ra.

Plugin đầu vào nhận dữ liệu từ nguồn nhật ký, ghi dữ liệu vào hàng đợi và gửi xác nhận rằng dữ liệu đã được nhận đến nguồn.

Tin nhắn từ hàng đợi được Logstash xử lý, chuyển qua bộ lọc và plugin đầu ra. Khi nhận được xác nhận từ đầu ra rằng nhật ký đã được gửi, Logstash sẽ xóa nhật ký đã xử lý khỏi hàng đợi. Nếu Logstash dừng, tất cả các tin nhắn chưa được xử lý và các tin nhắn chưa nhận được xác nhận vẫn còn trong hàng đợi và Logstash sẽ tiếp tục xử lý chúng vào lần khởi động tiếp theo.

điều chỉnh

Điều chỉnh bằng các phím trong tập tin C:Logstashconfiglogstash.yml:

  • queue.type: (giá trị có thể - persisted и memory (default)).
  • path.queue: (đường dẫn đến thư mục chứa các tệp hàng đợi, được lưu trữ trong C:Logstashqueue theo mặc định).
  • queue.page_capacity: (kích thước trang hàng đợi tối đa, giá trị mặc định là 64mb).
  • queue.drain: (đúng/sai - bật/tắt việc dừng xử lý hàng đợi trước khi tắt Logstash. Tôi khuyên bạn không nên bật tính năng này vì điều này sẽ ảnh hưởng trực tiếp đến tốc độ tắt máy chủ).
  • queue.max_events: (số lượng sự kiện tối đa trong hàng đợi, mặc định là 0 (không giới hạn)).
  • queue.max_bytes: (kích thước hàng đợi tối đa tính bằng byte, mặc định - 1024mb (1gb)).

Nếu được cấu hình queue.max_events и queue.max_bytes, thì các tin nhắn sẽ ngừng được chấp nhận vào hàng đợi khi đạt đến giá trị của bất kỳ cài đặt nào trong số này. Tìm hiểu thêm về Hàng đợi liên tục đây.

Một ví dụ về phần logstash.yml chịu trách nhiệm thiết lập hàng đợi:

queue.type: persisted
queue.max_bytes: 10gb

điều chỉnh

Cấu hình Logstash thường bao gồm ba phần, chịu trách nhiệm cho các giai đoạn xử lý nhật ký đến khác nhau: nhận (phần đầu vào), phân tích cú pháp (phần bộ lọc) và gửi tới Elastic (phần đầu ra). Dưới đây chúng ta sẽ xem xét kỹ hơn về từng người trong số họ.

Đầu vào

Chúng tôi nhận được luồng đến với nhật ký thô từ tác nhân filebeat. Đây là plugin mà chúng tôi chỉ ra trong phần đầu vào:

input {
  beats {
    port => 5044
  }
}

Sau cấu hình này, Logstash bắt đầu nghe cổng 5044 và khi nhận nhật ký, sẽ xử lý chúng theo cài đặt của phần bộ lọc. Nếu cần, bạn có thể bọc kênh để nhận nhật ký từ filebit trong SSL. Đọc thêm về cài đặt plugin beat đây.

Lọc

Tất cả nhật ký văn bản cần xử lý mà Exchange tạo ra đều ở định dạng csv với các trường được mô tả trong chính tệp nhật ký. Để phân tích bản ghi csv, Logstash cung cấp cho chúng tôi ba plugin: mổ xẻ, csv và Grok. Cái đầu tiên là nhiều nhất nhanh chóng, nhưng chỉ xử lý được việc phân tích cú pháp các nhật ký đơn giản nhất.
Ví dụ: nó sẽ chia bản ghi sau thành hai (do có dấu phẩy bên trong trường), đó là lý do tại sao nhật ký sẽ được phân tích cú pháp không chính xác:

…,"MDB:GUID1, Mailbox:GUID2, Event:526545791, MessageClass:IPM.Note, CreationTime:2020-05-15T12:01:56.457Z, ClientType:MOMT, SubmissionAssistant:MailboxTransportSubmissionEmailAssistant",…

Nó có thể được sử dụng khi phân tích nhật ký, ví dụ: IIS. Trong trường hợp này, phần bộ lọc có thể trông như thế này:

filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
    }
  }
} 

Cấu hình logstash cho phép bạn sử dụng câu điều kiện, vì vậy chúng tôi chỉ có thể gửi nhật ký được gắn thẻ filebeat tới plugin mổ xẻ IIS. Bên trong plugin, chúng tôi khớp các giá trị trường với tên của chúng, xóa trường gốc message, chứa một mục nhập từ nhật ký và chúng tôi có thể thêm trường tùy chỉnh, ví dụ: trường này sẽ chứa tên của ứng dụng mà từ đó chúng tôi thu thập nhật ký.

Trong trường hợp theo dõi nhật ký, tốt hơn nên sử dụng plugin csv, nó có thể xử lý chính xác các trường phức tạp:

filter {
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
}

Bên trong plugin, chúng tôi khớp các giá trị trường với tên của chúng, xóa trường gốc message (và cả các trường tenant-id и schema-version), chứa một mục nhập từ nhật ký và chúng tôi có thể thêm trường tùy chỉnh, ví dụ: trường này sẽ chứa tên của ứng dụng mà chúng tôi thu thập nhật ký.

Khi thoát khỏi giai đoạn lọc, chúng tôi sẽ nhận được tài liệu ở dạng gần đúng đầu tiên, sẵn sàng để trực quan hóa trong Kibana. Chúng ta sẽ thiếu những thứ sau:

  • Các trường số sẽ được nhận dạng dưới dạng văn bản, điều này ngăn cản các thao tác trên chúng. Cụ thể là các trường time-taken Nhật ký IIS, cũng như các trường recipient-count и total-bites Theo dõi nhật ký.
  • Dấu thời gian của tài liệu tiêu chuẩn sẽ chứa thời gian nhật ký được xử lý chứ không phải thời gian được ghi ở phía máy chủ.
  • Lĩnh vực recipient-address sẽ giống như một công trường xây dựng, không cho phép phân tích để đếm số người nhận thư.

Đã đến lúc thêm một chút phép thuật vào quá trình xử lý nhật ký.

Chuyển đổi trường số

Plugin mổ xẻ có một tùy chọn convert_datatype, có thể được sử dụng để chuyển đổi trường văn bản sang định dạng kỹ thuật số. Ví dụ như thế này:

dissect {
  …
  convert_datatype => { "time-taken" => "int" }
  …
}

Điều đáng ghi nhớ là phương pháp này chỉ phù hợp nếu trường chắc chắn chứa một chuỗi. Tùy chọn này không xử lý giá trị Null từ các trường và đưa ra một ngoại lệ.

Để theo dõi nhật ký, tốt hơn là không sử dụng phương pháp chuyển đổi tương tự, vì các trường recipient-count и total-bites có thể trống rỗng. Để chuyển đổi các trường này tốt hơn là sử dụng plugin đột biến:

mutate {
  convert => [ "total-bytes", "integer" ]
  convert => [ "recipient-count", "integer" ]
}

Tách địa chỉ người nhận thành từng người nhận

Vấn đề này cũng có thể được giải quyết bằng cách sử dụng plugin mutate:

mutate {
  split => ["recipient_address", ";"]
}

Thay đổi dấu thời gian

Trong trường hợp theo dõi nhật ký, vấn đề được giải quyết rất dễ dàng bằng plugin ngày, điều này sẽ giúp bạn viết trong trường timestamp ngày và giờ ở định dạng được yêu cầu từ trường date-time:

date {
  match => [ "date-time", "ISO8601" ]
  timezone => "Europe/Moscow"
  remove_field => [ "date-time" ]
}

Trong trường hợp nhật ký IIS, chúng ta sẽ cần kết hợp dữ liệu trường date и time bằng cách sử dụng plugin đột biến, đăng ký múi giờ chúng tôi cần và đặt dấu thời gian này vào timestamp sử dụng plugin ngày:

mutate { 
  add_field => { "data-time" => "%{date} %{time}" }
  remove_field => [ "date", "time" ]
}
date { 
  match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
  timezone => "UTC"
  remove_field => [ "data-time" ]
}

Đầu ra

Phần đầu ra được sử dụng để gửi nhật ký đã xử lý đến người nhận nhật ký. Trường hợp gửi trực tiếp tới Elastic thì sử dụng plugin nghiên cứu, chỉ định địa chỉ máy chủ và mẫu tên chỉ mục để gửi tài liệu được tạo:

output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Cấu hình cuối cùng

Cấu hình cuối cùng sẽ trông như thế này:

input {
  beats {
    port => 5044
  }
}
 
filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
      convert_datatype => { "time-taken" => "int" }
    }
    mutate { 
      add_field => { "data-time" => "%{date} %{time}" }
      remove_field => [ "date", "time" ]
    }
    date { 
      match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
      timezone => "UTC"
      remove_field => [ "data-time" ]
    }
  }
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
    mutate {
      convert => [ "total-bytes", "integer" ]
      convert => [ "recipient-count", "integer" ]
      split => ["recipient_address", ";"]
    }
    date {
      match => [ "date-time", "ISO8601" ]
      timezone => "Europe/Moscow"
      remove_field => [ "date-time" ]
    }
  }
}
 
output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Liên kết hữu ích:

Nguồn: www.habr.com

Thêm một lời nhận xét