Apache NiFi жүйесінде ағынды жеткізуді автоматтандыру

Привет!

Apache NiFi жүйесінде ағынды жеткізуді автоматтандыру

Тапсырма келесідей - жоғарыдағы суретте көрсетілген ағын бар, оны N серверге шығару керек. Apache NiFi. Ағын сынағы - файл жасалуда және басқа NiFi данасына жіберілуде. Деректерді тасымалдау NiFi сайтынан сайтқа протоколы арқылы жүзеге асырылады.

NiFi Site to Site (S2S) – NiFi даналары арасында деректерді тасымалдаудың қауіпсіз, оңай конфигурацияланатын жолы. S2S қалай жұмыс істейді, қараңыз құжаттама және S2S рұқсат беру үшін NiFi данасын конфигурациялауды ұмытпау маңызды, қараңыз осында.

S2S көмегімен деректерді беру туралы айтатын болсақ, бір дананы клиент, екінші сервер деп атайды. Клиент деректерді жібереді, сервер қабылдайды. Олардың арасында деректерді тасымалдауды конфигурациялаудың екі жолы:

  1. Басыңыз. Клиент данасында деректер қашықтағы процестер тобы (RPG) арқылы жіберіледі. Сервер данасында деректер енгізу порты арқылы қабылданады
  2. Тарт. Сервер деректерді RPG көмегімен алады, клиент шығыс порты арқылы жібереді.


Шығару ағыны Apache тізілімінде сақталады.

Apache NiFi Registry - бұл ағынды сақтау және нұсқаны басқару құралын қамтамасыз ететін Apache NiFi қосалқы жобасы. GIT түрі. Орнату, конфигурациялау және тізіліммен жұмыс істеу туралы ақпаратты мына жерден табуға болады ресми құжаттама. Сақтауға арналған ағын процестер тобына біріктіріліп, тізілімде осы пішінде сақталады. Бұған кейінірек мақалада қайта ораламыз.

Бастапқыда, N аз сан болғанда, ағын қолайлы уақытта қолмен жеткізіледі және жаңартылады.

Бірақ N ұлғайған сайын проблемалар көбейеді:

  1. ағынды жаңарту үшін көбірек уақыт қажет. Сіз барлық серверлерге кіруіңіз керек
  2. Үлгіні жаңарту қателері орын алады. Мұнда олар оны жаңартты, бірақ мұнда олар ұмытып кетті
  3. ұқсас операциялардың үлкен санын орындау кезіндегі адам қателері

Мұның бәрі бізді процесті автоматтандыру керек екеніне әкеледі. Мен бұл мәселені шешудің келесі жолдарын көрдім:

  1. NiFi орнына MiNiFi пайдаланыңыз
  2. NiFi CLI
  3. NiPyAPI

MiNiFi пайдалану

Apache MiNiFy - Apache NiFi қосалқы жобасы. MiNiFy - NiFi сияқты бірдей процессорларды пайдаланатын ықшам агент, ол NiFi сияқты бірдей ағындарды жасауға мүмкіндік береді. Агенттің жеңіл сипатына, басқалармен қатар, MiNiFy-де ағынды конфигурациялау үшін графикалық интерфейс жоқ болғандықтан қол жеткізіледі. MiNiFy-де графикалық интерфейстің болмауы ағынды minifi-ге жеткізу мәселесін шешу қажет екенін білдіреді. MiNiFy IOT-те белсенді қолданылғандықтан, көптеген құрамдас бөліктер бар және ағынды соңғы minifi даналарына жеткізу процесін автоматтандыру қажет. Таныс тапсырма, солай ма?

Басқа қосалқы жоба бұл мәселені шешуге көмектеседі - MiNiFi C2 Server. Бұл өнім конфигурацияны шығару архитектурасының орталық нүктесі болуға арналған. Ортаны қалай конфигурациялау керек - бөлімде сипатталған Бұл мақала Мәселені шешу үшін Хабре туралы ақпарат жеткілікті. MiNiFi C2 серверімен бірге оның конфигурациясын автоматты түрде жаңартады. Бұл тәсілдің жалғыз кемшілігі - C2 серверінде үлгілерді жасау керек, тізілімге қарапайым міндеттеме жеткіліксіз.

Жоғарыдағы мақалада сипатталған опция жұмыс істейді және оны жүзеге асыру қиын емес, бірақ біз мынаны ұмытпауымыз керек:

  1. minifi-де nifi-дің барлық процессорлары жоқ
  2. Minifi процессорларының нұсқалары NiFi процессорларының нұсқаларынан артта қалады.

Жазу кезінде NiFi соңғы нұсқасы 1.9.2. MiNiFi процессорының соңғы нұсқасы 1.7.0. Процессорларды MiNiFi жүйесіне қосуға болады, бірақ NiFi және MiNiFi процессорлары арасындағы нұсқалардың сәйкессіздігіне байланысты бұл жұмыс істемеуі мүмкін.

NiFi CLI

Айтуға қарағанда сипаттамасы ресми веб-сайттағы құрал, бұл ағынды жеткізу немесе процесті басқару саласындағы NiFI және NiFi Registry арасындағы өзара әрекеттесуді автоматтандыруға арналған құрал. Бастау үшін бұл құралды жүктеп алу керек. мұнда.

Утилитаны іске қосыңыз

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Қажетті ағынды тізілімнен жүктеу үшін біз шелектің идентификаторларын (шелек идентификаторы) және ағынның өзін (ағын идентификаторы) білуіміз керек. Бұл деректерді cli арқылы немесе NiFi тізілімінің веб-интерфейсінен алуға болады. Веб-интерфейсте ол келесідей көрінеді:

Apache NiFi жүйесінде ағынды жеткізуді автоматтандыру

CLI көмегімен бұл орындалады:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Тіркеуден импорттау процесі тобын іске қосыңыз:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Маңызды мәселе - кез келген nifi данасын біз процесс тобын жіберетін хост ретінде көрсетуге болады.

Тоқтатылған процессорлармен қосылған процесс тобы, оларды іске қосу керек

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Керемет, процессорлар іске қосылды. Дегенмен, тапсырма шарттарына сәйкес, деректерді басқа даналарға жіберу үшін бізге NiFi даналары қажет. Деректерді серверге тасымалдау үшін Push әдісін таңдадыңыз делік. Деректерді тасымалдауды ұйымдастыру үшін ағынымызға қосылған Қашықтағы процестер тобына (RPG) деректерді тасымалдауды қосу керек.

Apache NiFi жүйесінде ағынды жеткізуді автоматтандыру

CLI және басқа көздердегі құжаттамада деректерді тасымалдауды қосу жолын таппадым. Мұны қалай жасау керектігін білсеңіз, түсініктемелерде жазыңыз.

Бізде bash бар және біз соңына дейін баруға дайын болғандықтан, біз шығудың жолын табамыз! Бұл мәселені шешу үшін NiFi API пайдалануға болады. Келесі әдісті қолданайық, біз жоғарыдағы мысалдардан идентификаторды аламыз (біздің жағдайда бұл 7f522a13-016e-1000-e504-d5b15587f2f3). NiFi API әдістерінің сипаттамасы осында.

Apache NiFi жүйесінде ағынды жеткізуді автоматтандыру
Денеде сізге JSON өту керек, мысалы:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Оның жұмыс істеуі үшін толтыру қажет параметрлер:
мемлекет — деректерді беру күйі. Деректерді тасымалдауды қосу үшін TRANSMITTING, ал өшіру үшін ТОҚТАДЫ
нұсқа - процессор нұсқасы

нұсқасы жасалған кезде әдепкі бойынша 0 болады, бірақ бұл параметрлерді әдіс арқылы алуға болады

Apache NiFi жүйесінде ағынды жеткізуді автоматтандыру

Bash сценарийлерінің жанкүйерлері үшін бұл әдіс қолайлы болып көрінуі мүмкін, бірақ мен үшін бұл біршама қиын - bash сценарийлері менің сүйікті емес. Келесі әдіс менің ойымша қызықтырақ және ыңғайлы.

NiPyAPI

NiPyAPI - NiFi даналарымен әрекеттесу үшін Python кітапханасы. Құжаттар беті кітапханамен жұмыс істеуге қажетті ақпаратты қамтиды. Жылдам бастау бөлімінде сипатталған жоба github сайтында.

Конфигурацияны шығаруға арналған сценарийіміз Python тіліндегі бағдарлама. Кодтауға көшейік.
Біз одан әрі жұмыс үшін конфигурацияларды орнаттық. Бізге келесі параметрлер қажет:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Содан кейін мен осы кітапхананың сипатталған әдістерінің атауларын енгіземін осында.

Біз тізілімді nifi данасына қосамыз

nipyapi.versioning.create_registry_client

Бұл қадамда тізілімнің данаға қосылғанын тексеруді де қосуға болады; ол үшін әдісті пайдалануға болады.

nipyapi.versioning.list_registry_clients

Біз себеттегі ағынды одан әрі іздеу үшін шелек табамыз

nipyapi.versioning.get_registry_bucket

Табылған шелектің көмегімен біз ағынды іздейміз

nipyapi.versioning.get_flow_in_bucket

Әрі қарай, бұл процесс тобының қосылғанын түсіну маңызды. Процесс тобы координаттар арқылы орналастырылады және екіншісі біреудің үстіне қойылған кезде жағдай туындауы мүмкін. Мен тексердім, ол болуы мүмкін 🙂 Барлық қосылған процесс тобын алу үшін әдісті пайдаланыңыз

nipyapi.canvas.list_all_process_groups

содан кейін біз, мысалы, аты бойынша іздей аламыз.

Мен шаблонды жаңарту процесін сипаттамаймын, тек үлгінің жаңа нұсқасында процессорлар қосылса, онда кезектердегі хабарламалардың болуына байланысты проблемалар жоқ екенін айтайын. Бірақ егер процессорлар жойылса, проблемалар туындауы мүмкін (егер оның алдында хабарлама кезегі жиналса, nifi процессорды жоюға мүмкіндік бермейді). Егер сізді бұл мәселені қалай шешкенім қызықтырса - маған жазыңыз, біз бұл мәселені талқылаймыз. Мақаланың соңындағы байланыстар. Процесс тобын қосу қадамына көшейік.

Сценарийді жөндеу кезінде мен ағынның соңғы нұсқасы әрдайым тартыла бермейтін ерекшелікке тап болдым, сондықтан алдымен осы нұсқаны тексеруді ұсынамын:

nipyapi.versioning.get_latest_flow_ver

Процесс тобын орналастыру:

nipyapi.versioning.deploy_flow_version

Біз процессорларды іске қосамыз:

nipyapi.canvas.schedule_process_group

CLI туралы блокта қашықтағы процестер тобында деректерді беру автоматты түрде қосылмаған деп жазылған? Сценарийді іске асыру кезінде мен бұл мәселеге тап болдым. Ол кезде API арқылы деректерді тасымалдауды бастай алмадым және NiPyAPI кітапханасының әзірлеушісіне хат жазып, кеңес/көмек сұрауды шештім. Әзірлеуші ​​маған жауап берді, біз мәселені талқыладық және ол «бір нәрсені тексеруге» уақыт қажет екенін жазды. Содан кейін, бірнеше күннен кейін, менің іске қосу мәселесін шешетін Python тілінде функция жазылған хат келеді!!! Ол кезде NiPyAPI нұсқасы 0.13.3 болды және, әрине, мұндай ештеңе болған жоқ. Бірақ жақында шыққан 0.14.0 нұсқасында бұл функция кітапханаға енгізілген. Танысу,

nipyapi.canvas.set_remote_process_group_transmission

Сонымен, NiPyAPI кітапханасын пайдалана отырып, біз тізілімді қостық, ағынды шығардық, тіпті процессорлар мен деректерді тасымалдауды бастадық. Содан кейін сіз кодты тарай аласыз, чектердің барлық түрлерін қоса аласыз, журналды тіркей аласыз және бұл бәрі. Бірақ бұл мүлдем басқа әңгіме.

Мен қарастырған автоматтандыру нұсқаларының ішінде соңғысы маған ең тиімді болып көрінді. Біріншіден, бұл әлі де python коды, оған қосалқы бағдарлама кодын енгізуге және бағдарламалау тілінің барлық артықшылықтарын пайдалануға болады. Екіншіден, NiPyAPI жобасы белсенді дамып келеді және проблемалар туындаған жағдайда әзірлеушіге жаза аласыз. Үшіншіден, NiPyAPI әлі де күрделі мәселелерді шешуде NiFi-мен өзара әрекеттесу үшін икемді құрал болып табылады. Мысалы, хабар кезегі ағында енді бос екенін және процесс тобын жаңартуға болатынын анықтауда.

Бар болғаны. Мен NiFi жүйесінде ағынды жеткізуді автоматтандырудың 3 тәсілін сипаттадым, әзірлеушіге тап болуы мүмкін қателер және жеткізуді автоматтандыру үшін жұмыс кодын ұсындым. Егер сіз мен сияқты бұл тақырыпқа қызығушылық танытсаңыз - жаз!

Ақпарат көзі: www.habr.com

пікір қалдыру