Автоматизация на доставката на поток в Apache NiFi

Здравейте на всички!

Автоматизация на доставката на поток в Apache NiFi

Задачата е следната - има поток, показан на снимката по-горе, който трябва да бъде разгърнат на N сървъра с Apache NiFi. Тест на потока - генерира се файл и се изпраща към друг екземпляр на NiFi. Прехвърлянето на данни се осъществява с помощта на протокола NiFi Site to Site.

NiFi Site to Site (S2S) е сигурен, силно адаптивен начин за прехвърляне на данни между NiFi екземпляри. Вижте как работи S2S документация и е важно да не забравяте да настроите вашия NiFi екземпляр, за да позволи S2S see тук.

Когато става въпрос за пренос на данни чрез S2S, един екземпляр се нарича клиент, вторият е сървър. Клиентът изпраща данни, сървърът ги получава. Два начина за настройка на трансфер на данни между тях:

  1. Тласък. Данните се изпращат от екземпляра на клиента с помощта на Remote Process Group (RPG). На екземпляра на сървъра данните се получават чрез входния порт
  2. Дърпам. Сървърът получава данни с помощта на RPG, клиентът изпраща с помощта на изходния порт.


Потокът за преобръщане се съхранява в регистъра на Apache.

Регистърът на Apache NiFi е подпроект на Apache NiFi, който предоставя инструмент за съхранение на поток и версии. Един вид GIT. Информация за инсталиране, конфигуриране и работа с регистъра можете да намерите в официална документация. Потокът за съхранение се комбинира в група процеси и се съхранява в регистъра в тази форма. Ще се върнем към това по-късно в статията.

В началото, когато N е малко число, потокът се доставя и актуализира ръчно за разумно време.

Но с нарастването на N има повече проблеми:

  1. отнема повече време за актуализиране на потока. Трябва да отидете на всички сървъри
  2. има грешки при актуализиране на шаблони. Тук са актуализирали, но тук са забравили
  3. човешка грешка при извършване на голям брой подобни операции

Всичко това ни води до факта, че е необходимо процесът да се автоматизира. Опитах следните начини за разрешаване на този проблем:

  1. Използвайте MiNiFi вместо NiFi
  2. NiFi CLI
  3. NiPyAPI

Използване на MiNiFi

ApacheMiNify е подпроект на Apache NiFi. MiNiFy е компактен агент, който използва същите процесори като NiFi, което ви позволява да създадете същия поток като в NiFi. Лекотата на агента се постига, наред с други неща, поради факта, че MiNiFy няма графичен интерфейс за конфигурацията на потока. Липсата на графичен интерфейс на MiNiFy означава, че е необходимо да се реши проблемът с доставката на потока в minifi. Тъй като MiNiFy се използва активно в IOT, има много компоненти и процесът на доставяне на поток до крайните инстанции на minifi трябва да бъде автоматизиран. Позната задача, нали?

Друг подпроект, MiNiFi C2 Server, ще помогне за решаването на този проблем. Този продукт е предназначен да бъде централната точка в архитектурата за внедряване. Как да конфигурирате средата - описано в тази статия на Хабре и информацията е достатъчна за решаване на проблема. MiNiFi във връзка със сървъра C2 автоматично актуализира своята конфигурация. Единственият недостатък на този подход е, че трябва да създавате шаблони на C2 сървъра, просто ангажимент към регистъра не е достатъчен.

Опцията, описана в статията по-горе, работи и не е трудна за изпълнение, но не трябва да забравяме следното:

  1. minifi няма всички процесори от nifi
  2. Версиите на процесора в Minifi изостават от версиите на процесора в NiFi.

Към момента на писане най-новата версия на NiFi е 1.9.2. Версията на процесора на последната версия на MiNiFi е 1.7.0. Процесорите могат да се добавят към MiNiFi, но поради несъответствия във версиите между процесорите NiFi и MiNiFi това може да не работи.

NiFi CLI

Съдейки по описание инструмент на официалния уебсайт, това е инструмент за автоматизиране на взаимодействието между NiFI и NiFi Registry в областта на доставката на потока или управлението на процеси. Изтеглете този инструмент, за да започнете. следователно.

Стартирайте помощната програма

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

За да заредим необходимия поток от регистъра, трябва да знаем идентификаторите на кошницата (идентификатор на кофата) и самия поток (идентификатор на потока). Тези данни могат да бъдат получени или чрез cli, или в уеб интерфейса на регистъра NiFi. Уеб интерфейсът изглежда така:

Автоматизация на доставката на поток в Apache NiFi

С помощта на CLI правите следното:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Стартирайте група процеси за импортиране от системния регистър:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Важен момент е, че всяко nifi копие може да бъде посочено като хост, на който прехвърляме групата процеси.

Добавена е група процеси със спрени процесори, те трябва да бъдат стартирани

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Супер, процесорите започнаха. Въпреки това, според условията на проблема, имаме нужда от NiFi инстанции, за да изпращаме данни до други инстанции. Да приемем, че методът Push е избран за прехвърляне на данни към сървъра. За да организирате прехвърлянето на данни, е необходимо да активирате прехвърлянето на данни (Разрешаване на предаване) на добавената група за отдалечени процеси (RPG), която вече е включена в нашия поток.

Автоматизация на доставката на поток в Apache NiFi

В документацията в CLI и други източници не намерих начин да активирам трансфер на данни. Ако знаете как да направите това, моля, напишете в коментарите.

Тъй като имаме баш и сме готови да отидем до края, ще намерим изход! Можете да използвате NiFi API, за да разрешите този проблем. Нека използваме следния метод, вземаме ID от примерите по-горе (в нашия случай това е 7f522a13-016e-1000-e504-d5b15587f2f3). Описание на NiFi API методи тук.

Автоматизация на доставката на поток в Apache NiFi
В тялото трябва да подадете JSON в следната форма:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Параметри, които трябва да бъдат попълнени, за да „работи“:
са — статус на трансфер на данни. Налично ПРЕДАВАНЕ за активиране на прехвърлянето на данни, СПРЯНО за деактивиране
версия - версия на процесора

версията по подразбиране ще бъде 0, когато бъде създадена, но тези параметри могат да бъдат получени с помощта на метода

Автоматизация на доставката на поток в Apache NiFi

За любителите на bash скриптове този метод може да изглежда подходящ, но за мен е трудно - bash скриптовете не са ми любими. Следващият начин е по-интересен и по-удобен според мен.

NiPyAPI

NiPyAPI е библиотека на Python за взаимодействие с екземпляри на NiFi. Страница с документация съдържа необходимата информация за работа с библиотеката. Бързият старт е описан в проект в github.

Нашият скрипт за внедряване на конфигурацията е програма на Python. Да преминем към кодирането.
Настройте конфигурации за по-нататъшна работа. Ще ни трябват следните параметри:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

По-нататък ще вмъкна имената на методите на тази библиотека, които са описани тук.

Свързваме регистъра към екземпляра nifi, използвайки

nipyapi.versioning.create_registry_client

На тази стъпка можете също да добавите проверка, че регистърът вече е добавен към екземпляра, за това можете да използвате метода

nipyapi.versioning.list_registry_clients

Намираме кофата за по-нататъшно търсене на потока в кошницата

nipyapi.versioning.get_registry_bucket

Според намерената кофа търсим течение

nipyapi.versioning.get_flow_in_bucket

След това е важно да разберете дали тази група процеси вече е добавена. Процесната група е разположена по координати и може да възникне ситуация, когато втора се насложи върху една. Проверих, може да бъде 🙂 За да получите цялата добавена група процеси, използвайте метода

nipyapi.canvas.list_all_process_groups

и тогава можем да търсим, например, по име.

Няма да описвам процеса на актуализиране на шаблона, ще кажа само, че ако в новата версия на шаблона се добавят процесори, тогава няма проблеми с наличието на съобщения в опашките. Но ако процесорите бъдат премахнати, тогава могат да възникнат проблеми (nifi не позволява премахването на процесора, ако пред него се е натрупала опашка от съобщения). Ако се интересувате как реших този проблем - пишете ми, моля, ще обсъдим тази точка. Контакти в края на статията. Нека да преминем към стъпката на добавяне на група процеси.

При отстраняване на грешки в скрипта се натъкнах на функция, че най-новата версия на flow не винаги се изтегля, така че ви препоръчвам първо да изясните тази версия:

nipyapi.versioning.get_latest_flow_ver

Разположете група процеси:

nipyapi.versioning.deploy_flow_version

Стартираме процесорите:

nipyapi.canvas.schedule_process_group

В блока за CLI беше написано, че прехвърлянето на данни не се активира автоматично в отдалечената група процеси? При внедряването на скрипта се сблъсках и с този проблем. По това време не можах да започна прехвърляне на данни с помощта на API и реших да пиша на разработчика на библиотеката NiPyAPI и да помоля за съвет/помощ. Разработчикът ми отговори, обсъдихме проблема и той написа, че има нужда от време, за да „провери нещо“. И сега, няколко дни по-късно, пристига имейл, в който е написана функция на Python, която решава проблема ми при стартиране !!! По това време версията на NiPyAPI беше 0.13.3 и, разбира се, нямаше нищо подобно в нея. Но във версия 0.14.0, която беше пусната съвсем наскоро, тази функция вече е включена в библиотеката. Среща

nipyapi.canvas.set_remote_process_group_transmission

И така, с помощта на библиотеката NiPyAPI свързахме регистъра, навихме потока и дори стартирахме процесорите и трансфера на данни. След това можете да разресвате кода, да добавяте всякакви проверки, регистриране и това е всичко. Но това е съвсем различна история.

От вариантите за автоматизация, които разгледах, последният ми се стори най-ефективен. Първо, това все още е код на Python, в който можете да вградите спомагателен програмен код и да се насладите на всички предимства на езика за програмиране. Второ, проектът NiPyAPI се развива активно и в случай на проблеми можете да пишете на разработчика. Трето, NiPyAPI все още е по-гъвкав инструмент за взаимодействие с NiFi при решаване на сложни проблеми. Например при определяне дали опашките със съобщения в момента са празни в потока и дали е възможно да се актуализира групата процеси.

Това е всичко. Описах 3 подхода за автоматизиране на доставката на потока в NiFi, клопките, които може да срещне разработчикът, и предоставих работещ код за автоматизиране на доставката. Ако и вие се интересувате от тази тема като мен - пиши!

Източник: www.habr.com

Добавяне на нов коментар