Είμαστε φίλοι με το ELK και το Exchange. Μέρος 2

Είμαστε φίλοι με το ELK και το Exchange. Μέρος 2

Συνεχίζω την ιστορία μου για το πώς να κάνω φίλους Exchange και ELK (αρχή εδώ). Επιτρέψτε μου να σας υπενθυμίσω ότι αυτός ο συνδυασμός είναι ικανός να επεξεργαστεί έναν πολύ μεγάλο αριθμό κορμών χωρίς δισταγμό. Αυτή τη φορά θα μιλήσουμε για τον τρόπο λειτουργίας του Exchange με στοιχεία Logstash και Kibana.

Το Logstash στη στοίβα ELK χρησιμοποιείται για την έξυπνη επεξεργασία των κορμών και την προετοιμασία τους για τοποθέτηση σε Elastic με τη μορφή εγγράφων, βάσει των οποίων είναι βολικό να δημιουργηθούν διάφορες απεικονίσεις στο Kibana.

Εγκατάσταση

Αποτελείται από δύο στάδια:

  • Εγκατάσταση και διαμόρφωση του πακέτου OpenJDK.
  • Εγκατάσταση και διαμόρφωση του πακέτου Logstash.

Εγκατάσταση και διαμόρφωση του πακέτου OpenJDK

Το πακέτο OpenJDK πρέπει να ληφθεί και να αποσυμπιεστεί σε έναν συγκεκριμένο κατάλογο. Στη συνέχεια, η διαδρομή προς αυτόν τον κατάλογο πρέπει να εισαχθεί στις μεταβλητές $env:Path και $env:JAVA_HOME του λειτουργικού συστήματος Windows:

Είμαστε φίλοι με το ELK και το Exchange. Μέρος 2

Είμαστε φίλοι με το ELK και το Exchange. Μέρος 2

Ας ελέγξουμε την έκδοση Java:

PS C:> java -version
openjdk version "13.0.1" 2019-10-15
OpenJDK Runtime Environment (build 13.0.1+9)
OpenJDK 64-Bit Server VM (build 13.0.1+9, mixed mode, sharing)

Εγκατάσταση και διαμόρφωση του πακέτου Logstash

Κατεβάστε το αρχείο αρχειοθέτησης με τη διανομή Logstash ως εκ τούτου,. Το αρχείο πρέπει να αποσυσκευαστεί στη ρίζα του δίσκου. Αποσυσκευάστε σε φάκελο C:Program Files Δεν αξίζει τον κόπο, το Logstash θα αρνηθεί να ξεκινήσει κανονικά. Στη συνέχεια, πρέπει να εισέλθετε στο αρχείο jvm.options διορθώσεις που είναι υπεύθυνες για την κατανομή της μνήμης RAM για τη διαδικασία Java. Συνιστώ να καθορίσετε τη μισή μνήμη RAM του διακομιστή. Εάν διαθέτει 16 GB μνήμης RAM, τότε τα προεπιλεγμένα πλήκτρα είναι:

-Xms1g
-Xmx1g

πρέπει να αντικατασταθεί με:

-Xms8g
-Xmx8g

Επιπλέον, καλό είναι να σχολιάσετε τη γραμμή -XX:+UseConcMarkSweepGC. Περισσότερα για αυτό εδώ. Το επόμενο βήμα είναι να δημιουργήσετε μια προεπιλεγμένη διαμόρφωση στο αρχείο logstash.conf:

input {
 stdin{}
}
 
filter {
}
 
output {
 stdout {
 codec => "rubydebug"
 }
}

Με αυτήν τη διαμόρφωση, το Logstash διαβάζει δεδομένα από την κονσόλα, τα περνά μέσα από ένα κενό φίλτρο και τα εξάγει πίσω στην κονσόλα. Η χρήση αυτής της διαμόρφωσης θα ελέγξει τη λειτουργικότητα του Logstash. Για να το κάνετε αυτό, ας το εκτελέσουμε σε διαδραστική λειτουργία:

PS C:...bin> .logstash.bat -f .logstash.conf
...
[2019-12-19T11:15:27,769][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2019-12-19T11:15:27,847][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-12-19T11:15:28,113][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

Το Logstash ξεκίνησε με επιτυχία στη θύρα 9600.

Το τελικό βήμα εγκατάστασης: εκκινήστε το Logstash ως υπηρεσία Windows. Αυτό μπορεί να γίνει, για παράδειγμα, χρησιμοποιώντας το πακέτο NSSM:

PS C:...bin> .nssm.exe install logstash
Service "logstash" installed successfully!

ανοχή σε σφάλματα

Η ασφάλεια των αρχείων καταγραφής κατά τη μεταφορά από τον διακομιστή πηγής διασφαλίζεται από τον μηχανισμό Persistent Queues.

Πώς λειτουργεί

Η διάταξη των ουρών κατά την επεξεργασία του αρχείου καταγραφής είναι: είσοδος → ουρά → φίλτρο + έξοδος.

Το πρόσθετο εισόδου λαμβάνει δεδομένα από μια πηγή καταγραφής, τα γράφει σε μια ουρά και στέλνει επιβεβαίωση ότι τα δεδομένα έχουν ληφθεί στην πηγή.

Τα μηνύματα από την ουρά επεξεργάζονται από το Logstash, περνούν από το φίλτρο και το πρόσθετο εξόδου. Όταν λαμβάνει επιβεβαίωση από την έξοδο ότι το αρχείο καταγραφής έχει σταλεί, το Logstash αφαιρεί το επεξεργασμένο αρχείο καταγραφής από την ουρά. Εάν το Logstash σταματήσει, όλα τα μη επεξεργασμένα μηνύματα και τα μηνύματα για τα οποία δεν έχει ληφθεί επιβεβαίωση παραμένουν στην ουρά και το Logstash θα συνεχίσει να τα επεξεργάζεται την επόμενη φορά που θα ξεκινήσει.

προσαρμογή

Ρυθμίζεται με πλήκτρα στο αρχείο C:Logstashconfiglogstash.yml:

  • queue.type: (πιθανές τιμές - persisted и memory (default)).
  • path.queue: (διαδρομή προς το φάκελο με αρχεία ουράς, τα οποία είναι αποθηκευμένα στο C:Logstashqueue από προεπιλογή).
  • queue.page_capacity: (μέγιστο μέγεθος σελίδας ουράς, η προεπιλεγμένη τιμή είναι 64mb).
  • queue.drain: (true/false - ενεργοποιεί/απενεργοποιεί τη διακοπή της επεξεργασίας ουράς πριν τον τερματισμό του Logstash. Δεν συνιστώ να το ενεργοποιήσετε, γιατί αυτό θα επηρεάσει άμεσα την ταχύτητα του τερματισμού του διακομιστή).
  • queue.max_events: (μέγιστος αριθμός συμβάντων στην ουρά, η προεπιλογή είναι 0 (απεριόριστο)).
  • queue.max_bytes: (μέγιστο μέγεθος ουράς σε byte, προεπιλογή - 1024mb (1gb)).

Εάν έχει ρυθμιστεί queue.max_events и queue.max_bytes, τότε τα μηνύματα παύουν να γίνονται δεκτά στην ουρά όταν επιτευχθεί η τιμή οποιασδήποτε από αυτές τις ρυθμίσεις. Μάθετε περισσότερα για τις Μόνιμες Ουρές εδώ.

Ένα παράδειγμα του τμήματος του logstash.yml που είναι υπεύθυνο για τη ρύθμιση της ουράς:

queue.type: persisted
queue.max_bytes: 10gb

προσαρμογή

Η διαμόρφωση Logstash αποτελείται συνήθως από τρία μέρη, υπεύθυνα για διαφορετικές φάσεις επεξεργασίας των εισερχόμενων αρχείων καταγραφής: λήψη (τμήμα εισόδου), ανάλυση (τμήμα φίλτρου) και αποστολή στο Elastic (τμήμα εξόδου). Παρακάτω θα ρίξουμε μια πιο προσεκτική ματιά σε καθένα από αυτά.

Εισαγωγή

Λαμβάνουμε την εισερχόμενη ροή με ακατέργαστα αρχεία καταγραφής από πράκτορες filebeat. Είναι αυτό το πρόσθετο που υποδεικνύουμε στην ενότητα εισαγωγής:

input {
  beats {
    port => 5044
  }
}

Μετά από αυτήν τη διαμόρφωση, το Logstash ξεκινά να ακούει τη θύρα 5044 και κατά τη λήψη αρχείων καταγραφής τα επεξεργάζεται σύμφωνα με τις ρυθμίσεις της ενότητας φίλτρου. Εάν είναι απαραίτητο, μπορείτε να τυλίξετε το κανάλι για τη λήψη αρχείων καταγραφής από το αρχείο bit σε SSL. Διαβάστε περισσότερα σχετικά με τις ρυθμίσεις της προσθήκης beats εδώ.

Φίλτρα

Όλα τα αρχεία καταγραφής κειμένου που είναι ενδιαφέροντα για επεξεργασία που δημιουργεί το Exchange είναι σε μορφή csv με τα πεδία που περιγράφονται στο ίδιο το αρχείο καταγραφής. Για την ανάλυση των εγγραφών csv, το Logstash μας προσφέρει τρεις προσθήκες: διαμελίζω, csv και grok. Το πρώτο είναι το πιο γρήγορη, αλλά αντιμετωπίζει την ανάλυση μόνο των απλούστερων αρχείων καταγραφής.
Για παράδειγμα, θα χωρίσει την ακόλουθη εγγραφή στα δύο (λόγω της παρουσίας κόμματος μέσα στο πεδίο), γι' αυτό και το αρχείο καταγραφής θα αναλυθεί λανθασμένα:

…,"MDB:GUID1, Mailbox:GUID2, Event:526545791, MessageClass:IPM.Note, CreationTime:2020-05-15T12:01:56.457Z, ClientType:MOMT, SubmissionAssistant:MailboxTransportSubmissionEmailAssistant",…

Μπορεί να χρησιμοποιηθεί κατά την ανάλυση αρχείων καταγραφής, για παράδειγμα, IIS. Σε αυτήν την περίπτωση, το τμήμα φίλτρου μπορεί να μοιάζει με αυτό:

filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
    }
  }
} 

Η διαμόρφωση Logstash σάς επιτρέπει να χρησιμοποιήσετε δηλώσεις υπό όρους, επομένως μπορούμε να στείλουμε μόνο αρχεία καταγραφής που έχουν επισημανθεί με την ετικέτα filebeat στο πρόσθετο dissect IIS. Μέσα στο πρόσθετο αντιστοιχίζουμε τις τιμές των πεδίων με τα ονόματά τους, διαγράψτε το αρχικό πεδίο message, το οποίο περιείχε μια καταχώρηση από το αρχείο καταγραφής και μπορούμε να προσθέσουμε ένα προσαρμοσμένο πεδίο που, για παράδειγμα, θα περιέχει το όνομα της εφαρμογής από την οποία συλλέγουμε αρχεία καταγραφής.

Στην περίπτωση των αρχείων καταγραφής παρακολούθησης, είναι καλύτερο να χρησιμοποιήσετε το πρόσθετο csv· μπορεί να επεξεργαστεί σωστά πολύπλοκα πεδία:

filter {
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
}

Μέσα στο πρόσθετο αντιστοιχίζουμε τις τιμές των πεδίων με τα ονόματά τους, διαγράψτε το αρχικό πεδίο message (και επίσης πεδία tenant-id и schema-version), που περιείχε μια καταχώρηση από το αρχείο καταγραφής και μπορούμε να προσθέσουμε ένα προσαρμοσμένο πεδίο, το οποίο, για παράδειγμα, θα περιέχει το όνομα της εφαρμογής από την οποία συλλέγουμε αρχεία καταγραφής.

Κατά την έξοδο από το στάδιο φιλτραρίσματος, θα λάβουμε έγγραφα σε μια πρώτη προσέγγιση, έτοιμα για οπτικοποίηση στο Kibana. Θα μας λείψουν τα εξής:

  • Τα αριθμητικά πεδία θα αναγνωρίζονται ως κείμενο, γεγονός που αποτρέπει τις λειτουργίες σε αυτά. Δηλαδή τα χωράφια time-taken Αρχείο καταγραφής IIS, καθώς και πεδία recipient-count и total-bites Παρακολούθηση καταγραφής.
  • Η τυπική χρονική σήμανση εγγράφου θα περιέχει την ώρα επεξεργασίας του αρχείου καταγραφής και όχι την ώρα που γράφτηκε στην πλευρά του διακομιστή.
  • Πεδίο recipient-address θα μοιάζει με ένα εργοτάξιο, που δεν επιτρέπει την ανάλυση για την καταμέτρηση των παραληπτών των γραμμάτων.

Ήρθε η ώρα να προσθέσετε λίγη μαγεία στη διαδικασία επεξεργασίας των κορμών.

Μετατροπή αριθμητικών πεδίων

Το πρόσθετο dissect έχει μια επιλογή convert_datatype, το οποίο μπορεί να χρησιμοποιηθεί για τη μετατροπή ενός πεδίου κειμένου σε ψηφιακή μορφή. Για παράδειγμα, όπως αυτό:

dissect {
  …
  convert_datatype => { "time-taken" => "int" }
  …
}

Αξίζει να θυμάστε ότι αυτή η μέθοδος είναι κατάλληλη μόνο εάν το πεδίο θα περιέχει σίγουρα μια συμβολοσειρά. Η επιλογή δεν επεξεργάζεται τιμές Null από πεδία και δημιουργεί μια εξαίρεση.

Για τα αρχεία καταγραφής παρακολούθησης, είναι προτιμότερο να μην χρησιμοποιείτε παρόμοια μέθοδο μετατροπής, καθώς τα πεδία recipient-count и total-bites μπορεί να είναι κενό. Για να μετατρέψετε αυτά τα πεδία είναι προτιμότερο να χρησιμοποιήσετε ένα πρόσθετο αλλάσσω:

mutate {
  convert => [ "total-bytes", "integer" ]
  convert => [ "recipient-count", "integer" ]
}

Διαχωρισμός recipient_address σε μεμονωμένους παραλήπτες

Αυτό το πρόβλημα μπορεί επίσης να λυθεί χρησιμοποιώντας την προσθήκη μετάλλαξης:

mutate {
  split => ["recipient_address", ";"]
}

Αλλαγή της χρονικής σφραγίδας

Στην περίπτωση των αρχείων καταγραφής παρακολούθησης, το πρόβλημα λύνεται πολύ εύκολα από το πρόσθετο Ραντεβού , που θα σας βοηθήσει να γράψετε στο πεδίο timestamp ημερομηνία και ώρα στην απαιτούμενη μορφή από το πεδίο date-time:

date {
  match => [ "date-time", "ISO8601" ]
  timezone => "Europe/Moscow"
  remove_field => [ "date-time" ]
}

Στην περίπτωση των αρχείων καταγραφής των υπηρεσιών IIS, θα χρειαστεί να συνδυάσουμε δεδομένα πεδίου date и time χρησιμοποιώντας την προσθήκη μετάλλαξης, καταχωρήστε τη ζώνη ώρας που χρειαζόμαστε και τοποθετήστε αυτήν τη χρονική σήμανση timestamp χρησιμοποιώντας την προσθήκη ημερομηνίας:

mutate { 
  add_field => { "data-time" => "%{date} %{time}" }
  remove_field => [ "date", "time" ]
}
date { 
  match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
  timezone => "UTC"
  remove_field => [ "data-time" ]
}

Παραγωγή

Το τμήμα εξόδου χρησιμοποιείται για την αποστολή επεξεργασμένων αρχείων καταγραφής στον δέκτη καταγραφής. Σε περίπτωση απευθείας αποστολής στο Elastic χρησιμοποιείται plugin ελαστική αναζήτηση, το οποίο καθορίζει τη διεύθυνση διακομιστή και το πρότυπο ονόματος ευρετηρίου για την αποστολή του παραγόμενου εγγράφου:

output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Τελική διαμόρφωση

Η τελική διαμόρφωση θα μοιάζει με αυτό:

input {
  beats {
    port => 5044
  }
}
 
filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
      convert_datatype => { "time-taken" => "int" }
    }
    mutate { 
      add_field => { "data-time" => "%{date} %{time}" }
      remove_field => [ "date", "time" ]
    }
    date { 
      match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
      timezone => "UTC"
      remove_field => [ "data-time" ]
    }
  }
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
    mutate {
      convert => [ "total-bytes", "integer" ]
      convert => [ "recipient-count", "integer" ]
      split => ["recipient_address", ";"]
    }
    date {
      match => [ "date-time", "ISO8601" ]
      timezone => "Europe/Moscow"
      remove_field => [ "date-time" ]
    }
  }
}
 
output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Χρήσιμοι σύνδεσμοι:

Πηγή: www.habr.com

Προσθέστε ένα σχόλιο