Apache NiFi'de Akış Dağıtımı Otomasyonu

Herkese Merhaba!

Apache NiFi'de Akış Dağıtımı Otomasyonu

Görev şu şekildedir - yukarıdaki resimde gösterilen ve N sunucuya dağıtılması gereken bir akış vardır. Apache NiFi. Akış testi - bir dosya oluşturuluyor ve başka bir NiFi örneğine gönderiliyor. Veri aktarımı, NiFi Siteden Siteye protokolü kullanılarak gerçekleşir.

NiFi Siteden Siteye (S2S), NiFi bulut sunucuları arasında veri aktarmanın güvenli, son derece özelleştirilebilir bir yoludur. S2S'nin nasıl çalıştığını görün belgeleme ve NiFi örneğinizi S2S'nin görebilmesine izin verecek şekilde ayarlamayı unutmamanız önemlidir. burada.

S2S kullanarak veri aktarımı söz konusu olduğunda, bir örnek istemci, ikinci örnek ise sunucu olarak adlandırılır. İstemci veri gönderir, sunucu onu alır. Aralarında veri aktarımını ayarlamanın iki yolu:

  1. Itmek. Veriler istemci örneğinden Uzak İşlem Grubu (RPG) kullanılarak gönderilir. Sunucu örneğinde veriler Giriş Bağlantı Noktası kullanılarak alınır
  2. Çekme. Sunucu verileri RPG'yi kullanarak alır, istemci ise Çıkış bağlantı noktasını kullanarak gönderir.


Yuvarlama akışı Apache Kayıt Defterinde saklanır.

Apache NiFi Registry, Apache NiFi'nin akış depolama ve sürüm oluşturma aracı sağlayan bir alt projesidir. Bir çeşit GIT. Kayıt defterini yükleme, yapılandırma ve çalışmayla ilgili bilgileri şu adreste bulabilirsiniz: resmi belgeler. Depolama akışı bir süreç grubunda birleştirilir ve kayıt defterinde bu formda depolanır. Bu konuya yazının ilerleyen kısımlarında döneceğiz.

Başlangıçta N küçük bir sayı olduğunda akış makul bir sürede elle iletilir ve güncellenir.

Ancak N büyüdükçe daha fazla sorun ortaya çıkıyor:

  1. akışı güncellemek daha fazla zaman alır. Tüm sunuculara gitmeniz gerekiyor
  2. şablonları güncellerken hatalar var. Burada güncellendiler ama burada unuttular
  3. Çok sayıda benzer işlemi gerçekleştirirken insan hatası

Bütün bunlar bizi süreci otomatikleştirmenin gerekli olduğu gerçeğine getiriyor. Bu sorunu çözmek için aşağıdaki yolları denedim:

  1. NiFi yerine MiNiFi kullanın
  2. NiFi CLI
  3. NiPyAPI

MiNiFi'yi kullanma

ApacheMiNify Apache NiFi'nin bir alt projesidir. MiNiFy, NiFi ile aynı işlemcileri kullanan ve NiFi ile aynı akışı oluşturmanıza olanak tanıyan kompakt bir aracıdır. Aracın hafifliği, diğer şeylerin yanı sıra, MiNiFy'nin akış konfigürasyonu için grafiksel bir arayüze sahip olmaması nedeniyle elde edilir. MiNiFy'nin grafiksel bir arayüze sahip olmaması, minifi'de akış dağıtım sorununu çözmenin gerekli olduğu anlamına gelir. MiNiFy, IoT'de aktif olarak kullanıldığından, birçok bileşen vardır ve son minifi örneklerine akış sağlama sürecinin otomatikleştirilmesi gerekir. Tanıdık bir görev, değil mi?

Başka bir alt proje bu sorunun çözülmesine yardımcı olacaktır - MiNiFi C2 Sunucusu. Bu ürünün dağıtım mimarisinin merkezi noktası olması amaçlanmıştır. Ortamın nasıl yapılandırılacağı - bölümünde açıklanmıştır Bu makalede Habré hakkında ve bilgi sorunu çözmek için yeterli. MiNiFi, C2 sunucusuyla birlikte yapılandırmasını otomatik olarak günceller. Bu yaklaşımın tek dezavantajı C2 Sunucusunda şablonlar oluşturmanız gerekmesidir; kayıt defterine basit bir taahhüt yeterli değildir.

Yukarıdaki makalede anlatılan seçenek işe yarıyor ve uygulanması zor değil ancak şunu unutmamalıyız:

  1. minifi, nifi'nin tüm işlemcilerine sahip değil
  2. Minifi'deki CPU sürümleri, NiFi'deki CPU sürümlerinin gerisinde kalıyor.

Bu yazının yazıldığı sırada NiFi'nin en son sürümü 1.9.2'dir. MiNiFi'nin son sürümünün işlemci versiyonu 1.7.0'dır. MiNiFi'ye işlemciler eklenebilir ancak NiFi ve MiNiFi işlemciler arasındaki sürüm farklılıklarından dolayı bu çalışmayabilir.

NiFi CLI

Bakılırsa tanım resmi web sitesindeki araç, akış dağıtımı veya süreç yönetimi alanında NiFI ile NiFi Registry arasındaki etkileşimi otomatikleştirmek için bir araçtır. Başlamak için bu aracı indirin. bundan dolayı.

Yardımcı programı çalıştırın

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Kayıt defterinden gerekli akışı yükleyebilmemiz için sepetin tanımlayıcılarını (kova tanımlayıcısı) ve akışın kendisini (akış tanımlayıcısı) bilmemiz gerekir. Bu veriler cli aracılığıyla veya NiFi kayıt defteri web arayüzünden elde edilebilir. Web arayüzü şuna benzer:

Apache NiFi'de Akış Dağıtımı Otomasyonu

CLI'yi kullanarak şunu yaparsınız:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

İçe aktarma işlem grubunu kayıt defterinden çalıştırın:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Önemli bir nokta, herhangi bir nifi örneğinin, süreç grubunu yürüteceğimiz ana bilgisayar olarak belirtilebilmesidir.

Durdurulan işlemcilerle birlikte eklenen süreç grubu, bunların başlatılması gerekiyor

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Harika, işlemciler başladı. Ancak sorunun şartlarına göre diğer örneklere veri gönderebilmek için NiFi örneklerine ihtiyacımız var. Verileri sunucuya aktarmak için Push yönteminin seçildiğini varsayalım. Veri aktarımını organize etmek için halihazırda akışımıza dahil olan, eklenen Uzak İşlem Grubu (RPG) üzerinde veri aktarımını etkinleştirmek (İletimi etkinleştir) gerekir.

Apache NiFi'de Akış Dağıtımı Otomasyonu

CLI ve diğer kaynaklardaki belgelerde veri aktarımını etkinleştirmenin bir yolunu bulamadım. Bunu nasıl yapacağınızı biliyorsanız lütfen yorumlara yazın.

Madem ki bash'ımız var ve sonuna kadar gitmeye hazırız, bir çıkış yolu bulacağız! Bu sorunu çözmek için NiFi API'yi kullanabilirsiniz. Aşağıdaki yöntemi kullanalım, yukarıdaki örneklerden ID'yi alıyoruz (bizim durumumuzda 7f522a13-016e-1000-e504-d5b15587f2f3). NiFi API Yöntemlerinin Açıklaması burada.

Apache NiFi'de Akış Dağıtımı Otomasyonu
Gövdede, aşağıdaki biçimde JSON'u aktarmanız gerekir:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

“Çalışmak” için doldurulması gereken parametreler:
belirtmek, bildirmek — veri aktarım durumu. Veri aktarımını etkinleştirmek için İLETİM mevcut, devre dışı bırakmak için DURDURULDU
versiyon - işlemci sürümü

sürüm oluşturulduğunda varsayılan olarak 0 olacaktır, ancak bu parametreler yöntem kullanılarak elde edilebilir.

Apache NiFi'de Akış Dağıtımı Otomasyonu

Bash betiklerini sevenler için bu yöntem uygun görünebilir, ancak benim için zor - bash betikleri benim favorim değil. Bir sonraki yol bence daha ilginç ve daha kullanışlı.

NiPyAPI

NiPyAPI, NiFi örnekleriyle etkileşim kurmaya yönelik bir Python kitaplığıdır. Dokümantasyon sayfası Kütüphaneyle çalışmak için gerekli bilgileri içerir. Hızlı başlatma bölümünde açıklanmıştır. proje github'da.

Yapılandırmayı kullanıma sunmaya yönelik betiğimiz bir Python programıdır. Hadi kodlamaya geçelim.
Daha fazla çalışma için yapılandırmaları ayarlayın. Aşağıdaki parametrelere ihtiyacımız olacak:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Ayrıca bu kütüphanenin açıklanan yöntemlerinin adlarını ekleyeceğim. burada.

Kayıt defterini kullanarak nifi örneğine bağlarız.

nipyapi.versioning.create_registry_client

Bu adımda, kayıt defterinin örneğe zaten eklenmiş olup olmadığına dair bir kontrol de ekleyebilirsiniz, bunun için yöntemi kullanabilirsiniz.

nipyapi.versioning.list_registry_clients

Sepetteki akışı daha fazla aramak için kovayı buluyoruz

nipyapi.versioning.get_registry_bucket

Bulunan kovaya göre akış arıyoruz

nipyapi.versioning.get_flow_in_bucket

Daha sonra, bu süreç grubunun zaten eklenip eklenmediğini anlamak önemlidir. Süreç grubu koordinatlara göre yerleştirilir ve ikincisinin bir üst üste bindirilmesi durumunda bir durum ortaya çıkabilir. Kontrol ettim, olabilir 🙂 Eklenen tüm süreç gruplarını almak için yöntemi kullanın

nipyapi.canvas.list_all_process_groups

ve sonra örneğin ada göre arama yapabiliriz.

Şablonu güncelleme sürecini anlatmayacağım, sadece şablonun yeni sürümüne işlemciler eklenirse kuyruklarda mesajların bulunmasında herhangi bir sorun olmayacağını söyleyeceğim. Ancak işlemciler çıkarılırsa sorunlar ortaya çıkabilir (önünde bir mesaj kuyruğu birikmişse nifi, işlemcinin kaldırılmasına izin vermez). Bu sorunu nasıl çözdüğümle ilgileniyorsanız - lütfen bana yazın, bu konuyu tartışacağız. İletişim bilgileri makalenin sonunda. Süreç grubu ekleme adımına geçelim.

Komut dosyasında hata ayıklarken, akışın en son sürümünün her zaman yüklenmediği bir özellikle karşılaştım, bu nedenle öncelikle bu sürümü netleştirmenizi öneririm:

nipyapi.versioning.get_latest_flow_ver

Süreç grubunu dağıtın:

nipyapi.versioning.deploy_flow_version

İşlemcileri başlatıyoruz:

nipyapi.canvas.schedule_process_group

CLI ile ilgili blokta uzak işlem grubunda veri aktarımının otomatik olarak etkinleştirilmediği yazıyordu. Senaryoyu uygularken ben de bu sorunla karşılaştım. O zamanlar API kullanarak veri aktarımını başlatamıyordum ve NiPyAPI kütüphanesinin geliştiricisine yazıp tavsiye/yardım istemeye karar verdim. Geliştirici bana cevap verdi, sorunu tartıştık ve "bir şeyi kontrol etmek" için zamana ihtiyacı olduğunu yazdı. Ve şimdi, birkaç gün sonra, başlangıç ​​sorunumu çözen bir Python fonksiyonunun yazıldığı bir e-posta geliyor !!! O zamanlar NiPyAPI sürümü 0.13.3'tü ve elbette içinde buna benzer hiçbir şey yoktu. Ancak yakın zamanda piyasaya sürülen 0.14.0 sürümünde bu işlev zaten kitaplığa dahil edilmiştir. Tanışmak

nipyapi.canvas.set_remote_process_group_transmission

Böylece NiPyAPI kütüphanesinin yardımıyla kayıt defterini bağladık, akışı hızlandırdık ve hatta işlemcileri ve veri aktarımını başlattık. Daha sonra kodu tarayabilir, her türlü kontrolü ekleyebilir, günlüğe kaydedebilirsiniz, hepsi bu. Ama bu tamamen farklı bir hikaye.

Göz önünde bulundurduğum otomasyon seçeneklerinden ikincisi bana en verimlisi gibi geldi. İlk olarak, bu hala yardımcı program kodunu gömebileceğiniz ve programlama dilinin tüm avantajlarından yararlanabileceğiniz python kodudur. İkincisi, NiPyAPI projesi aktif olarak gelişiyor ve sorun olması durumunda geliştiriciye yazabilirsiniz. Üçüncüsü, NiPyAPI, karmaşık sorunları çözmede NiFi ile etkileşime geçmek için hala daha esnek bir araçtır. Örneğin, akışta mesaj kuyruklarının şu anda boş olup olmadığının ve süreç grubunun güncellenmesinin mümkün olup olmadığının belirlenmesinde.

Bu kadar. NiFi'de akış dağıtımını otomatikleştirmeye yönelik 3 yaklaşımı, bir geliştiricinin karşılaşabileceği tuzakları anlattım ve dağıtımı otomatikleştirmek için bir çalışma kodu sağladım. Eğer siz de bu konuya benim kadar meraklıysanız - yazmak!

Kaynak: habr.com

Yorum ekle