Automasi Penghantaran Aliran dalam Apache NiFi

Hello!

Automasi Penghantaran Aliran dalam Apache NiFi

Tugasnya adalah seperti berikut - terdapat aliran yang ditunjukkan dalam gambar di atas, yang perlu dilancarkan ke pelayan N dengan Apache NiFi. Ujian aliran - fail sedang dijana dan dihantar ke contoh NiFi yang lain. Pemindahan data berlaku menggunakan protokol Tapak ke Tapak NiFi.

NiFi Site to Site (S2S) ialah cara yang selamat dan boleh disesuaikan untuk memindahkan data antara kejadian NiFi. Lihat cara S2S berfungsi dokumentasi dan penting untuk diingat untuk menyediakan contoh NiFi anda untuk membolehkan S2S melihat di sini.

Apabila ia datang kepada pemindahan data menggunakan S2S, satu contoh dipanggil klien, yang kedua adalah pelayan. Pelanggan menghantar data, pelayan menerimanya. Dua cara untuk menyediakan pemindahan data antara mereka:

  1. Tolak. Data dihantar daripada contoh klien menggunakan Kumpulan Proses Jauh (RPG). Pada contoh pelayan, data diterima menggunakan Port Input
  2. Tarik. Pelayan menerima data menggunakan RPG, pelanggan menghantar menggunakan port Output.


Aliran untuk rolling disimpan dalam Apache Registry.

Apache NiFi Registry ialah subprojek Apache NiFi yang menyediakan alat storan aliran dan versi. Sejenis GIT. Maklumat tentang memasang, mengkonfigurasi dan bekerja dengan pendaftaran boleh didapati dalam dokumentasi rasmi. Aliran untuk storan digabungkan ke dalam kumpulan proses dan disimpan dalam pendaftaran dalam borang ini. Kami akan kembali kepada perkara ini kemudian dalam artikel.

Pada permulaan, apabila N ialah nombor kecil, aliran dihantar dan dikemas kini dengan tangan dalam masa yang munasabah.

Tetapi apabila N berkembang, terdapat lebih banyak masalah:

  1. memerlukan lebih banyak masa untuk mengemas kini aliran. Anda perlu pergi ke semua pelayan
  2. terdapat ralat mengemas kini templat. Di sini mereka mengemas kini, tetapi di sini mereka terlupa
  3. kesilapan manusia apabila melakukan sejumlah besar operasi serupa

Semua ini membawa kita kepada fakta bahawa ia adalah perlu untuk mengautomasikan proses. Saya telah mencuba cara berikut untuk menyelesaikan masalah ini:

  1. Gunakan MiNiFi dan bukannya NiFi
  2. NiFi CLI
  3. NiPyAPI

Menggunakan MiNiFi

ApacheMiNify ialah subprojek Apache NiFi. MiNiFy ialah ejen padat yang menggunakan pemproses yang sama seperti NiFi, membolehkan anda mencipta aliran yang sama seperti dalam NiFi. Keringanan ejen dicapai, antara lain, disebabkan oleh fakta bahawa MiNiFy tidak mempunyai antara muka grafik untuk konfigurasi aliran. Kekurangan antara muka grafik MiNiFy bermakna adalah perlu untuk menyelesaikan masalah penghantaran aliran dalam minifi. Memandangkan MiNiFy digunakan secara aktif dalam IOT, terdapat banyak komponen dan proses penyampaian aliran kepada contoh minifi akhir mesti diautomasikan. Tugas biasa, bukan?

Subprojek lain, Pelayan MiNiFi C2, akan membantu menyelesaikan masalah ini. Produk ini bertujuan untuk menjadi titik pusat dalam seni bina penggunaan. Cara mengkonfigurasi persekitaran - diterangkan dalam artikel ini pada HabrΓ© dan maklumat itu sudah cukup untuk menyelesaikan masalah. MiNiFi bersama dengan pelayan C2 mengemas kini konfigurasinya secara automatik. Satu-satunya kelemahan pendekatan ini ialah anda perlu membuat templat pada Pelayan C2, komitmen mudah untuk pendaftaran tidak mencukupi.

Pilihan yang diterangkan dalam artikel di atas berfungsi dan tidak sukar untuk dilaksanakan, tetapi kita tidak boleh melupakan perkara berikut:

  1. minifi tidak mempunyai semua pemproses daripada nifi
  2. Versi CPU dalam Minifi ketinggalan di belakang versi CPU dalam NiFi.

Pada masa penulisan, versi terkini NiFi ialah 1.9.2. Versi pemproses versi MiNiFi terkini ialah 1.7.0. Pemproses boleh ditambahkan pada MiNiFi, tetapi disebabkan percanggahan versi antara pemproses NiFi dan MiNiFi, ini mungkin tidak berfungsi.

NiFi CLI

Dilihat oleh penerangan alat di laman web rasmi, ini adalah alat untuk mengautomasikan interaksi antara NiFI dan Registry NiFi dalam bidang penghantaran aliran atau pengurusan proses. Muat turun alat ini untuk bermula. oleh itu.

Jalankan utiliti

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Untuk kami memuatkan aliran yang diperlukan dari pendaftaran, kami perlu mengetahui pengecam bakul (pengecam baldi) dan aliran itu sendiri (pengecam aliran). Data ini boleh diperolehi sama ada melalui cli atau dalam antara muka web pendaftaran NiFi. Antara muka web kelihatan seperti ini:

Automasi Penghantaran Aliran dalam Apache NiFi

Menggunakan CLI, anda melakukan ini:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Jalankan kumpulan proses import dari pendaftaran:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Perkara penting ialah mana-mana tika nifi boleh ditentukan sebagai hos tempat kami melancarkan kumpulan proses.

Kumpulan proses ditambah dengan pemproses yang dihentikan, mereka perlu dimulakan

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Hebat, pemproses telah bermula. Walau bagaimanapun, mengikut syarat masalah, kami memerlukan contoh NiFi untuk menghantar data ke kejadian lain. Mari kita anggap bahawa kaedah Push telah dipilih untuk memindahkan data ke pelayan. Untuk mengatur pemindahan data, adalah perlu untuk membolehkan pemindahan data (Dayakan penghantaran) pada Kumpulan Proses Jauh (RPG) tambahan, yang telah disertakan dalam aliran kami.

Automasi Penghantaran Aliran dalam Apache NiFi

Dalam dokumentasi dalam CLI dan sumber lain, saya tidak menemui jalan untuk membolehkan pemindahan data. Jika anda tahu bagaimana untuk melakukan ini, sila tulis dalam komen.

Oleh kerana kami mempunyai bash dan kami bersedia untuk pergi ke penghujung, kami akan mencari jalan keluar! Anda boleh menggunakan API NiFi untuk menyelesaikan masalah ini. Mari gunakan kaedah berikut, kami mengambil ID daripada contoh di atas (dalam kes kami ialah 7f522a13-016e-1000-e504-d5b15587f2f3). Penerangan Kaedah API NiFi di sini.

Automasi Penghantaran Aliran dalam Apache NiFi
Dalam badan, anda perlu lulus JSON, dalam bentuk berikut:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Parameter yang mesti diisi untuk "berfungsi":
adalah β€” status pemindahan data. TRANSMITTING tersedia untuk membolehkan pemindahan data, BERHENTI untuk melumpuhkan
versi - versi pemproses

versi akan lalai kepada 0 apabila dibuat, tetapi parameter ini boleh diperoleh menggunakan kaedah tersebut

Automasi Penghantaran Aliran dalam Apache NiFi

Bagi pencinta skrip bash, kaedah ini mungkin kelihatan sesuai, tetapi sukar bagi saya - skrip bash bukanlah kegemaran saya. Cara seterusnya adalah lebih menarik dan lebih mudah pada pendapat saya.

NiPyAPI

NiPyAPI ialah perpustakaan Python untuk berinteraksi dengan contoh NiFi. Halaman dokumentasi mengandungi maklumat yang diperlukan untuk bekerja dengan perpustakaan. Permulaan pantas diterangkan dalam projek pada github.

Skrip kami untuk melancarkan konfigurasi ialah program Python. Mari kita beralih kepada pengekodan.
Sediakan konfigurasi untuk kerja selanjutnya. Kami memerlukan parameter berikut:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #ΠΏΡƒΡ‚ΡŒ Π΄ΠΎ nifi-api инстанса, Π½Π° ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠΌ Ρ€Π°Π·Π²ΠΎΡ€Π°Ρ‡ΠΈΠ²Π°Π΅ΠΌ process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #ΠΏΡƒΡ‚ΡŒ Π΄ΠΎ nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #Π½Π°Π·Π²Π°Π½ΠΈΠ΅ registry, ΠΊΠ°ΠΊ Π±ΡƒΠ΄Π΅Ρ‚ Π½Π°Π·Ρ‹Π²Π°Ρ‚ΡŒΡΡ Π² инстансС nifi
nipyapi.config.bucket_name = 'BucketName' #Π½Π°Π·Π²Π°Π½ΠΈΠ΅ bucket, ΠΈΠ· ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠ³ΠΎ подтягиваСм flow
nipyapi.config.flow_name = 'FlowName' #Π½Π°Π·Π²Π°Π½ΠΈΠ΅ flow, ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠ΅ подтягиваСм

Selanjutnya saya akan memasukkan nama kaedah perpustakaan ini, yang diterangkan di sini.

Kami menyambungkan pendaftaran kepada contoh nifi menggunakan

nipyapi.versioning.create_registry_client

Pada langkah ini, anda juga boleh menambah semakan bahawa pendaftaran telah ditambahkan pada contoh, untuk ini anda boleh menggunakan kaedah

nipyapi.versioning.list_registry_clients

Kami mencari baldi untuk mencari lebih lanjut aliran dalam bakul

nipyapi.versioning.get_registry_bucket

Mengikut baldi yang ditemui, kami sedang mencari aliran

nipyapi.versioning.get_flow_in_bucket

Seterusnya, adalah penting untuk memahami jika kumpulan proses ini telah ditambahkan. Kumpulan proses diletakkan mengikut koordinat dan situasi mungkin timbul apabila kumpulan kedua diletakkan di atas satu. Saya semak, boleh jadi πŸ™‚ Untuk mendapatkan semua kumpulan proses tambahan, gunakan kaedah

nipyapi.canvas.list_all_process_groups

dan kemudian kita boleh mencari, sebagai contoh, dengan nama.

Saya tidak akan menerangkan proses mengemas kini templat, saya hanya akan mengatakan bahawa jika pemproses ditambah dalam versi templat baru, maka tidak ada masalah dengan kehadiran mesej dalam baris gilir. Tetapi jika pemproses dialih keluar, maka masalah mungkin timbul (nifi tidak membenarkan penyingkiran pemproses jika baris gilir mesej telah terkumpul di hadapannya). Jika anda berminat dengan cara saya menyelesaikan masalah ini - tulis kepada saya, sila, kami akan membincangkan perkara ini. Kenalan di akhir artikel. Mari kita teruskan ke langkah menambah kumpulan proses.

Semasa menyahpepijat skrip, saya menjumpai ciri yang versi aliran terbaharu tidak selalu dikeluarkan, jadi saya mengesyorkan agar anda menjelaskan versi ini terlebih dahulu:

nipyapi.versioning.get_latest_flow_ver

Sebarkan kumpulan proses:

nipyapi.versioning.deploy_flow_version

Kami memulakan pemproses:

nipyapi.canvas.schedule_process_group

Dalam blok tentang CLI, telah ditulis bahawa pemindahan data tidak didayakan secara automatik dalam kumpulan proses jauh? Semasa melaksanakan skrip, saya juga menghadapi masalah ini. Pada masa itu, saya tidak dapat memulakan pemindahan data menggunakan API dan saya memutuskan untuk menulis kepada pembangun perpustakaan NiPyAPI dan meminta nasihat / bantuan. Pemaju menjawab saya, kami membincangkan masalah itu dan dia menulis bahawa dia memerlukan masa untuk "menyemak sesuatu". Dan kini, beberapa hari kemudian, e-mel tiba di mana fungsi Python ditulis yang menyelesaikan masalah permulaan saya !!! Pada masa itu, versi NiPyAPI ialah 0.13.3 dan, sudah tentu, tiada apa-apa jenis di dalamnya. Tetapi dalam versi 0.14.0, yang dikeluarkan baru-baru ini, fungsi ini telah dimasukkan ke dalam perpustakaan. jumpa

nipyapi.canvas.set_remote_process_group_transmission

Jadi, dengan bantuan perpustakaan NiPyAPI, kami menyambungkan pendaftaran, melancarkan aliran, dan juga memulakan pemproses dan pemindahan data. Kemudian anda boleh menyikat kod, menambah semua jenis cek, pengelogan, dan itu sahaja. Tetapi itu cerita yang sama sekali berbeza.

Daripada pilihan automasi yang saya pertimbangkan, yang kedua nampaknya paling berkesan bagi saya. Pertama, ini masih kod python, di mana anda boleh membenamkan kod program tambahan dan menikmati semua faedah bahasa pengaturcaraan. Kedua, projek NiPyAPI sedang giat dibangunkan dan sekiranya terdapat masalah anda boleh menulis surat kepada pembangun. Ketiga, NiPyAPI masih merupakan alat yang lebih fleksibel untuk berinteraksi dengan NiFi dalam menyelesaikan masalah yang kompleks. Sebagai contoh, dalam menentukan sama ada baris gilir mesej pada masa ini kosong dalam aliran dan sama ada mungkin untuk mengemas kini kumpulan proses.

Itu sahaja. Saya menerangkan 3 pendekatan untuk mengautomasikan penghantaran aliran dalam NiFi, masalah yang mungkin dihadapi oleh pembangun dan menyediakan kod berfungsi untuk mengautomasikan penghantaran. Jika anda berminat dengan topik ini seperti saya - tulis!

Sumber: www.habr.com

Tambah komen