Всем привет!
Задача заключается в следующем — есть flow, представленный на картинке выше, который надо раскатить на N серверов с
NiFi Site to Site (S2S) — безопасный, легко настраиваемый способ передачи данных между инстансами NiFi. Как работает S2S смотрите в
В тех случаях, когда речь идет о передачи данных с помощью S2S — один инстанс называется клиентским, второй серверным. Клиентский отправляет данные, серверный — принимает. Два способа настроить передачу данных между ними:
- Push. С клиентского инстанса данные отправляются с помощью Remote Process Group (RPG). На серверном инстансе данные принимаются с помощью Input Port
- Pull. Сервер принимает данные с помощью RPG, клиент отправляет с помощью Output port.
Flow для раскатки храним в Apache Registry.
Apache NiFi Registry — подпроект Apache NiFi, представляющий инструмент для хранения flow и управления версиями. Этакий GIT. Информацию об установке, настройке и работе с registry можно найти в
На старте, когда N малое число, flow доставляется и актуализируется руками за приемлемое время.
Но с ростом N, проблем становится больше:
- на актуализацию flow уходит больше времени. Надо зайти на все сервера
- возникают ошибки актуализации шаблонов. Вот тут обновили, а тут забыли
- ошибки человека при выполнении большого количества однотипных операций
Всё это подводит нас к тому, что надо автоматизировать процесс. Я пробовал следующие способы решения этой задачи:
- Использовать MiNiFi вместо NiFi
- NiFi CLI
- NiPyAPI
Использование MiNiFi
Решить такую задачу поможет еще один подпроект — MiNiFi C2 Server. Этот продукт предназначен для того, чтобы быть центральной точкой в архитектуре раскатки конфигураций. Как сконфигурировать окружение — описано в
Вариант, описанный в статье выше рабочий и не сложный для реализации, но надо не забывать следующее:
- В minifi есть не все процессоры из nifi
- Версии процессоров в Minifi отстают от версий процессоров в NiFi.
На момент написания публикации последняя версия NiFi — 1.9.2. Версия процессоров последней версии MiNiFi — 1.7.0. Процессоры можно добавлять в MiNiFi, но из-за расхождения версий между процессорами NiFi и MiNiFi это может не сработать.
NiFi CLI
Судя по
Запускаем утилиту
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
Для того, чтобы нам подгрузить необходимый flow из registry, нам надо знать идентификаторы корзины (bucket identifier) и самого flow (flow identifier). Эти данные можно получить либо через cli, либо в веб-интерфейсе NiFi registry. В веб интерфейсе выглядит так:
С помощью CLI делается так:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Запускаем импорт process group из registry:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
Важный момент — в качестве хоста, на который мы накатываем process group может быть указан любой инстанс nifi.
Process group добавлен со стопнутыми процессорами, их надо запустить
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Отлично, процессоры стартанули. Однако, нам по условиям задачи надо, чтобы инстансы NiFi отправляли данные на другие инстансы. Предположим, что для передачи данных на сервер выбрали способ Push. Для того, чтобы организовать передачу данных, надо на добавленном Remote Process Group (RPG), который уже включен в наш flow включить передачу данных (Enable transmitting).
В документации в CLI и других источниках я не нашел способа включить передачу данных. Если вы знаете как это сделать — напишите, пожалуйста, в комментариях.
Раз уж у нас bash и мы готовы идти до конца — найдем выход! Можно воспользоваться NiFi API для решения этой проблемы. Воспользуемся следующим методом, ID берем из примеров выше (в нашем случае это 7f522a13-016e-1000-e504-d5b15587f2f3). Описание методов NiFi API
В body надо передать JSON, следующего вида:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Параметры, которые надо заполнить, чтобы “заработало”:
state — статус передачи данных. Доступно TRANSMITTING для включения передачи данных, STOPPED для выключения
version — версия процессора
version по умолчанию будет 0 при создании, но эти параметры можно получить используя метод
Для любителей bash скриптов данный метод может показаться пригодным, но мне тяжеловато — bash скрипты не самое мое любимое. Следующий способ поинтереснее и поудобнее на мой взгляд.
NiPyAPI
NiPyAPI — библиотека для языка Python для взаимодействия с инстансами NiFi.
Наш скрипт для раскатки конфигурации — программка на языке Python. Переходим к кодингу.
Настраиваем конфиги для дальнейшей работы. Нам понадобятся следующие параметры:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
Дальше буду вставлять названия методов этой библиотеки, которые описаны
Подключаем registry к инстансу nifi с помощью
nipyapi.versioning.create_registry_client
На этом шаге можно еще добавить проверку того, что registry уже к инстансу добавлен, для этого можно воспользоваться методом
nipyapi.versioning.list_registry_clients
Находим bucket для дальнейшего поиска flow в корзине
nipyapi.versioning.get_registry_bucket
По найденному bucket ищем flow
nipyapi.versioning.get_flow_in_bucket
Дальше важно понять а не добавлен ли уже этот process group. Process group размещается по координатам и может сложиться ситуация, когда поверх одного компонента наложится второй. Я проверял, такое может быть 🙂 Чтобы получить все добавленные process group используем метод
nipyapi.canvas.list_all_process_groups
и дальше можем поискать, например по имени.
Я не буду описывать процесс обновления шаблона, скажу лишь, что если в новой версии шаблона процессоры добавляются, то проблем с наличием сообщений в очередях нет. А вот если процессоры удаляются, то проблемы могут возникнуть (nifi не дает удалять процессор, если перед ним скопилась очередь сообщений). Если вам интересно как я решил эту проблему — напишите мне, пожалуйста, обсудим этот момент. Контакты в конце статьи. Перейдем к шагу добавления process group.
При отладке скрипта я столкнулся с особенностью, что не всегда подтягивается последняя версия flow, поэтому рекомендую сначала эту версию уточнить:
nipyapi.versioning.get_latest_flow_ver
Деплоим process group:
nipyapi.versioning.deploy_flow_version
Запускаем процессоры:
nipyapi.canvas.schedule_process_group
В блоке про CLI было написано, что в remote process group автоматически не включается передача данных? При реализации скрипта я столкнулся с этой проблемой тоже. На тот момент, запустить передачу данных с помощью API у меня не получилось и я решил написать разработчику библиотеки NiPyAPI и спросить совета/помощи. Разработчик мне ответил, мы обсудили проблему и он написал, что ему надо время “проверить кое-что”. И вот, спустя пару дней приходит письмо, в котором написана функция на Python, решающая мою проблему запуска!!! На тот момент версия NiPyAPI была 0.13.3 и в ней, конечно же, ничего такого не было. А вот в версию 0.14.0, которая вышла совсем недавно, эта функция уже вошла в составе библиотеки. Встречайте,
nipyapi.canvas.set_remote_process_group_transmission
Итак, с помощью библиотеки NiPyAPI подключили registry, накатили flow и даже запустили процессоры и передачу данных. Дальше можно причесывать код, добавлять всевозможные проверки, логирование и вот это всё. Но это уже совсем другая история.
Из рассмотренных мною вариантов автоматизации последний мне показался самым работоспособным. Во-первых, это все же код на python, в который можно встраивать вспомогательный программный код и пользоваться всеми преимуществами языка программирования. Во-вторых, проект NiPyAPI активно развивается и в случае проблем можно написать разработчику. В-третьих, NiPyAPI все же более гибкий инструмент для взаимодействия с NiFi в решении сложных задач. Например, в определении того пустые ли очереди сообщений сейчас в flow и можно ли обновлять process group.
На этом все. Я описал 3 подхода к автоматизации доставки flow в NiFi, подводные камни, с которыми может столкнуться разработчик и привел рабочий код для автоматизации доставки. Если вас так же, как и меня интересует эта тема —
Источник: habr.com