Здраво на сите!
Задачата е како што следува - има проток прикажан на сликата погоре, кој треба да се префрли на N сервери со
NiFi Site to Site (S2S) е безбеден, високо приспособлив начин за пренос на податоци помеѓу примероци на NiFi. Погледнете како функционира S2S
Кога станува збор за пренос на податоци со помош на S2S, еден пример се нарекува клиент, вториот е сервер. Клиентот испраќа податоци, серверот ги прима. Два начини да поставите пренос на податоци меѓу нив:
- Притисни. Податоците се испраќаат од инстанцата на клиентот со помош на група за далечински процеси (RPG). На примерот на серверот, податоците се примаат со помош на влезната порта
- Повлечете. Серверот прима податоци користејќи RPG, клиентот испраќа со помош на излезната порта.
Протокот за тркалање е зачуван во регистарот на Apache.
Регистарот на Apache NiFi е потпроект на Apache NiFi кој обезбедува алатка за складирање на проток и верзија. Еден вид GIT. Информации за инсталирање, конфигурирање и работа со регистарот може да се најдат во
На почетокот, кога N е мал број, протокот се испорачува и ажурира рачно во разумно време.
Но, како што расте N, има повеќе проблеми:
- потребно е повеќе време за ажурирање на протокот. Треба да отидете на сите сервери
- има грешки при ажурирањето на шаблоните. Овде ажурираа, но овде заборавија
- човечка грешка при извршување на голем број слични операции
Сето ова нè доведува до фактот дека е неопходно да се автоматизира процесот. Се обидов на следниве начини да го решам овој проблем:
- Користете MiNiFi наместо NiFi
- NiFi CLI
- NiPyAPI
Користење на MiniFi
Друг подпроект, MiNiFi C2 Server, ќе помогне да се реши овој проблем. Овој производ е наменет да биде централна точка во архитектурата на распоредување. Како да ја конфигурирате околината - опишано во
Опцијата опишана во написот погоре работи и не е тешка за имплементација, но не смееме да го заборавиме следново:
- minifi ги нема сите процесори од nifi
- Верзиите на процесорот во Minifi заостануваат зад верзиите на процесорот во NiFi.
Во моментот на пишување, најновата верзија на NiFi е 1.9.2. Процесорската верзија на најновата верзија на MiNiFi е 1.7.0. Процесорите може да се додадат на MiNiFi, но поради несовпаѓање на верзијата помеѓу NiFi и MiniFi процесорите, ова може да не работи.
NiFi CLI
Судејќи според
Стартувај ја алатката
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
За да го вчитаме потребниот проток од регистарот, треба да ги знаеме идентификаторите на корпата (идентификатор на кофа) и самиот проток (идентификатор на проток). Овие податоци може да се добијат или преку cli или во веб-интерфејсот на регистарот NiFi. Веб интерфејсот изгледа вака:
Користејќи го CLI, го правите ова:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Извршете ја групата на процеси за увоз од регистарот:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
Важна точка е што секој примерок на nifi може да биде наведен како домаќин на кој ја превртуваме процесната група.
Додадена е процесна група со запрени процесори, тие треба да се стартуваат
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Одлично, процесорите почнаа. Сепак, според условите на проблемот, потребни ни се NiFi инстанци за испраќање податоци до други инстанци. Да претпоставиме дека методот Push е избран за пренос на податоци на серверот. За да се организира пренос на податоци, неопходно е да се овозможи пренос на податоци (Овозможи пренос) на додадената група за далечински процеси (RPG), која веќе е вклучена во нашиот тек.
Во документацијата во CLI и други извори, не најдов начин да овозможам пренос на податоци. Ако знаете како да го направите ова, пишете во коментарите.
Бидејќи имаме баш и сме спремни да одиме до крај, ќе најдеме излез! Можете да го користите NiFi API за да го решите овој проблем. Да го користиме следниов метод, го земаме ID од примерите погоре (во нашиот случај тоа е 7f522a13-016e-1000-e504-d5b15587f2f3). Опис на методите на NiFi API
Во телото, треба да поминете JSON, од следнава форма:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Параметри што мора да се пополнат за да „работат“:
беа — статус на пренос на податоци. Достапно TRANSMITTING за да се овозможи пренос на податоци, STOPPED за да се оневозможи
верзија - верзија на процесорот
верзијата стандардно ќе биде 0 кога ќе се креира, но овие параметри може да се добијат со користење на методот
За љубителите на баш скрипти, овој метод може да изгледа соодветен, но тешко ми е - баш скриптите не ми се омилени. Следниот начин е поинтересен и попогоден според мене.
NiPyAPI
NiPyAPI е библиотека на Python за интеракција со NiFi инстанци.
Нашата скрипта за воведување на конфигурацијата е програма на Python. Ајде да преминеме на кодирање.
Поставете конфигурации за понатамошна работа. Ќе ни требаат следниве параметри:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
Понатаму ќе ги внесам имињата на методите на оваа библиотека, кои се опишани
Ние го поврзуваме регистарот со примерот nifi користејќи
nipyapi.versioning.create_registry_client
На овој чекор, можете исто така да додадете проверка дека регистарот е веќе додаден во примерокот, за ова можете да го користите методот
nipyapi.versioning.list_registry_clients
Ја наоѓаме корпата за дополнително да го бараме протокот во корпата
nipyapi.versioning.get_registry_bucket
Според најдената кофа бараме проток
nipyapi.versioning.get_flow_in_bucket
Следно, важно е да се разбере дали оваа процесна група е веќе додадена. Процесната група се поставува по координати и може да се појави ситуација кога втора е надредена над една. Проверив, може да биде 🙂 За да ја добиете целата додадена процесна група, користете го методот
nipyapi.canvas.list_all_process_groups
а потоа можеме да бараме, на пример, по име.
Нема да го опишам процесот на ажурирање на шаблонот, само ќе кажам дека ако се додадат процесори во новата верзија на шаблонот, тогаш нема проблеми со присуството на пораки во редиците. Но, ако процесорите се отстранат, тогаш може да се појават проблеми (nifi не дозволува отстранување на процесорот ако се акумулира редица за пораки пред него). Ако ве интересира како го решив овој проблем - пишете ми, ве молам, ќе разговараме за оваа точка. Контакти на крајот од статијата. Ајде да преминеме на чекорот за додавање процесна група.
При дебагирање на скриптата, наидов на карактеристика дека најновата верзија на протокот не секогаш се повлекува, па затоа препорачувам прво да ја разјасните оваа верзија:
nipyapi.versioning.get_latest_flow_ver
Процесна група за распоредување:
nipyapi.versioning.deploy_flow_version
Ги започнуваме процесорите:
nipyapi.canvas.schedule_process_group
Во блокот за CLI, пишуваше дека преносот на податоци не е автоматски овозможен во далечинската процесна група? При спроведувањето на скриптата, наидов и на овој проблем. Во тоа време, не можев да започнам со пренос на податоци со помош на API и решив да му напишам на развивачот на библиотеката NiPyAPI и да побарам совет / помош. Програмерот ми одговори, разговаравме за проблемот и тој напиша дека му треба време да „провери нешто“. И сега, неколку дена подоцна, пристигнува е-пошта во која е напишана функцијата Python што ми го решава проблемот со стартувањето !!! Во тоа време, верзијата NiPyAPI беше 0.13.3 и, се разбира, немаше ништо од тој вид во неа. Но, во верзијата 0.14.0, која беше објавена неодамна, оваа функција е веќе вклучена во библиотеката. Запознајте се
nipyapi.canvas.set_remote_process_group_transmission
Така, со помош на библиотеката NiPyAPI, го поврзавме регистарот, го навивавме протокот, па дури и ги започнавме процесорите и преносот на податоци. Потоа можете да го чешлате кодот, да додавате секакви проверки, логирање, и тоа е тоа. Но, тоа е сосема друга приказна.
Од опциите за автоматизација што ги разгледав, второто ми се чинеше најефикасно. Прво, ова е сè уште python код, во кој можете да вградите помошен програмски код и целосно да ги искористите предностите на програмскиот јазик. Второ, проектот NiPyAPI активно се развива и во случај на проблеми можете да му пишете на развивачот. Трето, NiPyAPI е сè уште пофлексибилна алатка за интеракција со NiFi при решавање на сложени проблеми. На пример, при одредувањето дали редиците за пораки се моментално празни во текот и дали е можно да се ажурира процесната група.
Тоа е се. Јас опишав 3 пристапи за автоматизирање на испораката на проток во NiFi, стапици со кои може да се сретне развивачот и дадов работен код за автоматизирање на испораката. Ако сте исто толку заинтересирани за оваа тема како мене -
Извор: www.habr.com