Sveiki visiem!
Uzdevums ir šāds - ir plūsma, kas parādīta attēlā augstāk, kuru nepieciešams izrullēt uz N serveriem ar
NiFi vietne uz vietni (S2S) ir drošs, viegli konfigurējams veids, kā pārsūtīt datus starp NiFi gadījumiem. Kā darbojas S2S, skatiet
Gadījumos, kad mēs runājam par datu pārsūtīšanu, izmantojot S2S, vienu gadījumu sauc par klientu, otru sauc par serveri. Klients sūta datus, serveris saņem. Divi veidi, kā konfigurēt datu pārraidi starp tām:
- Push. No klienta instances dati tiek nosūtīti, izmantojot attālo procesu grupu (RPG). Servera instancē dati tiek saņemti, izmantojot ievades portu
- Vilkt. Serveris saņem datus, izmantojot RPG, klients sūta, izmantojot Output portu.
Izklāšanas plūsma tiek glabāta Apache reģistrā.
Apache NiFi Registry ir Apache NiFi apakšprojekts, kas nodrošina plūsmas glabāšanas un versiju kontroles rīku. Sava veida GIT. Informāciju par reģistra instalēšanu, konfigurēšanu un darbu ar to var atrast
Sākumā, kad N ir mazs skaitlis, plūsma tiek piegādāta un manuāli atjaunināta pieņemamā laikā.
Bet, pieaugot N, problēmu kļūst arvien vairāk:
- plūsmas atjaunināšana prasa vairāk laika. Jums ir jāpiesakās visos serveros
- Veidnes atjaunināšanas kļūdas. Šeit viņi to atjaunināja, bet šeit viņi aizmirsa
- cilvēka kļūdas, veicot lielu skaitu līdzīgu darbību
Tas viss noved pie tā, ka mums ir jāautomatizē process. Es mēģināju šādus veidus, kā atrisināt šo problēmu:
- Izmantojiet MiNiFi, nevis NiFi
- NiFi CLI
- NiPyAPI
Izmantojot MiNiFi
Vēl viens apakšprojekts palīdzēs atrisināt šo problēmu - MiNiFi C2 Server. Šis produkts ir paredzēts kā centrālais punkts konfigurācijas izlaišanas arhitektūrā. Kā konfigurēt vidi - aprakstīts sadaļā
Iepriekš rakstā aprakstītā opcija darbojas un nav grūti īstenojama, taču mēs nedrīkstam aizmirst:
- Minifi nav visi nifi procesori
- Minifi procesoru versijas atpaliek no NiFi procesora versijām.
Rakstīšanas laikā jaunākā NiFi versija ir 1.9.2. Jaunākā MiNiFi procesora versija ir 1.7.0. Procesorus var pievienot MiNiFi, taču NiFi un MiNiFi procesoru versiju neatbilstības dēļ tas var nedarboties.
NiFi CLI
Spriežot pēc
Palaidiet utilītu
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
Lai mēs varētu ielādēt nepieciešamo plūsmu no reģistra, mums ir jāzina segmenta identifikatori (kopas identifikators) un pati plūsma (plūsmas identifikators). Šos datus var iegūt, izmantojot cli vai NiFi reģistra tīmekļa saskarni. Tīmekļa saskarnē tas izskatās šādi:
Izmantojot CLI, tas tiek darīts:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Mēs sākam importēt procesu grupu no reģistra:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
Svarīgi ir tas, ka jebkuru nifi gadījumu var norādīt kā resursdatoru, uz kuru mēs pārvēršam procesu grupu.
Procesu grupa ir pievienota ar apturētiem procesoriem, tie ir jāsāk
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Lieliski, procesori ir sākušies. Tomēr saskaņā ar uzdevuma noteikumiem mums ir nepieciešami NiFi gadījumi, lai nosūtītu datus uz citiem gadījumiem. Pieņemsim, ka datu pārsūtīšanai uz serveri esat izvēlējies Push metodi. Lai organizētu datu pārsūtīšanu, ir jāiespējo datu pārsūtīšana pievienotajā Remote Process Group (RPG), kas jau ir iekļauta mūsu plūsmā.
CLI un citos avotos esošajā dokumentācijā es neatradu veidu, kā iespējot datu pārsūtīšanu. Ja zināt, kā to izdarīt, lūdzu, rakstiet komentāros.
Tā kā mums ir bash un esam gatavi iet līdz galam, tad atradīsim izeju! Lai atrisinātu šo problēmu, varat izmantot NiFi API. Izmantosim šādu metodi, ņemsim ID no iepriekš minētajiem piemēriem (mūsu gadījumā tas ir 7f522a13-016e-1000-e504-d5b15587f2f3). NiFi API metožu apraksts
Pamattekstā jums ir jānodod JSON, piemēram:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Parametri, kas jāaizpilda, lai tas darbotos:
bija — datu pārsūtīšanas statuss. Pieejams: TRANSMITTING, lai iespējotu datu pārsūtīšanu, STOPPED, lai atspējotu
versija - procesora versija
versijas izveides laikā noklusējuma vērtība būs 0, taču šos parametrus var iegūt, izmantojot metodi
Bash skriptu cienītājiem šī metode var šķist piemērota, taču man tā ir nedaudz sarežģīta - bash skripti nav mans mīļākais. Nākamā metode, manuprāt, ir interesantāka un ērtāka.
NiPyAPI
NiPyAPI ir Python bibliotēka mijiedarbībai ar NiFi gadījumiem.
Mūsu skripts konfigurācijas izvēršanai ir programma Python. Pāriesim pie kodēšanas.
Mēs iestatījām konfigurācijas turpmākajam darbam. Mums būs nepieciešami šādi parametri:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
Tālāk es ievietošu šīs bibliotēkas metožu nosaukumus, kas ir aprakstīti
Savienojiet reģistru ar nifi gadījumu, izmantojot
nipyapi.versioning.create_registry_client
Šajā darbībā varat arī pievienot pārbaudi, vai reģistrs jau ir pievienots instancei; šim nolūkam varat izmantot metodi
nipyapi.versioning.list_registry_clients
Mēs atrodam spaini tālākai plūsmas meklēšanai grozā
nipyapi.versioning.get_registry_bucket
Izmantojot atrasto spaini, mēs meklējam plūsmu
nipyapi.versioning.get_flow_in_bucket
Tālāk ir svarīgi saprast, vai šī procesa grupa jau ir pievienota. Procesu grupa tiek novietota pēc koordinātām, un var rasties situācija, kad vienai virsū tiek uzlikts otrs komponents. Es pārbaudīju, tas var notikt :) Lai iegūtu visas pievienotās procesu grupas, mēs izmantojam metodi
nipyapi.canvas.list_all_process_groups
Mēs varam tālāk meklēt, piemēram, pēc nosaukuma.
Veidnes atjaunināšanas procesu neaprakstīšu, teikšu tikai to, ja jaunajā veidnes versijā ir pievienoti procesori, tad ar ziņojumu klātbūtni rindās nav problēmu. Bet, ja procesori tiek noņemti, var rasties problēmas (nifi neļauj noņemt procesoru, ja tā priekšā ir uzkrājusies ziņojumu rinda). Ja jūs interesē, kā es atrisināju šo problēmu, lūdzu, rakstiet man un mēs apspriedīsim šo jautājumu. Kontakti raksta beigās. Pāriesim pie procesa grupas pievienošanas.
Atkļūdojot skriptu, saskāros ar īpatnību, ka ne vienmēr tiek uzvilkta jaunākā plūsmas versija, tāpēc iesaku vispirms pārbaudīt šo versiju:
nipyapi.versioning.get_latest_flow_ver
Izvietot procesu grupu:
nipyapi.versioning.deploy_flow_version
Mēs sākam procesorus:
nipyapi.canvas.schedule_process_group
Blokā par CLI bija rakstīts, ka attālo procesu grupā datu pārsūtīšana netiek automātiski iespējota? Īstenojot skriptu, arī es saskāros ar šo problēmu. Tobrīd nevarēju uzsākt datu pārsūtīšanu, izmantojot API, un nolēmu rakstīt NiPyAPI bibliotēkas izstrādātājam un lūgt padomu/palīdzību. Izstrādātājs man atbildēja, mēs apspriedām problēmu, un viņš rakstīja, ka viņam ir vajadzīgs laiks, lai "kaut ko pārbaudītu". Un tad pēc pāris dienām pienāk vēstule, kurā Python ir ierakstīta funkcija, kas atrisina manu palaišanas problēmu!!! Toreiz NiPyAPI versija bija 0.13.3 un, protams, nekā tāda nebija. Bet versijā 0.14.0, kas tika izlaista pavisam nesen, šī funkcija jau bija iekļauta bibliotēkā. Iepazīstieties,
nipyapi.canvas.set_remote_process_group_transmission
Tātad, izmantojot NiPyAPI bibliotēku, mēs savienojām reģistru, izvērsām plūsmu un pat sākām procesorus un datu pārsūtīšanu. Pēc tam varat ķemmēt kodu, pievienot visu veidu pārbaudes, reģistrēšanu, un tas arī viss. Bet tas ir pavisam cits stāsts.
No manis apsvērtajām automatizācijas iespējām pēdējā man šķita visefektīvākā. Pirmkārt, tas joprojām ir python kods, kurā varat iegult palīgprogrammas kodu un izmantot visas programmēšanas valodas priekšrocības. Otrkārt, NiPyAPI projekts aktīvi attīstās un problēmu gadījumā varat rakstīt izstrādātājam. Treškārt, NiPyAPI joprojām ir elastīgāks rīks mijiedarbībai ar NiFi sarežģītu problēmu risināšanā. Piemēram, nosakot, vai ziņojumu rindas tagad plūsmā ir tukšas un vai procesu grupu var atjaunināt.
Tas ir viss. Es aprakstīju 3 pieejas plūsmas piegādes automatizēšanai NiFi, nepilnības, ar kurām var saskarties izstrādātājs, un nodrošināju darba kodu piegādes automatizēšanai. Ja šī tēma jūs interesē tikpat kā mani -
Avots: www.habr.com