Automatizacija isporuke protoka u Apache NiFi

Pozdrav svima!

Automatizacija isporuke protoka u Apache NiFi

Zadatak je sljedeći - postoji tok, prikazan na gornjoj slici, koji je potrebno razvući na N servera sa Apache NiFi. Test protoka - fajl se generiše i šalje na drugu NiFi instancu. Prijenos podataka se odvija korištenjem NiFi Site to Site protokola.

NiFi Site to Site (S2S) je siguran, lako podesiv način za prijenos podataka između NiFi instanci. Kako S2S radi, pogledajte dokumentaciju i važno je da ne zaboravite da konfigurišete NiFi instancu da dozvoli S2S, vidi ovdje.

U slučajevima kada je riječ o prijenosu podataka pomoću S2S-a, jedna instanca se zove klijent, druga server. Klijent šalje podatke, server prima. Dva načina za konfiguriranje prijenosa podataka između njih:

  1. Guranje. Iz instance klijenta, podaci se šalju pomoću Remote Process Group (RPG). Na instanci servera, podaci se primaju pomoću ulaznog porta
  2. povući. Server prima podatke koristeći RPG, klijent šalje koristeći izlazni port.


Tok za uvođenje je pohranjen u Apache Registry.

Apache NiFi Registry je podprojekat Apache NiFi koji pruža alat za skladištenje toka i kontrolu verzija. Neka vrsta GIT-a. Informacije o instalaciji, konfiguraciji i radu sa registrom možete pronaći u službena dokumentacija. Tok za skladištenje se kombinuje u procesnu grupu i pohranjuje u ovom obliku u registru. Na to ćemo se vratiti kasnije u članku.

Na početku, kada je N mali broj, protok se isporučuje i ažurira ručno u prihvatljivom vremenu.

Ali kako N raste, problemi postaju sve brojniji:

  1. potrebno je više vremena za ažuriranje toka. Morate se prijaviti na sve servere
  2. Došlo je do grešaka pri ažuriranju šablona. Ovdje su ga ažurirali, ali ovdje su zaboravili
  3. ljudske greške pri izvođenju velikog broja sličnih operacija

Sve nas to dovodi do činjenice da moramo automatizirati proces. Pokušao sam na sljedeće načine riješiti ovaj problem:

  1. Koristite MiNiFi umjesto NiFi
  2. NiFi CLI
  3. NiPyAPI

Koristeći MiniFi

Apache MiNiFy - podprojekat Apache NiFi. MiNiFy je kompaktan agent koji koristi iste procesore kao NiFi, omogućavajući vam da kreirate iste tokove kao u NiFi. Lagana priroda agenta je postignuta, između ostalog, činjenicom da MiNiFy nema grafički interfejs za konfiguraciju protoka. Nedostatak grafičkog interfejsa za MiNiFy znači da je neophodno rešiti problem isporuke toka na minifi. Budući da se MiNiFy aktivno koristi u IOT-u, postoji mnogo komponenti i proces isporuke toka do konačnih minifi instanci mora biti automatiziran. Poznat zadatak, zar ne?

Još jedan podprojekat će pomoći u rješavanju ovog problema - MiNiFi C2 Server. Ovaj proizvod je namijenjen da bude središnja točka u arhitekturi uvođenja konfiguracije. Kako konfigurirati okruženje - opisano u ovaj članak Na Habréu ima dovoljno informacija da se problem riješi. MiNiFi, u kombinaciji sa C2 serverom, automatski ažurira svoju konfiguraciju. Jedini nedostatak ovog pristupa je da morate kreirati šablone na C2 serveru nije dovoljno jednostavno urezivanje u registar.

Opcija opisana u gornjem članku radi i nije teška za implementaciju, ali ne smijemo zaboraviti sljedeće:

  1. Minifi nema sve nifi procesore
  2. Verzije Minifi procesora zaostaju za verzijama NiFi procesora.

U vrijeme pisanja, najnovija verzija NiFi je 1.9.2. Najnovija verzija MiNiFi procesora je 1.7.0. Procesori se mogu dodati u MiNiFi, ali zbog razlika u verzijama između NiFi i MiNiFi procesora, ovo možda neće raditi.

NiFi CLI

Sudeći po opis alat na službenoj web stranici, ovo je alat za automatizaciju interakcije između NiFI i NiFi Registry u oblasti isporuke toka ili upravljanja procesima. Da biste započeli, morate preuzeti ovaj alat. odavde.

Pokrenite uslužni program

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Da bismo učitali traženi tok iz registra, moramo znati identifikatore bucketa (bucket identifier) ​​i samog toka (flow identifier). Ovi podaci se mogu dobiti ili putem CLI-a ili u web interfejsu NiFi registra. U web interfejsu to izgleda ovako:

Automatizacija isporuke protoka u Apache NiFi

Koristeći CLI ovo se radi:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Počinjemo da uvozimo procesnu grupu iz registra:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Važna stvar je da bilo koja nifi instanca može biti specificirana kao host na koji smo skupljali procesnu grupu.

Grupa procesa je dodana sa zaustavljenim procesorima, potrebno ih je pokrenuti

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Odlično, procesori su počeli. Međutim, prema uslovima zadatka, potrebne su nam NiFi instance za slanje podataka drugim instancama. Pretpostavimo da ste odabrali Push metod za prijenos podataka na server. Da biste organizirali prijenos podataka, morate omogućiti prijenos podataka na dodanoj Remote Process Group (RPG), koja je već uključena u naš tok.

Automatizacija isporuke protoka u Apache NiFi

U dokumentaciji u CLI-u i drugim izvorima nisam našao način da omogućim prijenos podataka. Ako znate kako to učiniti, napišite u komentarima.

Pošto imamo bash i spremni smo da idemo do kraja, naći ćemo izlaz! Za rješavanje ovog problema možete koristiti NiFi API. Koristimo sljedeću metodu, uzmimo ID iz gornjih primjera (u našem slučaju to je 7f522a13-016e-1000-e504-d5b15587f2f3). Opis NiFi API metoda ovdje.

Automatizacija isporuke protoka u Apache NiFi
U tijelu trebate proslijediti JSON, ovako:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Parametri koje je potrebno popuniti da bi funkcionisao:
stanje — status prenosa podataka. Dostupno: PRENOS za omogućavanje prijenosa podataka, ZAUSTAVLJENO za onemogućavanje
verzija - verzija procesora

verzija će biti zadana na 0 kada se kreira, ali ovi parametri se mogu dobiti pomoću metode

Automatizacija isporuke protoka u Apache NiFi

Za ljubitelje bash skripti ova metoda može izgledati prikladno, ali mi je malo teško - bash skripte mi nisu omiljene. Sljedeća metoda je po mom mišljenju zanimljivija i praktičnija.

NiPyAPI

NiPyAPI je Python biblioteka za interakciju sa NiFi instancama. Stranica sa dokumentacijom sadrži potrebne informacije za rad sa bibliotekom. Brzi početak je opisan u projekt na github-u.

Naša skripta za uvođenje konfiguracije je program u Pythonu. Pređimo na kodiranje.
Postavili smo konfiguracije za dalji rad. Biće nam potrebni sledeći parametri:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Zatim ću umetnuti nazive metoda ove biblioteke, koje su opisane ovdje.

Povežite registar sa nifi instancom koristeći

nipyapi.versioning.create_registry_client

U ovom koraku možete dodati i provjeru da li je registar već dodan instanci, za to možete koristiti metodu

nipyapi.versioning.list_registry_clients

Pronalazimo kantu za dalje traženje protoka u korpi

nipyapi.versioning.get_registry_bucket

Koristeći pronađenu kantu, tražimo protok

nipyapi.versioning.get_flow_in_bucket

Zatim je važno razumjeti da li je ova procesna grupa već dodana. Grupa Proces je postavljena prema koordinatama i može nastati situacija kada se druga komponenta nadoveže na jednu. Provjerio sam, ovo se može dogoditi :) Da bismo dobili sve dodane grupe procesa koristimo metodu

nipyapi.canvas.list_all_process_groups

Možemo dalje pretraživati, na primjer, po imenu.

Neću opisivati ​​proces ažuriranja predloška, ​​samo ću reći da ako se dodaju procesori u novu verziju predloška, ​​onda nema problema s prisustvom poruka u redovima. Ali ako se procesori uklone, onda mogu nastati problemi (nifi vam ne dozvoljava da uklonite procesor ako se ispred njega nakupio red poruka). Ako vas zanima kako sam riješio ovaj problem, pišite mi i razgovaraćemo o ovom pitanju. Kontakti na kraju članka. Pređimo na korak dodavanja grupe procesa.

Kada sam otklanjao greške u skripti, naišao sam na posebnost da se najnovija verzija flowa ne povlači uvijek, pa preporučujem da prvo provjerite ovu verziju:

nipyapi.versioning.get_latest_flow_ver

Grupa procesa implementacije:

nipyapi.versioning.deploy_flow_version

Pokrećemo procesore:

nipyapi.canvas.schedule_process_group

U bloku o CLI je napisano da se prijenos podataka ne omogućava automatski u udaljenoj procesnoj grupi? Prilikom implementacije skripte naišao sam i na ovaj problem. U to vrijeme nisam mogao započeti prijenos podataka pomoću API-ja i odlučio sam da pišem programeru biblioteke NiPyAPI i zatražim savjet/pomoć. Programer mi je odgovorio, razgovarali smo o problemu i on je napisao da mu treba vremena da “nešto provjeri”. A onda, par dana kasnije, stiže pismo u kojem je napisana funkcija u Pythonu koja rješava moj problem sa lansiranjem!!! U to vrijeme, NiPyAPI verzija je bila 0.13.3 i, naravno, ništa slično nije bilo. Ali u verziji 0.14.0, koja je nedavno objavljena, ova funkcija je već bila uključena u biblioteku. upoznaj,

nipyapi.canvas.set_remote_process_group_transmission

Dakle, koristeći NiPyAPI biblioteku, povezali smo registar, pokrenuli tok, pa čak i pokrenuli procesore i prenos podataka. Zatim možete pročešljati kod, dodati sve vrste provjera, evidentiranje i to je sve. Ali to je sasvim druga priča.

Od opcija automatizacije koje sam razmatrao, posljednja mi se učinila najefikasnijom. Prvo, ovo je još uvijek Python kod, u koji možete ugraditi pomoćni programski kod i iskoristiti sve prednosti programskog jezika. Drugo, projekt NiPyAPI se aktivno razvija i u slučaju problema možete pisati programeru. Treće, NiPyAPI je i dalje fleksibilniji alat za interakciju s NiFi u rješavanju složenih problema. Na primjer, u određivanju da li su redovi poruka sada prazni u toku i da li se procesna grupa može ažurirati.

To je sve. Opisao sam 3 pristupa automatizaciji isporuke protoka u NiFi-ju, zamke na koje programer može naići i dao radni kod za automatizaciju isporuke. Ako ste zainteresovani za ovu temu kao i ja - pisati!

izvor: www.habr.com

Dodajte komentar