Otomatisasi pengiriman aliran di Apache NiFi

Hello!

Otomatisasi pengiriman aliran di Apache NiFi

Tugasnya adalah sebagai berikut - ada aliran yang ditunjukkan pada gambar di atas, yang perlu diluncurkan ke N server dengan Apache NiFi. Tes aliran - file sedang dibuat dan dikirim ke instance NiFi lain. Transfer data terjadi menggunakan protokol NiFi Site to Site.

NiFi Site to Site (S2S) adalah cara yang aman dan mudah dikonfigurasi untuk mentransfer data antar instans NiFi. Cara kerja S2S, lihat dokumentasi dan penting untuk tidak lupa mengkonfigurasi instance NiFi untuk mengizinkan S2S, lihat di sini.

Dalam kasus di mana kita berbicara tentang transfer data menggunakan S2S, satu contoh disebut klien, yang kedua disebut server. Klien mengirim data, server menerima. Dua cara untuk mengkonfigurasi transfer data di antara keduanya:

  1. Dorong. Data dikirim dari instance klien menggunakan Remote Process Group (RPG). Pada contoh server, data diterima menggunakan Port Input
  2. Tarik. Server menerima data menggunakan RPG, klien mengirimkan menggunakan port Output.


Alur untuk bergulir disimpan di Apache Registry.

Apache NiFi Registry adalah subproyek Apache NiFi yang menyediakan alat untuk penyimpanan aliran dan kontrol versi. Semacam GIT. Informasi tentang instalasi, konfigurasi, dan bekerja dengan registri dapat ditemukan di dokumentasi resmi. Aliran penyimpanan digabungkan ke dalam grup proses dan disimpan dalam registri dalam formulir ini. Kami akan kembali membahasnya nanti di artikel.

Pada awalnya, ketika N adalah angka kecil, aliran dikirimkan dan diperbarui secara manual dalam waktu yang dapat diterima.

Namun seiring dengan pertumbuhan N, permasalahan menjadi semakin banyak:

  1. dibutuhkan lebih banyak waktu untuk memperbarui alur. Anda harus masuk ke semua server
  2. Terjadi kesalahan pembaruan templat. Di sini mereka memperbaruinya, tetapi di sini mereka lupa
  3. kesalahan manusia saat melakukan sejumlah besar operasi serupa

Semua ini membawa kita pada fakta bahwa kita perlu mengotomatiskan prosesnya. Saya mencoba cara berikut untuk mengatasi masalah ini:

  1. Gunakan MiNiFi, bukan NiFi
  2. CLI NiFi
  3. NiPyAPI

Menggunakan MiNiFi

ApacheMiNify adalah subproyek Apache NiFi. MiNiFy adalah agen ringkas yang menggunakan prosesor yang sama dengan NiFi, memungkinkan Anda membuat aliran yang sama seperti di NiFi. Ringannya agen dicapai, antara lain, karena MiNiFy tidak memiliki antarmuka grafis untuk konfigurasi aliran. Kurangnya antarmuka grafis MiNiFy berarti masalah pengiriman aliran di minifi perlu diselesaikan. Karena MiNiFy secara aktif digunakan dalam IOT, ada banyak komponen dan proses pengiriman aliran ke minifi instance akhir harus diotomatisasi. Tugas yang familiar, bukan?

Subproyek lain, MiNiFi C2 Server, akan membantu mengatasi masalah ini. Produk ini dimaksudkan untuk menjadi titik sentral dalam arsitektur penerapan. Cara mengkonfigurasi lingkungan - dijelaskan di Artikel ini tentang Habré dan informasinya cukup untuk memecahkan masalah tersebut. MiNiFi bersama dengan server C2 secara otomatis memperbarui konfigurasinya. Satu-satunya kelemahan dari pendekatan ini adalah Anda harus membuat templat di Server C2; komit sederhana ke registri saja tidak cukup.

Opsi yang dijelaskan dalam artikel di atas berfungsi dan tidak sulit untuk diterapkan, namun kita tidak boleh melupakan hal berikut:

  1. Minifi tidak memiliki semua prosesor dari nifi
  2. Versi CPU di Minifi tertinggal dibandingkan versi CPU di NiFi.

Pada saat penulisan, versi terbaru NiFi adalah 1.9.2. Versi prosesor MiNiFi terbaru adalah 1.7.0. Prosesor dapat ditambahkan ke MiNiFi, namun karena perbedaan versi antara prosesor NiFi dan MiNiFi, hal ini mungkin tidak berfungsi.

CLI NiFi

Dilihat oleh keterangan alat di situs resminya, ini adalah alat untuk mengotomatiskan interaksi antara NiFI dan NiFi Registry di bidang pengiriman aliran atau manajemen proses. Unduh alat ini untuk memulai. karenanya.

Jalankan utilitasnya

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Agar kita dapat memuat aliran yang diperlukan dari registri, kita perlu mengetahui pengidentifikasi ember (pengidentifikasi ember) dan aliran itu sendiri (pengidentifikasi aliran). Data ini dapat diperoleh melalui cli atau di antarmuka web registri NiFi. Di antarmuka web tampilannya seperti ini:

Otomatisasi pengiriman aliran di Apache NiFi

Dengan menggunakan CLI, hal ini dilakukan:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Jalankan grup proses impor dari registri:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Poin penting adalah bahwa setiap instance nifi dapat ditentukan sebagai host tempat kita menjalankan grup proses.

Grup proses ditambahkan dengan prosesor yang dihentikan, mereka harus dimulai

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Hebat, prosesornya sudah dimulai. Namun, sesuai dengan kondisi masalahnya, kita memerlukan instance NiFi untuk mengirim data ke instance lain. Misalkan metode Push dipilih untuk mentransfer data ke server. Untuk mengatur transfer data, perlu untuk mengaktifkan transfer data (Aktifkan transmisi) pada Grup Proses Jarak Jauh (RPG) yang ditambahkan, yang sudah termasuk dalam aliran kami.

Otomatisasi pengiriman aliran di Apache NiFi

Dalam dokumentasi di CLI dan sumber lain, saya tidak menemukan cara untuk mengaktifkan transfer data. Jika Anda tahu cara melakukannya, silakan tulis di komentar.

Karena kita sudah mengadakan pesta dan siap untuk mencapai akhir, kita akan menemukan jalan keluarnya! Anda dapat menggunakan NiFi API untuk mengatasi masalah ini. Mari kita gunakan cara berikut, kita ambil ID dari contoh di atas (dalam kasus kita adalah 7f522a13-016e-1000-e504-d5b15587f2f3). Deskripsi Metode NiFi API di sini.

Otomatisasi pengiriman aliran di Apache NiFi
Di badan, Anda harus meneruskan JSON, dengan bentuk berikut:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Parameter yang perlu diisi agar berfungsi:
negara — status transfer data. Tersedia TRANSMITTING untuk mengaktifkan transfer data, STOPPED untuk menonaktifkan
versi - versi prosesor

versi akan default ke 0 saat dibuat, tetapi parameter ini dapat diperoleh dengan menggunakan metode ini

Otomatisasi pengiriman aliran di Apache NiFi

Bagi penggemar skrip bash, cara ini mungkin tampak cocok, tetapi bagi saya agak sulit - skrip bash bukanlah favorit saya. Cara selanjutnya menurut saya lebih menarik dan nyaman.

NiPyAPI

NiPyAPI adalah perpustakaan Python untuk berinteraksi dengan instance NiFi. Halaman dokumentasi berisi informasi yang diperlukan untuk bekerja dengan perpustakaan. Mulai cepat dijelaskan dalam proyek di github.

Skrip kami untuk meluncurkan konfigurasi adalah program Python. Mari beralih ke pengkodean.
Siapkan konfigurasi untuk pekerjaan lebih lanjut. Kami membutuhkan parameter berikut:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Selanjutnya saya akan memasukkan nama metode perpustakaan ini, yang dijelaskan di sini.

Kami menghubungkan registri ke instance nifi menggunakan

nipyapi.versioning.create_registry_client

Pada langkah ini, Anda juga dapat menambahkan tanda centang bahwa registri telah ditambahkan ke instance; untuk ini Anda dapat menggunakan metode ini

nipyapi.versioning.list_registry_clients

Kami menemukan ember untuk mencari lebih lanjut aliran di keranjang

nipyapi.versioning.get_registry_bucket

Dengan menggunakan ember yang ditemukan, kami mencari aliran

nipyapi.versioning.get_flow_in_bucket

Selanjutnya, penting untuk memahami apakah grup proses ini telah ditambahkan. Grup proses ditempatkan berdasarkan koordinat dan situasi mungkin muncul ketika yang kedua ditumpangkan di atas yang satu. Sudah saya cek, bisa 🙂 Untuk mendapatkan semua grup proses yang ditambahkan, gunakan metode tersebut

nipyapi.canvas.list_all_process_groups

lalu kita bisa mencari, misalnya berdasarkan nama.

Saya tidak akan menjelaskan proses update template, saya hanya akan mengatakan bahwa jika prosesor ditambahkan di template versi baru, maka tidak ada masalah dengan keberadaan pesan di antrian. Tetapi jika prosesor dilepas, maka masalah mungkin timbul (nifi tidak mengizinkan Anda menghapus prosesor jika antrian pesan menumpuk di depannya). Jika Anda tertarik dengan cara saya memecahkan masalah ini, silakan kirim surat kepada saya dan kami akan membahas masalah ini. Kontak di akhir artikel. Mari beralih ke langkah menambahkan grup proses.

Saat men-debug skrip, saya menemukan kekhasan bahwa aliran versi terbaru tidak selalu ditarik, jadi saya sarankan untuk memeriksa versi ini terlebih dahulu:

nipyapi.versioning.get_latest_flow_ver

Sebarkan grup proses:

nipyapi.versioning.deploy_flow_version

Kami memulai prosesor:

nipyapi.canvas.schedule_process_group

Di blok tentang CLI tertulis bahwa transfer data tidak diaktifkan secara otomatis di grup proses jarak jauh? Saat mengimplementasikan skrip, saya juga mengalami masalah ini. Pada saat itu, saya tidak dapat memulai transfer data menggunakan API dan saya memutuskan untuk menulis surat kepada pengembang perpustakaan NiPyAPI dan meminta saran/bantuan. Pengembang menjawab saya, kami mendiskusikan masalahnya dan dia menulis bahwa dia memerlukan waktu untuk "memeriksa sesuatu". Dan sekarang, beberapa hari kemudian, sebuah email masuk berisi fungsi Python tertulis yang memecahkan masalah startup saya!!! Saat itu, versi NiPyAPI adalah 0.13.3 dan tentu saja tidak ada yang seperti itu. Namun di versi 0.14.0 yang dirilis baru-baru ini, fungsi ini sudah disertakan di perpustakaan. Bertemu

nipyapi.canvas.set_remote_process_group_transmission

Jadi, dengan menggunakan perpustakaan NiPyAPI, kami menghubungkan registri, meluncurkan aliran, dan bahkan memulai prosesor dan transfer data. Kemudian Anda dapat menyisir kodenya, menambahkan segala macam pemeriksaan, logging, dan itu saja. Tapi itu cerita yang sama sekali berbeda.

Dari opsi otomasi yang saya pertimbangkan, yang terakhir menurut saya paling efisien. Pertama, ini masih berupa kode python, di mana Anda dapat menyematkan kode program tambahan dan memanfaatkan semua manfaat bahasa pemrograman tersebut. Kedua, proyek NiPyAPI secara aktif berkembang dan jika ada masalah, Anda dapat menulis ke pengembang. Ketiga, NiPyAPI masih merupakan alat yang lebih fleksibel untuk berinteraksi dengan NiFi dalam memecahkan masalah yang kompleks. Misalnya, dalam menentukan apakah antrian pesan sekarang kosong dalam aliran dan apakah grup proses dapat diperbarui.

Itu saja. Saya menjelaskan 3 pendekatan untuk mengotomatisasi pengiriman aliran di NiFi, kendala yang mungkin dihadapi pengembang dan memberikan kode yang berfungsi untuk mengotomatisasi pengiriman. Jika Anda sama tertariknya dengan topik ini seperti saya - е!

Sumber: www.habr.com

Tambah komentar