نحن أصدقاء مع ELK و Exchange. الجزء 2

نحن أصدقاء مع ELK و Exchange. الجزء 2

أواصل قصتي حول كيفية تكوين صداقات تبادل وELK (البداية هنا). اسمحوا لي أن أذكرك أن هذه المجموعة قادرة على معالجة عدد كبير جدًا من السجلات دون تردد. سنتحدث هذه المرة عن كيفية جعل Exchange يعمل مع مكونات Logstash وKibana.

يتم استخدام Logstash في مكدس ELK لمعالجة السجلات بذكاء وإعدادها لوضعها في Elastic في شكل مستندات، والتي على أساسها يكون من الملائم إنشاء تصورات مختلفة في Kibana.

تركيب

يتكون من مرحلتين:

  • تثبيت وتكوين حزمة OpenJDK.
  • تثبيت وتكوين حزمة Logstash.

تثبيت وتكوين حزمة OpenJDK

يجب تنزيل حزمة OpenJDK وتفريغها في دليل محدد. ثم يجب إدخال المسار إلى هذا الدليل في متغيرات $env:Path و$env:JAVA_HOME لنظام التشغيل Windows:

نحن أصدقاء مع ELK و Exchange. الجزء 2

نحن أصدقاء مع ELK و Exchange. الجزء 2

دعونا نتحقق من إصدار Java:

PS C:> java -version
openjdk version "13.0.1" 2019-10-15
OpenJDK Runtime Environment (build 13.0.1+9)
OpenJDK 64-Bit Server VM (build 13.0.1+9, mixed mode, sharing)

تثبيت وتكوين حزمة Logstash

قم بتنزيل ملف الأرشيف باستخدام توزيع Logstash من هنا. يجب تفكيك الأرشيف إلى جذر القرص. فك إلى المجلد C:Program Files لا يستحق الأمر ذلك، سوف يرفض Logstash البدء بشكل طبيعي. ثم تحتاج إلى الدخول في الملف jvm.options الإصلاحات المسؤولة عن تخصيص ذاكرة الوصول العشوائي لعملية Java. أوصي بتحديد نصف ذاكرة الوصول العشوائي للخادم. إذا كان لديه 16 جيجابايت من ذاكرة الوصول العشوائي (RAM)، فإن المفاتيح الافتراضية هي:

-Xms1g
-Xmx1g

يجب استبداله بـ:

-Xms8g
-Xmx8g

بالإضافة إلى ذلك، من المستحسن التعليق خارج السطر -XX:+UseConcMarkSweepGC. المزيد حول هذا الموضوع هنا. الخطوة التالية هي إنشاء تكوين افتراضي في ملف logstash.conf:

input {
 stdin{}
}
 
filter {
}
 
output {
 stdout {
 codec => "rubydebug"
 }
}

باستخدام هذا التكوين، يقرأ Logstash البيانات من وحدة التحكم، ويمررها عبر مرشح فارغ، ويخرجها مرة أخرى إلى وحدة التحكم. سيؤدي استخدام هذا التكوين إلى اختبار وظيفة Logstash. للقيام بذلك، دعونا تشغيله في الوضع التفاعلي:

PS C:...bin> .logstash.bat -f .logstash.conf
...
[2019-12-19T11:15:27,769][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2019-12-19T11:15:27,847][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-12-19T11:15:28,113][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

تم إطلاق Logstash بنجاح على المنفذ 9600.

خطوة التثبيت النهائية: تشغيل Logstash كخدمة Windows. ويمكن القيام بذلك، على سبيل المثال، باستخدام الحزمة إن إس إم:

PS C:...bin> .nssm.exe install logstash
Service "logstash" installed successfully!

التسامح مع الخطأ

يتم ضمان سلامة السجلات عند نقلها من الخادم المصدر من خلال آلية قوائم الانتظار المستمرة.

كيف يعمل

تخطيط قوائم الانتظار أثناء معالجة السجل هو: الإدخال → قائمة الانتظار → المرشح + الإخراج.

يتلقى مكون الإدخال الإضافي البيانات من مصدر السجل، ويكتبها في قائمة الانتظار، ويرسل تأكيدًا باستلام البيانات إلى المصدر.

تتم معالجة الرسائل من قائمة الانتظار بواسطة Logstash، وتمريرها عبر عامل التصفية والمكون الإضافي للإخراج. عند تلقي تأكيد من الإخراج بأن السجل قد تم إرساله، يقوم Logstash بإزالة السجل المعالج من قائمة الانتظار. إذا توقف Logstash، فستظل جميع الرسائل التي لم تتم معالجتها والرسائل التي لم يتم تلقي تأكيد لها في قائمة الانتظار، وسيستمر Logstash في معالجتها في المرة التالية التي يبدأ فيها.

تعديل

قابل للتعديل عن طريق المفاتيح الموجودة في الملف C:Logstashconfiglogstash.yml:

  • queue.type: (القيم المحتملة - persisted и memory (default)).
  • path.queue: (المسار إلى المجلد الذي يحتوي على ملفات قائمة الانتظار، والتي يتم تخزينها في C:Logstashqueue افتراضيًا).
  • queue.page_capacity: (الحد الأقصى لحجم صفحة قائمة الانتظار، القيمة الافتراضية هي 64 ميجابايت).
  • queue.drain: (صواب/خطأ - تمكين/تعطيل إيقاف معالجة قائمة الانتظار قبل إيقاف تشغيل Logstash. لا أوصي بتمكينه، لأن هذا سيؤثر بشكل مباشر على سرعة إيقاف تشغيل الخادم).
  • queue.max_events: (الحد الأقصى لعدد الأحداث في قائمة الانتظار، الافتراضي هو 0 (غير محدود)).
  • queue.max_bytes: (الحد الأقصى لحجم قائمة الانتظار بالبايت، الافتراضي - 1024 ميجابايت (1 جيجابايت)).

إذا تم تكوينه queue.max_events и queue.max_bytes، ثم يتوقف قبول الرسائل في قائمة الانتظار عند الوصول إلى قيمة أي من هذه الإعدادات. تعرف على المزيد حول قوائم الانتظار المستمرة هنا.

مثال على جزء logstash.yml المسؤول عن إعداد قائمة الانتظار:

queue.type: persisted
queue.max_bytes: 10gb

تعديل

يتكون تكوين Logstash عادةً من ثلاثة أجزاء مسؤولة عن مراحل مختلفة من معالجة السجلات الواردة: الاستلام (قسم الإدخال)، والتحليل (قسم التصفية)، والإرسال إلى Elastic (قسم الإخراج). أدناه سوف نلقي نظرة فاحصة على كل واحد منهم.

إدخال

نتلقى الدفق الوارد مع السجلات الأولية من وكلاء filebeat. هذا هو البرنامج المساعد الذي نشير إليه في قسم الإدخال:

input {
  beats {
    port => 5044
  }
}

بعد هذا التكوين، يبدأ Logstash في الاستماع إلى المنفذ 5044، وعند استلام السجلات، يقوم بمعالجتها وفقًا لإعدادات قسم التصفية. إذا لزم الأمر، يمكنك تغليف القناة لتلقي السجلات من filebit في SSL. اقرأ المزيد عن إعدادات البرنامج المساعد Beats هنا.

الفرز

كافة السجلات النصية المثيرة للاهتمام للمعالجة التي ينشئها Exchange تكون بتنسيق CSV مع الحقول الموضحة في ملف السجل نفسه. لتحليل سجلات CSV، يقدم لنا Logstash ثلاثة مكونات إضافية: تشريحو CSV و جروك. الأول هو الأكثر بسرعة، ولكنه يتعامل مع تحليل أبسط السجلات فقط.
على سبيل المثال، سيتم تقسيم السجل التالي إلى قسمين (بسبب وجود فاصلة داخل الحقل)، ولهذا السبب سيتم تحليل السجل بشكل غير صحيح:

…,"MDB:GUID1, Mailbox:GUID2, Event:526545791, MessageClass:IPM.Note, CreationTime:2020-05-15T12:01:56.457Z, ClientType:MOMT, SubmissionAssistant:MailboxTransportSubmissionEmailAssistant",…

ويمكن استخدامه عند تحليل السجلات، على سبيل المثال، IIS. في هذه الحالة، قد يبدو قسم التصفية كما يلي:

filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
    }
  }
} 

يتيح لك تكوين Logstash استخدام عبارات شرطية، لذلك يمكننا فقط إرسال السجلات التي تم وضع علامة عليها بعلامة filebeat إلى المكون الإضافي للتشريح IIS. داخل البرنامج المساعد نقوم بمطابقة قيم الحقول بأسمائها، وحذف الحقل الأصلي message، والذي يحتوي على إدخال من السجل، ويمكننا إضافة حقل مخصص يحتوي، على سبيل المثال، على اسم التطبيق الذي نجمع السجلات منه.

في حالة سجلات التتبع، من الأفضل استخدام البرنامج الإضافي CSV، حيث يمكنه معالجة الحقول المعقدة بشكل صحيح:

filter {
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
}

داخل البرنامج المساعد نقوم بمطابقة قيم الحقول بأسمائها، وحذف الحقل الأصلي message (وكذلك الحقول tenant-id и schema-version)، والذي يحتوي على إدخال من السجل، ويمكننا إضافة حقل مخصص، والذي سيحتوي، على سبيل المثال، على اسم التطبيق الذي نجمع السجلات منه.

عند الخروج من مرحلة التصفية، سنتلقى المستندات بشكل تقريبي أولي، جاهزة للتصور في كيبانا. سنفتقد ما يلي:

  • سيتم التعرف على الحقول الرقمية كنص، مما يمنع العمليات عليها. وهي الحقول time-taken سجل IIS، وكذلك الحقول recipient-count и total-bites تتبع السجل.
  • سيحتوي الطابع الزمني القياسي للمستند على الوقت الذي تمت فيه معالجة السجل، وليس الوقت الذي تمت كتابته فيه على جانب الخادم.
  • حقل recipient-address سيبدو وكأنه موقع بناء واحد، وهو ما لا يسمح بتحليل عدد مستلمي الرسائل.

حان الوقت لإضافة القليل من السحر إلى عملية معالجة السجل.

تحويل الحقول الرقمية

البرنامج المساعد تشريح لديه خيار convert_datatype، والتي يمكن استخدامها لتحويل حقل النص إلى تنسيق رقمي. على سبيل المثال، مثل هذا:

dissect {
  …
  convert_datatype => { "time-taken" => "int" }
  …
}

تجدر الإشارة إلى أن هذه الطريقة مناسبة فقط إذا كان الحقل سيحتوي بالتأكيد على سلسلة. لا يقوم الخيار بمعالجة القيم الخالية من الحقول ويطرح استثناءً.

بالنسبة لسجلات التتبع، من الأفضل عدم استخدام طريقة تحويل مماثلة، لأن الحقول recipient-count и total-bites قد تكون فارغة. لتحويل هذه الحقول فمن الأفضل استخدام البرنامج المساعد تنضج:

mutate {
  convert => [ "total-bytes", "integer" ]
  convert => [ "recipient-count", "integer" ]
}

تقسيم عنوان المستلم إلى مستلمين فرديين

يمكن أيضًا حل هذه المشكلة باستخدام البرنامج المساعد mutate:

mutate {
  split => ["recipient_address", ";"]
}

تغيير الطابع الزمني

في حالة سجلات التتبع، يتم حل المشكلة بسهولة شديدة بواسطة البرنامج الإضافي تاريخوالتي سوف تساعدك على الكتابة في هذا المجال timestamp التاريخ والوقت بالتنسيق المطلوب من الحقل date-time:

date {
  match => [ "date-time", "ISO8601" ]
  timezone => "Europe/Moscow"
  remove_field => [ "date-time" ]
}

في حالة سجلات IIS، سنحتاج إلى دمج البيانات الميدانية date и time باستخدام البرنامج المساعد mutate، قم بتسجيل المنطقة الزمنية التي نحتاجها ووضع هذا الطابع الزمني فيها timestamp باستخدام البرنامج المساعد التاريخ:

mutate { 
  add_field => { "data-time" => "%{date} %{time}" }
  remove_field => [ "date", "time" ]
}
date { 
  match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
  timezone => "UTC"
  remove_field => [ "data-time" ]
}

الناتج

يتم استخدام قسم الإخراج لإرسال السجلات المعالجة إلى مستلم السجل. في حالة الإرسال مباشرة إلى Elastic، يتم استخدام مكون إضافي elasticsearch، الذي يحدد عنوان الخادم ونموذج اسم الفهرس لإرسال المستند الذي تم إنشاؤه:

output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

التكوين النهائي

سيبدو التكوين النهائي كما يلي:

input {
  beats {
    port => 5044
  }
}
 
filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
      convert_datatype => { "time-taken" => "int" }
    }
    mutate { 
      add_field => { "data-time" => "%{date} %{time}" }
      remove_field => [ "date", "time" ]
    }
    date { 
      match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
      timezone => "UTC"
      remove_field => [ "data-time" ]
    }
  }
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
    mutate {
      convert => [ "total-bytes", "integer" ]
      convert => [ "recipient-count", "integer" ]
      split => ["recipient_address", ";"]
    }
    date {
      match => [ "date-time", "ISO8601" ]
      timezone => "Europe/Moscow"
      remove_field => [ "date-time" ]
    }
  }
}
 
output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

روابط مفيدة:

المصدر: www.habr.com

إضافة تعليق