Flow Delivery Automation Apache NiFi:ssä

Hei kaikille!

Flow Delivery Automation Apache NiFi:ssä

Tehtävä on seuraava - yllä olevassa kuvassa näkyy virtaus, joka on levitettävä N palvelimelle Apache NiFi. Flow-testi - tiedostoa luodaan ja lähetetään toiseen NiFi-esiintymään. Tiedonsiirto tapahtuu käyttämällä NiFi Site to Site -protokollaa.

NiFi Site to Site (S2S) on turvallinen, hyvin muokattavissa oleva tapa siirtää tietoja NiFi-instanssien välillä. Katso kuinka S2S toimii dokumentointi ja on tärkeää muistaa määrittää NiFi-instanssi sallimaan S2S-näkemys täällä.

Mitä tulee tiedonsiirtoon S2S:n avulla, yhtä ilmentymää kutsutaan asiakkaaksi ja toista palvelimeksi. Asiakas lähettää dataa, palvelin vastaanottaa sen. Kaksi tapaa määrittää tiedonsiirto niiden välillä:

  1. Työnnä. Tiedot lähetetään asiakasinstanssista Remote Process Group (RPG) -protokollan avulla. Palvelininstanssissa tiedot vastaanotetaan tuloportin kautta
  2. Vedä. Palvelin vastaanottaa dataa RPG:n avulla, asiakas lähettää Output-portin kautta.


Flow for rolling on tallennettu Apache-rekisteriin.

Apache NiFi Registry on Apache NiFi:n aliprojekti, joka tarjoaa virtauksen tallennus- ja versiointityökalun. Eräänlainen GIT. Tietoja rekisterin asentamisesta, määrittämisestä ja sen kanssa työskentelystä löytyy osoitteesta virallinen dokumentaatio. Tallennusvirta yhdistetään prosessiryhmäksi ja tallennetaan rekisteriin tässä muodossa. Palaamme tähän myöhemmin artikkelissa.

Alussa, kun N on pieni luku, virtaus toimitetaan ja päivitetään käsin kohtuullisessa ajassa.

Mutta kun N kasvaa, ongelmia tulee lisää:

  1. kulun päivittäminen vie enemmän aikaa. Sinun on mentävä kaikille palvelimille
  2. mallien päivityksessä on virheitä. Täällä he päivittivät, mutta täällä he unohtivat
  3. inhimillinen virhe suoritettaessa useita samankaltaisia ​​toimintoja

Kaikki tämä johtaa meidät siihen tosiasiaan, että prosessi on tarpeen automatisoida. Olen yrittänyt ratkaista tämän ongelman seuraavilla tavoilla:

  1. Käytä MiNiFiä NiFin sijaan
  2. NiFi CLI
  3. NiPyAPI

MiNiFin käyttö

ApacheMiNify on Apache NiFi:n osaprojekti. MiNiFy on kompakti agentti, joka käyttää samoja prosessoreita kuin NiFi, joten voit luoda saman virtauksen kuin NiFi:ssä. Agentin keveys saavutetaan muun muassa siitä syystä, että MiNiFyssä ei ole graafista käyttöliittymää virtauskonfiguraatiolle. MiNiFyn graafisen käyttöliittymän puute tarkoittaa, että minifi-virtauksen toimitusongelma on ratkaistava. Koska MiNiFyä käytetään aktiivisesti IOT:ssa, komponentteja on monia ja prosessi, jolla virtauksen toimittaminen lopullisiin minifi-instansseihin on automatisoitava. Tuttu tehtävä, eikö?

Toinen aliprojekti, MiNiFi C2 Server, auttaa ratkaisemaan tämän ongelman. Tämä tuote on tarkoitettu käyttöönottoarkkitehtuurin keskeiseksi pisteeksi. Kuinka määrittää ympäristö - kuvataan kohdassa tässä artikkelissa Habrésta ja tieto riittää ratkaisemaan ongelman. MiNiFi yhdessä C2-palvelimen kanssa päivittää kokoonpanonsa automaattisesti. Tämän lähestymistavan ainoa haittapuoli on, että sinun on luotava malleja C2-palvelimelle, pelkkä rekisteriin sitoutuminen ei riitä.

Yllä olevassa artikkelissa kuvattu vaihtoehto toimii, eikä sitä ole vaikea toteuttaa, mutta emme saa unohtaa seuraavaa:

  1. minifi ei sisällä kaikkia nifin prosessoreita
  2. Minifin prosessoriversiot ovat jäljessä NiFi:n prosessoriversioista.

Kirjoitushetkellä NiFi:n uusin versio on 1.9.2. Uusimman MiNiFi-version prosessoriversio on 1.7.0. Prosessoreita voidaan lisätä MiNifiin, mutta NiFi- ja MiNiFi-suorittimien versioeroista johtuen tämä ei välttämättä toimi.

NiFi CLI

Tuomari kuvaus työkalu virallisella verkkosivustolla, tämä on työkalu NiFI:n ja NiFi-rekisterin välisen vuorovaikutuksen automatisoimiseen virtauksen toimituksen tai prosessinhallinnan alalla. Lataa tämä työkalu aloittaaksesi. siten.

Suorita apuohjelma

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Jotta voimme ladata tarvittavan virtauksen rekisteristä, meidän on tiedettävä korin tunnisteet (ämpäritunniste) ja itse virtaus (virran tunniste). Nämä tiedot voidaan saada joko cli:n kautta tai NiFi-rekisterin verkkoliittymästä. Verkkokäyttöliittymä näyttää tältä:

Flow Delivery Automation Apache NiFi:ssä

Käyttämällä CLI:tä teet näin:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Suorita tuontiprosessiryhmä rekisteristä:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Tärkeä asia on, että mikä tahansa nifi-ilmentymä voidaan määrittää isännäksi, johon prosessiryhmä rullataan.

Prosessiryhmä lisätty pysäytettyjen prosessorien kanssa, ne on käynnistettävä

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Hienoa, prosessorit ovat alkaneet. Ongelman ehtojen mukaan tarvitsemme kuitenkin NiFi-esiintymiä tietojen lähettämiseen muille instansseille. Oletetaan, että tiedon siirtämiseen palvelimelle valittiin Push-menetelmä. Tiedonsiirron järjestämiseksi on tarpeen ottaa käyttöön tiedonsiirto (Ota lähetys käyttöön) lisätyssä Remote Process Groupissa (RPG), joka sisältyy jo tietovirtaamme.

Flow Delivery Automation Apache NiFi:ssä

CLI:n ja muiden lähteiden dokumentaatiosta en löytänyt tapaa mahdollistaa tiedonsiirto. Jos tiedät kuinka tehdä tämä, kirjoita kommentteihin.

Koska meillä on bash ja olemme valmiita menemään loppuun asti, löydämme tien ulos! Voit ratkaista tämän ongelman NiFi API:n avulla. Käytämme seuraavaa menetelmää, otamme tunnuksen yllä olevista esimerkeistä (tapauksessamme se on 7f522a13-016e-1000-e504-d5b15587f2f3). NiFi API -menetelmien kuvaus täällä.

Flow Delivery Automation Apache NiFi:ssä
Rungossa sinun on välitettävä JSON seuraavassa muodossa:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Parametrit, jotka on täytettävä, jotta ne "toimisivat":
olivat — tiedonsiirron tila. Käytettävissä TRANSMITTING tiedonsiirron mahdollistamiseksi, STOPPED poistaaksesi käytöstä
versio - prosessoriversio

version oletusarvo on 0 luotaessa, mutta nämä parametrit voidaan saada menetelmällä

Flow Delivery Automation Apache NiFi:ssä

Bash-skriptien ystäville tämä menetelmä saattaa tuntua sopivalta, mutta se on minulle vaikeaa - bash-skriptit eivät ole suosikkini. Seuraava tapa on mielestäni mielenkiintoisempi ja kätevämpi.

NiPyAPI

NiPyAPI on Python-kirjasto vuorovaikutukseen NiFi-instanssien kanssa. Dokumentaatiosivu sisältää tarvittavat tiedot työskennelläksesi kirjaston kanssa. Pikakäynnistys on kuvattu kohdassa projekti githubissa.

Skriptimme kokoonpanon käyttöönottamiseksi on Python-ohjelma. Siirrytään koodaukseen.
Määritä asetukset jatkotyötä varten. Tarvitsemme seuraavat parametrit:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Lisään edelleen tämän kirjaston menetelmien nimet, jotka on kuvattu täällä.

Yhdistämme rekisterin nifi-instanssiin käyttämällä

nipyapi.versioning.create_registry_client

Tässä vaiheessa voit myös lisätä tarkistuksen, että rekisteri on jo lisätty ilmentymään, tähän voit käyttää menetelmää

nipyapi.versioning.list_registry_clients

Löydämme ämpärin jatkaaksemme korin virtauksen etsimistä

nipyapi.versioning.get_registry_bucket

Löydetyn kauhan mukaan etsimme virtausta

nipyapi.versioning.get_flow_in_bucket

Seuraavaksi on tärkeää ymmärtää, onko tämä prosessiryhmä jo lisätty. Prosessiryhmä sijoitetaan koordinaattien mukaan ja voi syntyä tilanne, kun toisen päälle asetetaan toinen. Tarkistin, se voi olla 🙂 Saadaksesi kaikki lisätyt prosessiryhmät, käytä menetelmää

nipyapi.canvas.list_all_process_groups

ja sitten voimme etsiä esimerkiksi nimellä.

En kuvaile mallin päivitysprosessia, sanon vain, että jos prosessorit lisätään mallin uuteen versioon, viestien esiintymisessä jonoissa ei ole ongelmia. Mutta jos prosessorit poistetaan, voi syntyä ongelmia (nifi ei salli prosessorin poistamista, jos sen eteen on kertynyt viestijono). Jos olet kiinnostunut siitä, kuinka ratkaisin tämän ongelman - kirjoita minulle, keskustelemme tästä asiasta. Yhteystiedot artikkelin lopussa. Siirrytään vaiheeseen, jossa lisätään prosessiryhmä.

Komentosarjan virheenkorjauksen aikana törmäsin ominaisuuteen, että virtauksen uusin versio ei aina vedä esiin, joten suosittelen, että selvennät ensin tätä versiota:

nipyapi.versioning.get_latest_flow_ver

Ota käyttöön prosessiryhmä:

nipyapi.versioning.deploy_flow_version

Käynnistämme prosessorit:

nipyapi.canvas.schedule_process_group

CLI-lohkossa kirjoitettiin, että tiedonsiirto ei ole automaattisesti käytössä etäprosessiryhmässä? Käsikirjoitusta toteutettaessa törmäsin myös tähän ongelmaan. Tuolloin en voinut aloittaa tiedonsiirtoa API:lla ja päätin kirjoittaa NiPyAPI-kirjaston kehittäjälle ja pyytää neuvoja / apua. Kehittäjä vastasi minulle, keskustelimme ongelmasta ja hän kirjoitti, että hän tarvitsi aikaa "tarkistaa jotain". Ja nyt, pari päivää myöhemmin, tulee sähköposti, jossa on kirjoitettu Python-funktio, joka ratkaisee käynnistysongelmani !!! NiPyAPI-versio oli tuolloin 0.13.3, eikä siinä tietenkään ollut mitään vastaavaa. Mutta versiossa 0.14.0, joka julkaistiin aivan hiljattain, tämä toiminto on jo sisällytetty kirjastoon. Tavata

nipyapi.canvas.set_remote_process_group_transmission

Joten NiPyAPI-kirjaston avulla liitimme rekisterin, rullasimme virran ja jopa aloitimme prosessorit ja tiedonsiirron. Sitten voit kammata koodin, lisätä kaikenlaisia ​​tarkastuksia, kirjauksia ja siinä se. Mutta se on täysin eri tarina.

Harkimistani automaatiovaihtoehdoista jälkimmäinen vaikutti minusta tehokkaimmalta. Ensinnäkin tämä on edelleen python-koodi, johon voit upottaa apuohjelmakoodin ja nauttia kaikista ohjelmointikielen eduista. Toiseksi NiPyAPI-projekti kehittyy aktiivisesti ja ongelmatilanteissa voit kirjoittaa kehittäjälle. Kolmanneksi NiPyAPI on edelleen joustavampi työkalu vuorovaikutukseen NiFin kanssa monimutkaisten ongelmien ratkaisemisessa. Esimerkiksi määritettäessä, ovatko viestijonot tällä hetkellä tyhjiä kulussa ja onko mahdollista päivittää prosessiryhmä.

Siinä kaikki. Kuvasin kolme lähestymistapaa virtauksen toimittamisen automatisoimiseen NiFi:ssä, sudenkuopat, joita kehittäjä saattaa kohdata, ja toimitin toimivan koodin toimituksen automatisoimiseksi. Jos olet yhtä kiinnostunut tästä aiheesta kuin minä - kirjoittaa!

Lähde: will.com

Lisää kommentti