Hei kaikille!
Tehtävä on seuraava - yllä olevassa kuvassa näkyy virtaus, joka on levitettävä N palvelimelle
NiFi Site to Site (S2S) on turvallinen, hyvin muokattavissa oleva tapa siirtää tietoja NiFi-instanssien välillä. Katso kuinka S2S toimii
Mitä tulee tiedonsiirtoon S2S:n avulla, yhtä ilmentymää kutsutaan asiakkaaksi ja toista palvelimeksi. Asiakas lähettää dataa, palvelin vastaanottaa sen. Kaksi tapaa määrittää tiedonsiirto niiden välillä:
- Työnnä. Tiedot lähetetään asiakasinstanssista Remote Process Group (RPG) -protokollan avulla. Palvelininstanssissa tiedot vastaanotetaan tuloportin kautta
- Vedä. Palvelin vastaanottaa dataa RPG:n avulla, asiakas lähettää Output-portin kautta.
Flow for rolling on tallennettu Apache-rekisteriin.
Apache NiFi Registry on Apache NiFi:n aliprojekti, joka tarjoaa virtauksen tallennus- ja versiointityökalun. Eräänlainen GIT. Tietoja rekisterin asentamisesta, määrittämisestä ja sen kanssa työskentelystä löytyy osoitteesta
Alussa, kun N on pieni luku, virtaus toimitetaan ja päivitetään käsin kohtuullisessa ajassa.
Mutta kun N kasvaa, ongelmia tulee lisää:
- kulun päivittäminen vie enemmän aikaa. Sinun on mentävä kaikille palvelimille
- mallien päivityksessä on virheitä. Täällä he päivittivät, mutta täällä he unohtivat
- inhimillinen virhe suoritettaessa useita samankaltaisia toimintoja
Kaikki tämä johtaa meidät siihen tosiasiaan, että prosessi on tarpeen automatisoida. Olen yrittänyt ratkaista tämän ongelman seuraavilla tavoilla:
- Käytä MiNiFiä NiFin sijaan
- NiFi CLI
- NiPyAPI
MiNiFin käyttö
Toinen aliprojekti, MiNiFi C2 Server, auttaa ratkaisemaan tämän ongelman. Tämä tuote on tarkoitettu käyttöönottoarkkitehtuurin keskeiseksi pisteeksi. Kuinka määrittää ympäristö - kuvataan kohdassa
Yllä olevassa artikkelissa kuvattu vaihtoehto toimii, eikä sitä ole vaikea toteuttaa, mutta emme saa unohtaa seuraavaa:
- minifi ei sisällä kaikkia nifin prosessoreita
- Minifin prosessoriversiot ovat jäljessä NiFi:n prosessoriversioista.
Kirjoitushetkellä NiFi:n uusin versio on 1.9.2. Uusimman MiNiFi-version prosessoriversio on 1.7.0. Prosessoreita voidaan lisätä MiNifiin, mutta NiFi- ja MiNiFi-suorittimien versioeroista johtuen tämä ei välttämättä toimi.
NiFi CLI
Tuomari
Suorita apuohjelma
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
Jotta voimme ladata tarvittavan virtauksen rekisteristä, meidän on tiedettävä korin tunnisteet (ämpäritunniste) ja itse virtaus (virran tunniste). Nämä tiedot voidaan saada joko cli:n kautta tai NiFi-rekisterin verkkoliittymästä. Verkkokäyttöliittymä näyttää tältä:
Käyttämällä CLI:tä teet näin:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Suorita tuontiprosessiryhmä rekisteristä:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
Tärkeä asia on, että mikä tahansa nifi-ilmentymä voidaan määrittää isännäksi, johon prosessiryhmä rullataan.
Prosessiryhmä lisätty pysäytettyjen prosessorien kanssa, ne on käynnistettävä
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Hienoa, prosessorit ovat alkaneet. Ongelman ehtojen mukaan tarvitsemme kuitenkin NiFi-esiintymiä tietojen lähettämiseen muille instansseille. Oletetaan, että tiedon siirtämiseen palvelimelle valittiin Push-menetelmä. Tiedonsiirron järjestämiseksi on tarpeen ottaa käyttöön tiedonsiirto (Ota lähetys käyttöön) lisätyssä Remote Process Groupissa (RPG), joka sisältyy jo tietovirtaamme.
CLI:n ja muiden lähteiden dokumentaatiosta en löytänyt tapaa mahdollistaa tiedonsiirto. Jos tiedät kuinka tehdä tämä, kirjoita kommentteihin.
Koska meillä on bash ja olemme valmiita menemään loppuun asti, löydämme tien ulos! Voit ratkaista tämän ongelman NiFi API:n avulla. Käytämme seuraavaa menetelmää, otamme tunnuksen yllä olevista esimerkeistä (tapauksessamme se on 7f522a13-016e-1000-e504-d5b15587f2f3). NiFi API -menetelmien kuvaus
Rungossa sinun on välitettävä JSON seuraavassa muodossa:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Parametrit, jotka on täytettävä, jotta ne "toimisivat":
olivat — tiedonsiirron tila. Käytettävissä TRANSMITTING tiedonsiirron mahdollistamiseksi, STOPPED poistaaksesi käytöstä
versio - prosessoriversio
version oletusarvo on 0 luotaessa, mutta nämä parametrit voidaan saada menetelmällä
Bash-skriptien ystäville tämä menetelmä saattaa tuntua sopivalta, mutta se on minulle vaikeaa - bash-skriptit eivät ole suosikkini. Seuraava tapa on mielestäni mielenkiintoisempi ja kätevämpi.
NiPyAPI
NiPyAPI on Python-kirjasto vuorovaikutukseen NiFi-instanssien kanssa.
Skriptimme kokoonpanon käyttöönottamiseksi on Python-ohjelma. Siirrytään koodaukseen.
Määritä asetukset jatkotyötä varten. Tarvitsemme seuraavat parametrit:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
Lisään edelleen tämän kirjaston menetelmien nimet, jotka on kuvattu
Yhdistämme rekisterin nifi-instanssiin käyttämällä
nipyapi.versioning.create_registry_client
Tässä vaiheessa voit myös lisätä tarkistuksen, että rekisteri on jo lisätty ilmentymään, tähän voit käyttää menetelmää
nipyapi.versioning.list_registry_clients
Löydämme ämpärin jatkaaksemme korin virtauksen etsimistä
nipyapi.versioning.get_registry_bucket
Löydetyn kauhan mukaan etsimme virtausta
nipyapi.versioning.get_flow_in_bucket
Seuraavaksi on tärkeää ymmärtää, onko tämä prosessiryhmä jo lisätty. Prosessiryhmä sijoitetaan koordinaattien mukaan ja voi syntyä tilanne, kun toisen päälle asetetaan toinen. Tarkistin, se voi olla 🙂 Saadaksesi kaikki lisätyt prosessiryhmät, käytä menetelmää
nipyapi.canvas.list_all_process_groups
ja sitten voimme etsiä esimerkiksi nimellä.
En kuvaile mallin päivitysprosessia, sanon vain, että jos prosessorit lisätään mallin uuteen versioon, viestien esiintymisessä jonoissa ei ole ongelmia. Mutta jos prosessorit poistetaan, voi syntyä ongelmia (nifi ei salli prosessorin poistamista, jos sen eteen on kertynyt viestijono). Jos olet kiinnostunut siitä, kuinka ratkaisin tämän ongelman - kirjoita minulle, keskustelemme tästä asiasta. Yhteystiedot artikkelin lopussa. Siirrytään vaiheeseen, jossa lisätään prosessiryhmä.
Komentosarjan virheenkorjauksen aikana törmäsin ominaisuuteen, että virtauksen uusin versio ei aina vedä esiin, joten suosittelen, että selvennät ensin tätä versiota:
nipyapi.versioning.get_latest_flow_ver
Ota käyttöön prosessiryhmä:
nipyapi.versioning.deploy_flow_version
Käynnistämme prosessorit:
nipyapi.canvas.schedule_process_group
CLI-lohkossa kirjoitettiin, että tiedonsiirto ei ole automaattisesti käytössä etäprosessiryhmässä? Käsikirjoitusta toteutettaessa törmäsin myös tähän ongelmaan. Tuolloin en voinut aloittaa tiedonsiirtoa API:lla ja päätin kirjoittaa NiPyAPI-kirjaston kehittäjälle ja pyytää neuvoja / apua. Kehittäjä vastasi minulle, keskustelimme ongelmasta ja hän kirjoitti, että hän tarvitsi aikaa "tarkistaa jotain". Ja nyt, pari päivää myöhemmin, tulee sähköposti, jossa on kirjoitettu Python-funktio, joka ratkaisee käynnistysongelmani !!! NiPyAPI-versio oli tuolloin 0.13.3, eikä siinä tietenkään ollut mitään vastaavaa. Mutta versiossa 0.14.0, joka julkaistiin aivan hiljattain, tämä toiminto on jo sisällytetty kirjastoon. Tavata
nipyapi.canvas.set_remote_process_group_transmission
Joten NiPyAPI-kirjaston avulla liitimme rekisterin, rullasimme virran ja jopa aloitimme prosessorit ja tiedonsiirron. Sitten voit kammata koodin, lisätä kaikenlaisia tarkastuksia, kirjauksia ja siinä se. Mutta se on täysin eri tarina.
Harkimistani automaatiovaihtoehdoista jälkimmäinen vaikutti minusta tehokkaimmalta. Ensinnäkin tämä on edelleen python-koodi, johon voit upottaa apuohjelmakoodin ja nauttia kaikista ohjelmointikielen eduista. Toiseksi NiPyAPI-projekti kehittyy aktiivisesti ja ongelmatilanteissa voit kirjoittaa kehittäjälle. Kolmanneksi NiPyAPI on edelleen joustavampi työkalu vuorovaikutukseen NiFin kanssa monimutkaisten ongelmien ratkaisemisessa. Esimerkiksi määritettäessä, ovatko viestijonot tällä hetkellä tyhjiä kulussa ja onko mahdollista päivittää prosessiryhmä.
Siinä kaikki. Kuvasin kolme lähestymistapaa virtauksen toimittamisen automatisoimiseen NiFi:ssä, sudenkuopat, joita kehittäjä saattaa kohdata, ja toimitin toimivan koodin toimituksen automatisoimiseksi. Jos olet yhtä kiinnostunut tästä aiheesta kuin minä -
Lähde: will.com