تطبيق عملي لـ ELK. إعداد logstash

مقدمة

أثناء نشر نظام آخر ، واجهنا الحاجة إلى معالجة عدد كبير من السجلات المختلفة. تم اختيار ELK كأداة. ستتحدث هذه المقالة عن تجربتنا في إنشاء هذا المكدس.

نحن لا نضع هدفًا لوصف كل قدراته ، لكننا نريد التركيز على حل المشكلات العملية. هذا يرجع إلى حقيقة أنه مع وجود قدر كبير من الوثائق والصور الجاهزة ، هناك الكثير من المزالق ، على الأقل وجدناها.

لقد نشرنا المكدس من خلال تكوين عامل ميناء. علاوة على ذلك ، كان لدينا docker-compose.yml مكتوب جيدًا والذي سمح لنا برفع المكدس دون أي مشاكل تقريبًا. وبدا لنا أن النصر كان قريبًا بالفعل ، والآن سنقوم بتحريفه قليلاً ليناسب احتياجاتنا وهذا كل شيء.

لسوء الحظ ، لم تنجح محاولة ضبط النظام لتلقي السجلات من تطبيقنا ومعالجتها على الفور. لذلك ، قررنا أن الأمر يستحق دراسة كل مكون على حدة ، ثم العودة إلى وصلاتهم.

لذلك دعونا نبدأ مع logstash.

البيئة والنشر وتشغيل Logstash في حاوية

للنشر ، نستخدم docker-compose ، تم إجراء التجارب الموضحة هنا على MacOS و Ubuntu 18.0.4.

صورة logstash التي كانت لدينا في docker-compose.yml الأصلي هي docker.elastic.co/logstash/logstash:6.3.2

سوف نستخدمه للتجارب.

لتشغيل logstash ، كتبنا docker-compose.yml منفصل. بالطبع ، كان من الممكن تشغيل الصورة من سطر الأوامر ، ولكن بعد كل شيء ، قمنا بحل مهمة محددة ، حيث يتم تشغيل كل شيء من docker-compose بالنسبة لنا.

باختصار حول ملفات التكوين

كما يلي من الوصف ، يمكن تشغيل logstash كقناة واحدة ، في هذه الحالة ، يحتاج إلى نقل ملف * .conf أو لعدة قنوات ، وفي هذه الحالة يحتاج إلى نقل ملف pipelines.yml ، والذي بدوره ، سيشير إلى ملفات .conf لكل قناة.
اتخذنا المسار الثاني. بدا لنا أكثر تنوعًا وقابلية للتطوير. لذلك ، أنشأنا pipelines.yml ، وأنشأنا دليل خطوط الأنابيب الذي سنضع فيه ملفات .conf لكل قناة.

يوجد داخل الحاوية ملف تكوين آخر - logstash.yml. نحن لا نلمسها ، بل نستخدمها كما هي.

إذن هيكل الدليل لدينا هو:

تطبيق عملي لـ ELK. إعداد logstash

في الوقت الحالي ، نفترض أن هذا هو tcp على المنفذ 5046 لتلقي بيانات الإدخال ، وسنستخدم stdout للإخراج.

هنا مثل هذا التكوين البسيط للتشغيل الأول. لأن المهمة الأولية هي الإطلاق.

لذلك لدينا هذا عامل البناء compose.yml

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      	- elk
    ports:
      	- 5046:5046
    volumes:
      	- ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
	- ./config/pipelines:/usr/share/logstash/config/pipelines:ro

ما الذي نراه هنا؟

  1. تم أخذ الشبكات والأحجام من docker-compose.yml الأصلي (الذي يتم فيه تشغيل المكدس بالكامل) وأعتقد أنها لا تؤثر بشكل كبير على الصورة العامة هنا.
  2. نقوم بإنشاء خدمة واحدة (خدمات) logstash ، من صورة docker.elastic.co/logstash/logstash:6.3.2 ونطلق عليها اسم logstash_one_channel.
  3. نقوم بإعادة توجيه المنفذ 5046 داخل الحاوية إلى نفس المنفذ الداخلي.
  4. نقوم بتعيين ملف تكوين أنبوب ./config/pipelines.yml الخاص بنا إلى ملف /usr/share/logstash/config/pipelines.yml داخل الحاوية ، حيث سيقوم logstash باستلامه وجعله للقراءة فقط ، فقط في حالة.
  5. نقوم بتعيين دليل ./config/pipelines ، حيث لدينا ملفات تكوين الأنابيب ، إلى دليل / usr / share / logstash / config / pipelines ونجعله أيضًا للقراءة فقط.

تطبيق عملي لـ ELK. إعداد logstash

ملف piping.yml

- pipeline.id: HABR
  pipeline.workers: 1
  pipeline.batch.size: 1
  path.config: "./config/pipelines/habr_pipeline.conf"

يصف قناة واحدة مع معرف HABR والمسار إلى ملف التكوين الخاص بها.

وأخيرًا ملف "./config/pipelines/habr_pipeline.conf"

input {
  tcp {
    port => "5046"
   }
  }
filter {
  mutate {
    add_field => [ "habra_field", "Hello Habr" ]
    }
  }
output {
  stdout {
      
    }
  }

لن ندخل في وصفه في الوقت الحالي ، نحاول تشغيل:

docker-compose up

ماذا نرى؟

بدأت الحاوية. يمكننا التحقق من عملها:

echo '13123123123123123123123213123213' | nc localhost 5046

ونرى الاستجابة في وحدة تحكم الحاوية:

تطبيق عملي لـ ELK. إعداد logstash

لكن في الوقت نفسه ، نرى أيضًا:

logstash_one_channel | [2019-04-29T11: 28: 59,790،9200] [خطأ] [logstash.licensechecker.licensereader] تعذر استرداد معلومات الترخيص من خادم الترخيص {: message => "Elasticsearch Unreachable: [http: // elasticsearch: XNUMX /] [Manticore :: ResolutionFailure] elasticsearch "، ...

logstash_one_channel | [2019-04-29T11: 28: 59,894،0] [INFO] [logstash.pipeline] بدأ خط الأنابيب بنجاح {: pipeline_id => ". Monitoring-logstash"،: thread => "# »}

logstash_one_channel | [2019-04-29T11: 28: 59,988،2] [INFO] [logstash.agent] خطوط الأنابيب قيد التشغيل {: count => XNUMX،: running_pipelines => [: HABR،: ". Monitoring-logstash"]،: non_running_pipelines => [ ]}
logstash_one_channel | [2019-04-29T11: 29: 00,015،XNUMX] [خطأ] [logstash.inputs.metrics] تم تثبيت X-Pack على Logstash ولكن ليس على Elasticsearch. الرجاء تثبيت X-Pack على Elasticsearch لاستخدام ميزة المراقبة. قد تتوفر ميزات أخرى.
logstash_one_channel | [2019-04-29T11: 29: 00,526،9600] [INFO] [logstash.agent] تم بدء تشغيل نقطة نهاية Logstash API بنجاح {: port => XNUMX}
logstash_one_channel | [2019-04-29T11: 29: 04,478،9200] [INFO] [logstash.outputs.elasticsearch] تشغيل التحقق من الصحة لمعرفة ما إذا كان اتصال Elasticsearch يعمل {: healthcheck_url => http: // elasticsearch: XNUMX /،: path => "/"}
logstash_one_channel | [2019-04-29T11: 29: 04,487،XNUMX] [WARN] [logstash.outputs.elasticsearch] تمت محاولة إحياء الاتصال بمثيل ES ميت ، ولكن حدث خطأ. {: url => "elasticsearch: 9200 / "،: error_type => LogStash :: Outputs :: ElasticSearch :: HttpClient :: Pool :: HostUnreachableError،: error =>" Elasticsearch غير قابلة للوصول: [http: // elasticsearch: 9200 /] [Manticore :: ResolutionFailure] elasticsearch "}
logstash_one_channel | [2019-04-29T11: 29: 04,704،9200] [INFO] [logstash.licensechecker.licensereader] تشغيل فحص الصحة لمعرفة ما إذا كان اتصال Elasticsearch يعمل {: healthcheck_url => http: // elasticsearch: XNUMX /،: path => "/"}
logstash_one_channel | [2019-04-29T11: 29: 04,710،XNUMX] [WARN] [logstash.licensechecker.licensereader] تمت محاولة إحياء الاتصال بمثيل ES الميت ، ولكن حدث خطأ. {: url => "elasticsearch: 9200 / "،: error_type => LogStash :: Outputs :: ElasticSearch :: HttpClient :: Pool :: HostUnreachableError،: error =>" Elasticsearch غير قابلة للوصول: [http: // elasticsearch: 9200 /] [Manticore :: ResolutionFailure] elasticsearch "}

وسجلنا يزحف طوال الوقت.

لقد أبرزت هنا باللون الأخضر الرسالة التي تفيد بأن خط الأنابيب بدأ بنجاح ، باللون الأحمر رسالة الخطأ وبالأصفر رسالة حول محاولة الاتصال elasticsearch: 9200.
يحدث هذا بسبب حقيقة أنه في logstash.conf المضمن في الصورة ، هناك فحص لتوافر البحث المرن. بعد كل شيء ، يفترض logstash أنه يعمل كجزء من Elk stack ، وقمنا بفصله.

يمكنك العمل ، لكنها ليست مريحة.

الحل هو تعطيل هذا الاختيار عبر متغير البيئة XPACK_MONITORING_ENABLED.

دعنا نجري تغييرًا على docker-compose.yml ونقوم بتشغيله مرة أخرى:

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro

الآن ، كل شيء على ما يرام. الحاوية جاهزة للتجارب.

يمكننا الكتابة مرة أخرى في وحدة التحكم المجاورة:

echo '13123123123123123123123213123213' | nc localhost 5046

ونرى:

logstash_one_channel | {
logstash_one_channel |         "message" => "13123123123123123123123213123213",
logstash_one_channel |      "@timestamp" => 2019-04-29T11:43:44.582Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |            "host" => "gateway",
logstash_one_channel |            "port" => 49418
logstash_one_channel | }

العمل ضمن قناة واحدة

لذلك بدأنا. الآن يمكنك بالفعل أن تأخذ الوقت الكافي لتكوين logstash مباشرة. دعونا لا نتطرق إلى ملف pipelines.yml في الوقت الحالي ، دعنا نرى ما يمكننا الحصول عليه من خلال العمل مع قناة واحدة.

يجب أن أقول إن المبدأ العام للعمل مع ملف تكوين القناة موصوف جيدًا في الدليل الرسمي هنا هنا
إذا كنت تريد القراءة باللغة الروسية ، فقد استخدمنا هذا مقالة - سلعة(لكن بناء جملة الاستعلام قديم هناك ، يجب أن تأخذ ذلك في الحسبان).

دعنا ننتقل بالتتابع من قسم الإدخال. لقد رأينا بالفعل العمل على برنامج التعاون الفني. ما الأشياء الأخرى التي يمكن أن تكون مثيرة للاهتمام هنا؟

اختبر الرسائل باستخدام نبضات القلب

هناك إمكانية مثيرة للاهتمام لإنشاء رسائل اختبار تلقائية.
للقيام بذلك ، تحتاج إلى تضمين المكون الإضافي heartbean في قسم الإدخال.

input {
  heartbeat {
    message => "HeartBeat!"
   }
  } 

نقوم بتشغيله ، نبدأ في الاستلام مرة واحدة في الدقيقة

logstash_one_channel | {
logstash_one_channel |      "@timestamp" => 2019-04-29T13:52:04.567Z,
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "HeartBeat!",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "host" => "a0667e5c57ec"
logstash_one_channel | }

نريد أن نتلقى في كثير من الأحيان ، نحتاج إلى إضافة معلمة الفاصل الزمني.
هذه هي الطريقة التي سنتلقى بها رسالة كل 10 ثوانٍ.

input {
  heartbeat {
    message => "HeartBeat!"
    interval => 10
   }
  }

الحصول على البيانات من ملف

قررنا أيضًا النظر في وضع الملف. إذا كان يعمل بشكل جيد مع الملف ، فمن الممكن ألا تكون هناك حاجة إلى وكيل ، على الأقل للاستخدام المحلي.

وفقًا للوصف ، يجب أن يكون وضع التشغيل مشابهًا لـ tail -f ، أي يقرأ الأسطر الجديدة أو يقرأ الملف بأكمله اختياريًا.

إذن ما نريد الحصول عليه:

  1. نريد استلام الأسطر الملحقة بملف سجل واحد.
  2. نريد تلقي البيانات المكتوبة في عدة ملفات سجلات ، مع القدرة على فصل ما تم استلامه من مكانه.
  3. نريد التأكد من أنه عند إعادة تشغيل logstash ، لن يتلقى هذه البيانات مرة أخرى.
  4. نريد التحقق من أنه إذا تم تعطيل logstash ، واستمرت كتابة البيانات في الملفات ، فعند تشغيلها ، سوف نتلقى هذه البيانات.

لإجراء التجربة ، دعنا نضيف سطرًا آخر إلى docker-compose.yml ، ونفتح الدليل حيث نضع الملفات.

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro
      - ./logs:/usr/share/logstash/input

وقم بتغيير قسم الإدخال في habr_pipeline.conf

input {
  file {
    path => "/usr/share/logstash/input/*.log"
   }
  }

نحن بدأنا:

docker-compose up

لإنشاء ملفات السجل وكتابتها ، سنستخدم الأمر:


echo '1' >> logs/number1.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:28:53.876Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log"
logstash_one_channel | }

نعم ، إنه يعمل!

في الوقت نفسه ، نرى أننا قمنا تلقائيًا بإضافة حقل المسار. لذلك في المستقبل ، سنتمكن من تصفية السجلات وفقًا لها.

لنجرب مجددا:

echo '2' >> logs/number1.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:28:59.906Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "2",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log"
logstash_one_channel | }

والآن إلى ملف آخر:

 echo '1' >> logs/number2.log

{
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:29:26.061Z,
logstash_one_channel |        "@version" => "1",
logstash_one_channel |         "message" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number2.log"
logstash_one_channel | }

عظيم! تم التقاط الملف ، تم تحديد المسار بشكل صحيح ، كل شيء على ما يرام.

وقف logstash وإعادة التشغيل. دعنا ننتظر. الصمت. أولئك. نحن لا نتلقى هذه السجلات مرة أخرى.

والآن التجربة الأكثر جرأة.

نضع logstash وننفذ:

echo '3' >> logs/number2.log
echo '4' >> logs/number1.log

قم بتشغيل logstash مرة أخرى وانظر:

logstash_one_channel | {
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "3",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number2.log",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:48:50.589Z
logstash_one_channel | }
logstash_one_channel | {
logstash_one_channel |            "host" => "ac2d4e3ef70f",
logstash_one_channel |     "habra_field" => "Hello Habr",
logstash_one_channel |         "message" => "4",
logstash_one_channel |        "@version" => "1",
logstash_one_channel |            "path" => "/usr/share/logstash/input/number1.log",
logstash_one_channel |      "@timestamp" => 2019-04-29T14:48:50.856Z
logstash_one_channel | }

الصيحة! التقط كل شيء.

لكن من الضروري التحذير مما يلي. إذا تمت إزالة حاوية logstash (توقف عامل الإرساء logstash_one_channel && docker rm logstash_one_channel) ، فلن يتم التقاط أي شيء. تم تخزين موضع الملف الذي تمت قراءته من أجله داخل الحاوية. إذا بدأت من الصفر ، فلن تقبل إلا سطورًا جديدة.

قراءة الملفات الموجودة

لنفترض أننا نقوم بتشغيل logstash لأول مرة ، ولكن لدينا بالفعل سجلات ونود معالجتها.
إذا قمنا بتشغيل logstash مع قسم الإدخال الذي استخدمناه أعلاه ، فلن نحصل على أي شيء. سيتم معالجة الأسطر الجديدة فقط بواسطة logstash.

لسحب الأسطر من الملفات الموجودة ، أضف سطرًا إضافيًا إلى قسم الإدخال:

input {
  file {
    start_position => "beginning"
    path => "/usr/share/logstash/input/*.log"
   }
  }

علاوة على ذلك ، هناك فارق بسيط ، وهذا يؤثر فقط على الملفات الجديدة التي لم يرها logstash بعد. بالنسبة للملفات نفسها التي كانت موجودة بالفعل في مجال عرض logstash ، فقد تذكرت بالفعل حجمها وستأخذ الآن سجلات جديدة فقط فيها.

دعنا نتوقف عن هذا من خلال دراسة قسم الإدخال. هناك العديد من الخيارات ، ولكن في الوقت الحالي ، لدينا ما يكفي لإجراء مزيد من التجارب.

التوجيه وتحويل البيانات

دعنا نحاول حل المشكلة التالية ، لنفترض أن لدينا رسائل من قناة واحدة ، بعضها إعلامي وبعضها رسائل خطأ. تختلف في العلامة. بعضها INFO والبعض الآخر خطأ.

نحن بحاجة إلى فصلهم عند الخروج. أولئك. نكتب رسائل إعلامية في قناة ، ونكتب رسائل خطأ في قناة أخرى.

للقيام بذلك ، انتقل من قسم الإدخال للتصفية والإخراج.

باستخدام قسم التصفية ، سنقوم بتحليل الرسالة الواردة ، والحصول على تجزئة (أزواج قيمة مفتاح) منها ، والتي يمكننا العمل بها بالفعل ، أي تحليل حسب الشروط. وفي قسم الإخراج ، سنختار الرسائل ونرسل كل واحدة إلى قناتها الخاصة.

اعراب رسالة مع grok

من أجل تحليل السلاسل النصية والحصول على مجموعة من الحقول منها ، هناك مكون إضافي خاص في قسم التصفية - grok.

دون أن أضع نفسي هدفاً لإعطاء وصف مفصل له هنا (لهذا أشير إليه الوثائق الرسمية) ، سأقدم مثال بسيط.

للقيام بذلك ، تحتاج إلى تحديد تنسيق خطوط الإدخال. لدي مثل هذا:

1 رسالة معلومات 1
2 رسالة خطأ 2

أولئك. المعرف أولاً ، ثم INFO / ERROR ، ثم كلمة بدون مسافات.
ليست صعبة ، لكنها كافية لفهم مبدأ العملية.

لذلك ، في قسم التصفية ، في البرنامج المساعد grok ، نحتاج إلى تحديد نمط لتحليل سلاسلنا.

سيبدو مثل هذا:

filter {
  grok {
    match => { "message" => ["%{INT:message_id} %{LOGLEVEL:message_type} %{WORD:message_text}"] }
   }
  } 

في الأساس ، إنه تعبير عادي. يتم استخدام الأنماط الجاهزة ، مثل INT و LOGLEVEL و WORD. يمكن الاطلاع على وصفهم ، بالإضافة إلى الأنماط الأخرى ، هنا. هنا

الآن ، بالمرور عبر هذا الفلتر ، ستتحول السلسلة الخاصة بنا إلى تجزئة من ثلاثة حقول: message_id و message_type و message_text.

سيتم عرضها في قسم الإخراج.

توجيه الرسائل في قسم الإخراج باستخدام الأمر if

في قسم الإخراج ، كما نتذكر ، كنا سنقسم الرسائل إلى مجموعتين. بعضها - وهو iNFO ، سنخرج إلى وحدة التحكم ، ومع وجود أخطاء ، سنخرج إلى ملف.

كيف يمكننا مشاركة هذه الرسائل؟ تشير حالة المشكلة بالفعل إلى حل - بعد كل شيء ، لدينا بالفعل حقل نوع message_type مخصص ، والذي يمكن أن يأخذ قيمتين فقط INFO و ERROR. وعليه سنتخذ خيارًا باستخدام عبارة if.

if [message_type] == "ERROR" {
        # Здесь выводим в файл
       } else
     {
      # Здесь выводим в stdout
    }

يمكن العثور على وصف العمل مع الحقول والمشغلين في هذا القسم دليل رسمي.

الآن ، حول الاستنتاج نفسه.

إخراج وحدة التحكم ، كل شيء واضح هنا - stdout {}

لكن الإخراج إلى الملف - تذكر أننا نقوم بتشغيل كل هذا من الحاوية ومن أجل الوصول إلى الملف الذي نكتب فيه النتيجة من الخارج ، نحتاج إلى فتح هذا الدليل في docker-compose.yml.

المجموع:

يبدو قسم الإخراج في ملفنا كالتالي:


output {
  if [message_type] == "ERROR" {
    file {
          path => "/usr/share/logstash/output/test.log"
          codec => line { format => "custom format: %{message}"}
         }
    } else
     {stdout {
             }
     }
  }

أضف وحدة تخزين أخرى إلى docker-compose.yml للإخراج:

version: '3'

networks:
  elk:

volumes:
  elasticsearch:
    driver: local

services:

  logstash:
    container_name: logstash_one_channel
    image: docker.elastic.co/logstash/logstash:6.3.2
    networks:
      - elk
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 5046:5046
   volumes:
      - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro
      - ./config/pipelines:/usr/share/logstash/config/pipelines:ro
      - ./logs:/usr/share/logstash/input
      - ./output:/usr/share/logstash/output

نبدأ ، نحاول ، نرى التقسيم إلى دفقين.

المصدر: www.habr.com

إضافة تعليق