Всім привіт!
Завдання полягає в наступному - є flow, представлений на зображенні вище, який треба розкотити на N серверів з
NiFi Site to Site (S2S) - безпечний, легко настроюється спосіб передачі даних між інстансами NiFi. Як працює S2S дивіться
У тих випадках, коли йдеться про передачу даних за допомогою S2S, один інстанс називається клієнтським, другий серверним. Клієнтський надсилає дані, серверний - приймає. Два способи налаштувати передачу даних між ними:
- Штовхати. З клієнтського інстансу дані надсилаються за допомогою Remote Process Group (RPG). На серверному інстансі дані приймаються за допомогою Input Port
- Потягнути. Сервер приймає дані за допомогою RPG, клієнт надсилає за допомогою Output port.
Flow для розкочування зберігаємо в Apache Registry.
Apache NiFi Registry - підпроект Apache NiFi, що представляє інструмент для зберігання flow та керування версіями. Такий собі GIT. Інформацію про встановлення, налаштування та роботу з registry можна знайти в
На старті, коли N мале число, flow доставляється та актуалізується руками за прийнятний час.
Але зі зростанням N, проблем стає більше:
- на актуалізацію flow йде більше часу. Треба зайти на всі сервери
- виникають помилки актуалізації шаблонів. Ось тут оновили, а тут забули
- помилки людини під час виконання великої кількості однотипних операцій
Все це підводить нас до того, що треба автоматизувати процес. Я пробував такі способи вирішення цього завдання:
- Використовувати MiNiFi замість NiFi
- NiFi CLI
- NiPyAPI
Використання MiNiFi
Вирішити таке завдання допоможе ще один підпроект – MiNiFi C2 Server. Цей продукт призначений для того, щоб бути центральною точкою в архітектурі розкочування конфігурацій. Як налаштувати оточення - описано в
Варіант, описаний у статті вище робітник і не складний для реалізації, але не слід забувати наступне:
- У minifi є не всі процесори з nifi
- Версії процесорів у Minifi відстають від версій процесорів у NiFi.
На момент написання публікації остання версія NiFi - 1.9.2. Версія процесорів останньої версії MiNiFi - 1.7.0. Процесори можна додавати до MiNiFi, але через розбіжність версій між процесорами NiFi та MiNiFi це може не спрацювати.
NiFi CLI
Судячи з
Запускаємо утиліту
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
Для того, щоб нам підвантажити необхідний flow із registry, нам треба знати ідентифікатори кошика (bucket identifier) та самого flow (flow identifier). Ці дані можна отримати через cli, або у веб-інтерфейсі NiFi registry. У веб-інтерфейсі виглядає так:
За допомогою CLI робиться так:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Запускаємо імпорт process group із registry:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
Важливий момент - як хост, на який ми накочуємо process group може бути вказаний будь-який інстанс nifi.
Process group доданий зі стопнутими процесорами, їх треба запустити
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Добре, процесори стартанули. Проте, за умовами завдання треба, щоб інстанси NiFi відправляли дані інші інстанси. Припустимо, що передачі даних на сервер вибрали спосіб Push. Для того, щоб організувати передачу даних, треба на доданому Remote Process Group (RPG), який вже включений до нашого flow включити передачу даних (Enable transmitting).
У документації CLI та інших джерел я не знайшов способу включити передачу даних. Якщо ви знаєте як це зробити – напишіть, будь ласка, у коментарях.
Раз у нас bash і ми готові йти до кінця — знайдемо вихід! Можна скористатися NiFi API для вирішення цієї проблеми. Скористаємося наступним методом, ID беремо з прикладів вище (у нашому випадку це 7f522a13-016e-1000-e504-d5b15587f2f3). Опис методів NiFi API
В body треба передати JSON, такого вигляду:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Параметри, які треба заповнити, щоб “запрацювало”:
були - Статус передачі даних. Доступно TRANSMITTING для увімкнення передачі даних, STOPPED для вимикання
версія - версія процесора
version за замовчуванням буде 0 при створенні, але ці параметри можна отримати за допомогою методу
Для любителів bash скриптів цей метод може здатися придатним, але мені важкувато - bash скрипти не моє улюблене. Наступний спосіб цікавіший і зручніший на мій погляд.
NiPyAPI
NiPyAPI — бібліотека мови Python для взаємодії з інстансами NiFi.
Наш скрипт для розкочування конфігурації - програма на мові Python. Переходимо до кодингу.
Налаштовуємо конфіги для подальшої роботи. Нам знадобляться такі параметри:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
Далі вставлятиму назви методів цієї бібліотеки, які описані
Підключаємо registry до інстансу nifi за допомогою
nipyapi.versioning.create_registry_client
На цьому кроці можна додати перевірку того, що registry вже до інстансу доданий, для цього можна скористатися методом
nipyapi.versioning.list_registry_clients
Знаходимо bucket для подальшого пошуку flow у кошику
nipyapi.versioning.get_registry_bucket
Знайденим bucket шукаємо flow
nipyapi.versioning.get_flow_in_bucket
Далі важливо зрозуміти, чи не доданий вже цей process group. Process group розміщується координатами і може скластися ситуація, коли поверх одного компонента накладеться другий. Я перевіряв, чи таке може бути 🙂 Щоб отримати всі додані process group використовуємо метод
nipyapi.canvas.list_all_process_groups
і далі можемо пошукати, наприклад, на ім'я.
Я не описуватиму процес оновлення шаблону, скажу лише, що якщо в новій версії шаблону процесори додаються, то проблем з наявністю повідомлень у чергах немає. А от якщо процесори видаляються, то проблеми можуть виникнути (nifi не дає видаляти процесор, якщо перед ним зібралася черга повідомлень). Якщо вам цікаво, як я вирішив цю проблему — напишіть мені, будь ласка, обговоримо цей момент. Контакти наприкінці статті. Перейдемо до кроку додавання group process.
При налагодженні скрипту я зіткнувся з особливістю, що не завжди підтягується остання версія flow, тому рекомендую цю версію уточнити:
nipyapi.versioning.get_latest_flow_ver
Деплоїм process group:
nipyapi.versioning.deploy_flow_version
Запускаємо процесори:
nipyapi.canvas.schedule_process_group
У блоці про CLI було написано, що remote process group автоматично не включається передача даних? При реалізації скрипту я зіштовхнувся із цією проблемою теж. На той момент, запустити передачу даних за допомогою API у мене не вдалося і я вирішив написати розробнику бібліотеки NiPyAPI і запитати поради/допомоги. Розробник мені відповів, ми обговорили проблему і він написав, що йому треба час перевірити дещо. І ось, через пару днів приходить лист, в якому написано функцію на Python, що вирішує мою проблему запуску! На той момент версія NiPyAPI була 0.13.3 і в ній, звичайно, нічого такого не було. А ось у версію 0.14.0, яка вийшла нещодавно, ця функція вже увійшла до складу бібліотеки. Зустрічайте,
nipyapi.canvas.set_remote_process_group_transmission
Отже, за допомогою бібліотеки NiPyAPI підключили registre, накотили flow і навіть запустили процесори та передачу даних. Далі можна зачісувати код, додавати всілякі перевірки, логування і це все. Але це вже зовсім інша історія.
З розглянутих мною варіантів автоматизації останній мені здався найпрацездатнішим. По-перше, це все ж таки код на python, в який можна вбудовувати допоміжний програмний код і користуватися всіма перевагами мови програмування. По-друге, проект NiPyAPI активно розвивається і у разі проблем можна написати розробнику. По-третє, NiPyAPI все ж таки більш гнучкий інструмент для взаємодії з NiFi у вирішенні складних завдань. Наприклад, у визначенні того чи порожні черги повідомлень зараз у flow і чи можна оновлювати process group.
На цьому все. Я описав три підходи до автоматизації доставки flow в NiFi, підводні камені, з якими може зіткнутися розробник і навів робочий код для автоматизації доставки. Якщо вас так само, як і мене цікавить ця тема
Джерело: habr.com