Pozdrav!
Zadatak je sljedeći - postoji tok, prikazan na gornjoj slici, koji treba razviti na N poslužitelja s
NiFi Site to Site (S2S) je siguran, lako konfigurabilan način za prijenos podataka između NiFi instanci. Kako radi S2S, pogledajte
U slučajevima kada govorimo o prijenosu podataka pomoću S2S, jedna instanca se naziva klijent, a druga server. Klijent šalje podatke, poslužitelj ih prima. Dva načina za konfiguriranje prijenosa podataka između njih:
- Gurati. Iz instance klijenta podaci se šalju pomoću grupe udaljenih procesa (RPG). Na instanci poslužitelja podaci se primaju pomoću ulaznog priključka
- Vuci. Poslužitelj prima podatke pomoću RPG-a, klijent ih šalje pomoću izlaznog porta.
Tijek za izvođenje pohranjen je u Apache registru.
Apache NiFi Registar je potprojekt Apache NiFi koji pruža alat za pohranjivanje protoka i kontrolu verzija. Neka vrsta GIT-a. Informacije o instaliranju, konfiguraciji i radu s registrom možete pronaći u
Na početku, kada je N mali broj, protok se isporučuje i ažurira ručno u prihvatljivom vremenu.
Ali kako N raste, problemi postaju sve brojniji:
- potrebno je više vremena za ažuriranje toka. Morate se prijaviti na sve poslužitelje
- Javljaju se pogreške pri ažuriranju predloška. Ovdje su ažurirali, a ovdje su zaboravili
- ljudske pogreške pri izvođenju velikog broja sličnih operacija
Sve to nas dovodi do činjenice da moramo automatizirati proces. Pokušao sam riješiti ovaj problem na sljedeće načine:
- Koristite MiNiFi umjesto NiFi
- NiFi CLI
- NiPyAPI
Korištenje MiNiFi
Još jedan potprojekt pomoći će riješiti ovaj problem - MiNiFi C2 Server. Ovaj proizvod je namijenjen da bude središnja točka u arhitekturi postavljanja konfiguracije. Kako konfigurirati okruženje - opisano u
Opcija opisana u gornjem članku radi i nije je teško implementirati, ali ne smijemo zaboraviti sljedeće:
- Minifi nema sve procesore od nifi
- Inačice Minifi procesora zaostaju za verzijama NiFi procesora.
U vrijeme pisanja, najnovija verzija NiFi-ja je 1.9.2. Najnovija verzija MiNiFi procesora je 1.7.0. Procesori se mogu dodati u MiNiFi, ali zbog razlika u verzijama između NiFi i MiNiFi procesora, to možda neće raditi.
NiFi CLI
Sudeći po
Pokrenite uslužni program
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
Da bismo mogli učitati traženi protok iz registra, moramo znati identifikatore spremnika (identifikator spremnika) i sam tok (identifikator protoka). Ovi se podaci mogu dobiti ili putem cli-a ili u web sučelju NiFi registra. U web sučelju to izgleda ovako:
Korištenjem CLI-ja ovo se radi:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Počinjemo uvoziti grupu procesa iz registra:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
Važna točka je da se bilo koja nifi instanca može specificirati kao host na koji prenosimo grupu procesa.
Dodana je grupa procesa sa zaustavljenim procesorima, potrebno ih je pokrenuti
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Super, procesori su krenuli. Međutim, prema uvjetima zadatka, potrebne su nam NiFi instance za slanje podataka drugim instancama. Pretpostavimo da ste odabrali Push metodu za prijenos podataka na poslužitelj. Kako biste organizirali prijenos podataka, morate omogućiti prijenos podataka na dodanoj Remote Process Group (RPG), koja je već uključena u naš tijek.
U dokumentaciji u CLI i drugim izvorima nisam našao način da omogućim prijenos podataka. Ako znate kako to učiniti, napišite u komentarima.
Pošto imamo bash i spremni smo ići do kraja, naći ćemo izlaz! Za rješavanje ovog problema možete koristiti NiFi API. Upotrijebimo sljedeću metodu, uzmimo ID iz gornjih primjera (u našem slučaju to je 7f522a13-016e-1000-e504-d5b15587f2f3). Opis NiFi API metoda
U tijelu trebate proslijediti JSON, ovako:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Parametri koje je potrebno ispuniti da bi radio:
su — status prijenosa podataka. Dostupno: TRANSMITTING za omogućavanje prijenosa podataka, STOPPED za onemogućavanje
verzija - verzija procesora
verzija će biti postavljena na 0 kada se stvori, ali ti se parametri mogu dobiti pomoću metode
Za ljubitelje bash skripti ova se metoda može činiti prikladnom, ali meni je malo teška - bash skripte mi nisu najdraže. Sljedeća metoda je po mom mišljenju zanimljivija i praktičnija.
NiPyAPI
NiPyAPI je Python biblioteka za interakciju s NiFi instancama.
Naša skripta za izvođenje konfiguracije je program u Pythonu. Prijeđimo na kodiranje.
Postavili smo konfiguracije za daljnji rad. Trebat će nam sljedeći parametri:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
Zatim ću umetnuti nazive metoda ove biblioteke, koje su opisane
Povežite registar s nifi instancom pomoću
nipyapi.versioning.create_registry_client
U ovom koraku također možete dodati provjeru da je registar već dodan instanci; za to možete koristiti metodu
nipyapi.versioning.list_registry_clients
Pronalazimo kantu za daljnju potragu za protokom u košari
nipyapi.versioning.get_registry_bucket
Pomoću pronađene kante tražimo protok
nipyapi.versioning.get_flow_in_bucket
Zatim je važno razumjeti je li ova grupa procesa već dodana. Grupa Process postavljena je prema koordinatama i može doći do situacije kada se druga komponenta postavi na jednu. Provjerio sam, ovo se može dogoditi :) Da bismo dobili sve dodane grupe procesa koristimo metodu
nipyapi.canvas.list_all_process_groups
Možemo dalje pretraživati, na primjer, po imenu.
Neću opisivati postupak ažuriranja predloška, samo ću reći da ako se procesori dodaju u novu verziju predloška, tada nema problema s prisutnošću poruka u redovima čekanja. Ali ako se procesori uklone, tada mogu nastati problemi (nifi vam ne dopušta uklanjanje procesora ako se ispred njega nakupio red poruka). Ako vas zanima kako sam riješio ovaj problem, pišite mi i razgovarat ćemo o ovom problemu. Kontakti na kraju članka. Prijeđimo na korak dodavanja grupe procesa.
Prilikom otklanjanja pogrešaka u skripti naišao sam na neobičnost da se najnovija verzija flowa ne povlači uvijek, pa preporučujem da prvo provjerite ovu verziju:
nipyapi.versioning.get_latest_flow_ver
Implementacija grupe procesa:
nipyapi.versioning.deploy_flow_version
Pokrećemo procesore:
nipyapi.canvas.schedule_process_group
U bloku o CLI je napisano da prijenos podataka nije automatski omogućen u grupi udaljenih procesa? Prilikom implementacije skripte i ja sam naišao na ovaj problem. U to vrijeme nisam mogao pokrenuti prijenos podataka pomoću API-ja i odlučio sam pisati programeru biblioteke NiPyAPI i zatražiti savjet/pomoć. Programer mi je odgovorio, razgovarali smo o problemu i napisao je da mu treba vremena da "nešto provjeri". I onda, par dana kasnije, stiže pismo u kojem je napisana funkcija u Pythonu koja rješava moj problem pokretanja!!! Tada je verzija NiPyAPI bila 0.13.3 i, naravno, nije bilo ništa od toga. Ali u verziji 0.14.0, koja je nedavno objavljena, ova je funkcija već bila uključena u biblioteku. Upoznaj,
nipyapi.canvas.set_remote_process_group_transmission
Dakle, pomoću biblioteke NiPyAPI, povezali smo registar, pokrenuli tok i čak pokrenuli procesore i prijenos podataka. Zatim možete pročešljati kod, dodati sve vrste provjera, zapisivanja i to je sve. Ali to je sasvim druga priča.
Od opcija automatizacije koje sam razmatrao, zadnja mi se činila najučinkovitijom. Prvo, ovo je još uvijek python kod u koji možete ugraditi pomoćni programski kod i iskoristiti sve prednosti programskog jezika. Drugo, projekt NiPyAPI se aktivno razvija i u slučaju problema možete pisati programeru. Treće, NiPyAPI je još uvijek fleksibilniji alat za interakciju s NiFi-jem u rješavanju složenih problema. Na primjer, u određivanju jesu li redovi poruka sada prazni u toku i može li se ažurirati grupa procesa.
To je sve. Opisao sam 3 pristupa automatizaciji isporuke protoka u NiFi-ju, zamke na koje programer može naići i pružio radni kod za automatizaciju isporuke. Ako ste zainteresirani za ovu temu kao i mene -
Izvor: www.habr.com