Аўтаматызацыя дастаўкі flow у Apache NiFi

Усім прывітанне!

Аўтаматызацыя дастаўкі flow у Apache NiFi

Задача заключаецца ў наступным – ёсць flow, прадстаўлены на малюнку вышэй, які трэба раскаціць на N сервераў з Apache NiFi. Flow тэставы - ідзе генерацыя файла і адпраўка ў іншы інстанс NiFi. Перадача даных адбываецца з дапамогай пратакола NiFi Site to Site.

NiFi Site to Site (S2S) - бяспечны, лёгка наладжвальны спосаб перадачы дадзеных паміж інстансамі NiFi. Як працуе S2S глядзіце ў дакументацыі і важна не забыцца наладзіць інстанс NiFi, каб дазволіць S2S глядзіце тут.

У тых выпадках, калі гаворка ідзе аб перадачы дадзеных з дапамогай S2S – адзін інстанс называецца кліенцкім, другі серверным. Кліенцкі адпраўляе дадзеныя, серверны - прымае. Два спосабы наладзіць перадачу дадзеных паміж імі:

  1. Штурхаць. З кліенцкага інстансу дадзеныя адпраўляюцца з дапамогай Remote Process Group (RPG). На сервернай інстансе дадзеныя прымаюцца з дапамогай Input Port
  2. Пацягнуць. Сервер прымае дадзеныя з дапамогай RPG, кліент дасылае з дапамогай Output port.


Flow для раскочвання захоўваемы ў Apache Registry.

Apache NiFi Registry - падпраект Apache NiFi, які прадстаўляе інструмент для захоўвання flow і кіравання версіямі. Гэткі GIT. Інфармацыю аб усталёўцы, наладзе і працы з registry можна знайсці ў афіцыйнай дакументацыі. Flow для захоўвання аб'ядноўваецца ў process group і ў такім выглядзе захоўваецца ў registry. Далей у артыкуле да гэтага яшчэ вернемся.

На старце, калі N малы лік, flow дастаўляецца і актуалізуецца рукамі за прымальны час.

Але з ростам N, праблем становіцца больш:

  1. на актуалізацыю flow сыходзіць больш часу. Трэба зайсці на ўсе серверы
  2. узнікаюць памылкі актуалізацыі шаблонаў. Вось тут абнавілі, а тут забыліся
  3. памылкі чалавека пры выкананні вялікай колькасці аднатыпных аперацый

Усё гэта падводзіць нас да таго, што трэба аўтаматызаваць працэс. Я спрабаваў наступныя спосабы рашэння гэтай задачы:

  1. Выкарыстоўваць MiNiFi замест NiFi
  2. NiFi CLI
  3. NiPyAPI

Выкарыстанне MiNiFi

Apache MiNiFy - падпраект Apache NiFi. MiNiFy - кампактны агент, які выкарыстоўвае тыя ж самыя працэсары, што і NiFi, які дазваляе ствараць тыя ж flow, што і ў NiFi. Лёгкавага агента дасягаецца ў тым ліку за кошт таго, што ў MiNiFy няма графічнага інтэрфейсу для канфігурацыі flow. Адсутнасць графічнага інтэрфейсу ў MiNiFy азначае, што неабходна вырашаць праблему дастаўкі flow у minifi. Паколькі, MiNiFy актыўна выкарыстоўваецца ў IOT, кампанентаў шмат і працэс дастаўкі flow да канчатковых асобнікаў minifi трэба аўтаматызаваць. Знаёмая задача, праўда?

Вырашыць такую ​​задачу дапаможа яшчэ адзін падпраект – MiNiFi C2 Server. Гэты прадукт прызначаны для таго, каб быць цэнтральнай кропкай у архітэктуры раскочвання канфігурацый. Як сканфігураваць асяроддзе - апісана ў гэтым артыкуле на Хабры і інфармацыі дастаткова для вырашэння пастаўленай задачы. MiNiFi у звязку з C2 server аўтаматычным рэжыме абнаўляе канфігурацыю ў сябе. Адзіны недахоп такога падыходу – даводзіцца ствараць шаблоны на C2 Server, простага коміта ў registry не дастаткова.

Варыянт, апісаны ў артыкуле вышэй працоўны і не складаны для рэалізацыі, але трэба не забываць наступнае:

  1. У minifi ёсць не ўсе працэсары з nifi
  2. Версіі працэсараў у Minifi адстаюць ад версій працэсараў у NiFi.

На момант напісання публікацыі апошняя версія NiFi – 1.9.2. Версія працэсараў апошняй версіі MiNiFi – 1.7.0. Працэсары можна дадаваць у MiNiFi, але з-за разыходжанні версій паміж працэсарамі NiFi і MiNiFi гэта можа не спрацаваць.

NiFi CLI

Мяркуючы па апісанні прылады на афіцыйным сайце, гэта прылада для аўтаматызацыі ўзаемадзеяння NiFI і NiFi Registry у вобласці дастаўкі flow ці кіраванні працэсамі. Для пачатку працы гэты інструмент неабходна спампаваць адсюль.

Запускаем утыліту

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Для таго, каб нам падгрузіць неабходны flow з registry, нам трэба ведаць ідэнтыфікатары кошыка (bucket identifier) ​​і самога flow (flow identifier). Гэтыя дадзеныя можна атрымаць альбо праз cli, альбо ў вэб-інтэрфейсе NiFi registry. У вэб інтэрфейсе выглядае так:

Аўтаматызацыя дастаўкі flow у Apache NiFi

З дапамогай CLI робіцца так:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Запускаем імпарт process group з registry:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Важны момант - у якасці хаста, на які мы накатваем process group можа быць паказаны любы інстанс nifi.

Process group дададзены са стопнутымі працэсарамі, іх трэба запусціць

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Выдатна, працэсары стартанулі. Аднак, нам па ўмовах задачы трэба, каб інстансы NiFi адпраўлялі дадзеныя на іншыя інстансы. Выкажам здагадку, што для перадачы дадзеных на сервер абралі спосаб Push. Для таго, каб арганізаваць перадачу дадзеных, трэба на дададзеным Remote Process Group (RPG), які ўжо ўключаны ў наш flow уключыць перадачу дадзеных (Enable transmitting).

Аўтаматызацыя дастаўкі flow у Apache NiFi

У дакументацыі ў CLI і іншых крыніцах я не знайшоў спосабу ўключыць перадачу даных. Калі вы ведаеце як гэта зрабіць - напішыце, калі ласка, у каментарах.

Калі ўжо ў нас bash і мы гатовы ісці да канца — знойдзем выйсце! Можна скарыстацца NiFi API для вырашэння гэтай праблемы. Скарыстаемся наступным метадам, ID бярэм з прыкладаў вышэй (у нашым выпадку гэта 7f522a13-016e-1000-e504-d5b15587f2f3). Апісанне метадаў NiFi API тут.

Аўтаматызацыя дастаўкі flow у Apache NiFi
У body трэба перадаць JSON, наступнага выгляду:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Параметры, якія трэба запоўніць, каб "зарабіла":
былі - Статус перадачы дадзеных. Даступна TRANSMITTING для ўключэння перадачы дадзеных, STOPPED для выключэння
версія - версія працэсара

version па змаўчанні будзе 0 пры стварэнні, але гэтыя параметры можна атрымаць выкарыстоўваючы метад

Аўтаматызацыя дастаўкі flow у Apache NiFi

Для аматараў bash скрыптоў дадзены метад можа здацца прыдатным, але мне цяжкавата – bash скрыпты не самае маё каханае. Наступны спосаб больш цікавы і зручней на мой погляд.

NiPyAPI

NiPyAPI - бібліятэка для мовы Python для ўзаемадзеяння з інстансамі NiFi. Старонка з дакументацыяй змяшчае неабходную інфармацыю для работы з бібліятэкай. Quick start апісаны ў праекце на github.

Наш скрыпт для раскочвання канфігурацыі – праграма на мове Python. Пераходзім да кодынгу.
Наладжваем канфігі для далейшай працы. Нам спатрэбяцца наступныя параметры:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Далей буду ўстаўляць назвы метадаў гэтай бібліятэкі, якія апісаны. тут.

Падлучальны registry да инстансу nifi з дапамогай

nipyapi.versioning.create_registry_client

На гэтым кроку можна яшчэ дадаць праверку таго, што registry ужо да інстансу дададзены, для гэтага можна скарыстацца метадам

nipyapi.versioning.list_registry_clients

Знаходзім bucket для далейшага пошуку flow у кошыку

nipyapi.versioning.get_registry_bucket

Па знойдзеным bucket шукаем flow

nipyapi.versioning.get_flow_in_bucket

Далей важна зразумець а ці не дададзены ўжо гэты process group. Process group размяшчаецца па каардынатах і можа скласціся сітуацыя, калі па-над адным кампанентам накладзецца другі. Я правяраў, такое можа быць 🙂 Каб атрымаць усе дададзеныя process group выкарыстоўваем метад

nipyapi.canvas.list_all_process_groups

і далей можам пашукаць, напрыклад па імі.

Я не буду апісваць працэс абнаўлення шаблону, скажу толькі, што калі ў новай версіі шаблону працэсары дадаюцца, то праблем з наяўнасцю паведамленняў у чэргах няма. А вось калі працэсары выдаляюцца, то праблемы могуць узнікнуць (nifi не дае выдаляць працэсар, калі перад ім сабралася чарга паведамленняў). Калі вам цікава як я вырашыў гэтую праблему - напішыце мне, калі ласка, абмяркуем гэты момант. Кантакты ў канцы артыкула. Пяройдзем да кроку дадання process group.

Пры адладцы скрыпту я сутыкнуўся з асаблівасцю, што не заўсёды падцягваецца апошняя версія flow, таму рэкамендую спачатку гэтую версію ўдакладніць:

nipyapi.versioning.get_latest_flow_ver

Дэплоім process group:

nipyapi.versioning.deploy_flow_version

Запускаем працэсары:

nipyapi.canvas.schedule_process_group

У блоку пра CLI было напісана, што ў remote process group аўтаматычна не ўключаецца перадача даных? Пры рэалізацыі скрыпту я сутыкнуўся з гэтай праблемай таксама. На той момант, запусціць перадачу дадзеных з дапамогай API у мяне не атрымалася і я вырашыў напісаць распрацоўніку бібліятэкі NiPyAPI і спытаць рады/дапамогі. Распрацоўнік мне адказаў, мы абмеркавалі праблему і ён напісаў, што яму трэба час "праверыць сёе-тое". І вось, праз пару дзён прыходзіць ліст, у якім напісана функцыя на Python, вырашальная маю праблему запуску!!! На той момант версія NiPyAPI была 0.13.3 і ў ёй, вядома, нічога такога не было. А вось у версію 0.14.0, якая выйшла зусім нядаўна, гэтая функцыя ўжо ўвайшла ў складзе бібліятэкі. Сустракайце,

nipyapi.canvas.set_remote_process_group_transmission

Такім чынам, з дапамогай бібліятэкі NiPyAPI падлучылі registry, накацілі flow і нават запусцілі працэсары і перадачу дадзеных. Далей можна прычэсваць код, дадаваць разнастайныя праверкі, лагіраванне і вось гэта ўсё. Але гэта ўжо зусім іншая гісторыя.

З разгледжаных мною варыянтаў аўтаматызацыі апошні мне падаўся самым працаздольным. Па-першае, гэта ўсё ж код на python, у які можна ўбудоўваць дапаможны праграмны код і карыстацца ўсімі перавагамі мовы праграмавання. Па-другое, праект NiPyAPI актыўна развіваецца і ў выпадку праблем можна напісаць распрацоўніку. Па-трэцяе, NiPyAPI усё ж больш гнуткая прылада для ўзаемадзеяння з NiFi у рашэнні складаных задач. Напрыклад, у вызначэнні таго ці пустыя чэргі паведамленняў зараз у flow і ці можна абнаўляць process group.

На гэтым усё. Я апісаў 3 падыходу да аўтаматызацыі дастаўкі flow у NiFi, падводныя камяні, з якімі можа сутыкнуцца распрацоўшчык і прывёў працоўны код для аўтаматызацыі дастаўкі. Калі вас гэтак жа, як і мяне цікавіць гэтая тэма - пішыце!

Крыніца: habr.com

Дадаць каментар