Em bi ELK û Exchange re heval in. Beş 2

Em bi ELK û Exchange re heval in. Beş 2

Ez çîroka xwe didomînim ka meriv çawa hevalan dike Exchange û ELK (destpêk vir). Bihêle ez ji we re bi bîr bînim ku ev tevlihevî bêyî dudilî dikare hejmareke pir mezin a têketinê bişopîne. Vê carê em ê biaxivin ka meriv çawa Exchange bi pêkhateyên Logstash û Kibana re dixebite.

Logstash di stûna ELK-ê de tê bikar anîn da ku bi aqilmendî têketinan bişopîne û wan ji bo danîna Elastic di forma belgeyan de amade bike, li ser bingeha wê hêsan e ku meriv dîmenên cihêreng li Kibana ava bike.

mîhengê

Ji du qonaxan pêk tê:

  • Sazkirin û mîhengkirina pakêta OpenJDK.
  • Sazkirin û mîhengkirina pakêta Logstash.

Sazkirin û mîhengkirina pakêta OpenJDK

Pêdivî ye ku pakêta OpenJDK di pelrêçek taybetî de were dakêşandin û pakkirin. Dûv re riya vê pelrêçê divê di nav guhêrbarên $env:Path û $env:JAVA_HOME yên pergala xebitandinê ya Windows-ê de bikeve:

Em bi ELK û Exchange re heval in. Beş 2

Em bi ELK û Exchange re heval in. Beş 2

Ka em guhertoya Java-yê kontrol bikin:

PS C:> java -version
openjdk version "13.0.1" 2019-10-15
OpenJDK Runtime Environment (build 13.0.1+9)
OpenJDK 64-Bit Server VM (build 13.0.1+9, mixed mode, sharing)

Sazkirin û mîhengkirina pakêta Logstash

Bi belavkirina Logstash pelê arşîvê dakêşin ji vir. Pêdivî ye ku arşîv di koka dîskê de bê pakkirin. Ji peldankê re vekin C:Program Files Ne hêjayî wê ye, Logstash dê red bike ku bi gelemperî dest pê bike. Hingê hûn hewce ne ku têkevin pelê jvm.options tamîr dike ku berpirsiyarê veqetandina RAM-ê ji bo pêvajoya Java-yê ye. Ez pêşniyar dikim ku nîvê RAM-a serverê diyar bikin. Ger li ser wê 16 GB RAM heye, wê hingê bişkojkên xwerû ev in:

-Xms1g
-Xmx1g

divê bi şûna:

-Xms8g
-Xmx8g

Wekî din, tê pêşniyar kirin ku meriv rêzê şîrove bike -XX:+UseConcMarkSweepGC. More about this vir. Pêngava paşîn ev e ku meriv di pelê logstash.conf de mîhengek xwerû biafirîne:

input {
 stdin{}
}
 
filter {
}
 
output {
 stdout {
 codec => "rubydebug"
 }
}

Bi vê veavakirinê, Logstash daneyan ji konsolê dixwîne, di parzûnek vala re derbas dike, û vedigere konsolê. Bikaranîna vê veavakirinê dê fonksiyona Logstash biceribîne. Ji bo vê yekê, em wê di moda înteraktîf de bimeşînin:

PS C:...bin> .logstash.bat -f .logstash.conf
...
[2019-12-19T11:15:27,769][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2019-12-19T11:15:27,847][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-12-19T11:15:28,113][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

Logstash bi serfirazî li porta 9600 dest pê kir.

Qonaxa sazkirinê ya paşîn: Logstash wekî karûbarek Windows-ê dest pê bike. Ev dikare were kirin, wek nimûne, bi karanîna pakêtê NSSM:

PS C:...bin> .nssm.exe install logstash
Service "logstash" installed successfully!

tolerans xelet

Ewlehiya têketin dema ku ji servera çavkaniyê têne veguheztin ji hêla mekanîzmaya Rêzên Berdewam ve tê peyda kirin.

Çawa kar dike

Plansaziya rêzan di dema pêvajoya têketinê de ev e: ketin → rêz → parzûn + derketin.

Pêveka têketinê daneyan ji çavkaniyek têketinê distîne, wê li dorê dinivîse, û piştrastkirinê dişîne ku dane ji çavkaniyê re hatine wergirtin.

Peyamên ji rêzê ji hêla Logstash ve têne hilberandin, di parzûnê û pêveka derketinê re derbas dibin. Dema ku ji encamnameyê pejirandina ku têketin hatiye şandin werdigire, Logstash têketina pêvajoyî ji rêzê derdixe. Ger Logstash raweste, hemî peyam û peyamên nepêvajoyî yên ku ji bo wan piştrast nehatine di rêzê de dimînin, û Logstash dê gava din dest pê bike pêvajoya wan bidomîne.

ligorî

Ji hêla bişkojkên di pelê de têne rêve kirin C:Logstashconfiglogstash.yml:

  • queue.type: (nirxên gengaz - persisted и memory (default)).
  • path.queue: (Rêya peldanka bi pelên dorê, ku ji hêla xwerû ve di C: Logstashqueue de têne hilanîn).
  • queue.page_capacity: (mezintirîn mezinahiya rûpela dorê, nirxa xwerû 64mb e).
  • queue.drain: (rast/nerast - berî girtina Logstash-ê rawestana pêvajoya rêzê çalak/neçalak dike. Ez çalakkirina wê napejirînim, ji ber ku ew ê rasterast bandorê li leza girtina serverê bike).
  • queue.max_events: (Hejmara herî zêde ya bûyeran di dorê de, xwerû 0 e (bêsînor)).
  • queue.max_bytes: (Mezinahiya rêza herî zêde bi byte, xwerû - 1024mb (1gb)).

Ger mîheng kirin queue.max_events и queue.max_bytes, wê hingê dema ku nirxa yek ji van mîhengan bigihîje peyamên ku di rêzê de têne pejirandin rawestin. Zêdetir li ser Dorên Persistent hîn bibin vir.

Mînakek beşa logstash.yml ku berpirsiyarê sazkirina rêzê ye:

queue.type: persisted
queue.max_bytes: 10gb

ligorî

Veavakirina Logstash bi gelemperî ji sê beşan pêk tê, ku ji qonaxên cihêreng ên hilberandina têketinên hatinê berpirsiyar in: wergirtin (beşa têketinê), parskirin (beşa parzûnê) û şandina Elastic (beşa derketinê). Li jêr em ê ji nêz ve li her yek ji wan binêrin.

Beyan

Em herikîna hatinê bi têketinên xav ji ajanên filebeat distînin. Ev pêvek e ku em di beşa têketinê de destnîşan dikin:

input {
  beats {
    port => 5044
  }
}

Piştî vê veavakirinê, Logstash dest bi guhdarîkirina port 5044 dike, û dema ku têketin distîne, wan li gorî mîhengên beşa parzûnê pêvajoyê dike. Ger hewce be, hûn dikarin kanalê ji bo wergirtina têketinên ji pelê di SSL-ê de bipêçin. Di derbarê mîhengên pêveka beats de bêtir bixwînin vir.

Parzûn

Hemî têketinên nivîsê yên ku ji bo pêvajoyek ku Exchange çêdike balkêş in, di formata csv de bi qadên ku di pelê têketinê bixwe de têne diyar kirin de ne. Ji bo parkirina tomarên csv, Logstash sê pêvekan pêşkêşî me dike: dabeşkirin, csv û grok. Ya yekem herî zêde ye , lê bi parskirina tenê têketinên herî hêsan re mijûl dibe.
Mînakî, ew ê qeyda jêrîn bike du du (ji ber hebûna kommayek di hundurê zeviyê de), ji ber vê yekê têketin dê xelet were pars kirin:

…,"MDB:GUID1, Mailbox:GUID2, Event:526545791, MessageClass:IPM.Note, CreationTime:2020-05-15T12:01:56.457Z, ClientType:MOMT, SubmissionAssistant:MailboxTransportSubmissionEmailAssistant",…

Ew dikare dema ku têketin pars dike, ji bo nimûne, IIS were bikar anîn. Di vê rewşê de, beşa parzûnê dibe ku bi vî rengî xuya bike:

filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
    }
  }
} 

Veavakirina Logstash dihêle hûn bikar bînin daxuyaniyên şertî, ji ber vê yekê em dikarin tenê têketinên ku bi etîketa filebeat hatine nîşankirin ji pêveka veqetandinê re bişînin. IIS. Di hundurê pêvekê de em nirxên zeviyê bi navên wan re berhev dikin, qada orîjînal jêbirin message, ku têketinek ji têketinê vedihewand, û em dikarin qadek xwerû lê zêde bikin ku dê, mînakî, navê serîlêdana ku em têketin jê berhev dikin, hebe.

Di mijara têketinên şopandinê de, çêtir e ku meriv pêveka csv bikar bîne; ew dikare zeviyên tevlihev rast bişopîne:

filter {
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
}

Di hundurê pêvekê de em nirxên zeviyê bi navên wan re berhev dikin, qada orîjînal jêbirin message (û her weha zeviyan tenant-id и schema-version), ya ku têketinek ji têketinê vedihewand, û em dikarin zeviyek xwerû lê zêde bikin, ku dê, mînakî, navê serîlêdana ku em têketin jê berhev dikin hebe.

Di derketina ji qonaxa fîlterkirinê de, em ê belgeyan di nêzîkbûnek yekem de bistînin, ku ji bo dîtinê li Kibana amade ne. Em ê tiştên jêrîn winda bikin:

  • Zeviyên hejmarî dê wekî nivîsê bêne nas kirin, ku rê li ber operasyonên li ser wan digire. Ango zeviyan time-taken Têketina IIS, û hem jî zeviyan recipient-count и total-bites Têketin Şopandina.
  • Demjimêra belgeya standard dê dema ku têketin hate hilanîn, ne dema ku ew li alîyê serverê hatî nivîsandin vedihewîne.
  • warê recipient-address dê mîna yek cîhek avahîsaziyê xuya bike, ku rê nade analîzan ku wergirên tîpan bijmêre.

Wext e ku meriv sêrbaziyek piçûk li pêvajoya pêvajoya têketinê zêde bike.

Veguherandina qadên hejmarî

Pêveka veqetandinê vebijarkek heye convert_datatype, ku dikare were bikar anîn da ku zeviyek nivîsê bi forma dîjîtal veguherîne. Mînakî, bi vî rengî:

dissect {
  …
  convert_datatype => { "time-taken" => "int" }
  …
}

Hêjayî bibîrxistinê ye ku ev rêbaz tenê guncav e heke zevî bê guman rêzek hebe. Vebijêrk nirxên Null-ê ji qadan pêvajo nake û îstîsnayekê derdixe.

Ji bo şopandina têketin, çêtir e ku meriv rêbazek veguherînê ya wekhev bikar neynin, ji ber ku zeviyan recipient-count и total-bites dibe ku vala be. Ji bo veguhertina van qadan çêtir e ku meriv pêvekek bikar bîne mutate:

mutate {
  convert => [ "total-bytes", "integer" ]
  convert => [ "recipient-count", "integer" ]
}

Parçekirina recipient_address nav wergirên kesane

Ev pirsgirêk jî bi karanîna pêveka mutate dikare were çareser kirin:

mutate {
  split => ["recipient_address", ";"]
}

Guhertina demjimêrê

Di mijara têketinên şopandinê de, pirsgirêk ji hêla pêvekê ve pir bi hêsanî tê çareser kirin rojek, ku dê ji we re bibe alîkar ku hûn li zeviyê binivîsin timestamp roj û dem di forma pêwîst de ji zeviyê date-time:

date {
  match => [ "date-time", "ISO8601" ]
  timezone => "Europe/Moscow"
  remove_field => [ "date-time" ]
}

Di mijara têketinên IIS de, em ê hewce ne ku daneyên zeviyê bi hev re bikin date и time bi karanîna pêveka guheztinê, devera demjimêra ku em hewce ne tomar bikin û vê mohra demê tê de bi cîh bikin timestamp bikaranîna pêveka tarîxê:

mutate { 
  add_field => { "data-time" => "%{date} %{time}" }
  remove_field => [ "date", "time" ]
}
date { 
  match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
  timezone => "UTC"
  remove_field => [ "data-time" ]
}

Karûabr

Beşa derketinê ji bo şandina têketinên pêvajoyî ji wergirê têketinê re tê bikar anîn. Di bûyera şandina rasterast ji Elastic re, pêvekek tê bikar anîn elasticsearch, ku ji bo şandina belgeya hatî çêkirin navnîşana serverê û şablona navê navnîşê diyar dike:

output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Veavakirina dawî

Veavakirina dawîn dê wiha xuya bike:

input {
  beats {
    port => 5044
  }
}
 
filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
      convert_datatype => { "time-taken" => "int" }
    }
    mutate { 
      add_field => { "data-time" => "%{date} %{time}" }
      remove_field => [ "date", "time" ]
    }
    date { 
      match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
      timezone => "UTC"
      remove_field => [ "data-time" ]
    }
  }
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
    mutate {
      convert => [ "total-bytes", "integer" ]
      convert => [ "recipient-count", "integer" ]
      split => ["recipient_address", ";"]
    }
    date {
      match => [ "date-time", "ISO8601" ]
      timezone => "Europe/Moscow"
      remove_field => [ "date-time" ]
    }
  }
}
 
output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

Zencîreyên bikêr:

Source: www.habr.com

Add a comment