Wij zijn bevriend met ELK en Exchange. Deel 2

Wij zijn bevriend met ELK en Exchange. Deel 2

Ik vervolg mijn verhaal over hoe je vrienden kunt maken Exchange en ELK (begin hier). Ik wil u eraan herinneren dat deze combinatie in staat is om zonder aarzeling een zeer groot aantal logs te verwerken. Deze keer zullen we het hebben over hoe je Exchange kunt laten werken met Logstash- en Kibana-componenten.

Logstash in de ELK-stack wordt gebruikt om logs op een intelligente manier te verwerken en voor te bereiden voor plaatsing in Elastic in de vorm van documenten, op basis waarvan het handig is om verschillende visualisaties in Kibana te bouwen.

installatie

Bestaat uit twee fasen:

  • Het OpenJDK-pakket installeren en configureren.
  • Het Logstash-pakket installeren en configureren.

Het OpenJDK-pakket installeren en configureren

Het OpenJDK-pakket moet worden gedownload en uitgepakt in een specifieke map. Vervolgens moet het pad naar deze map worden ingevoerd in de variabelen $env:Path en $env:JAVA_HOME van het Windows-besturingssysteem:

Wij zijn bevriend met ELK en Exchange. Deel 2

Wij zijn bevriend met ELK en Exchange. Deel 2

Laten we de Java-versie controleren:

PS C:> java -version
openjdk version "13.0.1" 2019-10-15
OpenJDK Runtime Environment (build 13.0.1+9)
OpenJDK 64-Bit Server VM (build 13.0.1+9, mixed mode, sharing)

Het Logstash-pakket installeren en configureren

Download het archiefbestand met de Logstash-distributie vandaar. Het archief moet worden uitgepakt naar de hoofdmap van de schijf. Uitpakken naar map C:Program Files Het is het niet waard, Logstash zal weigeren normaal te starten. Dan moet u het bestand invoeren jvm.options fixes die verantwoordelijk zijn voor het toewijzen van RAM voor het Java-proces. Ik raad aan om de helft van het RAM-geheugen van de server op te geven. Als er 16 GB RAM aan boord is, zijn de standaardsleutels:

-Xms1g
-Xmx1g

moet vervangen worden door:

-Xms8g
-Xmx8g

Daarnaast is het raadzaam om de regel van commentaar te voorzien -XX:+UseConcMarkSweepGC. Meer erover hier. De volgende stap is het maken van een standaardconfiguratie in het bestand logstash.conf:

input {
 stdin{}
}
 
filter {
}
 
output {
 stdout {
 codec => "rubydebug"
 }
}

Met deze configuratie leest Logstash gegevens van de console, passeert deze door een leeg filter en voert deze terug naar de console. Als u deze configuratie gebruikt, wordt de functionaliteit van Logstash getest. Om dit te doen, laten we het in de interactieve modus uitvoeren:

PS C:...bin> .logstash.bat -f .logstash.conf
...
[2019-12-19T11:15:27,769][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2019-12-19T11:15:27,847][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-12-19T11:15:28,113][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

Logstash is succesvol gelanceerd op poort 9600.

De laatste installatiestap: start Logstash als een Windows-service. Dit kan bijvoorbeeld met behulp van het pakket NSSM:

PS C:...bin> .nssm.exe install logstash
Service "logstash" installed successfully!

fout tolerantie

De veiligheid van logboeken bij overdracht vanaf de bronserver wordt verzekerd door het Persistent Queues-mechanisme.

Hoe het werkt

De indeling van wachtrijen tijdens logverwerking is: invoer β†’ wachtrij β†’ filter + uitvoer.

De invoerplug-in ontvangt gegevens van een logbron, schrijft deze naar een wachtrij en verzendt een bevestiging dat de gegevens zijn ontvangen naar de bron.

Berichten uit de wachtrij worden verwerkt door Logstash, doorgegeven door het filter en de uitvoerplug-in. Wanneer u uit de uitvoer een bevestiging ontvangt dat het logboek is verzonden, verwijdert Logstash het verwerkte logboek uit de wachtrij. Als Logstash stopt, blijven alle onverwerkte berichten en berichten waarvoor geen bevestiging is ontvangen in de wachtrij staan ​​en zal Logstash deze de volgende keer dat deze wordt gestart, blijven verwerken.

afstelling

Instelbaar via toetsen in de vijl C:Logstashconfiglogstash.yml:

  • queue.type: (mogelijke waarden - persisted ΠΈ memory (default)).
  • path.queue: (pad naar de map met wachtrijbestanden, die standaard worden opgeslagen in C:Logstashqueue).
  • queue.page_capacity: (maximale wachtrijpaginagrootte, standaardwaarde is 64 MB).
  • queue.drain: (true/false - schakelt het stoppen van wachtrijverwerking in/uit voordat Logstash wordt afgesloten. Ik raad niet aan dit in te schakelen, omdat dit rechtstreeks van invloed is op de snelheid van het afsluiten van de server).
  • queue.max_events: (maximaal aantal gebeurtenissen in de wachtrij, standaard is 0 (onbeperkt)).
  • queue.max_bytes: (maximale wachtrijgrootte in bytes, standaard - 1024 MB (1 GB)).

Indien geconfigureerd queue.max_events ΠΈ queue.max_bytes, worden berichten niet meer in de wachtrij geaccepteerd wanneer de waarde van een van deze instellingen wordt bereikt. Meer informatie over permanente wachtrijen hier.

Een voorbeeld van het deel van logstash.yml dat verantwoordelijk is voor het instellen van de wachtrij:

queue.type: persisted
queue.max_bytes: 10gb

afstelling

De Logstash-configuratie bestaat doorgaans uit drie delen, die verantwoordelijk zijn voor verschillende fasen van het verwerken van inkomende logboeken: ontvangen (invoersectie), parseren (filtersectie) en verzenden naar Elastic (uitvoersectie). Hieronder zullen we elk van hen nader bekijken.

Invoer

We ontvangen de binnenkomende stream met onbewerkte logbestanden van filebeat-agents. Het is deze plug-in die we aangeven in de invoersectie:

input {
  beats {
    port => 5044
  }
}

Na deze configuratie begint Logstash te luisteren naar poort 5044 en verwerkt deze bij het ontvangen van logs volgens de instellingen van de filtersectie. Indien nodig kunt u het kanaal voor het ontvangen van logbestanden van filebit in SSL inpakken. Lees meer over de instellingen van de beats-plug-in hier.

FILTER

Alle tekstlogs die interessant zijn voor verwerking die Exchange genereert zijn in csv-formaat met de velden beschreven in het logbestand zelf. Voor het parseren van csv-records biedt Logstash ons drie plug-ins: ontleden, csv en grok. De eerste is de meest snel, maar kan alleen de eenvoudigste logboeken parseren.
Het zal bijvoorbeeld de volgende record in tweeΓ«n splitsen (vanwege de aanwezigheid van een komma in het veld), wat de reden is dat het logboek onjuist wordt geparseerd:

…,"MDB:GUID1, Mailbox:GUID2, Event:526545791, MessageClass:IPM.Note, CreationTime:2020-05-15T12:01:56.457Z, ClientType:MOMT, SubmissionAssistant:MailboxTransportSubmissionEmailAssistant",…

Het kan worden gebruikt bij het parseren van logboeken, bijvoorbeeld IIS. In dit geval kan het filtergedeelte er als volgt uitzien:

filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
    }
  }
} 

Met de Logstash-configuratie kunt u gebruiken Voorwaardelijke stellingen, dus we kunnen alleen logs die zijn getagd met de filebeat-tag naar de dissect-plug-in sturen IIS. Binnen de plug-in matchen we de veldwaarden met hun namen, verwijderen we het originele veld message, die een vermelding uit het logboek bevatte, en we kunnen een aangepast veld toevoegen dat bijvoorbeeld de naam bevat van de applicatie waarvan we logboeken verzamelen.

In het geval van trackinglogs kunt u beter de csv-plug-in gebruiken; deze kan complexe velden correct verwerken:

filter {
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
}

Binnen de plug-in matchen we de veldwaarden met hun namen, verwijderen we het originele veld message (en ook velden tenant-id ΠΈ schema-version), dat een invoer uit het logboek bevatte, en we kunnen een aangepast veld toevoegen, dat bijvoorbeeld de naam bevat van de applicatie waarvan we logboeken verzamelen.

Bij het verlaten van de filterfase ontvangen we documenten in een eerste benadering, klaar voor visualisatie in Kibana. Wij zullen het volgende missen:

  • Numerieke velden worden herkend als tekst, waardoor er geen bewerkingen op kunnen worden uitgevoerd. Namelijk de velden time-taken IIS-logboek, evenals velden recipient-count ΠΈ total-bites Logboekregistratie.
  • De standaardtijdstempel van het document bevat de tijd waarop het logboek is verwerkt, niet de tijd waarop het op de server is geschreven.
  • Veld recipient-address zal eruit zien als één bouwplaats, die geen analyse mogelijk maakt om de ontvangers van brieven te tellen.

Het is tijd om een ​​beetje magie toe te voegen aan het logverwerkingsproces.

Numerieke velden converteren

De dissect-plug-in heeft een optie convert_datatype, waarmee u een tekstveld naar een digitaal formaat kunt converteren. Bijvoorbeeld zoals dit:

dissect {
  …
  convert_datatype => { "time-taken" => "int" }
  …
}

Het is de moeite waard eraan te denken dat deze methode alleen geschikt is als het veld zeker een string bevat. De optie verwerkt geen Null-waarden uit velden en genereert een uitzondering.

Voor trackinglogs is het beter om geen vergelijkbare conversiemethode te gebruiken, aangezien de velden recipient-count ΠΈ total-bites mag leeg zijn. Om deze velden te converteren kun je beter een plugin gebruiken muteren:

mutate {
  convert => [ "total-bytes", "integer" ]
  convert => [ "recipient-count", "integer" ]
}

Ontvanger_adres opsplitsen in individuele ontvangers

Dit probleem kan ook worden opgelost met behulp van de mute-plug-in:

mutate {
  split => ["recipient_address", ";"]
}

Het tijdstempel wijzigen

In het geval van trackinglogs wordt het probleem heel eenvoudig opgelost door de plug-in gegevens, waarmee u in het veld kunt schrijven timestamp datum en tijd in het gewenste formaat uit het veld date-time:

date {
  match => [ "date-time", "ISO8601" ]
  timezone => "Europe/Moscow"
  remove_field => [ "date-time" ]
}

In het geval van IIS-logboeken moeten we veldgegevens combineren date ΠΈ time gebruik de mute-plug-in, registreer de tijdzone die we nodig hebben en plaats deze tijdstempel timestamp met behulp van de datumplug-in:

mutate { 
  add_field => { "data-time" => "%{date} %{time}" }
  remove_field => [ "date", "time" ]
}
date { 
  match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
  timezone => "UTC"
  remove_field => [ "data-time" ]
}

uitgang

De uitvoersectie wordt gebruikt om verwerkte logs naar de logontvanger te sturen. Bij rechtstreekse verzending naar Elastic wordt gebruik gemaakt van een plugin elasticsearch, dat het serveradres en de indexnaamsjabloon specificeert voor het verzenden van het gegenereerde document:

output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Definitieve configuratie

De uiteindelijke configuratie zal er als volgt uitzien:

input {
  beats {
    port => 5044
  }
}
 
filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
      convert_datatype => { "time-taken" => "int" }
    }
    mutate { 
      add_field => { "data-time" => "%{date} %{time}" }
      remove_field => [ "date", "time" ]
    }
    date { 
      match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
      timezone => "UTC"
      remove_field => [ "data-time" ]
    }
  }
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
    mutate {
      convert => [ "total-bytes", "integer" ]
      convert => [ "recipient-count", "integer" ]
      split => ["recipient_address", ";"]
    }
    date {
      match => [ "date-time", "ISO8601" ]
      timezone => "Europe/Moscow"
      remove_field => [ "date-time" ]
    }
  }
}
 
output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Nuttige links:

Bron: www.habr.com

Voeg een reactie