Привет!
Тапсырма келесідей - жоғарыдағы суретте көрсетілген ағын бар, оны N серверге шығару керек.
NiFi Site to Site (S2S) – NiFi даналары арасында деректерді тасымалдаудың қауіпсіз, оңай конфигурацияланатын жолы. S2S қалай жұмыс істейді, қараңыз
S2S көмегімен деректерді беру туралы айтатын болсақ, бір дананы клиент, екінші сервер деп атайды. Клиент деректерді жібереді, сервер қабылдайды. Олардың арасында деректерді тасымалдауды конфигурациялаудың екі жолы:
- Басыңыз. Клиент данасында деректер қашықтағы процестер тобы (RPG) арқылы жіберіледі. Сервер данасында деректер енгізу порты арқылы қабылданады
- Тарт. Сервер деректерді RPG көмегімен алады, клиент шығыс порты арқылы жібереді.
Шығару ағыны Apache тізілімінде сақталады.
Apache NiFi Registry - бұл ағынды сақтау және нұсқаны басқару құралын қамтамасыз ететін Apache NiFi қосалқы жобасы. GIT түрі. Орнату, конфигурациялау және тізіліммен жұмыс істеу туралы ақпаратты мына жерден табуға болады
Бастапқыда, N аз сан болғанда, ағын қолайлы уақытта қолмен жеткізіледі және жаңартылады.
Бірақ N ұлғайған сайын проблемалар көбейеді:
- ағынды жаңарту үшін көбірек уақыт қажет. Сіз барлық серверлерге кіруіңіз керек
- Үлгіні жаңарту қателері орын алады. Мұнда олар оны жаңартты, бірақ мұнда олар ұмытып кетті
- ұқсас операциялардың үлкен санын орындау кезіндегі адам қателері
Мұның бәрі бізді процесті автоматтандыру керек екеніне әкеледі. Мен бұл мәселені шешудің келесі жолдарын көрдім:
- NiFi орнына MiNiFi пайдаланыңыз
- NiFi CLI
- NiPyAPI
MiNiFi пайдалану
Басқа қосалқы жоба бұл мәселені шешуге көмектеседі - MiNiFi C2 Server. Бұл өнім конфигурацияны шығару архитектурасының орталық нүктесі болуға арналған. Ортаны қалай конфигурациялау керек - бөлімде сипатталған
Жоғарыдағы мақалада сипатталған опция жұмыс істейді және оны жүзеге асыру қиын емес, бірақ біз мынаны ұмытпауымыз керек:
- minifi-де nifi-дің барлық процессорлары жоқ
- Minifi процессорларының нұсқалары NiFi процессорларының нұсқаларынан артта қалады.
Жазу кезінде NiFi соңғы нұсқасы 1.9.2. MiNiFi процессорының соңғы нұсқасы 1.7.0. Процессорларды MiNiFi жүйесіне қосуға болады, бірақ NiFi және MiNiFi процессорлары арасындағы нұсқалардың сәйкессіздігіне байланысты бұл жұмыс істемеуі мүмкін.
NiFi CLI
Айтуға қарағанда
Утилитаны іске қосыңыз
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
Қажетті ағынды тізілімнен жүктеу үшін біз шелектің идентификаторларын (шелек идентификаторы) және ағынның өзін (ағын идентификаторы) білуіміз керек. Бұл деректерді cli арқылы немесе NiFi тізілімінің веб-интерфейсінен алуға болады. Веб-интерфейсте ол келесідей көрінеді:
CLI көмегімен бұл орындалады:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Тіркеуден импорттау процесі тобын іске қосыңыз:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
Маңызды мәселе - кез келген nifi данасын біз процесс тобын жіберетін хост ретінде көрсетуге болады.
Тоқтатылған процессорлармен қосылған процесс тобы, оларды іске қосу керек
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Керемет, процессорлар іске қосылды. Дегенмен, тапсырма шарттарына сәйкес, деректерді басқа даналарға жіберу үшін бізге NiFi даналары қажет. Деректерді серверге тасымалдау үшін Push әдісін таңдадыңыз делік. Деректерді тасымалдауды ұйымдастыру үшін ағынымызға қосылған Қашықтағы процестер тобына (RPG) деректерді тасымалдауды қосу керек.
CLI және басқа көздердегі құжаттамада деректерді тасымалдауды қосу жолын таппадым. Мұны қалай жасау керектігін білсеңіз, түсініктемелерде жазыңыз.
Бізде bash бар және біз соңына дейін баруға дайын болғандықтан, біз шығудың жолын табамыз! Бұл мәселені шешу үшін NiFi API пайдалануға болады. Келесі әдісті қолданайық, біз жоғарыдағы мысалдардан идентификаторды аламыз (біздің жағдайда бұл 7f522a13-016e-1000-e504-d5b15587f2f3). NiFi API әдістерінің сипаттамасы
Денеде сізге JSON өту керек, мысалы:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Оның жұмыс істеуі үшін толтыру қажет параметрлер:
мемлекет — деректерді беру күйі. Деректерді тасымалдауды қосу үшін TRANSMITTING, ал өшіру үшін ТОҚТАДЫ
нұсқа - процессор нұсқасы
нұсқасы жасалған кезде әдепкі бойынша 0 болады, бірақ бұл параметрлерді әдіс арқылы алуға болады
Bash сценарийлерінің жанкүйерлері үшін бұл әдіс қолайлы болып көрінуі мүмкін, бірақ мен үшін бұл біршама қиын - bash сценарийлері менің сүйікті емес. Келесі әдіс менің ойымша қызықтырақ және ыңғайлы.
NiPyAPI
NiPyAPI - NiFi даналарымен әрекеттесу үшін Python кітапханасы.
Конфигурацияны шығаруға арналған сценарийіміз Python тіліндегі бағдарлама. Кодтауға көшейік.
Біз одан әрі жұмыс үшін конфигурацияларды орнаттық. Бізге келесі параметрлер қажет:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
Содан кейін мен осы кітапхананың сипатталған әдістерінің атауларын енгіземін
Біз тізілімді nifi данасына қосамыз
nipyapi.versioning.create_registry_client
Бұл қадамда тізілімнің данаға қосылғанын тексеруді де қосуға болады; ол үшін әдісті пайдалануға болады.
nipyapi.versioning.list_registry_clients
Біз себеттегі ағынды одан әрі іздеу үшін шелек табамыз
nipyapi.versioning.get_registry_bucket
Табылған шелектің көмегімен біз ағынды іздейміз
nipyapi.versioning.get_flow_in_bucket
Әрі қарай, бұл процесс тобының қосылғанын түсіну маңызды. Процесс тобы координаттар арқылы орналастырылады және екіншісі біреудің үстіне қойылған кезде жағдай туындауы мүмкін. Мен тексердім, ол болуы мүмкін 🙂 Барлық қосылған процесс тобын алу үшін әдісті пайдаланыңыз
nipyapi.canvas.list_all_process_groups
содан кейін біз, мысалы, аты бойынша іздей аламыз.
Мен шаблонды жаңарту процесін сипаттамаймын, тек үлгінің жаңа нұсқасында процессорлар қосылса, онда кезектердегі хабарламалардың болуына байланысты проблемалар жоқ екенін айтайын. Бірақ егер процессорлар жойылса, проблемалар туындауы мүмкін (егер оның алдында хабарлама кезегі жиналса, nifi процессорды жоюға мүмкіндік бермейді). Егер сізді бұл мәселені қалай шешкенім қызықтырса - маған жазыңыз, біз бұл мәселені талқылаймыз. Мақаланың соңындағы байланыстар. Процесс тобын қосу қадамына көшейік.
Сценарийді жөндеу кезінде мен ағынның соңғы нұсқасы әрдайым тартыла бермейтін ерекшелікке тап болдым, сондықтан алдымен осы нұсқаны тексеруді ұсынамын:
nipyapi.versioning.get_latest_flow_ver
Процесс тобын орналастыру:
nipyapi.versioning.deploy_flow_version
Біз процессорларды іске қосамыз:
nipyapi.canvas.schedule_process_group
CLI туралы блокта қашықтағы процестер тобында деректерді беру автоматты түрде қосылмаған деп жазылған? Сценарийді іске асыру кезінде мен бұл мәселеге тап болдым. Ол кезде API арқылы деректерді тасымалдауды бастай алмадым және NiPyAPI кітапханасының әзірлеушісіне хат жазып, кеңес/көмек сұрауды шештім. Әзірлеуші маған жауап берді, біз мәселені талқыладық және ол «бір нәрсені тексеруге» уақыт қажет екенін жазды. Содан кейін, бірнеше күннен кейін, менің іске қосу мәселесін шешетін Python тілінде функция жазылған хат келеді!!! Ол кезде NiPyAPI нұсқасы 0.13.3 болды және, әрине, мұндай ештеңе болған жоқ. Бірақ жақында шыққан 0.14.0 нұсқасында бұл функция кітапханаға енгізілген. Танысу,
nipyapi.canvas.set_remote_process_group_transmission
Сонымен, NiPyAPI кітапханасын пайдалана отырып, біз тізілімді қостық, ағынды шығардық, тіпті процессорлар мен деректерді тасымалдауды бастадық. Содан кейін сіз кодты тарай аласыз, чектердің барлық түрлерін қоса аласыз, журналды тіркей аласыз және бұл бәрі. Бірақ бұл мүлдем басқа әңгіме.
Мен қарастырған автоматтандыру нұсқаларының ішінде соңғысы маған ең тиімді болып көрінді. Біріншіден, бұл әлі де python коды, оған қосалқы бағдарлама кодын енгізуге және бағдарламалау тілінің барлық артықшылықтарын пайдалануға болады. Екіншіден, NiPyAPI жобасы белсенді дамып келеді және проблемалар туындаған жағдайда әзірлеушіге жаза аласыз. Үшіншіден, NiPyAPI әлі де күрделі мәселелерді шешуде NiFi-мен өзара әрекеттесу үшін икемді құрал болып табылады. Мысалы, хабар кезегі ағында енді бос екенін және процесс тобын жаңартуға болатынын анықтауда.
Бар болғаны. Мен NiFi жүйесінде ағынды жеткізуді автоматтандырудың 3 тәсілін сипаттадым, әзірлеушіге тап болуы мүмкін қателер және жеткізуді автоматтандыру үшін жұмыс кодын ұсындым. Егер сіз мен сияқты бұл тақырыпқа қызығушылық танытсаңыз -
Ақпарат көзі: www.habr.com