Հոսքի առաքման ավտոմատացում Apache NiFi-ում

Hello everyone!

Հոսքի առաքման ավտոմատացում Apache NiFi-ում

Առաջադրանքը հետևյալն է. կա մի հոսք, որը ներկայացված է վերևի նկարում, որը պետք է փոխանցվի N սերվերներ Apache NiFi. Հոսքի փորձարկում - ֆայլը ստեղծվում և ուղարկվում է մեկ այլ NiFi օրինակ: Տվյալների փոխանցումը կատարվում է NiFi Site to Site արձանագրության միջոցով:

NiFi Site to Site (S2S) անվտանգ, հեշտությամբ կարգավորելի միջոց է տվյալների փոխանցման NiFi ատյանների միջև: Ինչպես է աշխատում S2S-ը, տես փաստաթղթավորում և կարևոր է չմոռանալ կարգավորել NiFi օրինակը S2S-ին թույլ տալու համար, տես այստեղ.

Այն դեպքերում, երբ խոսքը S2S-ի միջոցով տվյալների փոխանցման մասին է, մի օրինակը կոչվում է հաճախորդ, երկրորդ սերվեր: Հաճախորդը ուղարկում է տվյալներ, սերվերը ստանում է: Նրանց միջև տվյալների փոխանցումը կարգավորելու երկու եղանակ.

  1. Հրեք. Հաճախորդի օրինակից տվյալները ուղարկվում են հեռավոր գործընթացների խմբի (RPG) միջոցով: Սերվերի օրինակում տվյալները ստացվում են մուտքագրման պորտի միջոցով
  2. Քաշեք. Սերվերը տվյալներ է ստանում RPG-ի միջոցով, հաճախորդը ուղարկում է Output port-ի միջոցով:


Գործարկման հոսքը պահվում է Apache ռեեստրում:

Apache NiFi Registry-ը Apache NiFi-ի ենթածրագր է, որն ապահովում է հոսքի պահպանման և տարբերակների վերահսկման գործիք: Մի տեսակ GIT: Տեղադրման, կազմաձևման և ռեեստրի հետ աշխատելու մասին տեղեկությունները կարելի է գտնել այստեղ պաշտոնական փաստաթղթեր. Պահպանման համար հոսքը համակցված է գործընթացի խմբի մեջ և այս ձևով պահվում է ռեեստրում: Այս մասին մենք կանդրադառնանք ավելի ուշ հոդվածում:

Սկզբում, երբ N-ը փոքր թիվ է, հոսքը մատակարարվում և թարմացվում է ձեռքով ընդունելի ժամանակում:

Բայց քանի որ N-ն աճում է, խնդիրներն ավելի շատ են դառնում.

  1. հոսքը թարմացնելու համար ավելի շատ ժամանակ է պահանջվում: Դուք պետք է մուտք գործեք բոլոր սերվերներ
  2. Կաղապարի թարմացման սխալներ են տեղի ունենում: Այստեղ թարմացրել են, բայց այստեղ մոռացել են
  3. մարդկային սխալները մեծ թվով նմանատիպ գործողություններ կատարելիս

Այս ամենը մեզ բերում է նրան, որ մենք պետք է ավտոմատացնենք գործընթացը։ Ես փորձեցի այս խնդիրը լուծելու հետևյալ ուղիները.

  1. Օգտագործեք MiNiFi-ի փոխարեն NiFi-ի փոխարեն
  2. NiFi CLI
  3. NiPyAPI

Օգտագործելով MiNiFi

Apache MiNiFy - Apache NiFi-ի ենթածրագիր: MiNiFy-ը կոմպակտ գործակալ է, որն օգտագործում է նույն պրոցեսորները, ինչ NiFi-ը, ինչը թույլ է տալիս ստեղծել նույն հոսքերը, ինչ NiFi-ում: Գործակալի թեթև բնույթը ձեռք է բերվում, ի թիվս այլ բաների, նրանով, որ MiNiFy-ը չունի հոսքի կազմաձևման գրաֆիկական ինտերֆեյս: MiNiFy-ում գրաֆիկական ինտերֆեյսի բացակայությունը նշանակում է, որ անհրաժեշտ է լուծել մինիֆի հոսքը հասցնելու խնդիրը։ Քանի որ MiNiFy-ն ակտիվորեն օգտագործվում է IOT-ում, կան բազմաթիվ բաղադրիչներ, և անհրաժեշտ է ավտոմատացնել հոսքը վերջնական մինիֆի ատյաններ հասցնելու գործընթացը: Ծանոթ առաջադրանք, չէ՞:

Մեկ այլ ենթանախագծ կօգնի լուծել այս խնդիրը՝ MiNiFi C2 Server: Այս արտադրանքը նախատեսված է որպես կենտրոնական կետ կոնֆիգուրացիայի ներդրման ճարտարապետության մեջ: Ինչպես կարգավորել միջավայրը - նկարագրված է այս հոդվածը Խնդիրը լուծելու համար բավականաչափ տեղեկություններ կան Հաբրեի մասին։ MiNiFi-ը C2 սերվերի հետ համատեղ ավտոմատ կերպով թարմացնում է իր կոնֆիգուրացիան: Այս մոտեցման միակ թերությունն այն է, որ դուք պետք է կաղապարներ ստեղծեք C2 սերվերի վրա, ռեեստրի պարզ հավատարմությունը բավարար չէ:

Վերոնշյալ հոդվածում նկարագրված տարբերակը աշխատում է և դժվար չէ իրականացնել, բայց չպետք է մոռանալ հետևյալը.

  1. Minifi-ն չունի nifi-ի բոլոր պրոցեսորները
  2. Minifi պրոցեսորների տարբերակները զիջում են NiFi պրոցեսորների տարբերակներին:

Գրելու պահին NiFi-ի վերջին տարբերակը 1.9.2-ն է: MiniFi պրոցեսորի վերջին տարբերակը 1.7.0 է: Պրոցեսորները կարող են ավելացվել MiNiFi-ին, սակայն NiFi-ի և MiniFi-ի պրոցեսորների տարբերակների անհամապատասխանության պատճառով դա կարող է չաշխատել:

NiFi CLI

Դատելով նկարագրությունը գործիք պաշտոնական կայքում, սա գործիք է՝ ավտոմատացնելու NiFI-ի և NiFi ռեգիստրի միջև փոխգործակցությունը հոսքերի մատակարարման կամ գործընթացների կառավարման ոլորտում: Սկսելու համար դուք պետք է ներբեռնեք այս գործիքը: ուստի.

Գործարկեք կոմունալ ծրագիրը

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Որպեսզի մենք ռեեստրից բեռնենք պահանջվող հոսքը, մենք պետք է իմանանք դույլի նույնացուցիչները (դույլի նույնացուցիչը) և ինքնին հոսքը (հոսքի նույնացուցիչը): Այս տվյալները կարելի է ձեռք բերել կամ cli-ի կամ NiFi ռեեստրի վեբ ինտերֆեյսի միջոցով: Վեբ ինտերֆեյսում այն ​​ունի հետևյալ տեսքը.

Հոսքի առաքման ավտոմատացում Apache NiFi-ում

CLI-ի միջոցով սա արվում է.

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Մենք սկսում ենք ներմուծել գործընթացի խումբ ռեեստրից.

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Կարևոր կետ այն է, որ ցանկացած nifi օրինակ կարող է նշվել որպես հյուրընկալող, որի վրա մենք գլորում ենք գործընթացի խումբը:

Գործընթացների խումբն ավելացվել է դադարեցված պրոցեսորներով, դրանք պետք է սկսել

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Հիանալի է, պրոցեսորները սկսվել են: Այնուամենայնիվ, առաջադրանքի պայմանների համաձայն, մեզ անհրաժեշտ են NiFi ատյաններ՝ այլ ատյաններ տվյալներ ուղարկելու համար։ Ենթադրենք, որ դուք ընտրել եք Push մեթոդը՝ տվյալները սերվեր փոխանցելու համար։ Տվյալների փոխանցումը կազմակերպելու համար դուք պետք է ակտիվացնեք տվյալների փոխանցումը ավելացված Remote Process Group (RPG) վրա, որն արդեն ներառված է մեր հոսքում:

Հոսքի առաքման ավտոմատացում Apache NiFi-ում

CLI-ի և այլ աղբյուրների փաստաթղթերում ես չեմ գտել տվյալների փոխանցումը միացնելու միջոց: Եթե ​​գիտեք, թե ինչպես դա անել, գրեք մեկնաբանություններում։

Քանի որ մենք ունենք բաշ և պատրաստ ենք գնալ մինչև վերջ, ելք կգտնենք։ Այս խնդիրը լուծելու համար կարող եք օգտագործել NiFi API-ն: Եկեք օգտագործենք հետևյալ մեթոդը, վերը նշված օրինակներից վերցնենք ID-ն (մեր դեպքում դա 7f522a13-016e-1000-e504-d5b15587f2f3 է): NiFi API մեթոդների նկարագրությունը այստեղ.

Հոսքի առաքման ավտոմատացում Apache NiFi-ում
Մարմնում դուք պետք է փոխանցեք JSON, այսպես.

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Պարամետրերը, որոնք պետք է լրացվեն, որպեսզի այն աշխատի.
էին - տվյալների փոխանցման կարգավիճակը: Հասանելի է՝ TRANSMITTING՝ տվյալների փոխանցումը միացնելու համար, STOPPED՝ անջատելու համար
տարբերակ - պրոցեսորի տարբերակ

տարբերակը լռելյայն կլինի 0, երբ ստեղծվի, բայց այս պարամետրերը կարելի է ձեռք բերել մեթոդի միջոցով

Հոսքի առաքման ավտոմատացում Apache NiFi-ում

Բաշ սկրիպտների սիրահարների համար այս մեթոդը կարող է հարմար թվալ, բայց ինձ համար մի փոքր դժվար է. bash սցենարներն իմ սիրելին չեն: Հաջորդ մեթոդն իմ կարծիքով ավելի հետաքրքիր ու հարմար է։

NiPyAPI

NiPyAPI-ն Python գրադարան է NiFi-ի օրինակների հետ շփվելու համար: Փաստաթղթերի էջ պարունակում է գրադարանի հետ աշխատելու համար անհրաժեշտ տեղեկատվություն: Արագ մեկնարկը նկարագրված է նախագիծը github-ում:

Կազմաձևը տարածելու մեր սցենարը Python-ի ծրագիր է: Անցնենք կոդավորմանը։
Մենք կարգավորում ենք հետագա աշխատանքի համար: Մեզ անհրաժեշտ կլինեն հետևյալ պարամետրերը.

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Հաջորդիվ կտեղադրեմ այս գրադարանի մեթոդների անվանումները, որոնք նկարագրված են այստեղ.

Միացրեք ռեեստրը nifi օրինակին, օգտագործելով

nipyapi.versioning.create_registry_client

Այս քայլում կարող եք նաև ավելացնել ստուգում, որ ռեեստրն արդեն ավելացվել է օրինակին, դրա համար կարող եք օգտագործել մեթոդը.

nipyapi.versioning.list_registry_clients

Մենք գտնում ենք դույլը զամբյուղում հոսքի հետագա որոնման համար

nipyapi.versioning.get_registry_bucket

Օգտագործելով գտնված դույլը, մենք փնտրում ենք հոսք

nipyapi.versioning.get_flow_in_bucket

Հաջորդը, կարևոր է հասկանալ, թե արդյոք այս գործընթացի խումբն արդեն ավելացվել է: Գործընթացների խումբը տեղադրվում է ըստ կոորդինատների և կարող է առաջանալ իրավիճակ, երբ երկրորդ բաղադրիչը դրվում է մեկի վրա: Ես ստուգեցի, դա կարող է պատահել :) Բոլոր ավելացված գործընթացային խմբերը ստանալու համար մենք օգտագործում ենք մեթոդը

nipyapi.canvas.list_all_process_groups

Մենք կարող ենք հետագայում որոնել, օրինակ, անունով:

Չեմ նկարագրի կաղապարի թարմացման գործընթացը, միայն կասեմ, որ եթե կաղապարի նոր տարբերակում ավելացվեն պրոցեսորներ, ապա հերթերում հաղորդագրությունների առկայության հետ կապված խնդիրներ չկան։ Բայց եթե պրոցեսորները հեռացվեն, ապա կարող են խնդիրներ առաջանալ (nifi-ն թույլ չի տալիս հեռացնել պրոցեսորը, եթե դրա դիմաց հաղորդագրությունների հերթ է կուտակվել): Եթե ​​ձեզ հետաքրքրում է, թե ինչպես լուծեցի այս խնդիրը, խնդրում եմ գրեք ինձ, և մենք կքննարկենք այս հարցը։ Կոնտակտներ հոդվածի վերջում: Եկեք անցնենք գործընթացի խումբ ավելացնելու քայլին:

Սցենարը վրիպազերծելիս ես հանդիպեցի մի յուրահատկության, որ հոսքի վերջին տարբերակը միշտ չէ, որ վեր է հանվում, ուստի խորհուրդ եմ տալիս նախ ստուգել այս տարբերակը.

nipyapi.versioning.get_latest_flow_ver

Տեղակայման գործընթացի խումբ.

nipyapi.versioning.deploy_flow_version

Մենք սկսում ենք պրոցեսորները.

nipyapi.canvas.schedule_process_group

CLI-ի մասին բլոկում գրված էր, որ տվյալների փոխանցումը ավտոմատ միացված չէ հեռավոր պրոցեսների խմբում? Սցենարն իրականացնելիս ես նույնպես հանդիպեցի այս խնդրին։ Այդ ժամանակ ես չկարողացա սկսել տվյալների փոխանցումը API-ի միջոցով և որոշեցի գրել NiPyAPI գրադարանի մշակողին և խնդրել խորհուրդ/օգնություն: Մշակողը պատասխանեց ինձ, մենք քննարկեցինք խնդիրը և նա գրեց, որ իրեն ժամանակ է պետք «ինչ-որ բան ստուգելու համար»: Եվ հետո մի երկու օր հետո գալիս է նամակ, որտեղ Python-ում գրված է մի ֆունկցիա, որը լուծում է իմ գործարկման խնդիրը!!! Այն ժամանակ NiPyAPI-ի տարբերակը 0.13.3 էր ու, բնականաբար, նման բան չկար։ Բայց բոլորովին վերջերս թողարկված 0.14.0 տարբերակում այս ֆունկցիան արդեն ներառված էր գրադարանում։ Հանդիպել,

nipyapi.canvas.set_remote_process_group_transmission

Այսպիսով, օգտագործելով NiPyAPI գրադարանը, մենք միացրինք ռեեստրը, շրջանառության մեջ դրեցինք հոսքը և նույնիսկ սկսեցինք պրոցեսորներ և տվյալների փոխանցում: Այնուհետև կարող եք սանրել ծածկագիրը, ավելացնել բոլոր տեսակի ստուգումներ, գրանցումներ և վերջ: Բայց դա բոլորովին այլ պատմություն է:

Իմ դիտարկած ավտոմատացման տարբերակներից վերջինն ինձ թվում էր ամենաարդյունավետը: Նախ, սա դեռ python կոդ է, որի մեջ կարող եք տեղադրել օժանդակ ծրագրի կոդը և օգտվել ծրագրավորման լեզվի բոլոր առավելություններից: Երկրորդ, NiPyAPI նախագիծը ակտիվորեն զարգանում է, և խնդիրների դեպքում կարող եք գրել ծրագրավորողին: Երրորդ, NiPyAPI-ն դեռևս ավելի ճկուն գործիք է NiFi-ի հետ շփվելու բարդ խնդիրների լուծման համար: Օրինակ՝ որոշելու, թե արդյոք հաղորդագրության հերթերն այժմ դատարկ են հոսքի մեջ, և արդյոք գործընթացի խումբը կարող է թարմացվել:

Այսքանը: Ես նկարագրեցի NiFi-ում հոսքի մատակարարման ավտոմատացման 3 մոտեցում, որոգայթներ, որոնց կարող է հանդիպել մշակողը, և տրամադրեցի աշխատանքային կոդ՝ առաքումն ավտոմատացնելու համար: Եթե ​​դուք նույնքան հետաքրքրված եք այս թեմայով, որքան ես, գրել!

Source: www.habr.com

Добавить комментарий