أتمتة تسليم التدفق في Apache NiFi

مرحبا بالجميع!

أتمتة تسليم التدفق في Apache NiFi

المهمة هي كما يلي - هناك تدفق، معروض في الصورة أعلاه، والذي يجب نشره على خوادم N مع اباتشي نيفي. اختبار التدفق - يتم إنشاء ملف وإرساله إلى مثيل NiFi آخر. يتم نقل البيانات باستخدام بروتوكول NiFi Site to Site.

يعد NiFi Site to Site (S2S) طريقة آمنة وسهلة التكوين لنقل البيانات بين مثيلات NiFi. كيف يعمل S2S، انظر توثيق ومن المهم ألا ننسى تكوين مثيل NiFi للسماح بـ S2S، راجع هنا.

في الحالات التي نتحدث فيها عن نقل البيانات باستخدام S2S، يُطلق على أحد المثيلات اسم العميل، وهو الخادم الثاني. يرسل العميل البيانات ويستقبلها الخادم. طريقتان لتكوين نقل البيانات بينهما:

  1. دفع. من مثيل العميل، يتم إرسال البيانات باستخدام مجموعة العمليات البعيدة (RPG). في مثيل الخادم، يتم تلقي البيانات باستخدام منفذ الإدخال
  2. سحب. يتلقى الخادم البيانات باستخدام آر بي جي، ويرسلها العميل باستخدام منفذ الإخراج.


يتم تخزين التدفق الخاص بالطرح في سجل Apache.

يعد Apache NiFi Registry مشروعًا فرعيًا لـ Apache NiFi يوفر أداة لتخزين التدفق والتحكم في الإصدار. نوع من الجهاز الهضمي. يمكن العثور على معلومات حول تثبيت السجل وتكوينه والعمل معه في الوثائق الرسمية. يتم دمج تدفق التخزين في مجموعة عمليات ويتم تخزينه بهذا النموذج في السجل. وسنعود إلى هذا لاحقًا في المقال.

في البداية، عندما يكون N رقمًا صغيرًا، يتم تسليم التدفق وتحديثه يدويًا في وقت مقبول.

ولكن مع نمو N، تصبح المشاكل أكثر عددًا:

  1. يستغرق المزيد من الوقت لتحديث التدفق. تحتاج إلى تسجيل الدخول إلى جميع الخوادم
  2. تحدث أخطاء في تحديث القالب. هنا قاموا بتحديثه، ولكن هنا نسوا
  3. الأخطاء البشرية عند تنفيذ عدد كبير من العمليات المماثلة

كل هذا يقودنا إلى حقيقة أننا بحاجة إلى أتمتة العملية. لقد جربت الطرق التالية لحل هذه المشكلة:

  1. استخدم MiNiFi بدلاً من NiFi
  2. نيفي كلي
  3. NiPyAPI

باستخدام مينيفاي

أباتشي مينيفاي - مشروع فرعي لـ Apache NiFi. MiNiFy هو وكيل مدمج يستخدم نفس المعالجات مثل NiFi، مما يسمح لك بإنشاء نفس التدفقات كما في NiFi. يتم تحقيق الطبيعة الخفيفة للوكيل، من بين أمور أخرى، من خلال حقيقة أن MiNiFy لا يحتوي على واجهة رسومية لتكوين التدفق. يعني عدم وجود واجهة رسومية في MiNiFy أنه من الضروري حل مشكلة توصيل التدفق إلى minifi. نظرًا لاستخدام MiNiFy بشكل نشط في IOT، فهناك العديد من المكونات ويجب أتمتة عملية توصيل التدفق إلى مثيلات minifi النهائية. مهمة مألوفة، أليس كذلك؟

سيساعد مشروع فرعي آخر في حل هذه المشكلة - خادم MiNiFi C2. يهدف هذا المنتج إلى أن يكون النقطة المركزية في بنية طرح التكوين. كيفية تكوين البيئة - موصوفة في هذا المقال هناك معلومات كافية عن حبري لحل المشكلة. يقوم MiNiFi، بالتعاون مع خادم C2، بتحديث تكوينه تلقائيًا. العيب الوحيد في هذا الأسلوب هو أنه يتعين عليك إنشاء قوالب على خادم C2؛ فالالتزام البسيط بالتسجيل ليس كافيًا.

الخيار الموضح في المقال أعلاه فعال وليس صعب التنفيذ، لكن يجب ألا ننسى ما يلي:

  1. Minifi لا يحتوي على كافة المعالجات من nifi
  2. إصدارات معالج Minifi تتخلف عن إصدارات معالج NiFi.

في وقت كتابة هذا التقرير، كان أحدث إصدار من NiFi هو 1.9.2. أحدث إصدار لمعالج MiNiFi هو 1.7.0. يمكن إضافة المعالجات إلى MiNiFi، ولكن بسبب اختلافات الإصدار بين معالجات NiFi وMiNiFi، قد لا يعمل هذا.

نيفي كلي

اذا حكمنا من خلال وصف أداة على الموقع الرسمي، وهي أداة لأتمتة التفاعل بين NiFI وNiFi Registry في مجال تسليم التدفق أو إدارة العمليات. للبدء، تحتاج إلى تنزيل هذه الأداة. من هنا.

قم بتشغيل الأداة المساعدة

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

لكي نتمكن من تحميل التدفق المطلوب من السجل، نحتاج إلى معرفة معرفات الجرافة (معرف الجرافة) والتدفق نفسه (معرف التدفق). يمكن الحصول على هذه البيانات إما من خلال cli أو من واجهة ويب تسجيل NiFi. في واجهة الويب يبدو كما يلي:

أتمتة تسليم التدفق في Apache NiFi

باستخدام CLI يتم ذلك:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

نبدأ في استيراد مجموعة العمليات من التسجيل:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

النقطة المهمة هي أنه يمكن تحديد أي مثيل nifi باعتباره المضيف الذي ننتقل إليه مجموعة العمليات.

تمت إضافة مجموعة العمليات مع المعالجات المتوقفة، ويجب أن تبدأ

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

عظيم، بدأت المعالجات. ومع ذلك، وفقًا لشروط المهمة، نحتاج إلى مثيلات NiFi لإرسال البيانات إلى مثيلات أخرى. لنفترض أنك اخترت طريقة الدفع لنقل البيانات إلى الخادم. من أجل تنظيم نقل البيانات، تحتاج إلى تمكين نقل البيانات على مجموعة العمليات البعيدة (RPG) المضافة، والتي تم تضمينها بالفعل في التدفق الخاص بنا.

أتمتة تسليم التدفق في Apache NiFi

في الوثائق الموجودة في CLI والمصادر الأخرى، لم أجد طريقة لتمكين نقل البيانات. إذا كنت تعرف كيفية القيام بذلك، يرجى الكتابة في التعليقات.

بما أن لدينا باش ونحن على استعداد للذهاب إلى النهاية، فسنجد طريقة للخروج! يمكنك استخدام NiFi API لحل هذه المشكلة. دعونا نستخدم الطريقة التالية، خذ المعرف من الأمثلة أعلاه (في حالتنا هو 7f522a13-016e-1000-e504-d5b15587f2f3). وصف طرق NiFi API هنا.

أتمتة تسليم التدفق في Apache NiFi
في الجسم تحتاج إلى تمرير JSON، مثل هذا:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

المعلمات التي يجب ملؤها حتى تعمل:
حالة - حالة نقل البيانات. متاح: TRANSMITTING لتمكين نقل البيانات، وSTOPPED للتعطيل
الإصدار - نسخة المعالج

سيكون الإصدار الافتراضي هو 0 عند إنشائه، ولكن يمكن الحصول على هذه المعلمات باستخدام الطريقة

أتمتة تسليم التدفق في Apache NiFi

بالنسبة لمحبي نصوص bash، قد تبدو هذه الطريقة مناسبة، لكنها صعبة بعض الشيء بالنسبة لي - نصوص bash ليست المفضلة لدي. الطريقة التالية أكثر إثارة للاهتمام وملاءمة في رأيي.

NiPyAPI

NiPyAPI هي مكتبة Python للتفاعل مع مثيلات NiFi. صفحة التوثيق يحتوي على المعلومات اللازمة للعمل مع المكتبة. تم وصف البداية السريعة في مشروع على جيثب.

البرنامج النصي الخاص بنا لطرح التكوين هو برنامج بلغة Python. دعنا ننتقل إلى الترميز.
قمنا بإعداد التكوينات لمزيد من العمل. سنحتاج إلى المعلمات التالية:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

بعد ذلك سأقوم بإدراج أسماء طرق هذه المكتبة الموضحة هنا.

قم بتوصيل السجل بمثيل nifi باستخدام

nipyapi.versioning.create_registry_client

في هذه الخطوة، يمكنك أيضًا إضافة التحقق من إضافة السجل بالفعل إلى المثيل؛ ولهذا يمكنك استخدام الطريقة

nipyapi.versioning.list_registry_clients

نجد الدلو لمزيد من البحث عن التدفق في السلة

nipyapi.versioning.get_registry_bucket

باستخدام الدلو الموجود، نبحث عن التدفق

nipyapi.versioning.get_flow_in_bucket

بعد ذلك، من المهم أن نفهم ما إذا كانت مجموعة العمليات هذه قد تمت إضافتها بالفعل. يتم وضع مجموعة العمليات وفقًا للإحداثيات وقد ينشأ موقف عندما يتم تركيب مكون ثانٍ فوق مكون آخر. لقد تأكدت من أن هذا يمكن أن يحدث :) للحصول على جميع مجموعات العمليات المضافة، نستخدم هذه الطريقة

nipyapi.canvas.list_all_process_groups

يمكننا إجراء المزيد من البحث، على سبيل المثال، بالاسم.

لن أصف عملية تحديث القالب، سأقول فقط أنه إذا تمت إضافة المعالجات في الإصدار الجديد من القالب، فلا توجد مشاكل مع وجود الرسائل في قوائم الانتظار. ولكن إذا تمت إزالة المعالجات، فقد تنشأ مشاكل (لا يسمح لك Nifi بإزالة المعالج إذا تراكمت قائمة انتظار الرسائل أمامه). إذا كنت مهتمًا بكيفية حل هذه المشكلة، فيرجى الكتابة لي وسنناقش هذه المشكلة. جهات الاتصال في نهاية المقال. دعنا ننتقل إلى خطوة إضافة مجموعة العمليات.

عند تصحيح البرنامج النصي، واجهت خصوصية تتمثل في عدم سحب أحدث إصدار من التدفق دائمًا، لذا أوصي بالتحقق من هذا الإصدار أولاً:

nipyapi.versioning.get_latest_flow_ver

نشر مجموعة العمليات:

nipyapi.versioning.deploy_flow_version

نبدأ المعالجات:

nipyapi.canvas.schedule_process_group

في الكتلة الخاصة بـ CLI، تمت كتابة أنه لا يتم تمكين نقل البيانات تلقائيًا في مجموعة العمليات عن بعد؟ عند تنفيذ البرنامج النصي، واجهت هذه المشكلة أيضًا. في ذلك الوقت، لم أتمكن من بدء نقل البيانات باستخدام واجهة برمجة التطبيقات (API) وقررت الكتابة إلى مطور مكتبة NiPyAPI وطلب النصيحة/المساعدة. استجاب لي المطور، وناقشنا المشكلة وكتب أنه يحتاج إلى وقت "للتحقق من شيء ما". وبعد ذلك بيومين، تصل رسالة تحتوي على دالة مكتوبة بلغة بايثون تحل مشكلة الإطلاق الخاصة بي!!! في ذلك الوقت، كان إصدار NiPyAPI هو 0.13.3، وبالطبع لم يكن هناك شيء من هذا القبيل. ولكن في الإصدار 0.14.0، الذي تم إصداره مؤخرًا، تم تضمين هذه الوظيفة بالفعل في المكتبة. يقابل،

nipyapi.canvas.set_remote_process_group_transmission

لذلك، باستخدام مكتبة NiPyAPI، قمنا بتوصيل السجل، ونشر التدفق، وحتى بدأنا المعالجات ونقل البيانات. ثم يمكنك تمشيط الكود وإضافة جميع أنواع الشيكات والتسجيل، وهذا كل شيء. لكن هذه قصة مختلفة تمامًا.

من بين خيارات الأتمتة التي فكرت فيها، بدا لي أن الخيار الأخير هو الأكثر كفاءة. أولاً، لا يزال هذا كود بايثون، حيث يمكنك تضمين كود البرنامج المساعد فيه والاستفادة من جميع مزايا لغة البرمجة. ثانيا، يتطور مشروع NiPyAPI بنشاط وفي حالة وجود مشاكل يمكنك الكتابة إلى المطور. ثالثا، لا تزال NiPyAPI أداة أكثر مرونة للتفاعل مع NiFi في حل المشكلات المعقدة. على سبيل المثال، في تحديد ما إذا كانت قوائم انتظار الرسائل فارغة الآن في التدفق وما إذا كان من الممكن تحديث مجموعة العمليات.

هذا كل شئ. لقد وصفت 3 طرق لأتمتة تسليم التدفق في NiFi، والمزالق التي قد يواجهها المطور، وقدمت كود العمل لأتمتة التسليم. إذا كنت مهتمًا بهذا الموضوع مثلي - بداية!

المصدر: www.habr.com

إضافة تعليق