Hola a todos!
Zeregin hau da: fluxu bat dago, goiko irudian aurkezten dena, N zerbitzarietara zabaldu behar dena.
NiFi Site to Site (S2S) NiFi instantzien artean datuak transferitzeko modu seguru eta erraz konfiguragarria da. S2S nola funtzionatzen duen, ikusi
S2S erabiliz datuen transferentziari buruz ari garen kasuetan, instantzia bati bezeroa deitzen zaio, bigarren zerbitzariari. Bezeroak datuak bidaltzen ditu, zerbitzariak jasotzen ditu. Bien arteko datu-transferentzia konfiguratzeko bi modu:
- Bultza. Bezeroaren instantziatik, datuak Urruneko Prozesu Taldea (RPG) erabiliz bidaltzen dira. Zerbitzariaren instantzian, datuak sarrerako ataka erabiliz jasotzen dira
- Pull. Zerbitzariak datuak RPG erabiliz jasotzen ditu, bezeroak Irteera ataka erabiliz bidaltzen ditu.
Abian jartzeko fluxua Apache Erregistroan gordetzen da.
Apache NiFi Registry Apache NiFiren azpiproiektua da, fluxua biltegiratzeko eta bertsioen kontrolarako tresna eskaintzen duena. GIT moduko bat. Erregistroarekin instalatzeari, konfiguratzeari eta lan egiteari buruzko informazioa hemen aurki daiteke
Hasieran, N zenbaki txikia denean, fluxua eskuz entregatu eta eguneratzen da denbora onargarrian.
Baina N hazi ahala, arazoak ugariagoak dira:
- denbora gehiago behar da fluxua eguneratzeko. Zerbitzari guztietan saioa hasi behar duzu
- Txantiloiak eguneratzean erroreak gertatzen dira. Hemen eguneratu dute, baina hemen ahaztu egin dira
- giza akatsak antzeko eragiketa ugari egitean
Horrek guztiak prozesua automatizatu behar dugula ezagutzera garamatza. Arazo hau konpontzeko modu hauek probatu ditut:
- Erabili MiNiFi NiFiren ordez
- NiFi CLI
- NiPyAPI
MiNiFi erabiliz
Beste azpiproiektu batek arazo hau konpontzen lagunduko du - MiNiFi C2 Server. Produktu hau konfigurazio-inplementazio-arkitekturako puntu nagusia izan nahi du. Nola konfiguratu ingurunea - atalean deskribatua
Goiko artikuluan deskribatutako aukera funtzionatzen du eta ez da zaila ezartzeko, baina ez dugu ahaztu behar honako hau:
- Minifi-k ez ditu nifi-ren prozesadore guztiak
- Minifi prozesadorearen bertsioak NiFi prozesadoreen bertsioen atzean geratzen dira.
Idazteko unean, NiFiren azken bertsioa 1.9.2 da. MiNiFi prozesadorearen azken bertsioa 1.7.0 da. MiNiFi-ra prozesadoreak gehi daitezke, baina NiFi eta MiNiFi prozesadoreen arteko bertsio-desberdintasunen ondorioz, baliteke honek ez funtzionatzea.
NiFi CLI
arabera
Abiarazi utilitatea
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
Erregistrotik beharrezkoa den fluxua kargatzeko, kuboaren identifikatzaileak (ontziaren identifikatzailea) eta fluxua bera (fluxuaren identifikatzailea) ezagutu behar ditugu. Datu hauek cli bidez edo NiFi erregistroko web interfazean lor daitezke. Web interfazean honela ikusten da:
CLI erabiliz hau egiten da:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Prozesu taldea erregistrotik inportatzen hasten gara:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
Puntu garrantzitsu bat da edozein nifi instantzia prozesu-taldea eramango dugun ostalari gisa zehaztu daitekeela.
Prozesu taldea gelditutako prozesadoreekin gehitu da, hasi behar dira
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Bikaina, prozesadoreak hasi dira. Hala ere, zereginaren baldintzen arabera, NiFi instantziak behar ditugu datuak beste instantzia batzuetara bidaltzeko. Demagun Push metodoa aukeratu duzula zerbitzariari datuak transferitzeko. Datu-transferentzia antolatzeko, datu-transferentzia gaitu behar duzu gehitutako Remote Process Group (RPG), dagoeneko gure fluxuan sartuta dagoena.
CLI-ko eta beste iturri batzuetako dokumentazioan, ez nuen datuen transferentzia gaitzeko modurik aurkitu. Hau nola egin badakizu, mesedez idatzi iruzkinetan.
Bash daukagunez eta amaierara joateko prest gaudenez, aterabide bat aurkituko dugu! NiFi APIa erabil dezakezu arazo hau konpontzeko. Erabili dezagun metodo hau, har dezagun goiko adibideetako IDa (gure kasuan 7f522a13-016e-1000-e504-d5b15587f2f3 da). NiFi API metodoen deskribapena
Gorputzean JSON pasatu behar duzu, honela:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Funtzionatzeko bete beharreko parametroak:
egoera β Datu transferentziaren egoera. Eskuragarri: TRANSMITTING datuen transferentzia gaitzeko, STOPED desgaitzeko
bertsioa - prozesadorearen bertsioa
bertsioa 0 izango da lehenetsita sortzen denean, baina parametro hauek metodoa erabiliz lor daitezke
Bash scripten zaleentzat, metodo hau egokia iruditu daiteke, baina pixka bat zaila da niretzat - bash scriptak ez dira nire gogokoenak. Hurrengo metodoa interesgarriagoa eta erosoagoa da nire ustez.
NiPyAPI
NiPyAPI NiFi instantziekin elkarreragiteko Python liburutegi bat da.
Konfigurazioa zabaltzeko gure scripta Python-eko programa bat da. Goazen kodeketara.
Lan gehiago egiteko konfigurazioak konfiguratzen ditugu. Parametro hauek beharko ditugu:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #ΠΏΡΡΡ Π΄ΠΎ nifi-api ΠΈΠ½ΡΡΠ°Π½ΡΠ°, Π½Π° ΠΊΠΎΡΠΎΡΠΎΠΌ ΡΠ°Π·Π²ΠΎΡΠ°ΡΠΈΠ²Π°Π΅ΠΌ process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #ΠΏΡΡΡ Π΄ΠΎ nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #Π½Π°Π·Π²Π°Π½ΠΈΠ΅ registry, ΠΊΠ°ΠΊ Π±ΡΠ΄Π΅Ρ Π½Π°Π·ΡΠ²Π°ΡΡΡΡ Π² ΠΈΠ½ΡΡΠ°Π½ΡΠ΅ nifi
nipyapi.config.bucket_name = 'BucketName' #Π½Π°Π·Π²Π°Π½ΠΈΠ΅ bucket, ΠΈΠ· ΠΊΠΎΡΠΎΡΠΎΠ³ΠΎ ΠΏΠΎΠ΄ΡΡΠ³ΠΈΠ²Π°Π΅ΠΌ flow
nipyapi.config.flow_name = 'FlowName' #Π½Π°Π·Π²Π°Π½ΠΈΠ΅ flow, ΠΊΠΎΡΠΎΡΠΎΠ΅ ΠΏΠΎΠ΄ΡΡΠ³ΠΈΠ²Π°Π΅ΠΌ
Jarraian, liburutegi honen metodoen izenak txertatuko ditut, deskribatzen direnak
Konektatu erregistroa nifi instantziara erabiliz
nipyapi.versioning.create_registry_client
Urrats honetan, erregistroa instantzian dagoeneko gehitu dela egiaztatzea ere gehi dezakezu; horretarako metodoa erabil dezakezu
nipyapi.versioning.list_registry_clients
Saskian fluxua gehiago bilatzeko kuboa aurkituko dugu
nipyapi.versioning.get_registry_bucket
Aurkitutako ontzia erabiliz, fluxua bilatzen dugu
nipyapi.versioning.get_flow_in_bucket
Ondoren, garrantzitsua da prozesu-talde hori dagoeneko gehitu den ulertzea. Prozesu taldea koordenatuen arabera kokatzen da eta egoera bat sor daiteke bigarren osagai bat baten gainean gainjartzean. Egiaztatu dut, hau gerta daiteke :) Gehitutako prozesu-talde guztiak lortzeko metodoa erabiltzen dugu
nipyapi.canvas.list_all_process_groups
Gehiago bilatu dezakegu, adibidez, izenaren arabera.
Ez dut txantiloia eguneratzeko prozesua deskribatuko, soilik esango dut txantiloiaren bertsio berrian prozesadoreak gehitzen badira, ez dagoela arazorik ilaretan mezuak egotearekin. Baina prozesadoreak kentzen badira, arazoak sor daitezke (nifi-k ez dizu prozesadore bat kentzen uzten, horren aurrean mezu-ilara bat pilatu bada). Arazo hau nola konpondu dudan interesatzen bazaizu, idatzi iezadazu eta arazo hau eztabaidatuko dugu. Kontaktuak artikuluaren amaieran. Goazen prozesu-talde bat gehitzeko urratsera.
Script-a araztean, fluxuaren azken bertsioa beti ez dela ateratzen berezitasun batekin egin nuen topo, beraz bertsio hau lehenbailehen egiaztatzea gomendatzen dut:
nipyapi.versioning.get_latest_flow_ver
Inplementatu prozesu taldea:
nipyapi.versioning.deploy_flow_version
Prozesadoreak abiarazten ditugu:
nipyapi.canvas.schedule_process_group
CLIri buruzko blokean idatzi zen datuen transferentzia ez dela automatikoki gaituta urruneko prozesu taldean? Scripta ezartzerakoan, arazo hau ere aurkitu dut. Garai hartan, ezin izan nuen APIa erabiliz datuen transferentzia hasi eta NiPyAPI liburutegiaren garatzaileari idaztea erabaki nuen eta aholku/laguntza eskatzea. Garatzaileak erantzun zidan, arazoaz eztabaidatu genuen eta "zerbait egiaztatzeko" denbora behar zuela idatzi zuen. Eta gero, pare bat egun geroago, gutun bat iristen da eta bertan Python-en idatzitako funtzio bat nire abiarazte arazoa konpontzen da!!! Garai hartan, NiPyAPI bertsioa 0.13.3 zen eta, noski, ez zegoen horrelakorik. Baina duela gutxi kaleratu zen 0.14.0 bertsioan, funtzio hau liburutegian sartuta zegoen jada. Ezagutu,
nipyapi.canvas.set_remote_process_group_transmission
Beraz, NiPyAPI liburutegia erabiliz, erregistroa konektatu genuen, fluxua zabaldu genuen eta prozesadoreak eta datu-transferentziak ere hasi genituen. Ondoren, kodea orraztu, era guztietako egiaztapenak gehitu, saioa erregistratu eta kito. Baina hori guztiz bestelako istorio bat da.
Kontuan izan ditudan automatizazio aukeren artean, azkena iruditu zait eraginkorrena. Lehenik eta behin, hau python kodea da oraindik, eta programa-kode laguntzailea txertatu dezakezu eta programazio-lengoaiaren abantaila guztiak aprobetxatu. Bigarrenik, NiPyAPI proiektua aktiboki garatzen ari da eta arazoak izanez gero garatzaileari idatzi dezakezu. Hirugarrenik, NiPyAPI tresna malguagoa da oraindik NiFirekin elkarreraginean arazo konplexuak konpontzeko. Adibidez, mezu-ilarak fluxuan hutsik dauden eta prozesu-taldea eguneratu daitekeen zehaztean.
Hori da dena. NiFi-n fluxu-entrega automatizatzeko 3 ikuspegi deskribatu nituen, garatzaile batek aurki ditzakeen akatsak, eta entrega automatizatzeko lan-kodea eman nuen. Gai hau ni bezain interesatzen bazaizu -
Iturria: www.habr.com