Автоматизація доставки flow в Apache NiFi

Всім привіт!

Автоматизація доставки flow в Apache NiFi

Завдання полягає в наступному - є flow, представлений на зображенні вище, який треба розкотити на N серверів з Apache NiFi. Flow тестовий - йде генерація файлу та відправка в іншу інстанс NiFi. Передача даних відбувається за допомогою протоколу NiFi Site to Site.

NiFi Site to Site (S2S) - безпечний, легко настроюється спосіб передачі даних між інстансами NiFi. Як працює S2S дивіться документації і важливо не забути налаштувати інстанс NiFi, щоб дозволити S2S дивіться тут.

У тих випадках, коли йдеться про передачу даних за допомогою S2S, один інстанс називається клієнтським, другий серверним. Клієнтський надсилає дані, серверний - приймає. Два способи налаштувати передачу даних між ними:

  1. Штовхати. З клієнтського інстансу дані надсилаються за допомогою Remote Process Group (RPG). На серверному інстансі дані приймаються за допомогою Input Port
  2. Потягнути. Сервер приймає дані за допомогою RPG, клієнт надсилає за допомогою Output port.


Flow для розкочування зберігаємо в Apache Registry.

Apache NiFi Registry - підпроект Apache NiFi, що представляє інструмент для зберігання flow та керування версіями. Такий собі GIT. Інформацію про встановлення, налаштування та роботу з registry можна знайти в офіційної документації. Flow для зберігання об'єднується в process group і у такому вигляді зберігається в registry. Далі у статті до цього ще повернемось.

На старті, коли N мале число, flow доставляється та актуалізується руками за прийнятний час.

Але зі зростанням N, проблем стає більше:

  1. на актуалізацію flow йде більше часу. Треба зайти на всі сервери
  2. виникають помилки актуалізації шаблонів. Ось тут оновили, а тут забули
  3. помилки людини під час виконання великої кількості однотипних операцій

Все це підводить нас до того, що треба автоматизувати процес. Я пробував такі способи вирішення цього завдання:

  1. Використовувати MiNiFi замість NiFi
  2. NiFi CLI
  3. NiPyAPI

Використання MiNiFi

Apache MiNiFy - Підпроект Apache NiFi. MiNiFy - компактний агент, що використовує ті ж процесори, що і NiFi, що дозволяє створювати ті ж flow, що і в NiFi. Легкість агента досягається в тому числі за рахунок того, що у MiNiFy немає графічного інтерфейсу для конфігурації flow. Відсутність графічного інтерфейсу MiNiFy означає, що необхідно вирішувати проблему доставки flow в minifi. Оскільки, MiNiFy активно використовується в IOT, багато компонентів і процес доставки flow до кінцевих екземплярів minifi треба автоматизувати. Знайоме завдання, правда?

Вирішити таке завдання допоможе ще один підпроект – MiNiFi C2 Server. Цей продукт призначений для того, щоб бути центральною точкою в архітектурі розкочування конфігурацій. Як налаштувати оточення - описано в цієї статті на Хабре та інформації достатньо для вирішення поставленого завдання. MiNiFi у зв'язку з C2 server автоматичному режимі оновлює конфігурацію у себе. Єдиний недолік такого підходу - доводиться створювати шаблони на C2 Server, простого комміта в registry не достатньо.

Варіант, описаний у статті вище робітник і не складний для реалізації, але не слід забувати наступне:

  1. У minifi є не всі процесори з nifi
  2. Версії процесорів у Minifi відстають від версій процесорів у NiFi.

На момент написання публікації остання версія NiFi - 1.9.2. Версія процесорів останньої версії MiNiFi - 1.7.0. Процесори можна додавати до MiNiFi, але через розбіжність версій між процесорами NiFi та MiNiFi це може не спрацювати.

NiFi CLI

Судячи з описом інструменту на офіційному сайті, це інструмент для автоматизації взаємодії NiFI та NiFi Registry в області доставки flow або управління процесами. Для початку роботи цей інструмент необхідно завантажити звідси.

Запускаємо утиліту

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Для того, щоб нам підвантажити необхідний flow із registry, нам треба знати ідентифікатори кошика (bucket identifier) ​​та самого flow (flow identifier). Ці дані можна отримати через cli, або у веб-інтерфейсі NiFi registry. У веб-інтерфейсі виглядає так:

Автоматизація доставки flow в Apache NiFi

За допомогою CLI робиться так:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Запускаємо імпорт process group із registry:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Важливий момент - як хост, на який ми накочуємо process group може бути вказаний будь-який інстанс nifi.

Process group доданий зі стопнутими процесорами, їх треба запустити

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Добре, процесори стартанули. Проте, за умовами завдання треба, щоб інстанси NiFi відправляли дані інші інстанси. Припустимо, що передачі даних на сервер вибрали спосіб Push. Для того, щоб організувати передачу даних, треба на доданому Remote Process Group (RPG), який вже включений до нашого flow включити передачу даних (Enable transmitting).

Автоматизація доставки flow в Apache NiFi

У документації CLI та інших джерел я не знайшов способу включити передачу даних. Якщо ви знаєте як це зробити – напишіть, будь ласка, у коментарях.

Раз у нас bash і ми готові йти до кінця — знайдемо вихід! Можна скористатися NiFi API для вирішення цієї проблеми. Скористаємося наступним методом, ID беремо з прикладів вище (у нашому випадку це 7f522a13-016e-1000-e504-d5b15587f2f3). Опис методів NiFi API тут.

Автоматизація доставки flow в Apache NiFi
В body треба передати JSON, такого вигляду:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Параметри, які треба заповнити, щоб “запрацювало”:
були - Статус передачі даних. Доступно TRANSMITTING для увімкнення передачі даних, STOPPED для вимикання
версія - версія процесора

version за замовчуванням буде 0 при створенні, але ці параметри можна отримати за допомогою методу

Автоматизація доставки flow в Apache NiFi

Для любителів bash скриптів цей метод може здатися придатним, але мені важкувато - bash скрипти не моє улюблене. Наступний спосіб цікавіший і зручніший на мій погляд.

NiPyAPI

NiPyAPI — бібліотека мови Python для взаємодії з інстансами NiFi. Сторінка із документацією містить необхідну інформацію для роботи з бібліотекою. Quick start описаний у проекті github.

Наш скрипт для розкочування конфігурації - програма на мові Python. Переходимо до кодингу.
Налаштовуємо конфіги для подальшої роботи. Нам знадобляться такі параметри:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Далі вставлятиму назви методів цієї бібліотеки, які описані тут.

Підключаємо registry до інстансу nifi за допомогою

nipyapi.versioning.create_registry_client

На цьому кроці можна додати перевірку того, що registry вже до інстансу доданий, для цього можна скористатися методом

nipyapi.versioning.list_registry_clients

Знаходимо bucket для подальшого пошуку flow у кошику

nipyapi.versioning.get_registry_bucket

Знайденим bucket шукаємо flow

nipyapi.versioning.get_flow_in_bucket

Далі важливо зрозуміти, чи не доданий вже цей process group. Process group розміщується координатами і може скластися ситуація, коли поверх одного компонента накладеться другий. Я перевіряв, чи таке може бути 🙂 Щоб отримати всі додані process group використовуємо метод

nipyapi.canvas.list_all_process_groups

і далі можемо пошукати, наприклад, на ім'я.

Я не описуватиму процес оновлення шаблону, скажу лише, що якщо в новій версії шаблону процесори додаються, то проблем з наявністю повідомлень у чергах немає. А от якщо процесори видаляються, то проблеми можуть виникнути (nifi не дає видаляти процесор, якщо перед ним зібралася черга повідомлень). Якщо вам цікаво, як я вирішив цю проблему — напишіть мені, будь ласка, обговоримо цей момент. Контакти наприкінці статті. Перейдемо до кроку додавання group process.

При налагодженні скрипту я зіткнувся з особливістю, що не завжди підтягується остання версія flow, тому рекомендую цю версію уточнити:

nipyapi.versioning.get_latest_flow_ver

Деплоїм process group:

nipyapi.versioning.deploy_flow_version

Запускаємо процесори:

nipyapi.canvas.schedule_process_group

У блоці про CLI було написано, що remote process group автоматично не включається передача даних? При реалізації скрипту я зіштовхнувся із цією проблемою теж. На той момент, запустити передачу даних за допомогою API у мене не вдалося і я вирішив написати розробнику бібліотеки NiPyAPI і запитати поради/допомоги. Розробник мені відповів, ми обговорили проблему і він написав, що йому треба час перевірити дещо. І ось, через пару днів приходить лист, в якому написано функцію на Python, що вирішує мою проблему запуску! На той момент версія NiPyAPI була 0.13.3 і в ній, звичайно, нічого такого не було. А ось у версію 0.14.0, яка вийшла нещодавно, ця функція вже увійшла до складу бібліотеки. Зустрічайте,

nipyapi.canvas.set_remote_process_group_transmission

Отже, за допомогою бібліотеки NiPyAPI підключили registre, накотили flow і навіть запустили процесори та передачу даних. Далі можна зачісувати код, додавати всілякі перевірки, логування і це все. Але це вже зовсім інша історія.

З розглянутих мною варіантів автоматизації останній мені здався найпрацездатнішим. По-перше, це все ж таки код на python, в який можна вбудовувати допоміжний програмний код і користуватися всіма перевагами мови програмування. По-друге, проект NiPyAPI активно розвивається і у разі проблем можна написати розробнику. По-третє, NiPyAPI все ж таки більш гнучкий інструмент для взаємодії з NiFi у вирішенні складних завдань. Наприклад, у визначенні того чи порожні черги повідомлень зараз у flow і чи можна оновлювати process group.

На цьому все. Я описав три підходи до автоматизації доставки flow в NiFi, підводні камені, з якими може зіткнутися розробник і навів робочий код для автоматизації доставки. Якщо вас так само, як і мене цікавить ця тема пишіть!

Джерело: habr.com

Додати коментар або відгук