Apache NiFi ичинде агымды жеткирүү автоматташтырылган

Баарына салам!

Apache NiFi ичинде агымды жеткирүү автоматташтырылган

Тапшырма төмөнкүдөй - жогорудагы сүрөттө көрсөтүлгөн агым бар, аны N серверге жайылтуу керек. Apache NiFi. Агым тести - файл түзүлүп, башка NiFi инстанциясына жөнөтүлүүдө. Маалыматтарды өткөрүү NiFi Сайттан Сайт протоколу аркылуу ишке ашат.

NiFi Сайттан Сайтка (S2S) NiFi инстанцияларынын ортосунда маалыматтарды өткөрүп берүүнүн коопсуз, өтө ыңгайлаштырылган жолу. S2S кантип иштээрин көрүңүз документтер жана S2S көрүүгө уруксат берүү үчүн NiFi инстанцияңызды орнотууну унутпаңыз бул жерде.

S2S аркылуу берилиштерди өткөрүүгө келгенде, бир инстанция кардар, экинчиси сервер деп аталат. Кардар маалыматтарды жөнөтөт, сервер аны кабыл алат. Алардын ортосунда маалымат алмашууну орнотуунун эки жолу:

  1. түртүү. Маалыматтар Remote Process Group (RPG) аркылуу кардар инстанциясынан жөнөтүлөт. Сервер инстанциясында маалыматтар киргизүү порту аркылуу кабыл алынат
  2. тартуу. Сервер RPG аркылуу маалыматтарды кабыл алат, кардар Output портунун жардамы менен жөнөтөт.


Прокат үчүн агым Apache реестринде сакталат.

Apache NiFi Реестри - бул Apache NiFi чакан долбоору, ал агымды сактоо жана версиялоо куралын камсыз кылат. GIT бир түрү. Орнотуу, конфигурациялоо жана реестр менен иштөө жөнүндө маалыматты төмөнкү жерден тапса болот расмий документтер. Сактоо үчүн агым процесстик топко бириктирилет жана реестрде ушул формада сакталат. Бул тууралуу кийинчерээк макалада кайрылабыз.

Башында, N аз сан болгондо, агым акылга сыярлык убакытта кол менен жеткирилет жана жаңыртылат.

Бирок N көбөйгөн сайын, дагы көп көйгөйлөр бар:

  1. агымды жаңыртуу үчүн көбүрөөк убакыт талап кылынат. Сиз бардык серверлерге барышыңыз керек
  2. шаблондорду жаңыртууда каталар бар. Бул жерде алар жаңыртышты, бирок бул жерде алар унутуп калышты
  3. көп сандагы окшош операцияларды аткарууда адамдын катасы

Мына ушулардын бардыгы бизди процессти автоматташтыруу зарыл экендигине алып келет. Мен бул көйгөйдү чечүү үчүн төмөнкү жолдорду аракет кылдым:

  1. NiFi ордуна MiNiFi колдонуңуз
  2. NiFi CLI
  3. NiPyAPI

MiNiFi колдонуу

ApacheMiNify Apache NiFi чакан долбоору болуп саналат. MiNiFy - NiFi сыяктуу эле процессорлорду колдонгон, NiFiдагыдай эле агымды түзүүгө мүмкүндүк берген компакт агент. Агенттин жеңилдигине, башка нерселер менен катар MiNiFyде агым конфигурациясынын графикалык интерфейси жок болгондуктан жетишилет. MiNiFyнин графикалык интерфейсинин жоктугу минифиде агымды жеткирүү маселесин чечүү керектигин билдирет. MiNiFy IOTте жигердүү колдонулгандыктан, көптөгөн компоненттер бар жана агымдын акыркы minifi инстанцияларына жеткирүү процесси автоматташтырылышы керек. Тааныш тапшырма, туурабы?

Дагы бир чакан долбоор, MiNiFi C2 Server бул көйгөйдү чечүүгө жардам берет. Бул продукт жайгаштыруу архитектурасынын борбордук пункту болууга арналган. Айлана-чөйрөнү кантип конфигурациялоо керек - сүрөттөлгөн бул макалада Habré боюнча жана маалымат көйгөйдү чечүү үчүн жетиштүү. MiNiFi C2 сервери менен бирге анын конфигурациясын автоматтык түрдө жаңылайт. Бул ыкманын бирден-бир кемчилиги - сиз C2 серверинде шаблондорду түзүшүңүз керек, реестрге жөнөкөй милдеттеме жетишсиз.

Жогорудагы макалада сүрөттөлгөн вариант иштеп жатат жана аны ишке ашыруу кыйын эмес, бирок биз төмөнкүлөрдү унутпашыбыз керек:

  1. minifiде nifiден бардык процессорлор жок
  2. Minifiдеги CPU версиялары NiFiдагы CPU версияларынан артта калат.

Жазуу учурунда, NiFi акыркы версиясы 1.9.2. Акыркы MiNiFi версиясынын процессордук версиясы 1.7.0. Процессорлорду MiNiFi'га кошууга болот, бирок NiFi жана MiNiFi процессорлорунун ортосундагы версиянын дал келбестигинен улам, бул иштебей калышы мүмкүн.

NiFi CLI

сот тарабынан сүрөттөмө расмий сайтында курал, бул NiFI жана NiFi реестринин агымын жеткирүү же процессти башкаруу жаатындагы өз ара аракеттенүүсүн автоматташтыруу куралы. Баштоо үчүн бул куралды жүктөп алыңыз. бул жерде.

Утилитаны иштетиңиз

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Реестрден керектүү агымды жүктөө үчүн, биз себеттин идентификаторлорун (чака идентификатору) жана агымдын өзүн (агымдын идентификатору) билишибиз керек. Бул маалыматтарды cli аркылуу же NiFi реестринин веб интерфейсинен алса болот. Веб интерфейси мындай көрүнөт:

Apache NiFi ичинде агымды жеткирүү автоматташтырылган

CLI колдонуп, сиз муну жасайсыз:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Реестрден импорт процессинин тобун иштетүү:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Маанилүү жагдай, ар кандай nifi инстанциясы процесс тобун жылдырган хост катары көрсөтүлүшү мүмкүн.

Процесс тобу токтотулган процессорлор менен кошулду, аларды баштоо керек

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Жакшы, процессорлор башталды. Бирок, маселенин шарттарына ылайык, башка инстанцияларга маалыматтарды жөнөтүү үчүн бизге NiFi инстанциялары керек. Маалыматтарды серверге өткөрүү үчүн Push ыкмасы тандалды деп коёлу. Маалыматтарды өткөрүп берүүнү уюштуруу үчүн, буга чейин биздин агымга киргизилген Remote Process Group (RPG) боюнча маалыматтарды өткөрүп берүүнү (Өткөрүүнү иштетүү) иштетүү керек.

Apache NiFi ичинде агымды жеткирүү автоматташтырылган

CLI жана башка булактардагы документтерде мен маалыматтарды берүүнү иштетүүнүн жолун тапкан жокмун. Муну кантип жасоону билсеңиз, комментарийге жазыңыз.

Башыбыз бар жана биз акырына чейин барууга даярбыз, андан чыгуунун жолун табабыз! Бул көйгөйдү чечүү үчүн NiFi API колдоно аласыз. Келгиле, төмөнкү ыкманы колдонолу, биз жогорудагы мисалдардан ID алабыз (биздин учурда бул 7f522a13-016e-1000-e504-d5b15587f2f3). NiFi API методдорунун сүрөттөлүшү бул жерде.

Apache NiFi ичинде агымды жеткирүү автоматташтырылган
Денеде сиз төмөнкү формадагы JSON өтүшүңүз керек:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

"Иштөө" үчүн толтурулушу керек болгон параметрлер:
мамлекет — маалыматтарды берүү абалы. Берилиштерди өткөрүүнү иштетүү үчүн TRANSMITTING жеткиликтүү, өчүрүү үчүн ТОКТОТУЛГАН
версия - процессордун версиясы

версия түзүлгөндө демейки 0 болот, бирок бул параметрлерди метод аркылуу алууга болот

Apache NiFi ичинде агымды жеткирүү автоматташтырылган

Bash скрипттерин сүйүүчүлөр үчүн бул ыкма ылайыктуу көрүнүшү мүмкүн, бирок мен үчүн бул кыйын - bash скрипттери менин сүйүктүүм эмес. Кийинки жол менимче кызыктуураак жана ыңгайлуураак.

NiPyAPI

NiPyAPI - NiFi инстанциялары менен иштешүү үчүн Python китепканасы. Документтер барагы китепкана менен иштөө үчүн зарыл болгон маалыматтарды камтыйт. Тез баштоо сүрөттөлгөн долбоор github боюнча.

Конфигурацияны жайылтуу үчүн биздин скрипт Python программасы. Келгиле, коддоого өтөлү.
Андан ары иштөө үчүн конфигурацияларды орнотуңуз. Бизге төмөнкү параметрлер керек болот:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Андан ары мен сүрөттөлгөн бул китепкананын ыкмаларынын аттарын киргизем бул жерде.

Биз реестрди nifi инстанциясына туташтырабыз

nipyapi.versioning.create_registry_client

Бул кадамда сиз реестр мурунтан эле инстанцияга кошулганын текшерүүнү кошо аласыз, бул үчүн сиз ыкманы колдонсоңуз болот.

nipyapi.versioning.list_registry_clients

Биз себеттеги агымды андан ары издөө үчүн чаканы табабыз

nipyapi.versioning.get_registry_bucket

Табылган чака боюнча биз агымды издеп жатабыз

nipyapi.versioning.get_flow_in_bucket

Андан кийин, бул процесс тобу буга чейин кошулганын түшүнүү маанилүү. Процесс тобу координаттар менен жайгаштырылат жана биринин үстүнө экинчиси коюлганда кырдаал келип чыгышы мүмкүн. Мен текшердим, ал 🙂 бардык кошулган процесс тобун алуу үчүн, ыкманы колдонуңуз

nipyapi.canvas.list_all_process_groups

анан биз, мисалы, аты боюнча издей алабыз.

Мен шаблонду жаңыртуу процессин сүрөттөбөй эле коёюн, эгер шаблондун жаңы версиясында процессорлор кошулса, анда кезектердеги билдирүүлөрдүн болушуна байланыштуу көйгөйлөр жок экенин гана айтайын. Бирок, эгерде процессорлор алынып салынса, анда көйгөйлөр пайда болушу мүмкүн (эгерде анын алдында билдирүү кезеги топтолсо, nifi процессорду алып салууга жол бербейт). Эгер сиз бул маселени кантип чечкениме кызыксаңыз - мага жазыңыз, сураныч, биз бул маселени талкуулайбыз. Макаланын аягындагы байланыштар. Процесс тобун кошуу кадамына өтөбүз.

Скрипттин мүчүлүштүктөрүн оңдоодо, мен агымдын акыркы версиясы дайыма эле көтөрүлбөй турган өзгөчөлүккө туш болдум, андыктан алгач ушул версияны тактап алууну сунуштайм:

nipyapi.versioning.get_latest_flow_ver

Процесс тобун жайылтуу:

nipyapi.versioning.deploy_flow_version

Биз процессорлорду баштайбыз:

nipyapi.canvas.schedule_process_group

CLI жөнүндө блокто, алыскы процесстер тобунда маалыматтарды берүү автоматтык түрдө иштетилбейт деп жазылган? Сценарийди ишке ашырууда мен да бул көйгөйгө туш болдум. Ошол учурда, мен API аркылуу берилиштерди өткөрүүнү баштай алган жокмун жана NiPyAPI китепканасынын иштеп чыгуучусуна жазып, кеңеш/жардам суроону чечтим. Иштеп чыгуучу мага жооп берди, биз маселени талкууладык жана ал "бир нерсени текшерүүгө" убакыт керек деп жазды. Эми, бир нече күндөн кийин, менин баштоо көйгөйүмдү чечкен Python функциясы жазылган электрондук кат келди !!! Ошол убакта NiPyAPI версиясы 0.13.3 болчу жана, албетте, анда мындай эч нерсе болгон эмес. Бирок жакында чыккан 0.14.0 версиясында бул функция китепканага мурунтан эле киргизилген. Жолугушуу

nipyapi.canvas.set_remote_process_group_transmission

Ошентип, NiPyAPI китепканасынын жардамы менен биз реестрди туташтырдык, агымды жыйнап, ал тургай процессорлорду жана маалыматтарды өткөрүп баштадык. Андан кийин сиз кодду тарай аласыз, бардык текшерүүлөрдү, журналдарды кошсоңуз болот, ушуну менен. Бирок бул таптакыр башка окуя.

Мен караган автоматташтыруу варианттарынын ичинен акыркысы мага эң эффективдүү көрүндү. Биринчиден, бул дагы эле Python коду, ага сиз көмөкчү программа кодун киргизип, программалоо тилинин бардык мүмкүнчүлүктөрүн пайдалана аласыз. Экинчиден, NiPyAPI долбоору жигердүү өнүгүп жатат жана көйгөйлөр пайда болгон учурда иштеп чыгуучуга жазсаңыз болот. Үчүнчүдөн, NiPyAPI дагы эле татаал маселелерди чечүүдө NiFi менен өз ара аракеттенүү үчүн ийкемдүү курал болуп саналат. Мисалы, учурда агымда билдирүү кезеги бош экендигин жана процесс тобун жаңыртуу мүмкүнбү же жокпу аныктоодо.

Баары болду. Мен NiFi'де агымдын жеткирилишин автоматташтыруунун 3 ыкмасын, иштеп чыгуучу туш болушу мүмкүн болгон тузактарды сүрөттөп бердим жана жеткирүүнү автоматташтыруу үчүн жумушчу кодду бердим. Эгер сиз да мен сыяктуу бул темага кызыксаңыз - жаз!

Source: www.habr.com

Комментарий кошуу