ما با ELK و Exchange دوست هستیم. قسمت 2

ما با ELK و Exchange دوست هستیم. قسمت 2

من داستان خود را در مورد چگونگی ایجاد دوستان Exchange و ELK ادامه می دهم (شروع اینجا). یادآوری می کنم که این ترکیب قادر است تعداد بسیار زیادی لاگ را بدون تردید پردازش کند. این بار در مورد چگونگی کارکرد Exchange با اجزای Logstash و Kibana صحبت خواهیم کرد.

Logstash در پشته ELK برای پردازش هوشمند سیاههها و آماده سازی آنها برای قرار دادن در Elastic در قالب اسناد استفاده می شود که بر اساس آن ساخت تجسم های مختلف در Kibana راحت است.

نصب

شامل دو مرحله است:

  • نصب و پیکربندی بسته OpenJDK.
  • نصب و پیکربندی بسته Logstash.

نصب و پیکربندی بسته OpenJDK

بسته OpenJDK باید دانلود و در یک دایرکتوری خاص باز شود. سپس مسیر این دایرکتوری باید در متغیرهای $env:Path و $env:JAVA_HOME سیستم عامل ویندوز وارد شود:

ما با ELK و Exchange دوست هستیم. قسمت 2

ما با ELK و Exchange دوست هستیم. قسمت 2

بیایید نسخه جاوا را بررسی کنیم:

PS C:> java -version
openjdk version "13.0.1" 2019-10-15
OpenJDK Runtime Environment (build 13.0.1+9)
OpenJDK 64-Bit Server VM (build 13.0.1+9, mixed mode, sharing)

نصب و پیکربندی بسته Logstash

فایل آرشیو را با توزیع Logstash دانلود کنید از این رو. آرشیو باید در ریشه دیسک باز شود. بسته بندی را در پوشه باز کنید C:Program Files ارزشش را ندارد، Logstash از شروع عادی خودداری می کند. سپس باید وارد فایل شوید jvm.options رفع مسئول تخصیص RAM برای فرآیند جاوا. توصیه می کنم نیمی از رم سرور را مشخص کنید. اگر دارای 16 گیگابایت رم روی برد باشد، کلیدهای پیش فرض عبارتند از:

-Xms1g
-Xmx1g

باید با:

-Xms8g
-Xmx8g

علاوه بر این، توصیه می شود که در این خط نظر دهید -XX:+UseConcMarkSweepGC. بیشتر در این مورد اینجا. مرحله بعدی ایجاد یک پیکربندی پیش فرض در فایل logstash.conf است:

input {
 stdin{}
}
 
filter {
}
 
output {
 stdout {
 codec => "rubydebug"
 }
}

با این پیکربندی، Logstash داده‌ها را از کنسول می‌خواند، آن‌ها را از یک فیلتر خالی عبور می‌دهد و به کنسول بازمی‌گرداند. استفاده از این پیکربندی عملکرد Logstash را آزمایش می کند. برای انجام این کار، اجازه دهید آن را در حالت تعاملی اجرا کنیم:

PS C:...bin> .logstash.bat -f .logstash.conf
...
[2019-12-19T11:15:27,769][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2019-12-19T11:15:27,847][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-12-19T11:15:28,113][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

Logstash با موفقیت در پورت 9600 راه اندازی شد.

مرحله نهایی نصب: راه اندازی Logstash به عنوان یک سرویس ویندوز. این را می توان به عنوان مثال با استفاده از بسته انجام داد NSSM:

PS C:...bin> .nssm.exe install logstash
Service "logstash" installed successfully!

تحمل خطا

ایمنی گزارش‌ها هنگام انتقال از سرور منبع توسط مکانیزم صف‌های ماندگار تضمین می‌شود.

چگونه کار می کند

طرح صف ها در طول پردازش گزارش به این صورت است: ورودی → صف → فیلتر + خروجی.

افزونه ورودی داده‌ها را از یک منبع گزارش دریافت می‌کند، آن‌ها را در یک صف می‌نویسد و تأییدیه دریافت داده‌ها را به منبع ارسال می‌کند.

پیام‌های صف توسط Logstash پردازش می‌شوند، از فیلتر و افزونه خروجی عبور می‌کنند. هنگام دریافت تأییدیه از خروجی مبنی بر ارسال گزارش، Logstash گزارش پردازش شده را از صف حذف می کند. اگر Logstash متوقف شود، همه پیام‌ها و پیام‌های پردازش‌نشده که هیچ تأییدی برای آن‌ها دریافت نشده است در صف باقی می‌مانند و Logstash دفعه بعد که شروع می‌شود به پردازش آنها ادامه می‌دهد.

تنظیم

با کلیدهای موجود در فایل قابل تنظیم است C:Logstashconfiglogstash.yml:

  • queue.type: (مقادیر ممکن - persisted и memory (default)).
  • path.queue: (مسیر پوشه با فایل های صف که به طور پیش فرض در C:Logstashqueue ذخیره می شوند).
  • queue.page_capacity: (حداکثر اندازه صفحه صف، مقدار پیش فرض 64 مگابایت است).
  • queue.drain: (درست/نادرست - توقف پردازش صف را قبل از خاموش کردن Logstash فعال/غیرفعال می کند. من فعال کردن آن را توصیه نمی کنم، زیرا این کار مستقیماً بر سرعت خاموش شدن سرور تأثیر می گذارد).
  • queue.max_events: (حداکثر تعداد رویدادها در صف، پیش فرض 0 (نامحدود) است).
  • queue.max_bytes: (حداکثر اندازه صف بر حسب بایت، پیش فرض - 1024 مگابایت (1 گیگابایت)).

اگر پیکربندی شده باشد queue.max_events и queue.max_bytes، پس از رسیدن به مقدار هر یک از این تنظیمات، پیام ها در صف پذیرفته نمی شوند. درباره صف های ماندگار بیشتر بدانید اینجا.

نمونه ای از بخشی از logstash.yml که مسئول تنظیم صف است:

queue.type: persisted
queue.max_bytes: 10gb

تنظیم

پیکربندی Logstash معمولاً از سه بخش تشکیل شده است که مسئول مراحل مختلف پردازش لاگ های ورودی است: دریافت (بخش ورودی)، تجزیه (بخش فیلتر) و ارسال به Elastic (بخش خروجی). در زیر نگاهی دقیق تر به هر یک از آنها خواهیم داشت.

ورودی

ما جریان ورودی را با لاگ های خام از عوامل filebeat دریافت می کنیم. این افزونه است که در قسمت ورودی نشان می دهیم:

input {
  beats {
    port => 5044
  }
}

پس از این تنظیمات، Logstash شروع به گوش دادن به پورت 5044 می کند و هنگام دریافت گزارش ها، آنها را مطابق تنظیمات قسمت فیلتر پردازش می کند. در صورت لزوم، می‌توانید کانال دریافت گزارش‌ها از فایل بیت را در SSL بپیچید. درباره تنظیمات افزونه beats بیشتر بخوانید اینجا.

فیلتر

تمام گزارش‌های متنی که Exchange تولید می‌کند و برای پردازش جالب هستند، در قالب csv با فیلدهای توضیح داده شده در خود فایل گزارش هستند. برای تجزیه رکوردهای csv، Logstash سه افزونه را به ما ارائه می دهد: کالبد شکافی، csv و grok. اولی بیشترین است سریع، اما با تجزیه تنها ساده ترین گزارش ها مقابله می کند.
به عنوان مثال، رکورد زیر را به دو قسمت تقسیم می کند (به دلیل وجود کاما در داخل فیلد)، به همین دلیل است که گزارش به اشتباه تجزیه می شود:

…,"MDB:GUID1, Mailbox:GUID2, Event:526545791, MessageClass:IPM.Note, CreationTime:2020-05-15T12:01:56.457Z, ClientType:MOMT, SubmissionAssistant:MailboxTransportSubmissionEmailAssistant",…

می توان از آن هنگام تجزیه لاگ ها، به عنوان مثال، IIS استفاده کرد. در این مورد، بخش فیلتر ممکن است به شکل زیر باشد:

filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
    }
  }
} 

پیکربندی Logstash به شما امکان استفاده از آن را می دهد عبارات مشروط، بنابراین ما فقط می توانیم گزارش هایی را که با تگ filebeat تگ شده اند به افزونه dissect ارسال کنیم. IIS. در داخل افزونه مقادیر فیلدها را با نام آنها مطابقت می دهیم، فیلد اصلی را حذف می کنیم message، که حاوی یک ورودی از گزارش بود، و می‌توانیم یک فیلد سفارشی اضافه کنیم که مثلاً حاوی نام برنامه‌ای باشد که از آن گزارش‌ها را جمع‌آوری می‌کنیم.

در مورد ردیابی لاگ ها، بهتر است از افزونه csv استفاده کنید؛ این افزونه می تواند فیلدهای پیچیده را به درستی پردازش کند:

filter {
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
}

در داخل افزونه، مقادیر فیلدها را با نام آنها مطابقت می دهیم، فیلد اصلی را حذف می کنیم message (و همچنین فیلدها tenant-id и schema-version) که حاوی یک ورودی از گزارش است، و ما می توانیم یک فیلد سفارشی اضافه کنیم، که به عنوان مثال، حاوی نام برنامه ای است که از آن گزارش ها را جمع آوری می کنیم.

در هنگام خروج از مرحله فیلترینگ، اسناد را به صورت تقریبی اول آماده برای تجسم در کیبانا دریافت خواهیم کرد. موارد زیر را از دست خواهیم داد:

  • فیلدهای عددی به عنوان متن شناسایی می شوند که از انجام عملیات روی آنها جلوگیری می کند. یعنی زمینه ها time-taken گزارش IIS و همچنین فیلدها recipient-count и total-bites پیگیری ورود به سیستم.
  • مهر زمانی سند استاندارد شامل زمان پردازش گزارش است، نه زمانی که در سمت سرور نوشته شده است.
  • رشته recipient-address شبیه یک سایت ساخت و ساز است که امکان تجزیه و تحلیل برای شمارش گیرندگان نامه ها را نمی دهد.

وقت آن است که کمی جادو به فرآیند پردازش لاگ اضافه کنیم.

تبدیل فیلدهای عددی

افزونه dissect یک گزینه دارد convert_datatype، که می تواند برای تبدیل یک فیلد متنی به فرمت دیجیتال استفاده شود. به عنوان مثال، مانند این:

dissect {
  …
  convert_datatype => { "time-taken" => "int" }
  …
}

شایان ذکر است که این روش تنها در صورتی مناسب است که فیلد قطعاً دارای یک رشته باشد. این گزینه مقادیر Null را از فیلدها پردازش نمی کند و یک استثنا ایجاد می کند.

برای ردیابی لاگ ها، بهتر است از روش تبدیل مشابه استفاده نکنید، زیرا فیلدها recipient-count и total-bites ممکن است خالی باشد برای تبدیل این فیلدها بهتر است از افزونه استفاده کنید جهش:

mutate {
  convert => [ "total-bytes", "integer" ]
  convert => [ "recipient-count", "integer" ]
}

تقسیم recipient_address به گیرندگان فردی

این مشکل را می توان با استفاده از افزونه mutate نیز حل کرد:

mutate {
  split => ["recipient_address", ";"]
}

تغییر مهر زمانی

در مورد ردیابی لاگ ها، مشکل به راحتی توسط افزونه حل می شود تاریخ، که به شما در نوشتن در زمینه کمک می کند timestamp تاریخ و زمان در قالب مورد نیاز از فیلد date-time:

date {
  match => [ "date-time", "ISO8601" ]
  timezone => "Europe/Moscow"
  remove_field => [ "date-time" ]
}

در مورد گزارش‌های IIS، باید داده‌های فیلد را ترکیب کنیم date и time با استفاده از افزونه mutate، منطقه زمانی مورد نیاز خود را ثبت کرده و این مهر زمانی را در آن قرار دهید timestamp با استفاده از افزونه تاریخ:

mutate { 
  add_field => { "data-time" => "%{date} %{time}" }
  remove_field => [ "date", "time" ]
}
date { 
  match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
  timezone => "UTC"
  remove_field => [ "data-time" ]
}

تولید

بخش خروجی برای ارسال گزارش های پردازش شده به گیرنده گزارش استفاده می شود. در صورت ارسال مستقیم به الاستیک از افزونه استفاده می شود الاستیک، که آدرس سرور و الگوی نام فهرست را برای ارسال سند تولید شده مشخص می کند:

output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

پیکربندی نهایی

پیکربندی نهایی به صورت زیر خواهد بود:

input {
  beats {
    port => 5044
  }
}
 
filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
      convert_datatype => { "time-taken" => "int" }
    }
    mutate { 
      add_field => { "data-time" => "%{date} %{time}" }
      remove_field => [ "date", "time" ]
    }
    date { 
      match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
      timezone => "UTC"
      remove_field => [ "data-time" ]
    }
  }
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
    mutate {
      convert => [ "total-bytes", "integer" ]
      convert => [ "recipient-count", "integer" ]
      split => ["recipient_address", ";"]
    }
    date {
      match => [ "date-time", "ISO8601" ]
      timezone => "Europe/Moscow"
      remove_field => [ "date-time" ]
    }
  }
}
 
output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}

پیوندهای مفید:

منبع: www.habr.com

اضافه کردن نظر