Hello!
Tugasnya adalah seperti berikut - terdapat aliran yang ditunjukkan dalam gambar di atas, yang perlu dilancarkan ke pelayan N dengan
NiFi Site to Site (S2S) ialah cara yang selamat dan boleh disesuaikan untuk memindahkan data antara kejadian NiFi. Lihat cara S2S berfungsi
Apabila ia datang kepada pemindahan data menggunakan S2S, satu contoh dipanggil klien, yang kedua adalah pelayan. Pelanggan menghantar data, pelayan menerimanya. Dua cara untuk menyediakan pemindahan data antara mereka:
- Tolak. Data dihantar daripada contoh klien menggunakan Kumpulan Proses Jauh (RPG). Pada contoh pelayan, data diterima menggunakan Port Input
- Tarik. Pelayan menerima data menggunakan RPG, pelanggan menghantar menggunakan port Output.
Aliran untuk rolling disimpan dalam Apache Registry.
Apache NiFi Registry ialah subprojek Apache NiFi yang menyediakan alat storan aliran dan versi. Sejenis GIT. Maklumat tentang memasang, mengkonfigurasi dan bekerja dengan pendaftaran boleh didapati dalam
Pada permulaan, apabila N ialah nombor kecil, aliran dihantar dan dikemas kini dengan tangan dalam masa yang munasabah.
Tetapi apabila N berkembang, terdapat lebih banyak masalah:
- memerlukan lebih banyak masa untuk mengemas kini aliran. Anda perlu pergi ke semua pelayan
- terdapat ralat mengemas kini templat. Di sini mereka mengemas kini, tetapi di sini mereka terlupa
- kesilapan manusia apabila melakukan sejumlah besar operasi serupa
Semua ini membawa kita kepada fakta bahawa ia adalah perlu untuk mengautomasikan proses. Saya telah mencuba cara berikut untuk menyelesaikan masalah ini:
- Gunakan MiNiFi dan bukannya NiFi
- NiFi CLI
- NiPyAPI
Menggunakan MiNiFi
Subprojek lain, Pelayan MiNiFi C2, akan membantu menyelesaikan masalah ini. Produk ini bertujuan untuk menjadi titik pusat dalam seni bina penggunaan. Cara mengkonfigurasi persekitaran - diterangkan dalam
Pilihan yang diterangkan dalam artikel di atas berfungsi dan tidak sukar untuk dilaksanakan, tetapi kita tidak boleh melupakan perkara berikut:
- minifi tidak mempunyai semua pemproses daripada nifi
- Versi CPU dalam Minifi ketinggalan di belakang versi CPU dalam NiFi.
Pada masa penulisan, versi terkini NiFi ialah 1.9.2. Versi pemproses versi MiNiFi terkini ialah 1.7.0. Pemproses boleh ditambahkan pada MiNiFi, tetapi disebabkan percanggahan versi antara pemproses NiFi dan MiNiFi, ini mungkin tidak berfungsi.
NiFi CLI
Dilihat oleh
Jalankan utiliti
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
Untuk kami memuatkan aliran yang diperlukan dari pendaftaran, kami perlu mengetahui pengecam bakul (pengecam baldi) dan aliran itu sendiri (pengecam aliran). Data ini boleh diperolehi sama ada melalui cli atau dalam antara muka web pendaftaran NiFi. Antara muka web kelihatan seperti ini:
Menggunakan CLI, anda melakukan ini:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Jalankan kumpulan proses import dari pendaftaran:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
Perkara penting ialah mana-mana tika nifi boleh ditentukan sebagai hos tempat kami melancarkan kumpulan proses.
Kumpulan proses ditambah dengan pemproses yang dihentikan, mereka perlu dimulakan
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Hebat, pemproses telah bermula. Walau bagaimanapun, mengikut syarat masalah, kami memerlukan contoh NiFi untuk menghantar data ke kejadian lain. Mari kita anggap bahawa kaedah Push telah dipilih untuk memindahkan data ke pelayan. Untuk mengatur pemindahan data, adalah perlu untuk membolehkan pemindahan data (Dayakan penghantaran) pada Kumpulan Proses Jauh (RPG) tambahan, yang telah disertakan dalam aliran kami.
Dalam dokumentasi dalam CLI dan sumber lain, saya tidak menemui jalan untuk membolehkan pemindahan data. Jika anda tahu bagaimana untuk melakukan ini, sila tulis dalam komen.
Oleh kerana kami mempunyai bash dan kami bersedia untuk pergi ke penghujung, kami akan mencari jalan keluar! Anda boleh menggunakan API NiFi untuk menyelesaikan masalah ini. Mari gunakan kaedah berikut, kami mengambil ID daripada contoh di atas (dalam kes kami ialah 7f522a13-016e-1000-e504-d5b15587f2f3). Penerangan Kaedah API NiFi
Dalam badan, anda perlu lulus JSON, dalam bentuk berikut:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Parameter yang mesti diisi untuk "berfungsi":
adalah β status pemindahan data. TRANSMITTING tersedia untuk membolehkan pemindahan data, BERHENTI untuk melumpuhkan
versi - versi pemproses
versi akan lalai kepada 0 apabila dibuat, tetapi parameter ini boleh diperoleh menggunakan kaedah tersebut
Bagi pencinta skrip bash, kaedah ini mungkin kelihatan sesuai, tetapi sukar bagi saya - skrip bash bukanlah kegemaran saya. Cara seterusnya adalah lebih menarik dan lebih mudah pada pendapat saya.
NiPyAPI
NiPyAPI ialah perpustakaan Python untuk berinteraksi dengan contoh NiFi.
Skrip kami untuk melancarkan konfigurasi ialah program Python. Mari kita beralih kepada pengekodan.
Sediakan konfigurasi untuk kerja selanjutnya. Kami memerlukan parameter berikut:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #ΠΏΡΡΡ Π΄ΠΎ nifi-api ΠΈΠ½ΡΡΠ°Π½ΡΠ°, Π½Π° ΠΊΠΎΡΠΎΡΠΎΠΌ ΡΠ°Π·Π²ΠΎΡΠ°ΡΠΈΠ²Π°Π΅ΠΌ process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #ΠΏΡΡΡ Π΄ΠΎ nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #Π½Π°Π·Π²Π°Π½ΠΈΠ΅ registry, ΠΊΠ°ΠΊ Π±ΡΠ΄Π΅Ρ Π½Π°Π·ΡΠ²Π°ΡΡΡΡ Π² ΠΈΠ½ΡΡΠ°Π½ΡΠ΅ nifi
nipyapi.config.bucket_name = 'BucketName' #Π½Π°Π·Π²Π°Π½ΠΈΠ΅ bucket, ΠΈΠ· ΠΊΠΎΡΠΎΡΠΎΠ³ΠΎ ΠΏΠΎΠ΄ΡΡΠ³ΠΈΠ²Π°Π΅ΠΌ flow
nipyapi.config.flow_name = 'FlowName' #Π½Π°Π·Π²Π°Π½ΠΈΠ΅ flow, ΠΊΠΎΡΠΎΡΠΎΠ΅ ΠΏΠΎΠ΄ΡΡΠ³ΠΈΠ²Π°Π΅ΠΌ
Selanjutnya saya akan memasukkan nama kaedah perpustakaan ini, yang diterangkan
Kami menyambungkan pendaftaran kepada contoh nifi menggunakan
nipyapi.versioning.create_registry_client
Pada langkah ini, anda juga boleh menambah semakan bahawa pendaftaran telah ditambahkan pada contoh, untuk ini anda boleh menggunakan kaedah
nipyapi.versioning.list_registry_clients
Kami mencari baldi untuk mencari lebih lanjut aliran dalam bakul
nipyapi.versioning.get_registry_bucket
Mengikut baldi yang ditemui, kami sedang mencari aliran
nipyapi.versioning.get_flow_in_bucket
Seterusnya, adalah penting untuk memahami jika kumpulan proses ini telah ditambahkan. Kumpulan proses diletakkan mengikut koordinat dan situasi mungkin timbul apabila kumpulan kedua diletakkan di atas satu. Saya semak, boleh jadi π Untuk mendapatkan semua kumpulan proses tambahan, gunakan kaedah
nipyapi.canvas.list_all_process_groups
dan kemudian kita boleh mencari, sebagai contoh, dengan nama.
Saya tidak akan menerangkan proses mengemas kini templat, saya hanya akan mengatakan bahawa jika pemproses ditambah dalam versi templat baru, maka tidak ada masalah dengan kehadiran mesej dalam baris gilir. Tetapi jika pemproses dialih keluar, maka masalah mungkin timbul (nifi tidak membenarkan penyingkiran pemproses jika baris gilir mesej telah terkumpul di hadapannya). Jika anda berminat dengan cara saya menyelesaikan masalah ini - tulis kepada saya, sila, kami akan membincangkan perkara ini. Kenalan di akhir artikel. Mari kita teruskan ke langkah menambah kumpulan proses.
Semasa menyahpepijat skrip, saya menjumpai ciri yang versi aliran terbaharu tidak selalu dikeluarkan, jadi saya mengesyorkan agar anda menjelaskan versi ini terlebih dahulu:
nipyapi.versioning.get_latest_flow_ver
Sebarkan kumpulan proses:
nipyapi.versioning.deploy_flow_version
Kami memulakan pemproses:
nipyapi.canvas.schedule_process_group
Dalam blok tentang CLI, telah ditulis bahawa pemindahan data tidak didayakan secara automatik dalam kumpulan proses jauh? Semasa melaksanakan skrip, saya juga menghadapi masalah ini. Pada masa itu, saya tidak dapat memulakan pemindahan data menggunakan API dan saya memutuskan untuk menulis kepada pembangun perpustakaan NiPyAPI dan meminta nasihat / bantuan. Pemaju menjawab saya, kami membincangkan masalah itu dan dia menulis bahawa dia memerlukan masa untuk "menyemak sesuatu". Dan kini, beberapa hari kemudian, e-mel tiba di mana fungsi Python ditulis yang menyelesaikan masalah permulaan saya !!! Pada masa itu, versi NiPyAPI ialah 0.13.3 dan, sudah tentu, tiada apa-apa jenis di dalamnya. Tetapi dalam versi 0.14.0, yang dikeluarkan baru-baru ini, fungsi ini telah dimasukkan ke dalam perpustakaan. jumpa
nipyapi.canvas.set_remote_process_group_transmission
Jadi, dengan bantuan perpustakaan NiPyAPI, kami menyambungkan pendaftaran, melancarkan aliran, dan juga memulakan pemproses dan pemindahan data. Kemudian anda boleh menyikat kod, menambah semua jenis cek, pengelogan, dan itu sahaja. Tetapi itu cerita yang sama sekali berbeza.
Daripada pilihan automasi yang saya pertimbangkan, yang kedua nampaknya paling berkesan bagi saya. Pertama, ini masih kod python, di mana anda boleh membenamkan kod program tambahan dan menikmati semua faedah bahasa pengaturcaraan. Kedua, projek NiPyAPI sedang giat dibangunkan dan sekiranya terdapat masalah anda boleh menulis surat kepada pembangun. Ketiga, NiPyAPI masih merupakan alat yang lebih fleksibel untuk berinteraksi dengan NiFi dalam menyelesaikan masalah yang kompleks. Sebagai contoh, dalam menentukan sama ada baris gilir mesej pada masa ini kosong dalam aliran dan sama ada mungkin untuk mengemas kini kumpulan proses.
Itu sahaja. Saya menerangkan 3 pendekatan untuk mengautomasikan penghantaran aliran dalam NiFi, masalah yang mungkin dihadapi oleh pembangun dan menyediakan kod berfungsi untuk mengautomasikan penghantaran. Jika anda berminat dengan topik ini seperti saya -
Sumber: www.habr.com