Herkese Merhaba!
Görev şu şekildedir - yukarıdaki resimde gösterilen ve N sunucuya dağıtılması gereken bir akış vardır.
NiFi Siteden Siteye (S2S), NiFi bulut sunucuları arasında veri aktarmanın güvenli, son derece özelleştirilebilir bir yoludur. S2S'nin nasıl çalıştığını görün
S2S kullanarak veri aktarımı söz konusu olduğunda, bir örnek istemci, ikinci örnek ise sunucu olarak adlandırılır. İstemci veri gönderir, sunucu onu alır. Aralarında veri aktarımını ayarlamanın iki yolu:
- Itmek. Veriler istemci örneğinden Uzak İşlem Grubu (RPG) kullanılarak gönderilir. Sunucu örneğinde veriler Giriş Bağlantı Noktası kullanılarak alınır
- Çekme. Sunucu verileri RPG'yi kullanarak alır, istemci ise Çıkış bağlantı noktasını kullanarak gönderir.
Yuvarlama akışı Apache Kayıt Defterinde saklanır.
Apache NiFi Registry, Apache NiFi'nin akış depolama ve sürüm oluşturma aracı sağlayan bir alt projesidir. Bir çeşit GIT. Kayıt defterini yükleme, yapılandırma ve çalışmayla ilgili bilgileri şu adreste bulabilirsiniz:
Başlangıçta N küçük bir sayı olduğunda akış makul bir sürede elle iletilir ve güncellenir.
Ancak N büyüdükçe daha fazla sorun ortaya çıkıyor:
- akışı güncellemek daha fazla zaman alır. Tüm sunuculara gitmeniz gerekiyor
- şablonları güncellerken hatalar var. Burada güncellendiler ama burada unuttular
- Çok sayıda benzer işlemi gerçekleştirirken insan hatası
Bütün bunlar bizi süreci otomatikleştirmenin gerekli olduğu gerçeğine getiriyor. Bu sorunu çözmek için aşağıdaki yolları denedim:
- NiFi yerine MiNiFi kullanın
- NiFi CLI
- NiPyAPI
MiNiFi'yi kullanma
Başka bir alt proje bu sorunun çözülmesine yardımcı olacaktır - MiNiFi C2 Sunucusu. Bu ürünün dağıtım mimarisinin merkezi noktası olması amaçlanmıştır. Ortamın nasıl yapılandırılacağı - bölümünde açıklanmıştır
Yukarıdaki makalede anlatılan seçenek işe yarıyor ve uygulanması zor değil ancak şunu unutmamalıyız:
- minifi, nifi'nin tüm işlemcilerine sahip değil
- Minifi'deki CPU sürümleri, NiFi'deki CPU sürümlerinin gerisinde kalıyor.
Bu yazının yazıldığı sırada NiFi'nin en son sürümü 1.9.2'dir. MiNiFi'nin son sürümünün işlemci versiyonu 1.7.0'dır. MiNiFi'ye işlemciler eklenebilir ancak NiFi ve MiNiFi işlemciler arasındaki sürüm farklılıklarından dolayı bu çalışmayabilir.
NiFi CLI
Bakılırsa
Yardımcı programı çalıştırın
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
Kayıt defterinden gerekli akışı yükleyebilmemiz için sepetin tanımlayıcılarını (kova tanımlayıcısı) ve akışın kendisini (akış tanımlayıcısı) bilmemiz gerekir. Bu veriler cli aracılığıyla veya NiFi kayıt defteri web arayüzünden elde edilebilir. Web arayüzü şuna benzer:
CLI'yi kullanarak şunu yaparsınız:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
İçe aktarma işlem grubunu kayıt defterinden çalıştırın:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
Önemli bir nokta, herhangi bir nifi örneğinin, süreç grubunu yürüteceğimiz ana bilgisayar olarak belirtilebilmesidir.
Durdurulan işlemcilerle birlikte eklenen süreç grubu, bunların başlatılması gerekiyor
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Harika, işlemciler başladı. Ancak sorunun şartlarına göre diğer örneklere veri gönderebilmek için NiFi örneklerine ihtiyacımız var. Verileri sunucuya aktarmak için Push yönteminin seçildiğini varsayalım. Veri aktarımını organize etmek için halihazırda akışımıza dahil olan, eklenen Uzak İşlem Grubu (RPG) üzerinde veri aktarımını etkinleştirmek (İletimi etkinleştir) gerekir.
CLI ve diğer kaynaklardaki belgelerde veri aktarımını etkinleştirmenin bir yolunu bulamadım. Bunu nasıl yapacağınızı biliyorsanız lütfen yorumlara yazın.
Madem ki bash'ımız var ve sonuna kadar gitmeye hazırız, bir çıkış yolu bulacağız! Bu sorunu çözmek için NiFi API'yi kullanabilirsiniz. Aşağıdaki yöntemi kullanalım, yukarıdaki örneklerden ID'yi alıyoruz (bizim durumumuzda 7f522a13-016e-1000-e504-d5b15587f2f3). NiFi API Yöntemlerinin Açıklaması
Gövdede, aşağıdaki biçimde JSON'u aktarmanız gerekir:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
“Çalışmak” için doldurulması gereken parametreler:
belirtmek, bildirmek — veri aktarım durumu. Veri aktarımını etkinleştirmek için İLETİM mevcut, devre dışı bırakmak için DURDURULDU
versiyon - işlemci sürümü
sürüm oluşturulduğunda varsayılan olarak 0 olacaktır, ancak bu parametreler yöntem kullanılarak elde edilebilir.
Bash betiklerini sevenler için bu yöntem uygun görünebilir, ancak benim için zor - bash betikleri benim favorim değil. Bir sonraki yol bence daha ilginç ve daha kullanışlı.
NiPyAPI
NiPyAPI, NiFi örnekleriyle etkileşim kurmaya yönelik bir Python kitaplığıdır.
Yapılandırmayı kullanıma sunmaya yönelik betiğimiz bir Python programıdır. Hadi kodlamaya geçelim.
Daha fazla çalışma için yapılandırmaları ayarlayın. Aşağıdaki parametrelere ihtiyacımız olacak:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
Ayrıca bu kütüphanenin açıklanan yöntemlerinin adlarını ekleyeceğim.
Kayıt defterini kullanarak nifi örneğine bağlarız.
nipyapi.versioning.create_registry_client
Bu adımda, kayıt defterinin örneğe zaten eklenmiş olup olmadığına dair bir kontrol de ekleyebilirsiniz, bunun için yöntemi kullanabilirsiniz.
nipyapi.versioning.list_registry_clients
Sepetteki akışı daha fazla aramak için kovayı buluyoruz
nipyapi.versioning.get_registry_bucket
Bulunan kovaya göre akış arıyoruz
nipyapi.versioning.get_flow_in_bucket
Daha sonra, bu süreç grubunun zaten eklenip eklenmediğini anlamak önemlidir. Süreç grubu koordinatlara göre yerleştirilir ve ikincisinin bir üst üste bindirilmesi durumunda bir durum ortaya çıkabilir. Kontrol ettim, olabilir 🙂 Eklenen tüm süreç gruplarını almak için yöntemi kullanın
nipyapi.canvas.list_all_process_groups
ve sonra örneğin ada göre arama yapabiliriz.
Şablonu güncelleme sürecini anlatmayacağım, sadece şablonun yeni sürümüne işlemciler eklenirse kuyruklarda mesajların bulunmasında herhangi bir sorun olmayacağını söyleyeceğim. Ancak işlemciler çıkarılırsa sorunlar ortaya çıkabilir (önünde bir mesaj kuyruğu birikmişse nifi, işlemcinin kaldırılmasına izin vermez). Bu sorunu nasıl çözdüğümle ilgileniyorsanız - lütfen bana yazın, bu konuyu tartışacağız. İletişim bilgileri makalenin sonunda. Süreç grubu ekleme adımına geçelim.
Komut dosyasında hata ayıklarken, akışın en son sürümünün her zaman yüklenmediği bir özellikle karşılaştım, bu nedenle öncelikle bu sürümü netleştirmenizi öneririm:
nipyapi.versioning.get_latest_flow_ver
Süreç grubunu dağıtın:
nipyapi.versioning.deploy_flow_version
İşlemcileri başlatıyoruz:
nipyapi.canvas.schedule_process_group
CLI ile ilgili blokta uzak işlem grubunda veri aktarımının otomatik olarak etkinleştirilmediği yazıyordu. Senaryoyu uygularken ben de bu sorunla karşılaştım. O zamanlar API kullanarak veri aktarımını başlatamıyordum ve NiPyAPI kütüphanesinin geliştiricisine yazıp tavsiye/yardım istemeye karar verdim. Geliştirici bana cevap verdi, sorunu tartıştık ve "bir şeyi kontrol etmek" için zamana ihtiyacı olduğunu yazdı. Ve şimdi, birkaç gün sonra, başlangıç sorunumu çözen bir Python fonksiyonunun yazıldığı bir e-posta geliyor !!! O zamanlar NiPyAPI sürümü 0.13.3'tü ve elbette içinde buna benzer hiçbir şey yoktu. Ancak yakın zamanda piyasaya sürülen 0.14.0 sürümünde bu işlev zaten kitaplığa dahil edilmiştir. Tanışmak
nipyapi.canvas.set_remote_process_group_transmission
Böylece NiPyAPI kütüphanesinin yardımıyla kayıt defterini bağladık, akışı hızlandırdık ve hatta işlemcileri ve veri aktarımını başlattık. Daha sonra kodu tarayabilir, her türlü kontrolü ekleyebilir, günlüğe kaydedebilirsiniz, hepsi bu. Ama bu tamamen farklı bir hikaye.
Göz önünde bulundurduğum otomasyon seçeneklerinden ikincisi bana en verimlisi gibi geldi. İlk olarak, bu hala yardımcı program kodunu gömebileceğiniz ve programlama dilinin tüm avantajlarından yararlanabileceğiniz python kodudur. İkincisi, NiPyAPI projesi aktif olarak gelişiyor ve sorun olması durumunda geliştiriciye yazabilirsiniz. Üçüncüsü, NiPyAPI, karmaşık sorunları çözmede NiFi ile etkileşime geçmek için hala daha esnek bir araçtır. Örneğin, akışta mesaj kuyruklarının şu anda boş olup olmadığının ve süreç grubunun güncellenmesinin mümkün olup olmadığının belirlenmesinde.
Bu kadar. NiFi'de akış dağıtımını otomatikleştirmeye yönelik 3 yaklaşımı, bir geliştiricinin karşılaşabileceği tuzakları anlattım ve dağıtımı otomatikleştirmek için bir çalışma kodu sağladım. Eğer siz de bu konuya benim kadar meraklıysanız -
Kaynak: habr.com