Здравейте на всички!
Задачата е следната - има поток, показан на снимката по-горе, който трябва да бъде разгърнат на N сървъра с
NiFi Site to Site (S2S) е сигурен, силно адаптивен начин за прехвърляне на данни между NiFi екземпляри. Вижте как работи S2S
Когато става въпрос за пренос на данни чрез S2S, един екземпляр се нарича клиент, вторият е сървър. Клиентът изпраща данни, сървърът ги получава. Два начина за настройка на трансфер на данни между тях:
- Тласък. Данните се изпращат от екземпляра на клиента с помощта на Remote Process Group (RPG). На екземпляра на сървъра данните се получават чрез входния порт
- Дърпам. Сървърът получава данни с помощта на RPG, клиентът изпраща с помощта на изходния порт.
Потокът за преобръщане се съхранява в регистъра на Apache.
Регистърът на Apache NiFi е подпроект на Apache NiFi, който предоставя инструмент за съхранение на поток и версии. Един вид GIT. Информация за инсталиране, конфигуриране и работа с регистъра можете да намерите в
В началото, когато N е малко число, потокът се доставя и актуализира ръчно за разумно време.
Но с нарастването на N има повече проблеми:
- отнема повече време за актуализиране на потока. Трябва да отидете на всички сървъри
- има грешки при актуализиране на шаблони. Тук са актуализирали, но тук са забравили
- човешка грешка при извършване на голям брой подобни операции
Всичко това ни води до факта, че е необходимо процесът да се автоматизира. Опитах следните начини за разрешаване на този проблем:
- Използвайте MiNiFi вместо NiFi
- NiFi CLI
- NiPyAPI
Използване на MiNiFi
Друг подпроект, MiNiFi C2 Server, ще помогне за решаването на този проблем. Този продукт е предназначен да бъде централната точка в архитектурата за внедряване. Как да конфигурирате средата - описано в
Опцията, описана в статията по-горе, работи и не е трудна за изпълнение, но не трябва да забравяме следното:
- minifi няма всички процесори от nifi
- Версиите на процесора в Minifi изостават от версиите на процесора в NiFi.
Към момента на писане най-новата версия на NiFi е 1.9.2. Версията на процесора на последната версия на MiNiFi е 1.7.0. Процесорите могат да се добавят към MiNiFi, но поради несъответствия във версиите между процесорите NiFi и MiNiFi това може да не работи.
NiFi CLI
Съдейки по
Стартирайте помощната програма
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
За да заредим необходимия поток от регистъра, трябва да знаем идентификаторите на кошницата (идентификатор на кофата) и самия поток (идентификатор на потока). Тези данни могат да бъдат получени или чрез cli, или в уеб интерфейса на регистъра NiFi. Уеб интерфейсът изглежда така:
С помощта на CLI правите следното:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Стартирайте група процеси за импортиране от системния регистър:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
Важен момент е, че всяко nifi копие може да бъде посочено като хост, на който прехвърляме групата процеси.
Добавена е група процеси със спрени процесори, те трябва да бъдат стартирани
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Супер, процесорите започнаха. Въпреки това, според условията на проблема, имаме нужда от NiFi инстанции, за да изпращаме данни до други инстанции. Да приемем, че методът Push е избран за прехвърляне на данни към сървъра. За да организирате прехвърлянето на данни, е необходимо да активирате прехвърлянето на данни (Разрешаване на предаване) на добавената група за отдалечени процеси (RPG), която вече е включена в нашия поток.
В документацията в CLI и други източници не намерих начин да активирам трансфер на данни. Ако знаете как да направите това, моля, напишете в коментарите.
Тъй като имаме баш и сме готови да отидем до края, ще намерим изход! Можете да използвате NiFi API, за да разрешите този проблем. Нека използваме следния метод, вземаме ID от примерите по-горе (в нашия случай това е 7f522a13-016e-1000-e504-d5b15587f2f3). Описание на NiFi API методи
В тялото трябва да подадете JSON в следната форма:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Параметри, които трябва да бъдат попълнени, за да „работи“:
са — статус на трансфер на данни. Налично ПРЕДАВАНЕ за активиране на прехвърлянето на данни, СПРЯНО за деактивиране
версия - версия на процесора
версията по подразбиране ще бъде 0, когато бъде създадена, но тези параметри могат да бъдат получени с помощта на метода
За любителите на bash скриптове този метод може да изглежда подходящ, но за мен е трудно - bash скриптовете не са ми любими. Следващият начин е по-интересен и по-удобен според мен.
NiPyAPI
NiPyAPI е библиотека на Python за взаимодействие с екземпляри на NiFi.
Нашият скрипт за внедряване на конфигурацията е програма на Python. Да преминем към кодирането.
Настройте конфигурации за по-нататъшна работа. Ще ни трябват следните параметри:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
По-нататък ще вмъкна имената на методите на тази библиотека, които са описани
Свързваме регистъра към екземпляра nifi, използвайки
nipyapi.versioning.create_registry_client
На тази стъпка можете също да добавите проверка, че регистърът вече е добавен към екземпляра, за това можете да използвате метода
nipyapi.versioning.list_registry_clients
Намираме кофата за по-нататъшно търсене на потока в кошницата
nipyapi.versioning.get_registry_bucket
Според намерената кофа търсим течение
nipyapi.versioning.get_flow_in_bucket
След това е важно да разберете дали тази група процеси вече е добавена. Процесната група е разположена по координати и може да възникне ситуация, когато втора се насложи върху една. Проверих, може да бъде 🙂 За да получите цялата добавена група процеси, използвайте метода
nipyapi.canvas.list_all_process_groups
и тогава можем да търсим, например, по име.
Няма да описвам процеса на актуализиране на шаблона, ще кажа само, че ако в новата версия на шаблона се добавят процесори, тогава няма проблеми с наличието на съобщения в опашките. Но ако процесорите бъдат премахнати, тогава могат да възникнат проблеми (nifi не позволява премахването на процесора, ако пред него се е натрупала опашка от съобщения). Ако се интересувате как реших този проблем - пишете ми, моля, ще обсъдим тази точка. Контакти в края на статията. Нека да преминем към стъпката на добавяне на група процеси.
При отстраняване на грешки в скрипта се натъкнах на функция, че най-новата версия на flow не винаги се изтегля, така че ви препоръчвам първо да изясните тази версия:
nipyapi.versioning.get_latest_flow_ver
Разположете група процеси:
nipyapi.versioning.deploy_flow_version
Стартираме процесорите:
nipyapi.canvas.schedule_process_group
В блока за CLI беше написано, че прехвърлянето на данни не се активира автоматично в отдалечената група процеси? При внедряването на скрипта се сблъсках и с този проблем. По това време не можах да започна прехвърляне на данни с помощта на API и реших да пиша на разработчика на библиотеката NiPyAPI и да помоля за съвет/помощ. Разработчикът ми отговори, обсъдихме проблема и той написа, че има нужда от време, за да „провери нещо“. И сега, няколко дни по-късно, пристига имейл, в който е написана функция на Python, която решава проблема ми при стартиране !!! По това време версията на NiPyAPI беше 0.13.3 и, разбира се, нямаше нищо подобно в нея. Но във версия 0.14.0, която беше пусната съвсем наскоро, тази функция вече е включена в библиотеката. Среща
nipyapi.canvas.set_remote_process_group_transmission
И така, с помощта на библиотеката NiPyAPI свързахме регистъра, навихме потока и дори стартирахме процесорите и трансфера на данни. След това можете да разресвате кода, да добавяте всякакви проверки, регистриране и това е всичко. Но това е съвсем различна история.
От вариантите за автоматизация, които разгледах, последният ми се стори най-ефективен. Първо, това все още е код на Python, в който можете да вградите спомагателен програмен код и да се насладите на всички предимства на езика за програмиране. Второ, проектът NiPyAPI се развива активно и в случай на проблеми можете да пишете на разработчика. Трето, NiPyAPI все още е по-гъвкав инструмент за взаимодействие с NiFi при решаване на сложни проблеми. Например при определяне дали опашките със съобщения в момента са празни в потока и дали е възможно да се актуализира групата процеси.
Това е всичко. Описах 3 подхода за автоматизиране на доставката на потока в NiFi, клопките, които може да срещне разработчикът, и предоставих работещ код за автоматизиране на доставката. Ако и вие се интересувате от тази тема като мен -
Източник: www.habr.com