مرحبا بالجميع!
المهمة هي كما يلي - هناك تدفق، معروض في الصورة أعلاه، والذي يجب نشره على خوادم N مع
يعد NiFi Site to Site (S2S) طريقة آمنة وسهلة التكوين لنقل البيانات بين مثيلات NiFi. كيف يعمل S2S، انظر
في الحالات التي نتحدث فيها عن نقل البيانات باستخدام S2S، يُطلق على أحد المثيلات اسم العميل، وهو الخادم الثاني. يرسل العميل البيانات ويستقبلها الخادم. طريقتان لتكوين نقل البيانات بينهما:
- دفع. من مثيل العميل، يتم إرسال البيانات باستخدام مجموعة العمليات البعيدة (RPG). في مثيل الخادم، يتم تلقي البيانات باستخدام منفذ الإدخال
- سحب. يتلقى الخادم البيانات باستخدام آر بي جي، ويرسلها العميل باستخدام منفذ الإخراج.
يتم تخزين التدفق الخاص بالطرح في سجل Apache.
يعد Apache NiFi Registry مشروعًا فرعيًا لـ Apache NiFi يوفر أداة لتخزين التدفق والتحكم في الإصدار. نوع من الجهاز الهضمي. يمكن العثور على معلومات حول تثبيت السجل وتكوينه والعمل معه في
في البداية، عندما يكون N رقمًا صغيرًا، يتم تسليم التدفق وتحديثه يدويًا في وقت مقبول.
ولكن مع نمو N، تصبح المشاكل أكثر عددًا:
- يستغرق المزيد من الوقت لتحديث التدفق. تحتاج إلى تسجيل الدخول إلى جميع الخوادم
- تحدث أخطاء في تحديث القالب. هنا قاموا بتحديثه، ولكن هنا نسوا
- الأخطاء البشرية عند تنفيذ عدد كبير من العمليات المماثلة
كل هذا يقودنا إلى حقيقة أننا بحاجة إلى أتمتة العملية. لقد جربت الطرق التالية لحل هذه المشكلة:
- استخدم MiNiFi بدلاً من NiFi
- نيفي كلي
- NiPyAPI
باستخدام مينيفاي
سيساعد مشروع فرعي آخر في حل هذه المشكلة - خادم MiNiFi C2. يهدف هذا المنتج إلى أن يكون النقطة المركزية في بنية طرح التكوين. كيفية تكوين البيئة - موصوفة في
الخيار الموضح في المقال أعلاه فعال وليس صعب التنفيذ، لكن يجب ألا ننسى ما يلي:
- Minifi لا يحتوي على كافة المعالجات من nifi
- إصدارات معالج Minifi تتخلف عن إصدارات معالج NiFi.
في وقت كتابة هذا التقرير، كان أحدث إصدار من NiFi هو 1.9.2. أحدث إصدار لمعالج MiNiFi هو 1.7.0. يمكن إضافة المعالجات إلى MiNiFi، ولكن بسبب اختلافات الإصدار بين معالجات NiFi وMiNiFi، قد لا يعمل هذا.
نيفي كلي
اذا حكمنا من خلال
قم بتشغيل الأداة المساعدة
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
لكي نتمكن من تحميل التدفق المطلوب من السجل، نحتاج إلى معرفة معرفات الجرافة (معرف الجرافة) والتدفق نفسه (معرف التدفق). يمكن الحصول على هذه البيانات إما من خلال cli أو من واجهة ويب تسجيل NiFi. في واجهة الويب يبدو كما يلي:
باستخدام CLI يتم ذلك:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
نبدأ في استيراد مجموعة العمليات من التسجيل:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
النقطة المهمة هي أنه يمكن تحديد أي مثيل nifi باعتباره المضيف الذي ننتقل إليه مجموعة العمليات.
تمت إضافة مجموعة العمليات مع المعالجات المتوقفة، ويجب أن تبدأ
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
عظيم، بدأت المعالجات. ومع ذلك، وفقًا لشروط المهمة، نحتاج إلى مثيلات NiFi لإرسال البيانات إلى مثيلات أخرى. لنفترض أنك اخترت طريقة الدفع لنقل البيانات إلى الخادم. من أجل تنظيم نقل البيانات، تحتاج إلى تمكين نقل البيانات على مجموعة العمليات البعيدة (RPG) المضافة، والتي تم تضمينها بالفعل في التدفق الخاص بنا.
في الوثائق الموجودة في CLI والمصادر الأخرى، لم أجد طريقة لتمكين نقل البيانات. إذا كنت تعرف كيفية القيام بذلك، يرجى الكتابة في التعليقات.
بما أن لدينا باش ونحن على استعداد للذهاب إلى النهاية، فسنجد طريقة للخروج! يمكنك استخدام NiFi API لحل هذه المشكلة. دعونا نستخدم الطريقة التالية، خذ المعرف من الأمثلة أعلاه (في حالتنا هو 7f522a13-016e-1000-e504-d5b15587f2f3). وصف طرق NiFi API
في الجسم تحتاج إلى تمرير JSON، مثل هذا:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
المعلمات التي يجب ملؤها حتى تعمل:
حالة - حالة نقل البيانات. متاح: TRANSMITTING لتمكين نقل البيانات، وSTOPPED للتعطيل
الإصدار - نسخة المعالج
سيكون الإصدار الافتراضي هو 0 عند إنشائه، ولكن يمكن الحصول على هذه المعلمات باستخدام الطريقة
بالنسبة لمحبي نصوص bash، قد تبدو هذه الطريقة مناسبة، لكنها صعبة بعض الشيء بالنسبة لي - نصوص bash ليست المفضلة لدي. الطريقة التالية أكثر إثارة للاهتمام وملاءمة في رأيي.
NiPyAPI
NiPyAPI هي مكتبة Python للتفاعل مع مثيلات NiFi.
البرنامج النصي الخاص بنا لطرح التكوين هو برنامج بلغة Python. دعنا ننتقل إلى الترميز.
قمنا بإعداد التكوينات لمزيد من العمل. سنحتاج إلى المعلمات التالية:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
بعد ذلك سأقوم بإدراج أسماء طرق هذه المكتبة الموضحة
قم بتوصيل السجل بمثيل nifi باستخدام
nipyapi.versioning.create_registry_client
في هذه الخطوة، يمكنك أيضًا إضافة التحقق من إضافة السجل بالفعل إلى المثيل؛ ولهذا يمكنك استخدام الطريقة
nipyapi.versioning.list_registry_clients
نجد الدلو لمزيد من البحث عن التدفق في السلة
nipyapi.versioning.get_registry_bucket
باستخدام الدلو الموجود، نبحث عن التدفق
nipyapi.versioning.get_flow_in_bucket
بعد ذلك، من المهم أن نفهم ما إذا كانت مجموعة العمليات هذه قد تمت إضافتها بالفعل. يتم وضع مجموعة العمليات وفقًا للإحداثيات وقد ينشأ موقف عندما يتم تركيب مكون ثانٍ فوق مكون آخر. لقد تأكدت من أن هذا يمكن أن يحدث :) للحصول على جميع مجموعات العمليات المضافة، نستخدم هذه الطريقة
nipyapi.canvas.list_all_process_groups
يمكننا إجراء المزيد من البحث، على سبيل المثال، بالاسم.
لن أصف عملية تحديث القالب، سأقول فقط أنه إذا تمت إضافة المعالجات في الإصدار الجديد من القالب، فلا توجد مشاكل مع وجود الرسائل في قوائم الانتظار. ولكن إذا تمت إزالة المعالجات، فقد تنشأ مشاكل (لا يسمح لك Nifi بإزالة المعالج إذا تراكمت قائمة انتظار الرسائل أمامه). إذا كنت مهتمًا بكيفية حل هذه المشكلة، فيرجى الكتابة لي وسنناقش هذه المشكلة. جهات الاتصال في نهاية المقال. دعنا ننتقل إلى خطوة إضافة مجموعة العمليات.
عند تصحيح البرنامج النصي، واجهت خصوصية تتمثل في عدم سحب أحدث إصدار من التدفق دائمًا، لذا أوصي بالتحقق من هذا الإصدار أولاً:
nipyapi.versioning.get_latest_flow_ver
نشر مجموعة العمليات:
nipyapi.versioning.deploy_flow_version
نبدأ المعالجات:
nipyapi.canvas.schedule_process_group
في الكتلة الخاصة بـ CLI، تمت كتابة أنه لا يتم تمكين نقل البيانات تلقائيًا في مجموعة العمليات عن بعد؟ عند تنفيذ البرنامج النصي، واجهت هذه المشكلة أيضًا. في ذلك الوقت، لم أتمكن من بدء نقل البيانات باستخدام واجهة برمجة التطبيقات (API) وقررت الكتابة إلى مطور مكتبة NiPyAPI وطلب النصيحة/المساعدة. استجاب لي المطور، وناقشنا المشكلة وكتب أنه يحتاج إلى وقت "للتحقق من شيء ما". وبعد ذلك بيومين، تصل رسالة تحتوي على دالة مكتوبة بلغة بايثون تحل مشكلة الإطلاق الخاصة بي!!! في ذلك الوقت، كان إصدار NiPyAPI هو 0.13.3، وبالطبع لم يكن هناك شيء من هذا القبيل. ولكن في الإصدار 0.14.0، الذي تم إصداره مؤخرًا، تم تضمين هذه الوظيفة بالفعل في المكتبة. يقابل،
nipyapi.canvas.set_remote_process_group_transmission
لذلك، باستخدام مكتبة NiPyAPI، قمنا بتوصيل السجل، ونشر التدفق، وحتى بدأنا المعالجات ونقل البيانات. ثم يمكنك تمشيط الكود وإضافة جميع أنواع الشيكات والتسجيل، وهذا كل شيء. لكن هذه قصة مختلفة تمامًا.
من بين خيارات الأتمتة التي فكرت فيها، بدا لي أن الخيار الأخير هو الأكثر كفاءة. أولاً، لا يزال هذا كود بايثون، حيث يمكنك تضمين كود البرنامج المساعد فيه والاستفادة من جميع مزايا لغة البرمجة. ثانيا، يتطور مشروع NiPyAPI بنشاط وفي حالة وجود مشاكل يمكنك الكتابة إلى المطور. ثالثا، لا تزال NiPyAPI أداة أكثر مرونة للتفاعل مع NiFi في حل المشكلات المعقدة. على سبيل المثال، في تحديد ما إذا كانت قوائم انتظار الرسائل فارغة الآن في التدفق وما إذا كان من الممكن تحديث مجموعة العمليات.
هذا كل شئ. لقد وصفت 3 طرق لأتمتة تسليم التدفق في NiFi، والمزالق التي قد يواجهها المطور، وقدمت كود العمل لأتمتة التسليم. إذا كنت مهتمًا بهذا الموضوع مثلي -
المصدر: www.habr.com