Automatizacija isporuke protoka u Apache NiFi

Pozdrav!

Automatizacija isporuke protoka u Apache NiFi

Zadatak je sljedeći - postoji tok, prikazan na gornjoj slici, koji treba razviti na N poslužitelja s Apache NiFi. Test protoka - datoteka se generira i šalje drugoj NiFi instanci. Prijenos podataka odvija se korištenjem NiFi Site to Site protokola.

NiFi Site to Site (S2S) je siguran, lako konfigurabilan način za prijenos podataka između NiFi instanci. Kako radi S2S, pogledajte dokumentacija i važno je ne zaboraviti konfigurirati NiFi instancu da dopusti S2S, vidi ovdje.

U slučajevima kada govorimo o prijenosu podataka pomoću S2S, jedna instanca se naziva klijent, a druga server. Klijent šalje podatke, poslužitelj ih prima. Dva načina za konfiguriranje prijenosa podataka između njih:

  1. Gurati. Iz instance klijenta podaci se šalju pomoću grupe udaljenih procesa (RPG). Na instanci poslužitelja podaci se primaju pomoću ulaznog priključka
  2. Vuci. Poslužitelj prima podatke pomoću RPG-a, klijent ih šalje pomoću izlaznog porta.


Tijek za izvođenje pohranjen je u Apache registru.

Apache NiFi Registar je potprojekt Apache NiFi koji pruža alat za pohranjivanje protoka i kontrolu verzija. Neka vrsta GIT-a. Informacije o instaliranju, konfiguraciji i radu s registrom možete pronaći u službena dokumentacija. Tijek za pohranu kombinira se u grupu procesa i u tom obliku pohranjuje u registar. Na to ćemo se vratiti kasnije u članku.

Na početku, kada je N mali broj, protok se isporučuje i ažurira ručno u prihvatljivom vremenu.

Ali kako N raste, problemi postaju sve brojniji:

  1. potrebno je više vremena za ažuriranje toka. Morate se prijaviti na sve poslužitelje
  2. Javljaju se pogreške pri ažuriranju predloška. Ovdje su ažurirali, a ovdje su zaboravili
  3. ljudske pogreške pri izvođenju velikog broja sličnih operacija

Sve to nas dovodi do činjenice da moramo automatizirati proces. Pokušao sam riješiti ovaj problem na sljedeće načine:

  1. Koristite MiNiFi umjesto NiFi
  2. NiFi CLI
  3. NiPyAPI

Korištenje MiNiFi

Apache MiNiFy - podprojekt Apache NiFi. MiNiFy je kompaktni agent koji koristi iste procesore kao NiFi, omogućujući vam stvaranje istih tokova kao u NiFi-ju. Lagana priroda agenta postignuta je, između ostalog, činjenicom da MiNiFy nema grafičko sučelje za konfiguraciju protoka. Nedostatak grafičkog sučelja u MiNiFy znači da je potrebno riješiti problem isporuke protoka na minifi. Budući da se MiNiFy aktivno koristi u IOT-u, postoje mnoge komponente i proces isporuke protoka do konačnih minifi instanci treba biti automatiziran. Poznat zadatak, zar ne?

Još jedan potprojekt pomoći će riješiti ovaj problem - MiNiFi C2 Server. Ovaj proizvod je namijenjen da bude središnja točka u arhitekturi postavljanja konfiguracije. Kako konfigurirati okruženje - opisano u ovaj članak Na Habréu ima dovoljno informacija za rješavanje problema. MiNiFi, u suradnji s C2 poslužiteljem, automatski ažurira svoju konfiguraciju. Jedina mana ovog pristupa je to što morate kreirati predloške na C2 poslužitelju; jednostavno uvrštavanje u registar nije dovoljno.

Opcija opisana u gornjem članku radi i nije je teško implementirati, ali ne smijemo zaboraviti sljedeće:

  1. Minifi nema sve procesore od nifi
  2. Inačice Minifi procesora zaostaju za verzijama NiFi procesora.

U vrijeme pisanja, najnovija verzija NiFi-ja je 1.9.2. Najnovija verzija MiNiFi procesora je 1.7.0. Procesori se mogu dodati u MiNiFi, ali zbog razlika u verzijama između NiFi i MiNiFi procesora, to možda neće raditi.

NiFi CLI

Sudeći po opis alat na službenoj web stranici, ovo je alat za automatizaciju interakcije između NiFI i NiFi registra u području isporuke protoka ili upravljanja procesima. Za početak morate preuzeti ovaj alat. stoga.

Pokrenite uslužni program

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Da bismo mogli učitati traženi protok iz registra, moramo znati identifikatore spremnika (identifikator spremnika) ​​i sam tok (identifikator protoka). Ovi se podaci mogu dobiti ili putem cli-a ili u web sučelju NiFi registra. U web sučelju to izgleda ovako:

Automatizacija isporuke protoka u Apache NiFi

Korištenjem CLI-ja ovo se radi:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Počinjemo uvoziti grupu procesa iz registra:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Važna točka je da se bilo koja nifi instanca može specificirati kao host na koji prenosimo grupu procesa.

Dodana je grupa procesa sa zaustavljenim procesorima, potrebno ih je pokrenuti

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Super, procesori su krenuli. Međutim, prema uvjetima zadatka, potrebne su nam NiFi instance za slanje podataka drugim instancama. Pretpostavimo da ste odabrali Push metodu za prijenos podataka na poslužitelj. Kako biste organizirali prijenos podataka, morate omogućiti prijenos podataka na dodanoj Remote Process Group (RPG), koja je već uključena u naš tijek.

Automatizacija isporuke protoka u Apache NiFi

U dokumentaciji u CLI i drugim izvorima nisam našao način da omogućim prijenos podataka. Ako znate kako to učiniti, napišite u komentarima.

Pošto imamo bash i spremni smo ići do kraja, naći ćemo izlaz! Za rješavanje ovog problema možete koristiti NiFi API. Upotrijebimo sljedeću metodu, uzmimo ID iz gornjih primjera (u našem slučaju to je 7f522a13-016e-1000-e504-d5b15587f2f3). Opis NiFi API metoda ovdje.

Automatizacija isporuke protoka u Apache NiFi
U tijelu trebate proslijediti JSON, ovako:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Parametri koje je potrebno ispuniti da bi radio:
su — status prijenosa podataka. Dostupno: TRANSMITTING za omogućavanje prijenosa podataka, STOPPED za onemogućavanje
verzija - verzija procesora

verzija će biti postavljena na 0 kada se stvori, ali ti se parametri mogu dobiti pomoću metode

Automatizacija isporuke protoka u Apache NiFi

Za ljubitelje bash skripti ova se metoda može činiti prikladnom, ali meni je malo teška - bash skripte mi nisu najdraže. Sljedeća metoda je po mom mišljenju zanimljivija i praktičnija.

NiPyAPI

NiPyAPI je Python biblioteka za interakciju s NiFi instancama. Stranica s dokumentacijom sadrži potrebne podatke za rad s knjižnicom. Brzi početak je opisan u projekt na githubu.

Naša skripta za izvođenje konfiguracije je program u Pythonu. Prijeđimo na kodiranje.
Postavili smo konfiguracije za daljnji rad. Trebat će nam sljedeći parametri:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Zatim ću umetnuti nazive metoda ove biblioteke, koje su opisane ovdje.

Povežite registar s nifi instancom pomoću

nipyapi.versioning.create_registry_client

U ovom koraku također možete dodati provjeru da je registar već dodan instanci; za to možete koristiti metodu

nipyapi.versioning.list_registry_clients

Pronalazimo kantu za daljnju potragu za protokom u košari

nipyapi.versioning.get_registry_bucket

Pomoću pronađene kante tražimo protok

nipyapi.versioning.get_flow_in_bucket

Zatim je važno razumjeti je li ova grupa procesa već dodana. Grupa Process postavljena je prema koordinatama i može doći do situacije kada se druga komponenta postavi na jednu. Provjerio sam, ovo se može dogoditi :) Da bismo dobili sve dodane grupe procesa koristimo metodu

nipyapi.canvas.list_all_process_groups

Možemo dalje pretraživati, na primjer, po imenu.

Neću opisivati ​​postupak ažuriranja predloška, ​​samo ću reći da ako se procesori dodaju u novu verziju predloška, ​​tada nema problema s prisutnošću poruka u redovima čekanja. Ali ako se procesori uklone, tada mogu nastati problemi (nifi vam ne dopušta uklanjanje procesora ako se ispred njega nakupio red poruka). Ako vas zanima kako sam riješio ovaj problem, pišite mi i razgovarat ćemo o ovom problemu. Kontakti na kraju članka. Prijeđimo na korak dodavanja grupe procesa.

Prilikom otklanjanja pogrešaka u skripti naišao sam na neobičnost da se najnovija verzija flowa ne povlači uvijek, pa preporučujem da prvo provjerite ovu verziju:

nipyapi.versioning.get_latest_flow_ver

Implementacija grupe procesa:

nipyapi.versioning.deploy_flow_version

Pokrećemo procesore:

nipyapi.canvas.schedule_process_group

U bloku o CLI je napisano da prijenos podataka nije automatski omogućen u grupi udaljenih procesa? Prilikom implementacije skripte i ja sam naišao na ovaj problem. U to vrijeme nisam mogao pokrenuti prijenos podataka pomoću API-ja i odlučio sam pisati programeru biblioteke NiPyAPI i zatražiti savjet/pomoć. Programer mi je odgovorio, razgovarali smo o problemu i napisao je da mu treba vremena da "nešto provjeri". I onda, par dana kasnije, stiže pismo u kojem je napisana funkcija u Pythonu koja rješava moj problem pokretanja!!! Tada je verzija NiPyAPI bila 0.13.3 i, naravno, nije bilo ništa od toga. Ali u verziji 0.14.0, koja je nedavno objavljena, ova je funkcija već bila uključena u biblioteku. Upoznaj,

nipyapi.canvas.set_remote_process_group_transmission

Dakle, pomoću biblioteke NiPyAPI, povezali smo registar, pokrenuli tok i čak pokrenuli procesore i prijenos podataka. Zatim možete pročešljati kod, dodati sve vrste provjera, zapisivanja i to je sve. Ali to je sasvim druga priča.

Od opcija automatizacije koje sam razmatrao, zadnja mi se činila najučinkovitijom. Prvo, ovo je još uvijek python kod u koji možete ugraditi pomoćni programski kod i iskoristiti sve prednosti programskog jezika. Drugo, projekt NiPyAPI se aktivno razvija i u slučaju problema možete pisati programeru. Treće, NiPyAPI je još uvijek fleksibilniji alat za interakciju s NiFi-jem u rješavanju složenih problema. Na primjer, u određivanju jesu li redovi poruka sada prazni u toku i može li se ažurirati grupa procesa.

To je sve. Opisao sam 3 pristupa automatizaciji isporuke protoka u NiFi-ju, zamke na koje programer može naići i pružio radni kod za automatizaciju isporuke. Ako ste zainteresirani za ovu temu kao i mene - pisati!

Izvor: www.habr.com

Dodajte komentar