Hello everyone!
Առաջադրանքը հետևյալն է. կա մի հոսք, որը ներկայացված է վերևի նկարում, որը պետք է փոխանցվի N սերվերներ
NiFi Site to Site (S2S) անվտանգ, հեշտությամբ կարգավորելի միջոց է տվյալների փոխանցման NiFi ատյանների միջև: Ինչպես է աշխատում S2S-ը, տես
Այն դեպքերում, երբ խոսքը S2S-ի միջոցով տվյալների փոխանցման մասին է, մի օրինակը կոչվում է հաճախորդ, երկրորդ սերվեր: Հաճախորդը ուղարկում է տվյալներ, սերվերը ստանում է: Նրանց միջև տվյալների փոխանցումը կարգավորելու երկու եղանակ.
- Հրեք. Հաճախորդի օրինակից տվյալները ուղարկվում են հեռավոր գործընթացների խմբի (RPG) միջոցով: Սերվերի օրինակում տվյալները ստացվում են մուտքագրման պորտի միջոցով
- Քաշեք. Սերվերը տվյալներ է ստանում RPG-ի միջոցով, հաճախորդը ուղարկում է Output port-ի միջոցով:
Գործարկման հոսքը պահվում է Apache ռեեստրում:
Apache NiFi Registry-ը Apache NiFi-ի ենթածրագր է, որն ապահովում է հոսքի պահպանման և տարբերակների վերահսկման գործիք: Մի տեսակ GIT: Տեղադրման, կազմաձևման և ռեեստրի հետ աշխատելու մասին տեղեկությունները կարելի է գտնել այստեղ
Սկզբում, երբ N-ը փոքր թիվ է, հոսքը մատակարարվում և թարմացվում է ձեռքով ընդունելի ժամանակում:
Բայց քանի որ N-ն աճում է, խնդիրներն ավելի շատ են դառնում.
- հոսքը թարմացնելու համար ավելի շատ ժամանակ է պահանջվում: Դուք պետք է մուտք գործեք բոլոր սերվերներ
- Կաղապարի թարմացման սխալներ են տեղի ունենում: Այստեղ թարմացրել են, բայց այստեղ մոռացել են
- մարդկային սխալները մեծ թվով նմանատիպ գործողություններ կատարելիս
Այս ամենը մեզ բերում է նրան, որ մենք պետք է ավտոմատացնենք գործընթացը։ Ես փորձեցի այս խնդիրը լուծելու հետևյալ ուղիները.
- Օգտագործեք MiNiFi-ի փոխարեն NiFi-ի փոխարեն
- NiFi CLI
- NiPyAPI
Օգտագործելով MiNiFi
Մեկ այլ ենթանախագծ կօգնի լուծել այս խնդիրը՝ MiNiFi C2 Server: Այս արտադրանքը նախատեսված է որպես կենտրոնական կետ կոնֆիգուրացիայի ներդրման ճարտարապետության մեջ: Ինչպես կարգավորել միջավայրը - նկարագրված է
Վերոնշյալ հոդվածում նկարագրված տարբերակը աշխատում է և դժվար չէ իրականացնել, բայց չպետք է մոռանալ հետևյալը.
- Minifi-ն չունի nifi-ի բոլոր պրոցեսորները
- Minifi պրոցեսորների տարբերակները զիջում են NiFi պրոցեսորների տարբերակներին:
Գրելու պահին NiFi-ի վերջին տարբերակը 1.9.2-ն է: MiniFi պրոցեսորի վերջին տարբերակը 1.7.0 է: Պրոցեսորները կարող են ավելացվել MiNiFi-ին, սակայն NiFi-ի և MiniFi-ի պրոցեսորների տարբերակների անհամապատասխանության պատճառով դա կարող է չաշխատել:
NiFi CLI
Դատելով
Գործարկեք կոմունալ ծրագիրը
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
Որպեսզի մենք ռեեստրից բեռնենք պահանջվող հոսքը, մենք պետք է իմանանք դույլի նույնացուցիչները (դույլի նույնացուցիչը) և ինքնին հոսքը (հոսքի նույնացուցիչը): Այս տվյալները կարելի է ձեռք բերել կամ cli-ի կամ NiFi ռեեստրի վեբ ինտերֆեյսի միջոցով: Վեբ ինտերֆեյսում այն ունի հետևյալ տեսքը.
CLI-ի միջոցով սա արվում է.
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Մենք սկսում ենք ներմուծել գործընթացի խումբ ռեեստրից.
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
Կարևոր կետ այն է, որ ցանկացած nifi օրինակ կարող է նշվել որպես հյուրընկալող, որի վրա մենք գլորում ենք գործընթացի խումբը:
Գործընթացների խումբն ավելացվել է դադարեցված պրոցեսորներով, դրանք պետք է սկսել
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Հիանալի է, պրոցեսորները սկսվել են: Այնուամենայնիվ, առաջադրանքի պայմանների համաձայն, մեզ անհրաժեշտ են NiFi ատյաններ՝ այլ ատյաններ տվյալներ ուղարկելու համար։ Ենթադրենք, որ դուք ընտրել եք Push մեթոդը՝ տվյալները սերվեր փոխանցելու համար։ Տվյալների փոխանցումը կազմակերպելու համար դուք պետք է ակտիվացնեք տվյալների փոխանցումը ավելացված Remote Process Group (RPG) վրա, որն արդեն ներառված է մեր հոսքում:
CLI-ի և այլ աղբյուրների փաստաթղթերում ես չեմ գտել տվյալների փոխանցումը միացնելու միջոց: Եթե գիտեք, թե ինչպես դա անել, գրեք մեկնաբանություններում։
Քանի որ մենք ունենք բաշ և պատրաստ ենք գնալ մինչև վերջ, ելք կգտնենք։ Այս խնդիրը լուծելու համար կարող եք օգտագործել NiFi API-ն: Եկեք օգտագործենք հետևյալ մեթոդը, վերը նշված օրինակներից վերցնենք ID-ն (մեր դեպքում դա 7f522a13-016e-1000-e504-d5b15587f2f3 է): NiFi API մեթոդների նկարագրությունը
Մարմնում դուք պետք է փոխանցեք JSON, այսպես.
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Պարամետրերը, որոնք պետք է լրացվեն, որպեսզի այն աշխատի.
էին - տվյալների փոխանցման կարգավիճակը: Հասանելի է՝ TRANSMITTING՝ տվյալների փոխանցումը միացնելու համար, STOPPED՝ անջատելու համար
տարբերակ - պրոցեսորի տարբերակ
տարբերակը լռելյայն կլինի 0, երբ ստեղծվի, բայց այս պարամետրերը կարելի է ձեռք բերել մեթոդի միջոցով
Բաշ սկրիպտների սիրահարների համար այս մեթոդը կարող է հարմար թվալ, բայց ինձ համար մի փոքր դժվար է. bash սցենարներն իմ սիրելին չեն: Հաջորդ մեթոդն իմ կարծիքով ավելի հետաքրքիր ու հարմար է։
NiPyAPI
NiPyAPI-ն Python գրադարան է NiFi-ի օրինակների հետ շփվելու համար:
Կազմաձևը տարածելու մեր սցենարը Python-ի ծրագիր է: Անցնենք կոդավորմանը։
Մենք կարգավորում ենք հետագա աշխատանքի համար: Մեզ անհրաժեշտ կլինեն հետևյալ պարամետրերը.
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
Հաջորդիվ կտեղադրեմ այս գրադարանի մեթոդների անվանումները, որոնք նկարագրված են
Միացրեք ռեեստրը nifi օրինակին, օգտագործելով
nipyapi.versioning.create_registry_client
Այս քայլում կարող եք նաև ավելացնել ստուգում, որ ռեեստրն արդեն ավելացվել է օրինակին, դրա համար կարող եք օգտագործել մեթոդը.
nipyapi.versioning.list_registry_clients
Մենք գտնում ենք դույլը զամբյուղում հոսքի հետագա որոնման համար
nipyapi.versioning.get_registry_bucket
Օգտագործելով գտնված դույլը, մենք փնտրում ենք հոսք
nipyapi.versioning.get_flow_in_bucket
Հաջորդը, կարևոր է հասկանալ, թե արդյոք այս գործընթացի խումբն արդեն ավելացվել է: Գործընթացների խումբը տեղադրվում է ըստ կոորդինատների և կարող է առաջանալ իրավիճակ, երբ երկրորդ բաղադրիչը դրվում է մեկի վրա: Ես ստուգեցի, դա կարող է պատահել :) Բոլոր ավելացված գործընթացային խմբերը ստանալու համար մենք օգտագործում ենք մեթոդը
nipyapi.canvas.list_all_process_groups
Մենք կարող ենք հետագայում որոնել, օրինակ, անունով:
Չեմ նկարագրի կաղապարի թարմացման գործընթացը, միայն կասեմ, որ եթե կաղապարի նոր տարբերակում ավելացվեն պրոցեսորներ, ապա հերթերում հաղորդագրությունների առկայության հետ կապված խնդիրներ չկան։ Բայց եթե պրոցեսորները հեռացվեն, ապա կարող են խնդիրներ առաջանալ (nifi-ն թույլ չի տալիս հեռացնել պրոցեսորը, եթե դրա դիմաց հաղորդագրությունների հերթ է կուտակվել): Եթե ձեզ հետաքրքրում է, թե ինչպես լուծեցի այս խնդիրը, խնդրում եմ գրեք ինձ, և մենք կքննարկենք այս հարցը։ Կոնտակտներ հոդվածի վերջում: Եկեք անցնենք գործընթացի խումբ ավելացնելու քայլին:
Սցենարը վրիպազերծելիս ես հանդիպեցի մի յուրահատկության, որ հոսքի վերջին տարբերակը միշտ չէ, որ վեր է հանվում, ուստի խորհուրդ եմ տալիս նախ ստուգել այս տարբերակը.
nipyapi.versioning.get_latest_flow_ver
Տեղակայման գործընթացի խումբ.
nipyapi.versioning.deploy_flow_version
Մենք սկսում ենք պրոցեսորները.
nipyapi.canvas.schedule_process_group
CLI-ի մասին բլոկում գրված էր, որ տվյալների փոխանցումը ավտոմատ միացված չէ հեռավոր պրոցեսների խմբում? Սցենարն իրականացնելիս ես նույնպես հանդիպեցի այս խնդրին։ Այդ ժամանակ ես չկարողացա սկսել տվյալների փոխանցումը API-ի միջոցով և որոշեցի գրել NiPyAPI գրադարանի մշակողին և խնդրել խորհուրդ/օգնություն: Մշակողը պատասխանեց ինձ, մենք քննարկեցինք խնդիրը և նա գրեց, որ իրեն ժամանակ է պետք «ինչ-որ բան ստուգելու համար»: Եվ հետո մի երկու օր հետո գալիս է նամակ, որտեղ Python-ում գրված է մի ֆունկցիա, որը լուծում է իմ գործարկման խնդիրը!!! Այն ժամանակ NiPyAPI-ի տարբերակը 0.13.3 էր ու, բնականաբար, նման բան չկար։ Բայց բոլորովին վերջերս թողարկված 0.14.0 տարբերակում այս ֆունկցիան արդեն ներառված էր գրադարանում։ Հանդիպել,
nipyapi.canvas.set_remote_process_group_transmission
Այսպիսով, օգտագործելով NiPyAPI գրադարանը, մենք միացրինք ռեեստրը, շրջանառության մեջ դրեցինք հոսքը և նույնիսկ սկսեցինք պրոցեսորներ և տվյալների փոխանցում: Այնուհետև կարող եք սանրել ծածկագիրը, ավելացնել բոլոր տեսակի ստուգումներ, գրանցումներ և վերջ: Բայց դա բոլորովին այլ պատմություն է:
Իմ դիտարկած ավտոմատացման տարբերակներից վերջինն ինձ թվում էր ամենաարդյունավետը: Նախ, սա դեռ python կոդ է, որի մեջ կարող եք տեղադրել օժանդակ ծրագրի կոդը և օգտվել ծրագրավորման լեզվի բոլոր առավելություններից: Երկրորդ, NiPyAPI նախագիծը ակտիվորեն զարգանում է, և խնդիրների դեպքում կարող եք գրել ծրագրավորողին: Երրորդ, NiPyAPI-ն դեռևս ավելի ճկուն գործիք է NiFi-ի հետ շփվելու բարդ խնդիրների լուծման համար: Օրինակ՝ որոշելու, թե արդյոք հաղորդագրության հերթերն այժմ դատարկ են հոսքի մեջ, և արդյոք գործընթացի խումբը կարող է թարմացվել:
Այսքանը: Ես նկարագրեցի NiFi-ում հոսքի մատակարարման ավտոմատացման 3 մոտեցում, որոգայթներ, որոնց կարող է հանդիպել մշակողը, և տրամադրեցի աշխատանքային կոդ՝ առաքումն ավտոմատացնելու համար: Եթե դուք նույնքան հետաքրքրված եք այս թեմայով, որքան ես,
Source: www.habr.com