Баарына салам!
Тапшырма төмөнкүдөй - жогорудагы сүрөттө көрсөтүлгөн агым бар, аны N серверге жайылтуу керек.
NiFi Сайттан Сайтка (S2S) NiFi инстанцияларынын ортосунда маалыматтарды өткөрүп берүүнүн коопсуз, өтө ыңгайлаштырылган жолу. S2S кантип иштээрин көрүңүз
S2S аркылуу берилиштерди өткөрүүгө келгенде, бир инстанция кардар, экинчиси сервер деп аталат. Кардар маалыматтарды жөнөтөт, сервер аны кабыл алат. Алардын ортосунда маалымат алмашууну орнотуунун эки жолу:
- түртүү. Маалыматтар Remote Process Group (RPG) аркылуу кардар инстанциясынан жөнөтүлөт. Сервер инстанциясында маалыматтар киргизүү порту аркылуу кабыл алынат
- тартуу. Сервер RPG аркылуу маалыматтарды кабыл алат, кардар Output портунун жардамы менен жөнөтөт.
Прокат үчүн агым Apache реестринде сакталат.
Apache NiFi Реестри - бул Apache NiFi чакан долбоору, ал агымды сактоо жана версиялоо куралын камсыз кылат. GIT бир түрү. Орнотуу, конфигурациялоо жана реестр менен иштөө жөнүндө маалыматты төмөнкү жерден тапса болот
Башында, N аз сан болгондо, агым акылга сыярлык убакытта кол менен жеткирилет жана жаңыртылат.
Бирок N көбөйгөн сайын, дагы көп көйгөйлөр бар:
- агымды жаңыртуу үчүн көбүрөөк убакыт талап кылынат. Сиз бардык серверлерге барышыңыз керек
- шаблондорду жаңыртууда каталар бар. Бул жерде алар жаңыртышты, бирок бул жерде алар унутуп калышты
- көп сандагы окшош операцияларды аткарууда адамдын катасы
Мына ушулардын бардыгы бизди процессти автоматташтыруу зарыл экендигине алып келет. Мен бул көйгөйдү чечүү үчүн төмөнкү жолдорду аракет кылдым:
- NiFi ордуна MiNiFi колдонуңуз
- NiFi CLI
- NiPyAPI
MiNiFi колдонуу
Дагы бир чакан долбоор, MiNiFi C2 Server бул көйгөйдү чечүүгө жардам берет. Бул продукт жайгаштыруу архитектурасынын борбордук пункту болууга арналган. Айлана-чөйрөнү кантип конфигурациялоо керек - сүрөттөлгөн
Жогорудагы макалада сүрөттөлгөн вариант иштеп жатат жана аны ишке ашыруу кыйын эмес, бирок биз төмөнкүлөрдү унутпашыбыз керек:
- minifiде nifiден бардык процессорлор жок
- Minifiдеги CPU версиялары NiFiдагы CPU версияларынан артта калат.
Жазуу учурунда, NiFi акыркы версиясы 1.9.2. Акыркы MiNiFi версиясынын процессордук версиясы 1.7.0. Процессорлорду MiNiFi'га кошууга болот, бирок NiFi жана MiNiFi процессорлорунун ортосундагы версиянын дал келбестигинен улам, бул иштебей калышы мүмкүн.
NiFi CLI
сот тарабынан
Утилитаны иштетиңиз
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
Реестрден керектүү агымды жүктөө үчүн, биз себеттин идентификаторлорун (чака идентификатору) жана агымдын өзүн (агымдын идентификатору) билишибиз керек. Бул маалыматтарды cli аркылуу же NiFi реестринин веб интерфейсинен алса болот. Веб интерфейси мындай көрүнөт:
CLI колдонуп, сиз муну жасайсыз:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Реестрден импорт процессинин тобун иштетүү:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
Маанилүү жагдай, ар кандай nifi инстанциясы процесс тобун жылдырган хост катары көрсөтүлүшү мүмкүн.
Процесс тобу токтотулган процессорлор менен кошулду, аларды баштоо керек
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Жакшы, процессорлор башталды. Бирок, маселенин шарттарына ылайык, башка инстанцияларга маалыматтарды жөнөтүү үчүн бизге NiFi инстанциялары керек. Маалыматтарды серверге өткөрүү үчүн Push ыкмасы тандалды деп коёлу. Маалыматтарды өткөрүп берүүнү уюштуруу үчүн, буга чейин биздин агымга киргизилген Remote Process Group (RPG) боюнча маалыматтарды өткөрүп берүүнү (Өткөрүүнү иштетүү) иштетүү керек.
CLI жана башка булактардагы документтерде мен маалыматтарды берүүнү иштетүүнүн жолун тапкан жокмун. Муну кантип жасоону билсеңиз, комментарийге жазыңыз.
Башыбыз бар жана биз акырына чейин барууга даярбыз, андан чыгуунун жолун табабыз! Бул көйгөйдү чечүү үчүн NiFi API колдоно аласыз. Келгиле, төмөнкү ыкманы колдонолу, биз жогорудагы мисалдардан ID алабыз (биздин учурда бул 7f522a13-016e-1000-e504-d5b15587f2f3). NiFi API методдорунун сүрөттөлүшү
Денеде сиз төмөнкү формадагы JSON өтүшүңүз керек:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
"Иштөө" үчүн толтурулушу керек болгон параметрлер:
мамлекет — маалыматтарды берүү абалы. Берилиштерди өткөрүүнү иштетүү үчүн TRANSMITTING жеткиликтүү, өчүрүү үчүн ТОКТОТУЛГАН
версия - процессордун версиясы
версия түзүлгөндө демейки 0 болот, бирок бул параметрлерди метод аркылуу алууга болот
Bash скрипттерин сүйүүчүлөр үчүн бул ыкма ылайыктуу көрүнүшү мүмкүн, бирок мен үчүн бул кыйын - bash скрипттери менин сүйүктүүм эмес. Кийинки жол менимче кызыктуураак жана ыңгайлуураак.
NiPyAPI
NiPyAPI - NiFi инстанциялары менен иштешүү үчүн Python китепканасы.
Конфигурацияны жайылтуу үчүн биздин скрипт Python программасы. Келгиле, коддоого өтөлү.
Андан ары иштөө үчүн конфигурацияларды орнотуңуз. Бизге төмөнкү параметрлер керек болот:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
Андан ары мен сүрөттөлгөн бул китепкананын ыкмаларынын аттарын киргизем
Биз реестрди nifi инстанциясына туташтырабыз
nipyapi.versioning.create_registry_client
Бул кадамда сиз реестр мурунтан эле инстанцияга кошулганын текшерүүнү кошо аласыз, бул үчүн сиз ыкманы колдонсоңуз болот.
nipyapi.versioning.list_registry_clients
Биз себеттеги агымды андан ары издөө үчүн чаканы табабыз
nipyapi.versioning.get_registry_bucket
Табылган чака боюнча биз агымды издеп жатабыз
nipyapi.versioning.get_flow_in_bucket
Андан кийин, бул процесс тобу буга чейин кошулганын түшүнүү маанилүү. Процесс тобу координаттар менен жайгаштырылат жана биринин үстүнө экинчиси коюлганда кырдаал келип чыгышы мүмкүн. Мен текшердим, ал 🙂 бардык кошулган процесс тобун алуу үчүн, ыкманы колдонуңуз
nipyapi.canvas.list_all_process_groups
анан биз, мисалы, аты боюнча издей алабыз.
Мен шаблонду жаңыртуу процессин сүрөттөбөй эле коёюн, эгер шаблондун жаңы версиясында процессорлор кошулса, анда кезектердеги билдирүүлөрдүн болушуна байланыштуу көйгөйлөр жок экенин гана айтайын. Бирок, эгерде процессорлор алынып салынса, анда көйгөйлөр пайда болушу мүмкүн (эгерде анын алдында билдирүү кезеги топтолсо, nifi процессорду алып салууга жол бербейт). Эгер сиз бул маселени кантип чечкениме кызыксаңыз - мага жазыңыз, сураныч, биз бул маселени талкуулайбыз. Макаланын аягындагы байланыштар. Процесс тобун кошуу кадамына өтөбүз.
Скрипттин мүчүлүштүктөрүн оңдоодо, мен агымдын акыркы версиясы дайыма эле көтөрүлбөй турган өзгөчөлүккө туш болдум, андыктан алгач ушул версияны тактап алууну сунуштайм:
nipyapi.versioning.get_latest_flow_ver
Процесс тобун жайылтуу:
nipyapi.versioning.deploy_flow_version
Биз процессорлорду баштайбыз:
nipyapi.canvas.schedule_process_group
CLI жөнүндө блокто, алыскы процесстер тобунда маалыматтарды берүү автоматтык түрдө иштетилбейт деп жазылган? Сценарийди ишке ашырууда мен да бул көйгөйгө туш болдум. Ошол учурда, мен API аркылуу берилиштерди өткөрүүнү баштай алган жокмун жана NiPyAPI китепканасынын иштеп чыгуучусуна жазып, кеңеш/жардам суроону чечтим. Иштеп чыгуучу мага жооп берди, биз маселени талкууладык жана ал "бир нерсени текшерүүгө" убакыт керек деп жазды. Эми, бир нече күндөн кийин, менин баштоо көйгөйүмдү чечкен Python функциясы жазылган электрондук кат келди !!! Ошол убакта NiPyAPI версиясы 0.13.3 болчу жана, албетте, анда мындай эч нерсе болгон эмес. Бирок жакында чыккан 0.14.0 версиясында бул функция китепканага мурунтан эле киргизилген. Жолугушуу
nipyapi.canvas.set_remote_process_group_transmission
Ошентип, NiPyAPI китепканасынын жардамы менен биз реестрди туташтырдык, агымды жыйнап, ал тургай процессорлорду жана маалыматтарды өткөрүп баштадык. Андан кийин сиз кодду тарай аласыз, бардык текшерүүлөрдү, журналдарды кошсоңуз болот, ушуну менен. Бирок бул таптакыр башка окуя.
Мен караган автоматташтыруу варианттарынын ичинен акыркысы мага эң эффективдүү көрүндү. Биринчиден, бул дагы эле Python коду, ага сиз көмөкчү программа кодун киргизип, программалоо тилинин бардык мүмкүнчүлүктөрүн пайдалана аласыз. Экинчиден, NiPyAPI долбоору жигердүү өнүгүп жатат жана көйгөйлөр пайда болгон учурда иштеп чыгуучуга жазсаңыз болот. Үчүнчүдөн, NiPyAPI дагы эле татаал маселелерди чечүүдө NiFi менен өз ара аракеттенүү үчүн ийкемдүү курал болуп саналат. Мисалы, учурда агымда билдирүү кезеги бош экендигин жана процесс тобун жаңыртуу мүмкүнбү же жокпу аныктоодо.
Баары болду. Мен NiFi'де агымдын жеткирилишин автоматташтыруунун 3 ыкмасын, иштеп чыгуучу туш болушу мүмкүн болгон тузактарды сүрөттөп бердим жана жеткирүүнү автоматташтыруу үчүн жумушчу кодду бердим. Эгер сиз да мен сыяктуу бул темага кызыксаңыз -
Source: www.habr.com