Усім прывітанне!
Задача заключаецца ў наступным – ёсць flow, прадстаўлены на малюнку вышэй, які трэба раскаціць на N сервераў з
NiFi Site to Site (S2S) - бяспечны, лёгка наладжвальны спосаб перадачы дадзеных паміж інстансамі NiFi. Як працуе S2S глядзіце ў
У тых выпадках, калі гаворка ідзе аб перадачы дадзеных з дапамогай S2S – адзін інстанс называецца кліенцкім, другі серверным. Кліенцкі адпраўляе дадзеныя, серверны - прымае. Два спосабы наладзіць перадачу дадзеных паміж імі:
- Штурхаць. З кліенцкага інстансу дадзеныя адпраўляюцца з дапамогай Remote Process Group (RPG). На сервернай інстансе дадзеныя прымаюцца з дапамогай Input Port
- Пацягнуць. Сервер прымае дадзеныя з дапамогай RPG, кліент дасылае з дапамогай Output port.
Flow для раскочвання захоўваемы ў Apache Registry.
Apache NiFi Registry - падпраект Apache NiFi, які прадстаўляе інструмент для захоўвання flow і кіравання версіямі. Гэткі GIT. Інфармацыю аб усталёўцы, наладзе і працы з registry можна знайсці ў
На старце, калі N малы лік, flow дастаўляецца і актуалізуецца рукамі за прымальны час.
Але з ростам N, праблем становіцца больш:
- на актуалізацыю flow сыходзіць больш часу. Трэба зайсці на ўсе серверы
- узнікаюць памылкі актуалізацыі шаблонаў. Вось тут абнавілі, а тут забыліся
- памылкі чалавека пры выкананні вялікай колькасці аднатыпных аперацый
Усё гэта падводзіць нас да таго, што трэба аўтаматызаваць працэс. Я спрабаваў наступныя спосабы рашэння гэтай задачы:
- Выкарыстоўваць MiNiFi замест NiFi
- NiFi CLI
- NiPyAPI
Выкарыстанне MiNiFi
Вырашыць такую задачу дапаможа яшчэ адзін падпраект – MiNiFi C2 Server. Гэты прадукт прызначаны для таго, каб быць цэнтральнай кропкай у архітэктуры раскочвання канфігурацый. Як сканфігураваць асяроддзе - апісана ў
Варыянт, апісаны ў артыкуле вышэй працоўны і не складаны для рэалізацыі, але трэба не забываць наступнае:
- У minifi ёсць не ўсе працэсары з nifi
- Версіі працэсараў у Minifi адстаюць ад версій працэсараў у NiFi.
На момант напісання публікацыі апошняя версія NiFi – 1.9.2. Версія працэсараў апошняй версіі MiNiFi – 1.7.0. Працэсары можна дадаваць у MiNiFi, але з-за разыходжанні версій паміж працэсарамі NiFi і MiNiFi гэта можа не спрацаваць.
NiFi CLI
Мяркуючы па
Запускаем утыліту
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
Для таго, каб нам падгрузіць неабходны flow з registry, нам трэба ведаць ідэнтыфікатары кошыка (bucket identifier) і самога flow (flow identifier). Гэтыя дадзеныя можна атрымаць альбо праз cli, альбо ў вэб-інтэрфейсе NiFi registry. У вэб інтэрфейсе выглядае так:
З дапамогай CLI робіцца так:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Запускаем імпарт process group з registry:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
Важны момант - у якасці хаста, на які мы накатваем process group можа быць паказаны любы інстанс nifi.
Process group дададзены са стопнутымі працэсарамі, іх трэба запусціць
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Выдатна, працэсары стартанулі. Аднак, нам па ўмовах задачы трэба, каб інстансы NiFi адпраўлялі дадзеныя на іншыя інстансы. Выкажам здагадку, што для перадачы дадзеных на сервер абралі спосаб Push. Для таго, каб арганізаваць перадачу дадзеных, трэба на дададзеным Remote Process Group (RPG), які ўжо ўключаны ў наш flow уключыць перадачу дадзеных (Enable transmitting).
У дакументацыі ў CLI і іншых крыніцах я не знайшоў спосабу ўключыць перадачу даных. Калі вы ведаеце як гэта зрабіць - напішыце, калі ласка, у каментарах.
Калі ўжо ў нас bash і мы гатовы ісці да канца — знойдзем выйсце! Можна скарыстацца NiFi API для вырашэння гэтай праблемы. Скарыстаемся наступным метадам, ID бярэм з прыкладаў вышэй (у нашым выпадку гэта 7f522a13-016e-1000-e504-d5b15587f2f3). Апісанне метадаў NiFi API
У body трэба перадаць JSON, наступнага выгляду:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Параметры, якія трэба запоўніць, каб "зарабіла":
былі - Статус перадачы дадзеных. Даступна TRANSMITTING для ўключэння перадачы дадзеных, STOPPED для выключэння
версія - версія працэсара
version па змаўчанні будзе 0 пры стварэнні, але гэтыя параметры можна атрымаць выкарыстоўваючы метад
Для аматараў bash скрыптоў дадзены метад можа здацца прыдатным, але мне цяжкавата – bash скрыпты не самае маё каханае. Наступны спосаб больш цікавы і зручней на мой погляд.
NiPyAPI
NiPyAPI - бібліятэка для мовы Python для ўзаемадзеяння з інстансамі NiFi.
Наш скрыпт для раскочвання канфігурацыі – праграма на мове Python. Пераходзім да кодынгу.
Наладжваем канфігі для далейшай працы. Нам спатрэбяцца наступныя параметры:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
Далей буду ўстаўляць назвы метадаў гэтай бібліятэкі, якія апісаны.
Падлучальны registry да инстансу nifi з дапамогай
nipyapi.versioning.create_registry_client
На гэтым кроку можна яшчэ дадаць праверку таго, што registry ужо да інстансу дададзены, для гэтага можна скарыстацца метадам
nipyapi.versioning.list_registry_clients
Знаходзім bucket для далейшага пошуку flow у кошыку
nipyapi.versioning.get_registry_bucket
Па знойдзеным bucket шукаем flow
nipyapi.versioning.get_flow_in_bucket
Далей важна зразумець а ці не дададзены ўжо гэты process group. Process group размяшчаецца па каардынатах і можа скласціся сітуацыя, калі па-над адным кампанентам накладзецца другі. Я правяраў, такое можа быць 🙂 Каб атрымаць усе дададзеныя process group выкарыстоўваем метад
nipyapi.canvas.list_all_process_groups
і далей можам пашукаць, напрыклад па імі.
Я не буду апісваць працэс абнаўлення шаблону, скажу толькі, што калі ў новай версіі шаблону працэсары дадаюцца, то праблем з наяўнасцю паведамленняў у чэргах няма. А вось калі працэсары выдаляюцца, то праблемы могуць узнікнуць (nifi не дае выдаляць працэсар, калі перад ім сабралася чарга паведамленняў). Калі вам цікава як я вырашыў гэтую праблему - напішыце мне, калі ласка, абмяркуем гэты момант. Кантакты ў канцы артыкула. Пяройдзем да кроку дадання process group.
Пры адладцы скрыпту я сутыкнуўся з асаблівасцю, што не заўсёды падцягваецца апошняя версія flow, таму рэкамендую спачатку гэтую версію ўдакладніць:
nipyapi.versioning.get_latest_flow_ver
Дэплоім process group:
nipyapi.versioning.deploy_flow_version
Запускаем працэсары:
nipyapi.canvas.schedule_process_group
У блоку пра CLI было напісана, што ў remote process group аўтаматычна не ўключаецца перадача даных? Пры рэалізацыі скрыпту я сутыкнуўся з гэтай праблемай таксама. На той момант, запусціць перадачу дадзеных з дапамогай API у мяне не атрымалася і я вырашыў напісаць распрацоўніку бібліятэкі NiPyAPI і спытаць рады/дапамогі. Распрацоўнік мне адказаў, мы абмеркавалі праблему і ён напісаў, што яму трэба час "праверыць сёе-тое". І вось, праз пару дзён прыходзіць ліст, у якім напісана функцыя на Python, вырашальная маю праблему запуску!!! На той момант версія NiPyAPI была 0.13.3 і ў ёй, вядома, нічога такога не было. А вось у версію 0.14.0, якая выйшла зусім нядаўна, гэтая функцыя ўжо ўвайшла ў складзе бібліятэкі. Сустракайце,
nipyapi.canvas.set_remote_process_group_transmission
Такім чынам, з дапамогай бібліятэкі NiPyAPI падлучылі registry, накацілі flow і нават запусцілі працэсары і перадачу дадзеных. Далей можна прычэсваць код, дадаваць разнастайныя праверкі, лагіраванне і вось гэта ўсё. Але гэта ўжо зусім іншая гісторыя.
З разгледжаных мною варыянтаў аўтаматызацыі апошні мне падаўся самым працаздольным. Па-першае, гэта ўсё ж код на python, у які можна ўбудоўваць дапаможны праграмны код і карыстацца ўсімі перавагамі мовы праграмавання. Па-другое, праект NiPyAPI актыўна развіваецца і ў выпадку праблем можна напісаць распрацоўніку. Па-трэцяе, NiPyAPI усё ж больш гнуткая прылада для ўзаемадзеяння з NiFi у рашэнні складаных задач. Напрыклад, у вызначэнні таго ці пустыя чэргі паведамленняў зараз у flow і ці можна абнаўляць process group.
На гэтым усё. Я апісаў 3 падыходу да аўтаматызацыі дастаўкі flow у NiFi, падводныя камяні, з якімі можа сутыкнуцца распрацоўшчык і прывёў працоўны код для аўтаматызацыі дастаўкі. Калі вас гэтак жа, як і мяне цікавіць гэтая тэма -
Крыніца: habr.com