موږ د ELK او تبادلې سره ملګري یو. برخه 2

موږ د ELK او تبادلې سره ملګري یو. برخه 2

زه خپلې کیسې ته دوام ورکوم چې څنګه د ملګرو تبادله او ELK (پیل دلته). اجازه راکړئ تاسو ته یادونه وکړم چې دا ترکیب پرته له کوم ځنډ پرته د خورا لوی شمیر لاګونو پروسس کولو وړ دی. دا ځل موږ به په دې اړه وغږیږو چې څنګه د Logstash او Kibana اجزاوو سره د ایکسچینج کار کول ترلاسه کړو.

په ELK سټیک کې لوګسټاش په هوښیارۍ سره د لاګونو پروسس کولو لپاره کارول کیږي او د اسنادو په شکل کې په لچکدار کې ځای په ځای کولو لپاره چمتو کوي ، د دې پراساس دا په کبانا کې د مختلف لیدونو رامینځته کولو لپاره مناسب دی.

د جوړولو

له دوو مرحلو څخه جوړ دی:

  • د OpenJDK بسته نصب او تنظیم کول.
  • د Logstash بسته نصب او تنظیم کول.

د OpenJDK بسته نصب او تنظیم کول

د OpenJDK کڅوړه باید ډاونلوډ شي او په ځانګړي لارښود کې پاک شي. بیا دې لارښود ته لاره باید د وینډوز عملیاتي سیسټم $env:Path او $env:JAVA_HOME متغیرونو کې داخل شي:

موږ د ELK او تبادلې سره ملګري یو. برخه 2

موږ د ELK او تبادلې سره ملګري یو. برخه 2

راځئ چې د جاوا نسخه وګورو:

PS C:> java -version
openjdk version "13.0.1" 2019-10-15
OpenJDK Runtime Environment (build 13.0.1+9)
OpenJDK 64-Bit Server VM (build 13.0.1+9, mixed mode, sharing)

د Logstash بسته نصب او تنظیم کول

د لوګسټاش توزیع سره آرشیف فایل ډاونلوډ کړئ له دې ځایه. آرشیف باید د ډیسک ریښی ته خلاص شي. فولډر ته خلاص کړئ C:Program Files دا د دې ارزښت نلري، Logstash به په نورمال ډول پیل کولو څخه ډډه وکړي. بیا تاسو اړتیا لرئ فایل ته ننوځئ jvm.options د جاوا پروسې لپاره د RAM تخصیص لپاره مسؤل فکسونه. زه وړاندیز کوم چې د سرور نیم نیمه RAM مشخص کړئ. که دا په بورډ کې 16 GB رام ولري، نو بیا اصلي کیلي دي:

-Xms1g
-Xmx1g

باید بدل شي:

-Xms8g
-Xmx8g

سربیره پردې، دا مشوره ورکول کیږي چې په کرښه کې تبصره وکړئ -XX:+UseConcMarkSweepGC. په دې اړه نور معلومات دلته. بل ګام د logstash.conf فایل کې د ډیفالټ ترتیب رامینځته کول دي:

input {
 stdin{}
}
 
filter {
}
 
output {
 stdout {
 codec => "rubydebug"
 }
}

د دې ترتیب سره، Logstash د کنسول څخه ډاټا لوستل کوي، دا د خالي فلټر له لارې تیریږي، او بیرته یې کنسول ته رسوي. د دې ترتیب کارول به د Logstash فعالیت ازموي. د دې کولو لپاره، راځئ چې دا په متقابل حالت کې پرمخ بوځو:

PS C:...bin> .logstash.bat -f .logstash.conf
...
[2019-12-19T11:15:27,769][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2019-12-19T11:15:27,847][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-12-19T11:15:28,113][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

Logstash په 9600 بندر کې په بریالیتوب سره پیل شو.

د نصب کولو وروستی مرحله: Logstash د وینډوز خدمت په توګه پیل کړئ. دا کیدی شي، د بیلګې په توګه، د کڅوړې په کارولو سره NSSM:

PS C:...bin> .nssm.exe install logstash
Service "logstash" installed successfully!

د خطا زغم

د لاګونو خوندیتوب کله چې د سرچینې سرور څخه لیږدول کیږي د دوامدار قطار میکانیزم لخوا تضمین کیږي.

دا څنګه کار کوي؟

د لاګ پروسس کولو پرمهال د قطارونو ترتیب دا دی: داخل → قطار → فلټر + محصول.

د ان پټ پلگ ان د لاګ سرچینې څخه ډاټا ترلاسه کوي، دا په کتار کې لیکي، او تایید لیږي چې ډاټا سرچینې ته ترلاسه شوي.

د قطار څخه پیغامونه د Logstash لخوا پروسس کیږي، د فلټر او د محصول پلگ ان څخه تیریږي. کله چې د محصول څخه تایید ترلاسه شي چې لاګ لیږل شوی، Logstash پروسس شوی لاګ له قطار څخه لرې کوي. که Logstash ودریږي، ټول غیر پروسس شوي پیغامونه او پیغامونه چې هیڅ تصدیق یې نه دی ترلاسه کړی په کتار کې پاتې کیږي، او Logstash به د بل ځل پیل کولو پروسې ته دوام ورکړي.

تعدیلات

په فایل کې د کیلي لخوا د تنظیم وړ C:Logstashconfiglogstash.yml:

  • queue.type: (ممکن ارزښتونه - persisted и memory (default)).
  • path.queue: (د قطار فایلونو سره فولډر ته لاره، کوم چې په C:Logstashqueue ډیفالټ کې زیرمه شوي).
  • queue.page_capacity: (د قطار پاڼې اعظمي اندازه، اصلي ارزښت 64mb دی).
  • queue.drain: (ریښتیا / غلط - د Logstash بندولو دمخه د قطار پروسس کولو بندول فعال / غیر فعالوي. زه د فعالولو وړاندیز نه کوم، ځکه چې دا به په مستقیم ډول د سرور بندولو سرعت اغیزه وکړي).
  • queue.max_events: (په قطار کې د پیښو اعظمي شمیره، ډیفالټ 0 دی (لامحدود)).
  • queue.max_bytes: (په بایټونو کې د قطار اعظمي اندازه، ډیفالټ - 1024mb (1GB)).

که ترتیب شوی وي queue.max_events и queue.max_bytes، بیا پیغامونه په کتار کې د منلو مخه نیسي کله چې د دې ترتیباتو څخه کوم ارزښت ته ورسیږي. د دوامدار قطارونو په اړه نور معلومات ترلاسه کړئ دلته.

د logstash.yml برخې یوه بیلګه چې د قطار تنظیم کولو مسؤلیت لري:

queue.type: persisted
queue.max_bytes: 10gb

تعدیلات

د Logstash ترتیب معمولا له دریو برخو څخه جوړ دی، چې د راتلونکو لاګونو پروسس کولو مختلف پړاوونو لپاره مسؤل دي: ترلاسه کول (ان پټ برخه)، پارس کول (فلټر برخه) او لچک (د محصول برخه) ته لیږل. لاندې به موږ هر یو ته نږدې وګورو.

ننوتۍ

موږ د فایل بیټ اجنټانو څخه د خامو لاګونو سره راتلونکی جریان ترلاسه کوو. دا دا پلگ ان دی چې موږ یې د ننوتلو برخه کې اشاره کوو:

input {
  beats {
    port => 5044
  }
}

د دې ترتیب وروسته، Logstash د پورټ 5044 اوریدل پیل کوي، او کله چې لاګونه ترلاسه کوي، د فلټر برخې ترتیباتو سره سم پروسس کوي. که اړتیا وي، تاسو کولی شئ په SSL کې د فایل بیټ څخه د لاګونو ترلاسه کولو لپاره چینل وپلټئ. د بیټس پلگ ان تنظیماتو په اړه نور ولولئ دلته.

Filter

ټول متن لاګونه چې د پروسس کولو لپاره په زړه پوري دي چې ایکسچینج یې رامینځته کوي د csv فارمیټ کې دي پخپله د لاګ فایل کې بیان شوي ساحې سره. د csv ریکارډونو پارس کولو لپاره، Logstash موږ ته درې پلگ ان وړاندې کوي: تحلیل, csv او grok. لومړی تر ټولو ډیر دی быстрый، مګر یوازې د ساده لاګونو تحلیل کولو سره کاپي کوي.
د مثال په توګه، دا به لاندې ریکارډ په دوو برخو وویشي (په ساحه کې د کوما د شتون له امله)، له همدې امله لاګ به په غلطه توګه تجزیه شي:

…,"MDB:GUID1, Mailbox:GUID2, Event:526545791, MessageClass:IPM.Note, CreationTime:2020-05-15T12:01:56.457Z, ClientType:MOMT, SubmissionAssistant:MailboxTransportSubmissionEmailAssistant",…

دا کارول کیدی شي کله چې لاګونه پارس کول ، د مثال په توګه ، IIS. په دې حالت کې، د فلټر برخه ممکن داسې ښکاري:

filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
    }
  }
} 

د Logstash ترتیب تاسو ته اجازه درکوي چې وکاروئ مشروط بیانونه، نو موږ کولی شو یوازې هغه لاګونه واستوو چې د فایل بیټ ټګ سره د تحلیل پلگ ان ته لیږل شوي IIS. د پلگ ان دننه موږ د ساحې ارزښتونه د دوی له نومونو سره مل کوو، اصلي ساحه حذف کړئ message، کوم چې د لاګ څخه ننوتل شامل دي ، او موږ کولی شو یو دودیز ساحه اضافه کړو چې د مثال په توګه به د هغه غوښتنلیک نوم ولري چې له هغې څخه موږ لاګونه راټولوو.

د تعقیب لاګونو په حالت کې، دا غوره ده چې د csv پلگ ان وکاروئ؛ دا کولی شي په سمه توګه پیچلې ساحې پروسس کړي:

filter {
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
}

د پلگ ان دننه موږ د ساحې ارزښتونه د دوی له نومونو سره مل کوو، اصلي ساحه حذف کړئ message (او همدارنګه ساحې tenant-id и schema-version)، کوم چې د لاګ څخه ننوتل شامل دي، او موږ کولی شو یو دودیز ساحه اضافه کړو، کوم چې به د مثال په توګه، د هغه غوښتنلیک نوم ولري چې له هغې څخه موږ لاګونه راټولوو.

د فلټر کولو مرحلې څخه د وتلو په وخت کې، موږ به په لومړي اټکل کې اسناد ترلاسه کړو، په کبانا کې د لید لپاره چمتو دي. موږ به لاندې له لاسه ورکړو:

  • شمیرې ساحې به د متن په توګه وپیژندل شي، کوم چې په دوی باندې د عملیاتو مخه نیسي. یعني، ساحې time-taken IIS log، او همدارنګه ساحې recipient-count и total-bites د ننوتلو تعقیب.
  • د معیاري سند مهال ویش به هغه وخت ولري چې لاګ پروسس شوی و ، نه هغه وخت چې د سرور اړخ کې لیکل شوی و.
  • ډګر recipient-address د یو ساختماني سایټ په څیر ښکاري، کوم چې د لیکونو ترلاسه کونکو شمیرلو لپاره تحلیل ته اجازه نه ورکوي.

دا وخت دی چې د لاګ پروسس کولو پروسې کې یو څه جادو اضافه کړئ.

د شمیرو ساحو بدلول

د تحلیل پلگ ان یو اختیار لري convert_datatype، کوم چې د متن ساحه ډیجیټل ب formatه ته د بدلولو لپاره کارول کیدی شي. د مثال په توګه، دا ډول:

dissect {
  …
  convert_datatype => { "time-taken" => "int" }
  …
}

دا د یادولو وړ ده چې دا طریقه یوازې هغه وخت مناسبه ده چې ساحه به خامخا یو تار ولري. دا اختیار د ساحو څخه Null ارزښتونه نه پروسس کوي او یو استثنا یې غورځوي.

د تعقیب لاګونو لپاره، دا غوره ده چې د ورته بدلولو طریقه ونه کاروئ، ځکه چې ساحې recipient-count и total-bites کیدای شي خالي وي. د دې ساحو بدلولو لپاره دا غوره ده چې پلگ ان وکاروئ بدلول:

mutate {
  convert => [ "total-bytes", "integer" ]
  convert => [ "recipient-count", "integer" ]
}

په انفرادي ترلاسه کونکو کې د ترلاسه کونکي_ پته ویشل

دا ستونزه د میوټیټ پلگ ان په کارولو سره هم حل کیدی شي:

mutate {
  split => ["recipient_address", ";"]
}

د مهال ویش بدلول

د تعقیب لاګونو په حالت کې، ستونزه د پلگ ان لخوا په اسانۍ سره حل کیږي نېټه، کوم چې به تاسو سره په ساحه کې لیکلو کې مرسته وکړي timestamp نیټه او وخت د ساحې څخه په اړین شکل کې date-time:

date {
  match => [ "date-time", "ISO8601" ]
  timezone => "Europe/Moscow"
  remove_field => [ "date-time" ]
}

د IIS لاګونو په حالت کې، موږ به د ساحې ډاټا سره یوځای کولو ته اړتیا ولرو date и time د میوټیټ پلگ ان په کارولو سره ، د وخت زون ثبت کړئ چې موږ ورته اړتیا لرو او دا وخت ټاپه دننه کړئ timestamp د نیټې پلگ ان په کارولو سره:

mutate { 
  add_field => { "data-time" => "%{date} %{time}" }
  remove_field => [ "date", "time" ]
}
date { 
  match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
  timezone => "UTC"
  remove_field => [ "data-time" ]
}

Output

د محصول برخه د لاګ رسیدونکي ته د پروسس شوي لاګونو لیږلو لپاره کارول کیږي. په مستقیم ډول Elastic ته د لیږلو په صورت کې، یو پلگ ان کارول کیږي د، کوم چې د تولید شوي سند لیږلو لپاره د سرور پته او د شاخص نوم ټیمپلیټ مشخص کوي:

output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

وروستی ترتیب

وروستی ترتیب به داسې ښکاري:

input {
  beats {
    port => 5044
  }
}
 
filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
      convert_datatype => { "time-taken" => "int" }
    }
    mutate { 
      add_field => { "data-time" => "%{date} %{time}" }
      remove_field => [ "date", "time" ]
    }
    date { 
      match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
      timezone => "UTC"
      remove_field => [ "data-time" ]
    }
  }
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
    mutate {
      convert => [ "total-bytes", "integer" ]
      convert => [ "recipient-count", "integer" ]
      split => ["recipient_address", ";"]
    }
    date {
      match => [ "date-time", "ISO8601" ]
      timezone => "Europe/Moscow"
      remove_field => [ "date-time" ]
    }
  }
}
 
output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

ګټور لینکونه:

سرچینه: www.habr.com

Add a comment