اتوماسیون تحویل جریان در Apache NiFi

خوش آمدید!

اتوماسیون تحویل جریان در Apache NiFi

وظیفه به شرح زیر است - یک جریان نشان داده شده در تصویر بالا وجود دارد که باید به سرورهای N با آپاچی نی فای. تست جریان - یک فایل در حال تولید و ارسال به یک نمونه NiFi دیگر است. انتقال داده با استفاده از پروتکل NiFi Site به Site انجام می شود.

NiFi Site to Site (S2S) یک روش امن و بسیار قابل تنظیم برای انتقال داده ها بین نمونه های NiFi است. ببینید S2S چگونه کار می کند مستندات و مهم است که به یاد داشته باشید که نمونه NiFi خود را تنظیم کنید تا امکان دیدن S2S فراهم شود اینجا.

وقتی صحبت از انتقال داده با استفاده از S2S می شود، یک نمونه مشتری نامیده می شود، دومی سرور. مشتری داده ها را ارسال می کند، سرور آن را دریافت می کند. دو روش برای تنظیم انتقال داده بین آنها:

  1. فشار. داده ها از نمونه مشتری با استفاده از یک گروه پردازش از راه دور (RPG) ارسال می شود. در نمونه سرور، داده ها با استفاده از پورت ورودی دریافت می شوند
  2. کشیدن. سرور داده ها را با استفاده از RPG دریافت می کند، مشتری با استفاده از پورت خروجی ارسال می کند.


جریان برای نورد در رجیستری آپاچی ذخیره می شود.

Apache NiFi Registry یک پروژه فرعی از Apache NiFi است که ابزار ذخیره‌سازی جریان و نسخه‌سازی را فراهم می‌کند. نوعی GIT اطلاعات مربوط به نصب، پیکربندی و کار با رجیستری را می توان در این قسمت یافت اسناد رسمی. جریان برای ذخیره سازی در یک گروه فرآیند ترکیب شده و در رجیستری به این شکل ذخیره می شود. در ادامه مقاله به این موضوع باز خواهیم گشت.

در شروع، زمانی که N یک عدد کوچک است، جریان به صورت دستی در یک زمان معقول به روز می شود.

اما با رشد N، مشکلات بیشتری وجود دارد:

  1. زمان بیشتری برای به روز رسانی جریان طول می کشد. شما باید به همه سرورها بروید
  2. در به روز رسانی قالب ها خطاهایی وجود دارد. اینجا آپدیت کردند اما اینجا فراموش کردند
  3. خطای انسانی هنگام انجام تعداد زیادی عملیات مشابه

همه اینها ما را به این واقعیت می رساند که لازم است فرآیند را خودکار کنیم. من راه های زیر را برای حل این مشکل امتحان کرده ام:

  1. به جای NiFi از MiniFi استفاده کنید
  2. NiFi CLI
  3. NiPyAPI

استفاده از MiniFi

ApacheMiNify یک پروژه فرعی از Apache NiFi است. MiNiFy یک عامل فشرده است که از پردازنده های مشابه NiFi استفاده می کند و به شما امکان می دهد جریانی مشابه در NiFi ایجاد کنید. سبک بودن عامل، از جمله، به دلیل این واقعیت است که MiNiFy یک رابط گرافیکی برای پیکربندی جریان ندارد. عدم وجود رابط گرافیکی در MiniFy به این معنی است که حل مشکل تحویل جریان در مینی فای ضروری است. از آنجایی که MiniFy به طور فعال در IOT استفاده می شود، اجزای زیادی وجود دارد و فرآیند تحویل جریان به نمونه های نهایی مینی فای باید خودکار باشد. یک کار آشنا، درست است؟

یک پروژه فرعی دیگر، سرور MiniFi C2، به حل این مشکل کمک می کند. این محصول به عنوان نقطه مرکزی در معماری استقرار در نظر گرفته شده است. نحوه پیکربندی محیط - شرح داده شده در این مقاله در Habré و اطلاعات برای حل مشکل کافی است. MiniFi در ارتباط با سرور C2 به طور خودکار پیکربندی خود را به روز می کند. تنها ایراد این روش این است که باید در سرور C2 قالب ایجاد کنید، یک commit ساده به رجیستری کافی نیست.

گزینه ای که در مقاله بالا توضیح داده شد کار می کند و پیاده سازی آن دشوار نیست، اما نباید موارد زیر را فراموش کنیم:

  1. minifi همه پردازنده های nifi را ندارد
  2. نسخه های CPU در Minifi از نسخه های CPU در NiFi عقب هستند.

در زمان نوشتن، آخرین نسخه NiFi 1.9.2 است. نسخه پردازنده آخرین نسخه MiniFi 1.7.0 است. پردازنده‌ها را می‌توان به MiniFi اضافه کرد، اما به دلیل عدم تطابق نسخه بین پردازنده‌های NiFi و MiniFi، ممکن است این کار کار نکند.

NiFi CLI

قضاوت کردن توسط شرح ابزار موجود در وب سایت رسمی، این ابزاری برای خودکارسازی تعامل بین NiFI و NiFi Registry در زمینه تحویل جریان یا مدیریت فرآیند است. برای شروع این ابزار را دانلود کنید. از این رو.

برنامه کاربردی را اجرا کنید

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

برای اینکه بتوانیم جریان لازم را از رجیستری بارگیری کنیم، باید شناسه های سبد (شناسه سطل) و خود جریان (شناسه جریان) را بشناسیم. این داده ها را می توان از طریق cli یا در رابط وب رجیستری NiFi به دست آورد. رابط وب به شکل زیر است:

اتوماسیون تحویل جریان در Apache NiFi

با استفاده از CLI، این کار را انجام می دهید:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

گروه فرآیند واردات را از رجیستری اجرا کنید:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

یک نکته مهم این است که هر نمونه nifi را می توان به عنوان میزبانی که گروه پردازش را روی آن رول می کنیم، مشخص کرد.

گروه فرآیند با پردازنده های متوقف شده اضافه شده است، آنها باید راه اندازی شوند

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

عالی، پردازنده ها شروع شده اند. با این حال، با توجه به شرایط مشکل، ما به نمونه های NiFi برای ارسال داده به نمونه های دیگر نیاز داریم. بیایید فرض کنیم که روش Push برای انتقال داده ها به سرور انتخاب شده است. برای سازماندهی انتقال داده، لازم است انتقال داده (فعال کردن انتقال) را در گروه پردازش از راه دور اضافه شده (RPG) فعال کنید، که قبلاً در جریان ما گنجانده شده است.

اتوماسیون تحویل جریان در Apache NiFi

در اسناد موجود در CLI و سایر منابع، راهی برای فعال کردن انتقال داده پیدا نکردم. اگر می دانید چگونه این کار را انجام دهید، لطفاً در نظرات بنویسید.

از آنجایی که ما bash داریم و آماده هستیم تا به آخر برویم، راه خروجی خواهیم یافت! برای حل این مشکل می توانید از NiFi API استفاده کنید. بیایید از روش زیر استفاده کنیم، شناسه را از مثال های بالا می گیریم (در مورد ما 7f522a13-016e-1000-e504-d5b15587f2f3 است). شرح روش های NiFi API اینجا.

اتوماسیون تحویل جریان در Apache NiFi
در بدنه، باید JSON به شکل زیر را پاس کنید:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

پارامترهایی که برای "کار کردن" باید پر شوند:
بود - وضعیت انتقال داده TRANSMITTING موجود برای فعال کردن انتقال داده، STOPPED برای غیرفعال کردن
نسخه - نسخه پردازنده

نسخه به طور پیش فرض در هنگام ایجاد 0 خواهد بود، اما این پارامترها را می توان با استفاده از روش به دست آورد

اتوماسیون تحویل جریان در Apache NiFi

برای دوستداران اسکریپت bash، این روش ممکن است مناسب به نظر برسد، اما برای من سخت است - اسکریپت های bash مورد علاقه من نیستند. راه بعدی به نظر من جالب تر و راحت تر است.

NiPyAPI

NiPyAPI یک کتابخانه پایتون برای تعامل با نمونه های NiFi است. صفحه مستندات حاوی اطلاعات لازم برای کار با کتابخانه است. شروع سریع در شرح داده شده است پیش نویس در github.

اسکریپت ما برای اجرای پیکربندی یک برنامه پایتون است. بریم سراغ کدنویسی.
تنظیمات را برای کار بیشتر تنظیم کنید. ما به پارامترهای زیر نیاز خواهیم داشت:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

در ادامه نام روش های این کتابخانه را درج می کنم که شرح داده شده است اینجا.

ما رجیستری را با استفاده از نمونه nifi وصل می کنیم

nipyapi.versioning.create_registry_client

در این مرحله همچنین می توانید چکی اضافه کنید که رجیستری قبلاً به نمونه اضافه شده است، برای این کار می توانید از روش استفاده کنید

nipyapi.versioning.list_registry_clients

ما سطل را برای جستجوی بیشتر برای جریان در سبد پیدا می کنیم

nipyapi.versioning.get_registry_bucket

با توجه به سطل پیدا شده، ما به دنبال جریان هستیم

nipyapi.versioning.get_flow_in_bucket

در مرحله بعد، مهم است که بفهمیم آیا این گروه فرآیند قبلاً اضافه شده است یا خیر. گروه فرآیند بر اساس مختصات قرار می گیرد و زمانی ممکن است که گروه دوم بر روی یکی قرار گیرد موقعیتی ایجاد شود. من بررسی کردم، می تواند 🙂 برای دریافت تمام گروه فرآیند اضافه شده، از روش استفاده کنید

nipyapi.canvas.list_all_process_groups

و سپس می توانیم مثلاً با نام جستجو کنیم.

من روند به روز رسانی قالب را شرح نمی دهم، فقط می گویم که اگر پردازنده ها در نسخه جدید قالب اضافه شوند، هیچ مشکلی با وجود پیام در صف ها وجود ندارد. اما اگر پردازنده ها حذف شوند، ممکن است مشکلاتی ایجاد شود (نیفی اجازه حذف پردازنده را نمی دهد اگر یک صف پیام در مقابل آن جمع شده باشد). اگر علاقه مند هستید که چگونه این مشکل را حل کردم - برای من بنویسید، لطفاً در این مورد بحث خواهیم کرد. مخاطبین در انتهای مقاله بیایید به مرحله اضافه کردن یک گروه فرآیند برویم.

هنگام اشکال زدایی اسکریپت، با ویژگی مواجه شدم که آخرین نسخه جریان همیشه بالا نمی رود، بنابراین توصیه می کنم ابتدا این نسخه را روشن کنید:

nipyapi.versioning.get_latest_flow_ver

استقرار گروه فرآیند:

nipyapi.versioning.deploy_flow_version

ما پردازنده ها را شروع می کنیم:

nipyapi.canvas.schedule_process_group

در بلوک در مورد CLI نوشته شده بود که انتقال اطلاعات در گروه پردازش از راه دور به طور خودکار فعال نمی شود؟ هنگام پیاده سازی اسکریپت نیز با این مشکل مواجه شدم. در آن زمان، نمی‌توانستم انتقال داده را با استفاده از API شروع کنم و تصمیم گرفتم با توسعه‌دهنده کتابخانه NiPyAPI بنویسم و ​​از مشاوره / کمک بخواهم. توسعه دهنده به من پاسخ داد، ما در مورد مشکل صحبت کردیم و او نوشت که برای "بررسی چیزی" به زمان نیاز دارد. و الان چند روز بعد یه ایمیل میاد که توش یه تابع پایتون نوشته که مشکل راه اندازی من رو حل میکنه!!! در آن زمان نسخه NiPyAPI 0.13.3 بود و البته چیزی از این دست در آن وجود نداشت. اما در نسخه 0.14.0 که به تازگی منتشر شده است، این تابع قبلاً در کتابخانه گنجانده شده است. ملاقات

nipyapi.canvas.set_remote_process_group_transmission

بنابراین، با کمک کتابخانه NiPyAPI، ما رجیستری را متصل کردیم، جریان را جمع‌آوری کردیم و حتی پردازشگرها و انتقال داده‌ها را شروع کردیم. سپس می توانید کد را شانه کنید، انواع چک ها، ورود به سیستم را اضافه کنید و تمام. اما این یک داستان کاملا متفاوت است.

از میان گزینه‌های اتوماسیونی که در نظر گرفتم، دومی کارآمدترین به نظرم رسید. اولا، این هنوز هم کد پایتون است که می توانید کد برنامه کمکی را در آن جاسازی کنید و از مزایای زبان برنامه نویسی نهایت استفاده را ببرید. در مرحله دوم، پروژه NiPyAPI به طور فعال در حال توسعه است و در صورت بروز مشکل می توانید به توسعه دهنده نامه بنویسید. ثالثاً، NiPyAPI هنوز یک ابزار انعطاف‌پذیرتر برای تعامل با NiFi در حل مشکلات پیچیده است. به عنوان مثال، در تعیین اینکه آیا صف های پیام در حال حاضر در جریان خالی هستند و آیا امکان به روز رسانی گروه فرآیند وجود دارد یا خیر.

همین. من 3 رویکرد را برای خودکار کردن تحویل جریان در NiFi شرح دادم، مشکلاتی که یک توسعه دهنده ممکن است با آن مواجه شود و یک کد کاری برای تحویل خودکار ارائه کردم. اگر شما هم مثل من به این موضوع علاقه مند هستید - نوشتن!

منبع: www.habr.com

اضافه کردن نظر