Pozdrav svima!
Zadatak je sljedeći - postoji tok, prikazan na gornjoj slici, koji je potrebno razvući na N servera sa
NiFi Site to Site (S2S) je siguran, lako podesiv način za prijenos podataka između NiFi instanci. Kako S2S radi, pogledajte
U slučajevima kada je riječ o prijenosu podataka pomoću S2S-a, jedna instanca se zove klijent, druga server. Klijent šalje podatke, server prima. Dva načina za konfiguriranje prijenosa podataka između njih:
- Guranje. Iz instance klijenta, podaci se šalju pomoću Remote Process Group (RPG). Na instanci servera, podaci se primaju pomoću ulaznog porta
- povući. Server prima podatke koristeći RPG, klijent šalje koristeći izlazni port.
Tok za uvođenje je pohranjen u Apache Registry.
Apache NiFi Registry je podprojekat Apache NiFi koji pruža alat za skladištenje toka i kontrolu verzija. Neka vrsta GIT-a. Informacije o instalaciji, konfiguraciji i radu sa registrom možete pronaći u
Na početku, kada je N mali broj, protok se isporučuje i ažurira ručno u prihvatljivom vremenu.
Ali kako N raste, problemi postaju sve brojniji:
- potrebno je više vremena za ažuriranje toka. Morate se prijaviti na sve servere
- Došlo je do grešaka pri ažuriranju šablona. Ovdje su ga ažurirali, ali ovdje su zaboravili
- ljudske greške pri izvođenju velikog broja sličnih operacija
Sve nas to dovodi do činjenice da moramo automatizirati proces. Pokušao sam na sljedeće načine riješiti ovaj problem:
- Koristite MiNiFi umjesto NiFi
- NiFi CLI
- NiPyAPI
Koristeći MiniFi
Još jedan podprojekat će pomoći u rješavanju ovog problema - MiNiFi C2 Server. Ovaj proizvod je namijenjen da bude središnja točka u arhitekturi uvođenja konfiguracije. Kako konfigurirati okruženje - opisano u
Opcija opisana u gornjem članku radi i nije teška za implementaciju, ali ne smijemo zaboraviti sljedeće:
- Minifi nema sve nifi procesore
- Verzije Minifi procesora zaostaju za verzijama NiFi procesora.
U vrijeme pisanja, najnovija verzija NiFi je 1.9.2. Najnovija verzija MiNiFi procesora je 1.7.0. Procesori se mogu dodati u MiNiFi, ali zbog razlika u verzijama između NiFi i MiNiFi procesora, ovo možda neće raditi.
NiFi CLI
Sudeći po
Pokrenite uslužni program
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
Da bismo učitali traženi tok iz registra, moramo znati identifikatore bucketa (bucket identifier) i samog toka (flow identifier). Ovi podaci se mogu dobiti ili putem CLI-a ili u web interfejsu NiFi registra. U web interfejsu to izgleda ovako:
Koristeći CLI ovo se radi:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Počinjemo da uvozimo procesnu grupu iz registra:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
Važna stvar je da bilo koja nifi instanca može biti specificirana kao host na koji smo skupljali procesnu grupu.
Grupa procesa je dodana sa zaustavljenim procesorima, potrebno ih je pokrenuti
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Odlično, procesori su počeli. Međutim, prema uslovima zadatka, potrebne su nam NiFi instance za slanje podataka drugim instancama. Pretpostavimo da ste odabrali Push metod za prijenos podataka na server. Da biste organizirali prijenos podataka, morate omogućiti prijenos podataka na dodanoj Remote Process Group (RPG), koja je već uključena u naš tok.
U dokumentaciji u CLI-u i drugim izvorima nisam našao način da omogućim prijenos podataka. Ako znate kako to učiniti, napišite u komentarima.
Pošto imamo bash i spremni smo da idemo do kraja, naći ćemo izlaz! Za rješavanje ovog problema možete koristiti NiFi API. Koristimo sljedeću metodu, uzmimo ID iz gornjih primjera (u našem slučaju to je 7f522a13-016e-1000-e504-d5b15587f2f3). Opis NiFi API metoda
U tijelu trebate proslijediti JSON, ovako:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Parametri koje je potrebno popuniti da bi funkcionisao:
stanje — status prenosa podataka. Dostupno: PRENOS za omogućavanje prijenosa podataka, ZAUSTAVLJENO za onemogućavanje
verzija - verzija procesora
verzija će biti zadana na 0 kada se kreira, ali ovi parametri se mogu dobiti pomoću metode
Za ljubitelje bash skripti ova metoda može izgledati prikladno, ali mi je malo teško - bash skripte mi nisu omiljene. Sljedeća metoda je po mom mišljenju zanimljivija i praktičnija.
NiPyAPI
NiPyAPI je Python biblioteka za interakciju sa NiFi instancama.
Naša skripta za uvođenje konfiguracije je program u Pythonu. Pređimo na kodiranje.
Postavili smo konfiguracije za dalji rad. Biće nam potrebni sledeći parametri:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
Zatim ću umetnuti nazive metoda ove biblioteke, koje su opisane
Povežite registar sa nifi instancom koristeći
nipyapi.versioning.create_registry_client
U ovom koraku možete dodati i provjeru da li je registar već dodan instanci, za to možete koristiti metodu
nipyapi.versioning.list_registry_clients
Pronalazimo kantu za dalje traženje protoka u korpi
nipyapi.versioning.get_registry_bucket
Koristeći pronađenu kantu, tražimo protok
nipyapi.versioning.get_flow_in_bucket
Zatim je važno razumjeti da li je ova procesna grupa već dodana. Grupa Proces je postavljena prema koordinatama i može nastati situacija kada se druga komponenta nadoveže na jednu. Provjerio sam, ovo se može dogoditi :) Da bismo dobili sve dodane grupe procesa koristimo metodu
nipyapi.canvas.list_all_process_groups
Možemo dalje pretraživati, na primjer, po imenu.
Neću opisivati proces ažuriranja predloška, samo ću reći da ako se dodaju procesori u novu verziju predloška, onda nema problema s prisustvom poruka u redovima. Ali ako se procesori uklone, onda mogu nastati problemi (nifi vam ne dozvoljava da uklonite procesor ako se ispred njega nakupio red poruka). Ako vas zanima kako sam riješio ovaj problem, pišite mi i razgovaraćemo o ovom pitanju. Kontakti na kraju članka. Pređimo na korak dodavanja grupe procesa.
Kada sam otklanjao greške u skripti, naišao sam na posebnost da se najnovija verzija flowa ne povlači uvijek, pa preporučujem da prvo provjerite ovu verziju:
nipyapi.versioning.get_latest_flow_ver
Grupa procesa implementacije:
nipyapi.versioning.deploy_flow_version
Pokrećemo procesore:
nipyapi.canvas.schedule_process_group
U bloku o CLI je napisano da se prijenos podataka ne omogućava automatski u udaljenoj procesnoj grupi? Prilikom implementacije skripte naišao sam i na ovaj problem. U to vrijeme nisam mogao započeti prijenos podataka pomoću API-ja i odlučio sam da pišem programeru biblioteke NiPyAPI i zatražim savjet/pomoć. Programer mi je odgovorio, razgovarali smo o problemu i on je napisao da mu treba vremena da “nešto provjeri”. A onda, par dana kasnije, stiže pismo u kojem je napisana funkcija u Pythonu koja rješava moj problem sa lansiranjem!!! U to vrijeme, NiPyAPI verzija je bila 0.13.3 i, naravno, ništa slično nije bilo. Ali u verziji 0.14.0, koja je nedavno objavljena, ova funkcija je već bila uključena u biblioteku. upoznaj,
nipyapi.canvas.set_remote_process_group_transmission
Dakle, koristeći NiPyAPI biblioteku, povezali smo registar, pokrenuli tok, pa čak i pokrenuli procesore i prenos podataka. Zatim možete pročešljati kod, dodati sve vrste provjera, evidentiranje i to je sve. Ali to je sasvim druga priča.
Od opcija automatizacije koje sam razmatrao, posljednja mi se učinila najefikasnijom. Prvo, ovo je još uvijek Python kod, u koji možete ugraditi pomoćni programski kod i iskoristiti sve prednosti programskog jezika. Drugo, projekt NiPyAPI se aktivno razvija i u slučaju problema možete pisati programeru. Treće, NiPyAPI je i dalje fleksibilniji alat za interakciju s NiFi u rješavanju složenih problema. Na primjer, u određivanju da li su redovi poruka sada prazni u toku i da li se procesna grupa može ažurirati.
To je sve. Opisao sam 3 pristupa automatizaciji isporuke protoka u NiFi-ju, zamke na koje programer može naići i dao radni kod za automatizaciju isporuke. Ako ste zainteresovani za ovu temu kao i ja -
izvor: www.habr.com