Nous sommes amis avec ELK et Exchange. Partie 2

Nous sommes amis avec ELK et Exchange. Partie 2

Je continue mon histoire sur la façon de se faire des amis Exchange et ELK (début ici). Je vous rappelle que cette combinaison est capable de traiter un très grand nombre de logs sans hésitation. Cette fois, nous verrons comment faire fonctionner Exchange avec les composants Logstash et Kibana.

Logstash dans la pile ELK est utilisé pour traiter intelligemment les journaux et les préparer à être placés dans Elastic sous forme de documents, sur la base desquels il est pratique de créer diverses visualisations dans Kibana.

Installation

Se compose de deux étapes :

  • Installation et configuration du package OpenJDK.
  • Installation et configuration du package Logstash.

Installation et configuration du package OpenJDK

Le package OpenJDK doit être téléchargé et décompressé dans un répertoire spécifique. Ensuite, le chemin de ce répertoire doit être renseigné dans les variables $env:Path et $env:JAVA_HOME du système d'exploitation Windows :

Nous sommes amis avec ELK et Exchange. Partie 2

Nous sommes amis avec ELK et Exchange. Partie 2

Vérifions la version Java :

PS C:> java -version
openjdk version "13.0.1" 2019-10-15
OpenJDK Runtime Environment (build 13.0.1+9)
OpenJDK 64-Bit Server VM (build 13.0.1+9, mixed mode, sharing)

Installation et configuration du package Logstash

Téléchargez le fichier d'archive avec la distribution Logstash par conséquent,. L'archive doit être décompressée à la racine du disque. Décompresser dans un dossier C:Program Files Cela n'en vaut pas la peine, Logstash refusera de démarrer normalement. Ensuite, vous devez entrer dans le fichier jvm.options correctifs responsables de l'allocation de RAM pour le processus Java. Je recommande de spécifier la moitié de la RAM du serveur. S'il dispose de 16 Go de RAM à bord, les clés par défaut sont :

-Xms1g
-Xmx1g

doit être remplacé par :

-Xms8g
-Xmx8g

De plus, il est conseillé de commenter la ligne -XX:+UseConcMarkSweepGC. En savoir plus ici. L'étape suivante consiste à créer une configuration par défaut dans le fichier logstash.conf :

input {
 stdin{}
}
 
filter {
}
 
output {
 stdout {
 codec => "rubydebug"
 }
}

Avec cette configuration, Logstash lit les données de la console, les transmet à travers un filtre vide et les renvoie à la console. L'utilisation de cette configuration permettra de tester la fonctionnalité de Logstash. Pour ce faire, exécutons-le en mode interactif :

PS C:...bin> .logstash.bat -f .logstash.conf
...
[2019-12-19T11:15:27,769][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2019-12-19T11:15:27,847][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-12-19T11:15:28,113][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

Logstash a été lancé avec succès sur le port 9600.

La dernière étape d'installation : lancez Logstash en tant que service Windows. Cela peut être fait, par exemple, en utilisant le package NSSM:

PS C:...bin> .nssm.exe install logstash
Service "logstash" installed successfully!

tolérance aux pannes

La sécurité des logs lors du transfert depuis le serveur source est assurée par le mécanisme de files d'attente persistantes.

Comment ça marche

La disposition des files d'attente pendant le traitement des journaux est la suivante : entrée → file d'attente → filtre + sortie.

Le plugin d'entrée reçoit les données d'une source de journal, les écrit dans une file d'attente et envoie la confirmation que les données ont été reçues à la source.

Les messages de la file d'attente sont traités par Logstash, passés via le filtre et le plugin de sortie. Lorsqu'il reçoit la confirmation de la sortie que le journal a été envoyé, Logstash supprime le journal traité de la file d'attente. Si Logstash s'arrête, tous les messages non traités et les messages pour lesquels aucune confirmation n'a été reçue restent dans la file d'attente et Logstash continuera à les traiter au prochain démarrage.

réglage

Ajustable par clés dans le dossier C:Logstashconfiglogstash.yml:

  • queue.type: (valeurs possibles - persisted и memory (default)).
  • path.queue: (chemin d'accès au dossier contenant les fichiers de file d'attente, qui sont stockés par défaut dans C:Logstashqueue).
  • queue.page_capacity: (taille maximale des pages de file d'attente, la valeur par défaut est 64 Mo).
  • queue.drain: (vrai/faux - active/désactive l'arrêt du traitement de la file d'attente avant d'arrêter Logstash. Je ne recommande pas de l'activer, car cela affectera directement la vitesse d'arrêt du serveur).
  • queue.max_events: (nombre maximum d'événements dans la file d'attente, la valeur par défaut est 0 (illimité)).
  • queue.max_bytes: (taille maximale de la file d'attente en octets, par défaut - 1024 Mo (1 Go)).

Si configuré queue.max_events и queue.max_bytes, les messages cessent d'être acceptés dans la file d'attente lorsque la valeur de l'un de ces paramètres est atteinte. En savoir plus sur les files d'attente persistantes ici.

Un exemple de la partie de logstash.yml responsable de la configuration de la file d'attente :

queue.type: persisted
queue.max_bytes: 10gb

réglage

La configuration de Logstash se compose généralement de trois parties, responsables des différentes phases de traitement des journaux entrants : réception (section d'entrée), analyse (section de filtre) et envoi à Elastic (section de sortie). Ci-dessous, nous examinerons chacun d’eux de plus près.

Entrée

Nous recevons le flux entrant avec les journaux bruts des agents filebeat. C'est ce plugin que nous indiquons dans la section de saisie :

input {
  beats {
    port => 5044
  }
}

Après cette configuration, Logstash commence à écouter le port 5044 et, lors de la réception des journaux, les traite selon les paramètres de la section de filtrage. Si nécessaire, vous pouvez envelopper le canal de réception des journaux de filebit dans SSL. En savoir plus sur les paramètres du plugin Beats ici.

Filtre

Tous les journaux texte intéressants pour le traitement générés par Exchange sont au format csv avec les champs décrits dans le fichier journal lui-même. Pour analyser les enregistrements csv, Logstash nous propose trois plugins : disséquer, csv et grok. Le premier est le plus rapide, mais ne gère que l'analyse des journaux les plus simples.
Par exemple, l'enregistrement suivant sera divisé en deux (en raison de la présence d'une virgule à l'intérieur du champ), raison pour laquelle le journal ne sera pas analysé correctement :

…,"MDB:GUID1, Mailbox:GUID2, Event:526545791, MessageClass:IPM.Note, CreationTime:2020-05-15T12:01:56.457Z, ClientType:MOMT, SubmissionAssistant:MailboxTransportSubmissionEmailAssistant",…

Il peut être utilisé lors de l'analyse des journaux, par exemple IIS. Dans ce cas, la section filtre pourrait ressembler à ceci :

filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
    }
  }
} 

La configuration de Logstash vous permet d'utiliser expressions conditionnelles, nous ne pouvons donc envoyer que les journaux qui ont été marqués avec la balise filebeat au plugin dissect IIS. Dans le plugin, nous faisons correspondre les valeurs des champs avec leurs noms, supprimons le champ d'origine message, qui contenait une entrée du journal, et nous pouvons ajouter un champ personnalisé qui contiendra, par exemple, le nom de l'application à partir de laquelle nous collectons les journaux.

Dans le cas des logs de suivi, il est préférable d'utiliser le plugin csv, il permet de traiter correctement des champs complexes :

filter {
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
}

Dans le plugin, nous faisons correspondre les valeurs des champs avec leurs noms, supprimons le champ d'origine message (et aussi des champs tenant-id и schema-version), qui contenait une entrée du journal, et nous pouvons ajouter un champ personnalisé, qui contiendra par exemple le nom de l'application à partir de laquelle nous collectons les journaux.

A la sortie de l'étape de filtrage, nous recevrons des documents en première approximation, prêts à être visualisés dans Kibana. Il nous manquera les éléments suivants :

  • Les champs numériques seront reconnus comme du texte, ce qui empêche toute opération sur ceux-ci. A savoir les champs time-taken Journal IIS, ainsi que les champs recipient-count и total-bites Suivi des journaux.
  • L'horodatage standard du document contiendra l'heure à laquelle le journal a été traité, et non l'heure à laquelle il a été écrit côté serveur.
  • Champ recipient-address ressemblera à un seul chantier de construction, ce qui ne permet pas d'analyser le décompte des destinataires des lettres.

Il est temps d'ajouter un peu de magie au processus de traitement des journaux.

Conversion de champs numériques

Le plugin de dissection a une option convert_datatype, qui peut être utilisé pour convertir un champ de texte en format numérique. Par exemple, comme ceci :

dissect {
  …
  convert_datatype => { "time-taken" => "int" }
  …
}

Il convient de rappeler que cette méthode ne convient que si le champ contient définitivement une chaîne. L'option ne traite pas les valeurs Null des champs et lève une exception.

Pour les journaux de suivi, il est préférable de ne pas utiliser une méthode de conversion similaire, car les champs recipient-count и total-bites peut être vide. Pour convertir ces champs il est préférable d'utiliser un plugin subir une mutation:

mutate {
  convert => [ "total-bytes", "integer" ]
  convert => [ "recipient-count", "integer" ]
}

Diviser l'adresse_du_récipiendaire en destinataires individuels

Ce problème peut également être résolu en utilisant le plugin mutate :

mutate {
  split => ["recipient_address", ";"]
}

Changer l'horodatage

Dans le cas des logs de suivi, le problème est très facilement résolu par le plugin données, ce qui vous aidera à écrire sur le terrain timestamp date et heure au format requis à partir du champ date-time:

date {
  match => [ "date-time", "ISO8601" ]
  timezone => "Europe/Moscow"
  remove_field => [ "date-time" ]
}

Dans le cas des logs IIS, nous devrons combiner les données de terrain date и time à l'aide du plugin mutate, enregistrez le fuseau horaire dont nous avons besoin et placez cet horodatage dans timestamp en utilisant le plugin de date :

mutate { 
  add_field => { "data-time" => "%{date} %{time}" }
  remove_field => [ "date", "time" ]
}
date { 
  match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
  timezone => "UTC"
  remove_field => [ "data-time" ]
}

Sortie

La section de sortie est utilisée pour envoyer les journaux traités au récepteur de journaux. En cas d'envoi directement à Elastic, un plugin est utilisé elasticsearch, qui spécifie l'adresse du serveur et le modèle de nom d'index pour l'envoi du document généré :

output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Configuration finale

La configuration finale ressemblera à ceci :

input {
  beats {
    port => 5044
  }
}
 
filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
      convert_datatype => { "time-taken" => "int" }
    }
    mutate { 
      add_field => { "data-time" => "%{date} %{time}" }
      remove_field => [ "date", "time" ]
    }
    date { 
      match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
      timezone => "UTC"
      remove_field => [ "data-time" ]
    }
  }
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
    mutate {
      convert => [ "total-bytes", "integer" ]
      convert => [ "recipient-count", "integer" ]
      split => ["recipient_address", ";"]
    }
    date {
      match => [ "date-time", "ISO8601" ]
      timezone => "Europe/Moscow"
      remove_field => [ "date-time" ]
    }
  }
}
 
output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Liens utiles:

Source: habr.com

Ajouter un commentaire