Uwasilishaji wa otomatiki katika Apache NiFi

Hello kila mtu!

Uwasilishaji wa otomatiki katika Apache NiFi

Kazi ni kama ifuatavyo - kuna mtiririko, uliowasilishwa kwenye picha hapo juu, ambayo inahitaji kupitishwa kwa seva za N na Apache NiFi. Jaribio la mtiririko - faili inatolewa na kutumwa kwa mfano mwingine wa NiFi. Uhamisho wa data hutokea kwa kutumia itifaki ya Tovuti ya NiFi hadi Tovuti.

Tovuti ya NiFi hadi Tovuti (S2S) ni njia salama, inayoweza kusanidiwa kwa urahisi ya kuhamisha data kati ya matukio ya NiFi. Jinsi S2S inavyofanya kazi, ona nyaraka na ni muhimu usisahau kusanidi mfano wa NiFi ili kuruhusu S2S, ona hapa.

Katika hali ambapo tunazungumza juu ya uhamishaji wa data kwa kutumia S2S, mfano mmoja unaitwa mteja, seva ya pili. Mteja hutuma data, seva inapokea. Njia mbili za kusanidi uhamishaji wa data kati yao:

  1. Kushinikiza. Kutoka kwa mfano wa mteja, data hutumwa kwa kutumia Kikundi cha Mchakato wa Mbali (RPG). Katika mfano wa seva, data inapokelewa kwa kutumia Mlango wa Kuingiza Data
  2. Kuvuta. Seva hupokea data kwa kutumia RPG, mteja hutuma kwa kutumia mlango wa Pato.


Mtiririko wa kusambaza huhifadhiwa kwenye Usajili wa Apache.

Usajili wa Apache NiFi ni mradi mdogo wa Apache NiFi ambao hutoa zana ya uhifadhi wa mtiririko na udhibiti wa toleo. Aina ya GIT. Habari juu ya kusanikisha, kusanidi na kufanya kazi na Usajili inaweza kupatikana katika nyaraka rasmi. Mtiririko wa uhifadhi unajumuishwa katika kikundi cha mchakato na kuhifadhiwa katika fomu hii kwenye Usajili. Tutarudi kwa hili baadaye katika makala.

Mwanzoni, wakati N ni nambari ndogo, mtiririko hutolewa na kusasishwa mwenyewe kwa wakati unaokubalika.

Lakini kadiri N inavyokua, shida zinakuwa nyingi zaidi:

  1. inachukua muda zaidi kusasisha mtiririko. Unahitaji kuingia kwenye seva zote
  2. Hitilafu za kusasisha kiolezo hutokea. Hapa walisasisha, lakini hapa walisahau
  3. makosa ya kibinadamu wakati wa kufanya idadi kubwa ya shughuli zinazofanana

Haya yote yanatuleta kwenye ukweli kwamba tunahitaji kufanya mchakato otomatiki. Nilijaribu njia zifuatazo za kutatua shida hii:

  1. Tumia MiNiFi badala ya NiFi
  2. NiFi CLI
  3. NiPyAPI

Kutumia MiNiFi

Apache MiNiFy - mradi wa Apache NiFi. MiNiFy ni wakala wa kompakt anayetumia vichakataji sawa na NiFi, hukuruhusu kuunda mitiririko sawa na katika NiFi. Asili nyepesi ya wakala hupatikana, kati ya mambo mengine, na ukweli kwamba MiNiFy haina kielelezo cha picha kwa usanidi wa mtiririko. Ukosefu wa interface ya graphical katika MiNiFy ina maana kwamba ni muhimu kutatua tatizo la kutoa mtiririko kwa minifi. Kwa kuwa MiNiFy inatumiwa kikamilifu katika IOT, kuna vipengele vingi na mchakato wa kutoa mtiririko kwa matukio ya mwisho ya minifi inahitaji kuwa automatiska. Kazi inayojulikana, sivyo?

Subproject nyingine itasaidia kutatua tatizo hili - MiNiFi C2 Server. Bidhaa hii inakusudiwa kuwa sehemu kuu katika usanifu wa uchapishaji wa usanidi. Jinsi ya kusanidi mazingira - iliyoelezewa ndani Makala hii Kuna maelezo ya kutosha kuhusu Habre kutatua tatizo. MiNiFi, kwa kushirikiana na seva ya C2, inasasisha moja kwa moja usanidi wake. Upungufu pekee wa mbinu hii ni kwamba lazima uunda violezo kwenye Seva ya C2; ahadi rahisi kwa Usajili haitoshi.

Chaguo lililoelezewa katika kifungu hapo juu linafanya kazi na sio ngumu kutekeleza, lakini hatupaswi kusahau yafuatayo:

  1. Minifi haina vichakataji vyote kutoka kwa nifi
  2. Toleo za kichakataji cha Minifi ziko nyuma ya matoleo ya kichakataji cha NiFi.

Wakati wa kuandika, toleo la hivi karibuni la NiFi ni 1.9.2. Toleo la hivi karibuni la processor la MiNiFi ni 1.7.0. Wasindikaji wanaweza kuongezwa kwa MiNiFi, lakini kutokana na kutofautiana kwa toleo kati ya wasindikaji wa NiFi na MiNiFi, hii inaweza kufanya kazi.

NiFi CLI

Kwa kuzingatia maelezo chombo kwenye tovuti rasmi, hii ni zana ya kugeuza otomatiki mwingiliano kati ya NiFI na Usajili wa NiFi katika uwanja wa utoaji wa mtiririko au usimamizi wa mchakato. Ili kuanza, unahitaji kupakua chombo hiki. hivyo.

Zindua matumizi

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Ili sisi kupakia mtiririko unaohitajika kutoka kwa rejista, tunahitaji kujua vitambulisho vya ndoo (kitambulisho cha ndoo) na mtiririko yenyewe (kitambulisho cha mtiririko). Data hii inaweza kupatikana ama kupitia cli au kwenye kiolesura cha wavuti cha Usajili wa NiFi. Katika kiolesura cha wavuti inaonekana kama hii:

Uwasilishaji wa otomatiki katika Apache NiFi

Kutumia CLI hii inafanywa:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Tunaanza kuagiza kikundi cha mchakato kutoka kwa Usajili:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Hoja muhimu ni kwamba mfano wowote wa nifi unaweza kubainishwa kama mwenyeji ambaye tunapeleka kikundi cha mchakato.

Kikundi cha mchakato kimeongezwa na vichakataji vilivyosimamishwa, vinahitaji kuanzishwa

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Kubwa, wasindikaji wameanza. Hata hivyo, kwa mujibu wa masharti ya kazi, tunahitaji matukio ya NiFi kutuma data kwa matukio mengine. Hebu tuchukue kwamba umechagua njia ya Push ili kuhamisha data kwenye seva. Ili kupanga uhamishaji wa data, unahitaji kuwezesha uhamishaji data kwenye Kikundi kilichoongezwa cha Mchakato wa Mbali (RPG), ambacho tayari kimejumuishwa katika mtiririko wetu.

Uwasilishaji wa otomatiki katika Apache NiFi

Katika nyaraka katika CLI na vyanzo vingine, sikupata njia ya kuwezesha uhamisho wa data. Ikiwa unajua jinsi ya kufanya hivyo, tafadhali andika kwenye maoni.

Kwa kuwa tuna bash na tuko tayari kwenda hadi mwisho, tutapata njia ya kutoka! Unaweza kutumia API ya NiFi kutatua tatizo hili. Hebu tumia njia ifuatayo, chukua kitambulisho kutoka kwa mifano hapo juu (kwa upande wetu ni 7f522a13-016e-1000-e504-d5b15587f2f3). Maelezo ya njia za API za NiFi hapa.

Uwasilishaji wa otomatiki katika Apache NiFi
Kwenye mwili unahitaji kupitisha JSON, kama hii:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Vigezo vinavyohitaji kujazwa ili kufanya kazi:
walikuwa β€” hali ya uhamishaji data. Inapatikana: TRANSMITTING ili kuwezesha uhamishaji wa data, IMESIMAMA ili kuzima
version - toleo la processor

toleo litabadilika kuwa 0 linapoundwa, lakini vigezo hivi vinaweza kupatikana kwa kutumia mbinu

Uwasilishaji wa otomatiki katika Apache NiFi

Kwa mashabiki wa maandishi ya bash, njia hii inaweza kuonekana inafaa, lakini ni ngumu kwangu - maandishi ya bash sio ninayopenda. Njia inayofuata ni ya kuvutia zaidi na rahisi kwa maoni yangu.

NiPyAPI

NiPyAPI ni maktaba ya Python ya kuingiliana na matukio ya NiFi. Ukurasa wa hati ina habari muhimu kwa kufanya kazi na maktaba. Kuanza kwa haraka kunaelezewa ndani rasimu kwenye github.

Hati yetu ya kuzindua usanidi ni programu katika Python. Wacha tuendelee kwenye kuweka msimbo.
Tunaweka mipangilio kwa kazi zaidi. Tutahitaji vigezo vifuatavyo:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #ΠΏΡƒΡ‚ΡŒ Π΄ΠΎ nifi-api инстанса, Π½Π° ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠΌ Ρ€Π°Π·Π²ΠΎΡ€Π°Ρ‡ΠΈΠ²Π°Π΅ΠΌ process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #ΠΏΡƒΡ‚ΡŒ Π΄ΠΎ nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #Π½Π°Π·Π²Π°Π½ΠΈΠ΅ registry, ΠΊΠ°ΠΊ Π±ΡƒΠ΄Π΅Ρ‚ Π½Π°Π·Ρ‹Π²Π°Ρ‚ΡŒΡΡ Π² инстансС nifi
nipyapi.config.bucket_name = 'BucketName' #Π½Π°Π·Π²Π°Π½ΠΈΠ΅ bucket, ΠΈΠ· ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠ³ΠΎ подтягиваСм flow
nipyapi.config.flow_name = 'FlowName' #Π½Π°Π·Π²Π°Π½ΠΈΠ΅ flow, ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠ΅ подтягиваСм

Ifuatayo nitaingiza majina ya njia za maktaba hii, ambazo zimeelezewa hapa.

Unganisha Usajili kwa mfano wa nifi ukitumia

nipyapi.versioning.create_registry_client

Katika hatua hii, unaweza pia kuongeza cheki kwamba sajili tayari imeongezwa kwa mfano; kwa hili unaweza kutumia njia.

nipyapi.versioning.list_registry_clients

Tunapata ndoo kwa utafutaji zaidi wa mtiririko kwenye kikapu

nipyapi.versioning.get_registry_bucket

Kutumia ndoo iliyopatikana, tunatafuta mtiririko

nipyapi.versioning.get_flow_in_bucket

Ifuatayo, ni muhimu kuelewa ikiwa kikundi hiki cha mchakato tayari kimeongezwa. Kikundi cha Mchakato kinawekwa kulingana na kuratibu na hali inaweza kutokea wakati sehemu ya pili imewekwa juu ya moja. Niliangalia, hii inaweza kutokea :) Ili kupata vikundi vyote vya mchakato ulioongezwa tunatumia njia

nipyapi.canvas.list_all_process_groups

Tunaweza kutafuta zaidi, kwa mfano, kwa jina.

Sitaelezea mchakato wa uppdatering template, nitasema tu kwamba ikiwa wasindikaji wameongezwa katika toleo jipya la template, basi hakuna matatizo na kuwepo kwa ujumbe kwenye foleni. Lakini ikiwa wasindikaji huondolewa, basi matatizo yanaweza kutokea (nifi haikuruhusu kuondoa processor ikiwa foleni ya ujumbe imekusanya mbele yake). Ikiwa una nia ya jinsi nilivyotatua tatizo hili, tafadhali niandikie na tutajadili suala hili. Anwani mwishoni mwa makala. Wacha tuendelee kwenye hatua ya kuongeza kikundi cha mchakato.

Wakati wa kurekebisha hati, niligundua hali ya kipekee kwamba toleo la hivi karibuni la mtiririko halijatolewa kila wakati, kwa hivyo ninapendekeza uangalie toleo hili kwanza:

nipyapi.versioning.get_latest_flow_ver

Kundi la mchakato wa kusambaza:

nipyapi.versioning.deploy_flow_version

Tunaanza wasindikaji:

nipyapi.canvas.schedule_process_group

Katika block kuhusu CLI iliandikwa kwamba uhamisho wa data haujawezeshwa kiatomati katika kikundi cha mchakato wa mbali? Wakati wa kutekeleza maandishi, nilikutana na shida hii pia. Wakati huo, sikuweza kuanza kuhamisha data kwa kutumia API na niliamua kumwandikia msanidi programu wa maktaba ya NiPyAPI na kuomba ushauri/msaada. Msanidi programu alinijibu, tulijadili tatizo na aliandika kwamba alihitaji muda wa "kuangalia kitu". Na kisha, siku chache baadaye, barua inafika ambayo kazi imeandikwa katika Python ambayo hutatua shida yangu ya uzinduzi !!! Wakati huo, toleo la NiPyAPI lilikuwa 0.13.3 na, bila shaka, hakuna kitu kama hicho. Lakini katika toleo la 0.14.0, ambalo lilitolewa hivi karibuni, kazi hii ilikuwa tayari imejumuishwa kwenye maktaba. Kutana,

nipyapi.canvas.set_remote_process_group_transmission

Kwa hiyo, kwa kutumia maktaba ya NiPyAPI, tuliunganisha Usajili, tukasambaza mtiririko, na hata kuanza wasindikaji na uhamisho wa data. Kisha unaweza kuchana msimbo, kuongeza aina zote za hundi, ukataji miti, na ndivyo tu. Lakini hiyo ni hadithi tofauti kabisa.

Kati ya chaguzi za otomatiki nilizozingatia, ya mwisho ilionekana kwangu kuwa yenye ufanisi zaidi. Kwanza, hii bado ni msimbo wa python, ambayo unaweza kupachika nambari ya programu ya msaidizi na kuchukua faida ya faida zote za lugha ya programu. Pili, mradi wa NiPyAPI unaendelea kikamilifu na ikiwa kuna shida unaweza kumwandikia msanidi programu. Tatu, NiPyAPI bado ni chombo rahisi zaidi cha kuingiliana na NiFi katika kutatua matatizo magumu. Kwa mfano, katika kubainisha kama foleni za ujumbe sasa hazina mtiririko na kama kikundi cha mchakato kinaweza kusasishwa.

Ni hayo tu. Nilielezea mbinu 3 za uwasilishaji wa mtiririko kiotomatiki katika NiFi, mitego ambayo msanidi programu anaweza kukutana nayo, na nikatoa msimbo wa kufanya kazi kwa utoaji wa kiotomatiki. Ikiwa una nia ya mada hii kama mimi - andika!

Chanzo: mapenzi.com

Kuongeza maoni