Plūsmas piegādes automatizācija Apache NiFi

Sveiki visiem!

Plūsmas piegādes automatizācija Apache NiFi

Uzdevums ir šāds - ir plūsma, kas parādīta attēlā augstāk, kuru nepieciešams izrullēt uz N serveriem ar Apache NiFi. Plūsmas pārbaude — fails tiek ģenerēts un nosūtīts uz citu NiFi gadījumu. Datu pārsūtīšana notiek, izmantojot NiFi Site to Site protokolu.

NiFi vietne uz vietni (S2S) ir drošs, viegli konfigurējams veids, kā pārsūtīt datus starp NiFi gadījumiem. Kā darbojas S2S, skatiet dokumentācija un ir svarīgi neaizmirst konfigurēt NiFi gadījumu, lai atļautu S2S, sk šeit.

Gadījumos, kad mēs runājam par datu pārsūtīšanu, izmantojot S2S, vienu gadījumu sauc par klientu, otru sauc par serveri. Klients sūta datus, serveris saņem. Divi veidi, kā konfigurēt datu pārraidi starp tām:

  1. Push. No klienta instances dati tiek nosūtīti, izmantojot attālo procesu grupu (RPG). Servera instancē dati tiek saņemti, izmantojot ievades portu
  2. Vilkt. Serveris saņem datus, izmantojot RPG, klients sūta, izmantojot Output portu.


Izklāšanas plūsma tiek glabāta Apache reģistrā.

Apache NiFi Registry ir Apache NiFi apakšprojekts, kas nodrošina plūsmas glabāšanas un versiju kontroles rīku. Sava veida GIT. Informāciju par reģistra instalēšanu, konfigurēšanu un darbu ar to var atrast oficiālā dokumentācija. Uzglabāšanas plūsma tiek apvienota procesu grupā un tiek saglabāta šādā formā reģistrā. Mēs pie tā atgriezīsimies vēlāk rakstā.

Sākumā, kad N ir mazs skaitlis, plūsma tiek piegādāta un manuāli atjaunināta pieņemamā laikā.

Bet, pieaugot N, problēmu kļūst arvien vairāk:

  1. plūsmas atjaunināšana prasa vairāk laika. Jums ir jāpiesakās visos serveros
  2. Veidnes atjaunināšanas kļūdas. Šeit viņi to atjaunināja, bet šeit viņi aizmirsa
  3. cilvēka kļūdas, veicot lielu skaitu līdzīgu darbību

Tas viss noved pie tā, ka mums ir jāautomatizē process. Es mēģināju šādus veidus, kā atrisināt šo problēmu:

  1. Izmantojiet MiNiFi, nevis NiFi
  2. NiFi CLI
  3. NiPyAPI

Izmantojot MiNiFi

Apache MiNiFy - Apache NiFi apakšprojekts. MiNiFy ir kompakts aģents, kas izmanto tos pašus procesorus kā NiFi, ļaujot jums izveidot tādas pašas plūsmas kā NiFi. Aģenta vieglais raksturs cita starpā tiek panākts ar to, ka MiNiFy nav grafiskā interfeisa plūsmas konfigurēšanai. Grafiskā interfeisa trūkums programmā MiNiFy nozīmē, ka ir jāatrisina plūsmas piegādes problēma uz minifi. Tā kā MiNiFy aktīvi tiek izmantots IOT, ir daudz komponentu, un plūsmas piegādes process galīgajām minifi instancēm ir jāautomatizē. Pazīstams uzdevums, vai ne?

Vēl viens apakšprojekts palīdzēs atrisināt šo problēmu - MiNiFi C2 Server. Šis produkts ir paredzēts kā centrālais punkts konfigurācijas izlaišanas arhitektūrā. Kā konfigurēt vidi - aprakstīts sadaļā Šis raksts Ir pietiekami daudz informācijas par Habré, lai atrisinātu problēmu. MiNiFi kopā ar C2 serveri automātiski atjaunina tā konfigurāciju. Vienīgais šīs pieejas trūkums ir tas, ka jums ir jāizveido veidnes C2 serverī; ar vienkāršu reģistra apņemšanos nepietiek.

Iepriekš rakstā aprakstītā opcija darbojas un nav grūti īstenojama, taču mēs nedrīkstam aizmirst:

  1. Minifi nav visi nifi procesori
  2. Minifi procesoru versijas atpaliek no NiFi procesora versijām.

Rakstīšanas laikā jaunākā NiFi versija ir 1.9.2. Jaunākā MiNiFi procesora versija ir 1.7.0. Procesorus var pievienot MiNiFi, taču NiFi un MiNiFi procesoru versiju neatbilstības dēļ tas var nedarboties.

NiFi CLI

Spriežot pēc apraksts rīks oficiālajā vietnē, tas ir rīks NiFI un NiFi reģistra mijiedarbības automatizēšanai plūsmas piegādes vai procesu pārvaldības jomā. Lai sāktu, jums ir jālejupielādē šis rīks. tātad.

Palaidiet utilītu

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Lai mēs varētu ielādēt nepieciešamo plūsmu no reģistra, mums ir jāzina segmenta identifikatori (kopas identifikators) un pati plūsma (plūsmas identifikators). Šos datus var iegūt, izmantojot cli vai NiFi reģistra tīmekļa saskarni. Tīmekļa saskarnē tas izskatās šādi:

Plūsmas piegādes automatizācija Apache NiFi

Izmantojot CLI, tas tiek darīts:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Mēs sākam importēt procesu grupu no reģistra:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Svarīgi ir tas, ka jebkuru nifi gadījumu var norādīt kā resursdatoru, uz kuru mēs pārvēršam procesu grupu.

Procesu grupa ir pievienota ar apturētiem procesoriem, tie ir jāsāk

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Lieliski, procesori ir sākušies. Tomēr saskaņā ar uzdevuma noteikumiem mums ir nepieciešami NiFi gadījumi, lai nosūtītu datus uz citiem gadījumiem. Pieņemsim, ka datu pārsūtīšanai uz serveri esat izvēlējies Push metodi. Lai organizētu datu pārsūtīšanu, ir jāiespējo datu pārsūtīšana pievienotajā Remote Process Group (RPG), kas jau ir iekļauta mūsu plūsmā.

Plūsmas piegādes automatizācija Apache NiFi

CLI un citos avotos esošajā dokumentācijā es neatradu veidu, kā iespējot datu pārsūtīšanu. Ja zināt, kā to izdarīt, lūdzu, rakstiet komentāros.

Tā kā mums ir bash un esam gatavi iet līdz galam, tad atradīsim izeju! Lai atrisinātu šo problēmu, varat izmantot NiFi API. Izmantosim šādu metodi, ņemsim ID no iepriekš minētajiem piemēriem (mūsu gadījumā tas ir 7f522a13-016e-1000-e504-d5b15587f2f3). NiFi API metožu apraksts šeit.

Plūsmas piegādes automatizācija Apache NiFi
Pamattekstā jums ir jānodod JSON, piemēram:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Parametri, kas jāaizpilda, lai tas darbotos:
bija — datu pārsūtīšanas statuss. Pieejams: TRANSMITTING, lai iespējotu datu pārsūtīšanu, STOPPED, lai atspējotu
versija - procesora versija

versijas izveides laikā noklusējuma vērtība būs 0, taču šos parametrus var iegūt, izmantojot metodi

Plūsmas piegādes automatizācija Apache NiFi

Bash skriptu cienītājiem šī metode var šķist piemērota, taču man tā ir nedaudz sarežģīta - bash skripti nav mans mīļākais. Nākamā metode, manuprāt, ir interesantāka un ērtāka.

NiPyAPI

NiPyAPI ir Python bibliotēka mijiedarbībai ar NiFi gadījumiem. Dokumentācijas lapa satur nepieciešamo informāciju darbam ar bibliotēku. Ātrais sākums ir aprakstīts projektu vietnē github.

Mūsu skripts konfigurācijas izvēršanai ir programma Python. Pāriesim pie kodēšanas.
Mēs iestatījām konfigurācijas turpmākajam darbam. Mums būs nepieciešami šādi parametri:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Tālāk es ievietošu šīs bibliotēkas metožu nosaukumus, kas ir aprakstīti šeit.

Savienojiet reģistru ar nifi gadījumu, izmantojot

nipyapi.versioning.create_registry_client

Šajā darbībā varat arī pievienot pārbaudi, vai reģistrs jau ir pievienots instancei; šim nolūkam varat izmantot metodi

nipyapi.versioning.list_registry_clients

Mēs atrodam spaini tālākai plūsmas meklēšanai grozā

nipyapi.versioning.get_registry_bucket

Izmantojot atrasto spaini, mēs meklējam plūsmu

nipyapi.versioning.get_flow_in_bucket

Tālāk ir svarīgi saprast, vai šī procesa grupa jau ir pievienota. Procesu grupa tiek novietota pēc koordinātām, un var rasties situācija, kad vienai virsū tiek uzlikts otrs komponents. Es pārbaudīju, tas var notikt :) Lai iegūtu visas pievienotās procesu grupas, mēs izmantojam metodi

nipyapi.canvas.list_all_process_groups

Mēs varam tālāk meklēt, piemēram, pēc nosaukuma.

Veidnes atjaunināšanas procesu neaprakstīšu, teikšu tikai to, ja jaunajā veidnes versijā ir pievienoti procesori, tad ar ziņojumu klātbūtni rindās nav problēmu. Bet, ja procesori tiek noņemti, var rasties problēmas (nifi neļauj noņemt procesoru, ja tā priekšā ir uzkrājusies ziņojumu rinda). Ja jūs interesē, kā es atrisināju šo problēmu, lūdzu, rakstiet man un mēs apspriedīsim šo jautājumu. Kontakti raksta beigās. Pāriesim pie procesa grupas pievienošanas.

Atkļūdojot skriptu, saskāros ar īpatnību, ka ne vienmēr tiek uzvilkta jaunākā plūsmas versija, tāpēc iesaku vispirms pārbaudīt šo versiju:

nipyapi.versioning.get_latest_flow_ver

Izvietot procesu grupu:

nipyapi.versioning.deploy_flow_version

Mēs sākam procesorus:

nipyapi.canvas.schedule_process_group

Blokā par CLI bija rakstīts, ka attālo procesu grupā datu pārsūtīšana netiek automātiski iespējota? Īstenojot skriptu, arī es saskāros ar šo problēmu. Tobrīd nevarēju uzsākt datu pārsūtīšanu, izmantojot API, un nolēmu rakstīt NiPyAPI bibliotēkas izstrādātājam un lūgt padomu/palīdzību. Izstrādātājs man atbildēja, mēs apspriedām problēmu, un viņš rakstīja, ka viņam ir vajadzīgs laiks, lai "kaut ko pārbaudītu". Un tad pēc pāris dienām pienāk vēstule, kurā Python ir ierakstīta funkcija, kas atrisina manu palaišanas problēmu!!! Toreiz NiPyAPI versija bija 0.13.3 un, protams, nekā tāda nebija. Bet versijā 0.14.0, kas tika izlaista pavisam nesen, šī funkcija jau bija iekļauta bibliotēkā. Iepazīstieties,

nipyapi.canvas.set_remote_process_group_transmission

Tātad, izmantojot NiPyAPI bibliotēku, mēs savienojām reģistru, izvērsām plūsmu un pat sākām procesorus un datu pārsūtīšanu. Pēc tam varat ķemmēt kodu, pievienot visu veidu pārbaudes, reģistrēšanu, un tas arī viss. Bet tas ir pavisam cits stāsts.

No manis apsvērtajām automatizācijas iespējām pēdējā man šķita visefektīvākā. Pirmkārt, tas joprojām ir python kods, kurā varat iegult palīgprogrammas kodu un izmantot visas programmēšanas valodas priekšrocības. Otrkārt, NiPyAPI projekts aktīvi attīstās un problēmu gadījumā varat rakstīt izstrādātājam. Treškārt, NiPyAPI joprojām ir elastīgāks rīks mijiedarbībai ar NiFi sarežģītu problēmu risināšanā. Piemēram, nosakot, vai ziņojumu rindas tagad plūsmā ir tukšas un vai procesu grupu var atjaunināt.

Tas ir viss. Es aprakstīju 3 pieejas plūsmas piegādes automatizēšanai NiFi, nepilnības, ar kurām var saskarties izstrādātājs, un nodrošināju darba kodu piegādes automatizēšanai. Ja šī tēma jūs interesē tikpat kā mani - raksti!

Avots: www.habr.com

Pievieno komentāru