Аутоматизација испоруке протока у Апацхе НиФи

Поздрав свима!

Аутоматизација испоруке протока у Апацхе НиФи

Задатак је следећи - постоји ток, приказан на слици изнад, који треба да се развију на Н сервера са Апацхе НиФи. Тест протока - датотека се генерише и шаље другој НиФи инстанци. Пренос података се одвија коришћењем протокола НиФи Сите то Сите.

НиФи Сите то Сите (С2С) је сигуран, лако подесив начин за пренос података између НиФи инстанци. Како функционише С2С, погледајте документација и важно је да не заборавите да конфигуришете НиФи инстанцу да дозволи С2С, погледајте овде.

У случајевима када је реч о преносу података помоћу С2С, једна инстанца се зове клијент, друга сервер. Клијент шаље податке, сервер прима. Два начина да конфигуришете пренос података између њих:

  1. гурање. Са инстанце клијента, подаци се шаљу помоћу Ремоте Процесс Гроуп (РПГ). На инстанци сервера, подаци се примају помоћу улазног порта
  2. Вуци. Сервер прима податке користећи РПГ, клијент шаље користећи излазни порт.


Ток за увођење се чува у Апацхе регистру.

Апацхе НиФи Регистри је подпројекат Апацхе НиФи који обезбеђује алатку за складиштење тока и контролу верзија. Нека врста ГИТ-а. Информације о инсталацији, конфигурисању и раду са регистром можете пронаћи у званична документација. Ток за складиштење се комбинује у групу процеса и чува у овом облику у регистру. На ово ћемо се вратити касније у чланку.

На почетку, када је Н мали број, ток се испоручује и ажурира ручно у прихватљивом времену.

Али како Н расте, проблеми постају све бројнији:

  1. потребно је више времена за ажурирање тока. Морате се пријавити на све сервере
  2. Дошло је до грешака при ажурирању шаблона. Овде су га ажурирали, али овде су заборавили
  3. људске грешке при извођењу великог броја сличних операција

Све ово нас доводи до чињенице да је потребно да аутоматизујемо процес. Покушао сам на следеће начине да решим овај проблем:

  1. Користите МиНиФи уместо НиФи
  2. НиФи ЦЛИ
  3. НиПиАПИ

Користећи МиниФи

Апацхе МиНиФи - потпројекат Апацхе НиФи. МиНиФи је компактан агент који користи исте процесоре као НиФи, омогућавајући вам да креирате исте токове као у НиФи. Лагана природа агента се постиже, између осталог, чињеницом да МиНиФи нема графички интерфејс за конфигурацију тока. Недостатак графичког интерфејса у МиНиФи-у значи да је неопходно решити проблем испоруке тока на минифи. Пошто се МиНиФи активно користи у ИОТ-у, постоји много компоненти и процес испоруке тока коначним минифи инстанцама треба да буде аутоматизован. Познат задатак, зар не?

Још један подпројекат ће помоћи у решавању овог проблема - МиНиФи Ц2 Сервер. Овај производ је намењен да буде централна тачка у архитектури увођења конфигурације. Како конфигурисати окружење - описано у Овај чланак На Хабреу има довољно информација да се проблем реши. МиНиФи, заједно са Ц2 сервером, аутоматски ажурира своју конфигурацију. Једини недостатак овог приступа је што морате да креирате шаблоне на Ц2 серверу; једноставно урезивање у регистар није довољно.

Опција описана у горњем чланку ради и није тешка за имплементацију, али не смемо заборавити следеће:

  1. Минифи нема све нифи процесоре
  2. Верзије Минифи процесора заостају за верзијама НиФи процесора.

У време писања, најновија верзија НиФи је 1.9.2. Најновија верзија процесора МиНиФи је 1.7.0. Процесори се могу додати у МиНиФи, али због разлика у верзијама између НиФи и МиНиФи процесора, ово можда неће радити.

НиФи ЦЛИ

Судећи по Опис алат на званичном сајту, ово је алат за аутоматизацију интеракције између НиФИ и НиФи Регистра у области испоруке тока или управљања процесима. Да бисте започели, морате да преузмете овај алат. стога.

Покрените услужни програм

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Да бисмо учитали потребан ток из регистра, морамо да знамо идентификаторе буцкета (идентификатор буцкет) и самог тока (идентификатор тока). Ови подаци се могу добити или преко ЦЛИ-а или у веб интерфејсу НиФи регистра. У веб интерфејсу то изгледа овако:

Аутоматизација испоруке протока у Апацхе НиФи

Користећи ЦЛИ ово се ради:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Почињемо да увозимо процесну групу из регистра:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Важна ствар је да се било која нифи инстанца може навести као хост на који преносимо процесну групу.

Група процеса је додата са заустављеним процесорима, потребно их је покренути

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Одлично, процесори су почели. Међутим, према условима задатка, потребне су нам НиФи инстанце за слање података другим инстанцама. Претпоставимо да сте изабрали Пусх метод за пренос података на сервер. Да бисте организовали пренос података, потребно је да омогућите пренос података на додатој групи удаљених процеса (РПГ), која је већ укључена у наш ток.

Аутоматизација испоруке протока у Апацхе НиФи

У документацији у ЦЛИ и другим изворима нисам нашао начин да омогућим пренос података. Ако знате како то учинити, напишите у коментарима.

Пошто имамо баш и спремни смо да идемо до краја, наћи ћемо излаз! Можете користити НиФи АПИ да решите овај проблем. Хајде да употребимо следећи метод, узмимо ИД из примера изнад (у нашем случају то је 7ф522а13-016е-1000-е504-д5б15587ф2ф3). Опис метода НиФи АПИ овде.

Аутоматизација испоруке протока у Апацхе НиФи
У телу морате да проследите ЈСОН, овако:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Параметри које треба попунити да би функционисао:
били су — статус преноса података. Доступно: ПРЕНОС за омогућавање преноса података, ЗАУСТАВЉЕНО за онемогућавање
верзија - верзија процесора

верзија ће подразумевано бити 0 када се креира, али ови параметри се могу добити помоћу методе

Аутоматизација испоруке протока у Апацхе НиФи

За љубитеље басх скрипти, овај метод може изгледати прикладан, али ми је мало тежак - басх скрипте ми нису омиљене. Следећи метод је по мом мишљењу занимљивији и погоднији.

НиПиАПИ

НиПиАПИ је Питхон библиотека за интеракцију са НиФи инстанцама. Страница са документацијом садржи потребне информације за рад са библиотеком. Брзи почетак је описан у пројекат на гитхуб-у.

Наша скрипта за увођење конфигурације је програм у Питхон-у. Пређимо на кодирање.
Поставили смо конфигурације за даљи рад. Биће нам потребни следећи параметри:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Затим ћу уметнути називе метода ове библиотеке, које су описане овде.

Повежите регистар са нифи инстанцом користећи

nipyapi.versioning.create_registry_client

У овом кораку такође можете додати проверу да ли је регистар већ додат инстанци; за ово можете користити метод

nipyapi.versioning.list_registry_clients

У корпи налазимо канту за даље тражење протока

nipyapi.versioning.get_registry_bucket

Користећи пронађену канту, тражимо проток

nipyapi.versioning.get_flow_in_bucket

Затим је важно разумети да ли је ова група процеса већ додата. Група Процес је постављена према координатама и може настати ситуација када се друга компонента постави изнад једне. Проверио сам, ово може да се деси :) Да бисмо добили све додате групе процеса користимо метод

nipyapi.canvas.list_all_process_groups

Можемо даље претраживати, на пример, по имену.

Нећу описивати процес ажурирања шаблона, само ћу рећи да ако се процесори додају у нову верзију шаблона, онда нема проблема са присуством порука у редовима. Али ако се процесори уклоне, онда могу настати проблеми (нифи вам не дозвољава да уклоните процесор ако се испред њега накупио ред порука). Ако вас занима како сам решио овај проблем, пишите ми и разговараћемо о овом питању. Контакти на крају чланка. Пређимо на корак додавања групе процеса.

Када сам отклањао грешке у скрипти, наишао сам на посебност да се најновија верзија тока не повлачи увек, па препоручујем да прво проверите ову верзију:

nipyapi.versioning.get_latest_flow_ver

Група процеса распоређивања:

nipyapi.versioning.deploy_flow_version

Покрећемо процесоре:

nipyapi.canvas.schedule_process_group

У блоку о ЦЛИ је писало да се пренос података не омогућава аутоматски у групи удаљених процеса? Приликом имплементације скрипте, наишао сам и на овај проблем. У то време нисам могао да започнем пренос података помоћу АПИ-ја и одлучио сам да пишем програмеру библиотеке НиПиАПИ и затражим савет/помоћ. Програмер ми је одговорио, разговарали смо о проблему и он је написао да му треба времена да „нешто провери“. А онда, пар дана касније, стиже писмо у коме је написана функција у Питхон-у која решава мој проблем са покретањем!!! У то време, верзија НиПиАПИ била је 0.13.3 и, наравно, није било ништа слично. Али у верзији 0.14.0, која је недавно објављена, ова функција је већ била укључена у библиотеку. Сусрет,

nipyapi.canvas.set_remote_process_group_transmission

Дакле, користећи НиПиАПИ библиотеку, повезали смо регистар, покренули ток, па чак и покренули процесоре и пренос података. Затим можете прочешљати код, додати све врсте провера, евидентирати и то је све. Али то је сасвим друга прича.

Од опција аутоматизације које сам разматрао, последња ми се учинила најефикаснијом. Прво, ово је још увек Питхон код, у који можете да уградите помоћни програмски код и искористите све предности програмског језика. Друго, пројекат НиПиАПИ се активно развија и у случају проблема можете писати програмеру. Треће, НиПиАПИ је и даље флексибилнији алат за интеракцију са НиФи у решавању сложених проблема. На пример, приликом утврђивања да ли су редови порука сада празни у току и да ли се група процеса може ажурирати.

То је све. Описао сам 3 приступа аутоматизацији испоруке протока у НиФи-ју, замке на које програмер може наићи и дао радни код за аутоматизацију испоруке. Ако сте заинтересовани за ову тему као и ја - пиши!

Извор: ввв.хабр.цом

Додај коментар