Автоматизација за испорака на проток во Apache NiFi

Здраво на сите!

Автоматизација за испорака на проток во Apache NiFi

Задачата е како што следува - има проток прикажан на сликата погоре, кој треба да се префрли на N сервери со Apache NiFi. Тест на проток - се генерира датотека и се испраќа до друг примерок на NiFi. Преносот на податоци се случува со помош на протоколот NiFi Site to Site.

NiFi Site to Site (S2S) е безбеден, високо приспособлив начин за пренос на податоци помеѓу примероци на NiFi. Погледнете како функционира S2S документација и важно е да запомните да го поставите вашиот примерок на NiFi за да дозволите S2S да гледа тука.

Кога станува збор за пренос на податоци со помош на S2S, еден пример се нарекува клиент, вториот е сервер. Клиентот испраќа податоци, серверот ги прима. Два начини да поставите пренос на податоци меѓу нив:

  1. Притисни. Податоците се испраќаат од инстанцата на клиентот со помош на група за далечински процеси (RPG). На примерот на серверот, податоците се примаат со помош на влезната порта
  2. Повлечете. Серверот прима податоци користејќи RPG, клиентот испраќа со помош на излезната порта.


Протокот за тркалање е зачуван во регистарот на Apache.

Регистарот на Apache NiFi е потпроект на Apache NiFi кој обезбедува алатка за складирање на проток и верзија. Еден вид GIT. Информации за инсталирање, конфигурирање и работа со регистарот може да се најдат во официјална документација. Протокот за складирање е комбиниран во процесна група и се складира во регистарот во оваа форма. Ќе се вратиме на ова подоцна во статијата.

На почетокот, кога N е мал број, протокот се испорачува и ажурира рачно во разумно време.

Но, како што расте N, има повеќе проблеми:

  1. потребно е повеќе време за ажурирање на протокот. Треба да отидете на сите сервери
  2. има грешки при ажурирањето на шаблоните. Овде ажурираа, но овде заборавија
  3. човечка грешка при извршување на голем број слични операции

Сето ова нè доведува до фактот дека е неопходно да се автоматизира процесот. Се обидов на следниве начини да го решам овој проблем:

  1. Користете MiNiFi наместо NiFi
  2. NiFi CLI
  3. NiPyAPI

Користење на MiniFi

ApacheMiNify е потпроект на Apache NiFi. MiNiFy е компактен агент кој ги користи истите процесори како NiFi, што ви овозможува да го креирате истиот проток како во NiFi. Леснотијата на агентот се постигнува, меѓу другото, и поради фактот што MiNiFy нема графички интерфејс за конфигурацијата на протокот. Недостатокот на графички интерфејс на MiNiFy значи дека е неопходно да се реши проблемот со испораката на проток во минифи. Бидејќи MiNiFy активно се користи во IOT, има многу компоненти и процесот на доставување проток до конечните минифи инстанци мора да биде автоматизиран. Позната задача, нели?

Друг подпроект, MiNiFi C2 Server, ќе помогне да се реши овој проблем. Овој производ е наменет да биде централна точка во архитектурата на распоредување. Како да ја конфигурирате околината - опишано во овој напис на Хабре и информациите се доволни за да се реши проблемот. MiNiFi во врска со серверот C2 автоматски ја ажурира својата конфигурација. Единствениот недостаток на овој пристап е што треба да креирате шаблони на серверот C2, едноставното обврзување на регистарот не е доволно.

Опцијата опишана во написот погоре работи и не е тешка за имплементација, но не смееме да го заборавиме следново:

  1. minifi ги нема сите процесори од nifi
  2. Верзиите на процесорот во Minifi заостануваат зад верзиите на процесорот во NiFi.

Во моментот на пишување, најновата верзија на NiFi е 1.9.2. Процесорската верзија на најновата верзија на MiNiFi е 1.7.0. Процесорите може да се додадат на MiNiFi, но поради несовпаѓање на верзијата помеѓу NiFi и MiniFi процесорите, ова може да не работи.

NiFi CLI

Судејќи според опис алатка на официјалната веб-страница, ова е алатка за автоматизирање на интеракцијата помеѓу NiFI и NiFi регистарот во областа на испорака на проток или управување со процеси. Преземете ја оваа алатка за да започнете. оттука.

Стартувај ја алатката

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

За да го вчитаме потребниот проток од регистарот, треба да ги знаеме идентификаторите на корпата (идентификатор на кофа) и самиот проток (идентификатор на проток). Овие податоци може да се добијат или преку cli или во веб-интерфејсот на регистарот NiFi. Веб интерфејсот изгледа вака:

Автоматизација за испорака на проток во Apache NiFi

Користејќи го CLI, го правите ова:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Извршете ја групата на процеси за увоз од регистарот:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Важна точка е што секој примерок на nifi може да биде наведен како домаќин на кој ја превртуваме процесната група.

Додадена е процесна група со запрени процесори, тие треба да се стартуваат

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Одлично, процесорите почнаа. Сепак, според условите на проблемот, потребни ни се NiFi инстанци за испраќање податоци до други инстанци. Да претпоставиме дека методот Push е избран за пренос на податоци на серверот. За да се организира пренос на податоци, неопходно е да се овозможи пренос на податоци (Овозможи пренос) на додадената група за далечински процеси (RPG), која веќе е вклучена во нашиот тек.

Автоматизација за испорака на проток во Apache NiFi

Во документацијата во CLI и други извори, не најдов начин да овозможам пренос на податоци. Ако знаете како да го направите ова, пишете во коментарите.

Бидејќи имаме баш и сме спремни да одиме до крај, ќе најдеме излез! Можете да го користите NiFi API за да го решите овој проблем. Да го користиме следниов метод, го земаме ID од примерите погоре (во нашиот случај тоа е 7f522a13-016e-1000-e504-d5b15587f2f3). Опис на методите на NiFi API тука.

Автоматизација за испорака на проток во Apache NiFi
Во телото, треба да поминете JSON, од следнава форма:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Параметри што мора да се пополнат за да „работат“:
беа — статус на пренос на податоци. Достапно TRANSMITTING за да се овозможи пренос на податоци, STOPPED за да се оневозможи
верзија - верзија на процесорот

верзијата стандардно ќе биде 0 кога ќе се креира, но овие параметри може да се добијат со користење на методот

Автоматизација за испорака на проток во Apache NiFi

За љубителите на баш скрипти, овој метод може да изгледа соодветен, но тешко ми е - баш скриптите не ми се омилени. Следниот начин е поинтересен и попогоден според мене.

NiPyAPI

NiPyAPI е библиотека на Python за интеракција со NiFi инстанци. Страница за документација ги содржи потребните информации за работа со библиотеката. Брзиот почеток е опишан во нацрт на github.

Нашата скрипта за воведување на конфигурацијата е програма на Python. Ајде да преминеме на кодирање.
Поставете конфигурации за понатамошна работа. Ќе ни требаат следниве параметри:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Понатаму ќе ги внесам имињата на методите на оваа библиотека, кои се опишани тука.

Ние го поврзуваме регистарот со примерот nifi користејќи

nipyapi.versioning.create_registry_client

На овој чекор, можете исто така да додадете проверка дека регистарот е веќе додаден во примерокот, за ова можете да го користите методот

nipyapi.versioning.list_registry_clients

Ја наоѓаме корпата за дополнително да го бараме протокот во корпата

nipyapi.versioning.get_registry_bucket

Според најдената кофа бараме проток

nipyapi.versioning.get_flow_in_bucket

Следно, важно е да се разбере дали оваа процесна група е веќе додадена. Процесната група се поставува по координати и може да се појави ситуација кога втора е надредена над една. Проверив, може да биде 🙂 За да ја добиете целата додадена процесна група, користете го методот

nipyapi.canvas.list_all_process_groups

а потоа можеме да бараме, на пример, по име.

Нема да го опишам процесот на ажурирање на шаблонот, само ќе кажам дека ако се додадат процесори во новата верзија на шаблонот, тогаш нема проблеми со присуството на пораки во редиците. Но, ако процесорите се отстранат, тогаш може да се појават проблеми (nifi не дозволува отстранување на процесорот ако се акумулира редица за пораки пред него). Ако ве интересира како го решив овој проблем - пишете ми, ве молам, ќе разговараме за оваа точка. Контакти на крајот од статијата. Ајде да преминеме на чекорот за додавање процесна група.

При дебагирање на скриптата, наидов на карактеристика дека најновата верзија на протокот не секогаш се повлекува, па затоа препорачувам прво да ја разјасните оваа верзија:

nipyapi.versioning.get_latest_flow_ver

Процесна група за распоредување:

nipyapi.versioning.deploy_flow_version

Ги започнуваме процесорите:

nipyapi.canvas.schedule_process_group

Во блокот за CLI, пишуваше дека преносот на податоци не е автоматски овозможен во далечинската процесна група? При спроведувањето на скриптата, наидов и на овој проблем. Во тоа време, не можев да започнам со пренос на податоци со помош на API и решив да му напишам на развивачот на библиотеката NiPyAPI и да побарам совет / помош. Програмерот ми одговори, разговаравме за проблемот и тој напиша дека му треба време да „провери нешто“. И сега, неколку дена подоцна, пристигнува е-пошта во која е напишана функцијата Python што ми го решава проблемот со стартувањето !!! Во тоа време, верзијата NiPyAPI беше 0.13.3 и, се разбира, немаше ништо од тој вид во неа. Но, во верзијата 0.14.0, која беше објавена неодамна, оваа функција е веќе вклучена во библиотеката. Запознајте се

nipyapi.canvas.set_remote_process_group_transmission

Така, со помош на библиотеката NiPyAPI, го поврзавме регистарот, го навивавме протокот, па дури и ги започнавме процесорите и преносот на податоци. Потоа можете да го чешлате кодот, да додавате секакви проверки, логирање, и тоа е тоа. Но, тоа е сосема друга приказна.

Од опциите за автоматизација што ги разгледав, второто ми се чинеше најефикасно. Прво, ова е сè уште python код, во кој можете да вградите помошен програмски код и целосно да ги искористите предностите на програмскиот јазик. Второ, проектот NiPyAPI активно се развива и во случај на проблеми можете да му пишете на развивачот. Трето, NiPyAPI е сè уште пофлексибилна алатка за интеракција со NiFi при решавање на сложени проблеми. На пример, при одредувањето дали редиците за пораки се моментално празни во текот и дали е можно да се ажурира процесната група.

Тоа е се. Јас опишав 3 пристапи за автоматизирање на испораката на проток во NiFi, стапици со кои може да се сретне развивачот и дадов работен код за автоматизирање на испораката. Ако сте исто толку заинтересирани за оваа тема како мене - пишувај!

Извор: www.habr.com

Додадете коментар