Apache NiFi-da axının çatdırılmasının avtomatlaşdırılması

Hello!

Apache NiFi-da axının çatdırılmasının avtomatlaşdırılması

Tapşırıq aşağıdakı kimidir - yuxarıdakı şəkildə təqdim olunan bir axın var, onu N serverlərə yaymaq lazımdır. Apache NiFi. Axın testi - fayl yaradılır və başqa NiFi nümunəsinə göndərilir. Məlumat ötürülməsi NiFi Saytdan Sayta protokolundan istifadə etməklə baş verir.

NiFi Saytdan Sayta (S2S) NiFi nümunələri arasında məlumat ötürmək üçün təhlükəsiz, asanlıqla konfiqurasiya edilə bilən bir yoldur. S2S necə işləyir, baxın sənədləşdirmə və S2S-ə icazə vermək üçün NiFi instansiyasını konfiqurasiya etməyi unutmamaq vacibdir, bax burada.

S2S istifadə edərək məlumat ötürülməsindən danışdığımız hallarda, bir nümunə müştəri, ikinci server adlanır. Müştəri məlumatları göndərir, server qəbul edir. Onlar arasında məlumat ötürülməsini konfiqurasiya etməyin iki yolu:

  1. Basmaq. Müştəri nümunəsindən məlumat Uzaqdan Proses Qrupundan (RPG) istifadə edərək göndərilir. Server nümunəsində məlumat Giriş Portundan istifadə etməklə qəbul edilir
  2. Çəkmək. Server RPG-dən istifadə edərək məlumatları qəbul edir, müştəri çıxış portundan istifadə edərək göndərir.


Yayılma üçün axın Apache Registry-də saxlanılır.

Apache NiFi Registry, axın saxlanması və versiyaya nəzarət üçün alət təmin edən Apache NiFi alt layihəsidir. Bir növ GIT. Quraşdırma, konfiqurasiya və reyestrlə işləmək haqqında məlumatı burada tapa bilərsiniz rəsmi sənədlər. Saxlama üçün axın bir proses qrupuna birləşdirilir və bu formada reyestrdə saxlanılır. Bu mövzuya daha sonra məqalədə qayıdacağıq.

Başlanğıcda, N kiçik rəqəm olduqda, axın məqbul vaxt ərzində əl ilə çatdırılır və yenilənir.

Lakin N böyüdükcə problemlər daha da çoxalır:

  1. axını yeniləmək üçün daha çox vaxt tələb olunur. Bütün serverlərə daxil olmalısınız
  2. Şablonun yenilənməsi xətaları baş verir. Burada onu yenilədilər, amma burada unutdular
  3. çox sayda oxşar əməliyyatları yerinə yetirərkən insan səhvləri

Bütün bunlar bizi prosesi avtomatlaşdırmağa ehtiyac duyduğumuza gətirir. Bu problemi həll etmək üçün aşağıdakı yolları sınadım:

  1. NiFi əvəzinə MiNiFi istifadə edin
  2. NiFi CLI
  3. NiPyAPI

MiNiFi istifadə

Apache MiNiFy - Apache NiFi alt layihəsi. MiNiFy NiFi ilə eyni prosessorlardan istifadə edən, NiFi-da olduğu kimi eyni axınları yaratmağa imkan verən yığcam agentdir. Agentin yüngül təbiəti, digər şeylər arasında, MiNiFy-nin axın konfiqurasiyası üçün qrafik interfeysinin olmaması ilə əldə edilir. MiNiFy-də qrafik interfeysin olmaması o deməkdir ki, minifi-yə axının çatdırılması problemini həll etmək lazımdır. MiNiFy IOT-da aktiv şəkildə istifadə edildiyi üçün çoxlu komponentlər var və axınların son minifi nümunələrinə çatdırılması prosesi avtomatlaşdırılmalıdır. Tanış tapşırıq, elə deyilmi?

Başqa bir alt layihə bu problemi həll etməyə kömək edəcək - MiNiFi C2 Server. Bu məhsul konfiqurasiyanın təqdimat arxitekturasında mərkəzi nöqtə olmaq üçün nəzərdə tutulub. Ətraf mühiti necə konfiqurasiya etmək olar - bölmədə təsvir edilmişdir Bu məqalə Problemi həll etmək üçün Habré haqqında kifayət qədər məlumat var. MiNiFi, C2 serveri ilə birlikdə onun konfiqurasiyasını avtomatik olaraq yeniləyir. Bu yanaşmanın yeganə çatışmazlığı odur ki, siz C2 Serverdə şablonlar yaratmalısınız, reyestr üçün sadə bir öhdəlik kifayət deyil.

Yuxarıdakı məqalədə təsvir olunan seçim işləyir və həyata keçirmək çətin deyil, lakin aşağıdakıları unutmamalıyıq:

  1. Minifi-də nifi-dən bütün prosessorlar yoxdur
  2. Minifi prosessor versiyaları NiFi prosessor versiyalarından geri qalır.

Yazı zamanı NiFi-nin ən son versiyası 1.9.2-dir. MiNiFi prosessorunun ən son versiyası 1.7.0-dır. Prosessorları MiNiFi-a əlavə etmək olar, lakin NiFi və MiNiFi prosessorları arasında versiya uyğunsuzluğu səbəbindən bu işləməyə bilər.

NiFi CLI

Hörmətlə təsviri rəsmi veb saytındakı alət, bu axının çatdırılması və ya prosesin idarə edilməsi sahəsində NiFI və NiFi Registry arasında qarşılıqlı əlaqəni avtomatlaşdırmaq üçün bir vasitədir. Başlamaq üçün bu aləti yükləməlisiniz. buradan.

Utiliti işə salın

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Reyestrdən tələb olunan axını yükləməyimiz üçün vedrənin identifikatorlarını (kovanın identifikatoru) və axının özünün (axın identifikatoru) bilməliyik. Bu məlumatları cli vasitəsilə və ya NiFi reyestrinin veb interfeysində əldə etmək olar. Veb interfeysində bu belə görünür:

Apache NiFi-da axının çatdırılmasının avtomatlaşdırılması

CLI istifadə edərək, bu edilir:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Proses qrupunu registrdən idxal etməyə başlayırıq:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Əhəmiyyətli bir məqam ondan ibarətdir ki, hər hansı bir nifi nümunəsi proses qrupunu daşıdığımız host kimi göstərilə bilər.

Proses qrupu dayandırılmış prosessorlarla əlavə edildi, onları işə salmaq lazımdır

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Əla, prosessorlar işə salınıb. Bununla belə, tapşırığın şərtlərinə görə, məlumatları digər instansiyalara göndərmək üçün bizə NiFi nümunələri lazımdır. Tutaq ki, verilənləri serverə ötürmək üçün Push metodunu seçmisiniz. Məlumat ötürülməsini təşkil etmək üçün siz artıq axınımıza daxil edilmiş əlavə Uzaqdan Proses Qrupunda (RPG) məlumat ötürülməsini aktivləşdirməlisiniz.

Apache NiFi-da axının çatdırılmasının avtomatlaşdırılması

CLI və digər mənbələrdəki sənədlərdə məlumat ötürülməsini aktivləşdirmək üçün bir yol tapmadım. Bunu necə edəcəyinizi bilirsinizsə, şərhlərdə yazın.

Bashımız olduğundan və sona qədər getməyə hazır olduğumuz üçün çıxış yolu tapacağıq! Bu problemi həll etmək üçün NiFi API istifadə edə bilərsiniz. Aşağıdakı üsuldan istifadə edək, yuxarıdakı nümunələrdən ID götürək (bizim vəziyyətimizdə 7f522a13-016e-1000-e504-d5b15587f2f3-dir). NiFi API metodlarının təsviri burada.

Apache NiFi-da axının çatdırılmasının avtomatlaşdırılması
Bədəndə JSON-u belə keçirməlisiniz:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Onun işləməsi üçün doldurulmalı olan parametrlər:
idi — məlumat ötürülməsi vəziyyəti. Mövcuddur: Məlumat ötürülməsini aktivləşdirmək üçün VERİLİR, deaktiv etmək üçün DAYANDIRILDI
variant - prosessor versiyası

versiya yaradılarkən standart olaraq 0 olacaq, lakin bu parametrlər metoddan istifadə etməklə əldə edilə bilər

Apache NiFi-da axının çatdırılmasının avtomatlaşdırılması

Bash skriptlərinin pərəstişkarları üçün bu üsul uyğun görünə bilər, amma mənim üçün bir az çətindir - bash skriptləri mənim sevimli deyil. Növbəti üsul mənim fikrimcə daha maraqlı və rahatdır.

NiPyAPI

NiPyAPI NiFi nümunələri ilə qarşılıqlı əlaqə üçün Python kitabxanasıdır. Sənədlər səhifəsi kitabxana ilə işləmək üçün lazım olan məlumatları ehtiva edir. Tez başlanğıcda təsvir edilmişdir layihə github-da.

Konfiqurasiyanı yaymaq üçün skriptimiz Python-da bir proqramdır. Gəlin kodlaşdırmaya keçək.
Sonrakı iş üçün konfiqurasiyaları quraşdırdıq. Aşağıdakı parametrlərə ehtiyacımız olacaq:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Sonra bu kitabxananın təsvir olunan metodlarının adlarını daxil edəcəyəm burada.

İstifadə edərək reyestri nifi nümunəsinə qoşun

nipyapi.versioning.create_registry_client

Bu addımda siz həmçinin qeyd dəftərinin artıq nümunəyə əlavə olunduğunu yoxlaya bilərsiniz; bunun üçün metoddan istifadə edə bilərsiniz.

nipyapi.versioning.list_registry_clients

Səbətdə axın üçün əlavə axtarış üçün vedrəni tapırıq

nipyapi.versioning.get_registry_bucket

Tapılan vedrədən istifadə edərək axını axtarırıq

nipyapi.versioning.get_flow_in_bucket

Sonra, bu proses qrupunun artıq əlavə edilib-edilmədiyini anlamaq vacibdir. Proses qrupu koordinatlara uyğun yerləşdirilir və birinin üzərinə ikinci komponent qoyulduqda vəziyyət yarana bilər. Yoxladım, bu baş verə bilər :) Bütün əlavə proses qruplarını əldə etmək üçün metoddan istifadə edirik

nipyapi.canvas.list_all_process_groups

Məsələn, adla daha çox axtarış edə bilərik.

Şablonun yenilənməsi prosesini təsvir etməyəcəyəm, yalnız onu deyim ki, şablonun yeni versiyasında prosessorlar əlavə olunarsa, o zaman növbələrdə mesajların olması ilə bağlı heç bir problem yoxdur. Ancaq prosessorlar çıxarılsa, problemlər yarana bilər (nifi qarşısında mesaj növbəsi yığılıbsa, prosessoru çıxarmağa imkan vermir). Bu problemi necə həll etdiyimlə maraqlanırsınızsa, mənə yazın, bu məsələni müzakirə edək. Məqalənin sonundakı əlaqə. Proses qrupunun əlavə edilməsi mərhələsinə keçək.

Skripti sazlayarkən, axının ən son versiyasının həmişə açılmaması ilə bağlı bir xüsusiyyətlə qarşılaşdım, ona görə də əvvəlcə bu versiyanı yoxlamağı məsləhət görürəm:

nipyapi.versioning.get_latest_flow_ver

Proses qrupunu yerləşdirin:

nipyapi.versioning.deploy_flow_version

Prosessorları işə salırıq:

nipyapi.canvas.schedule_process_group

CLI haqqında blokda məlumatların ötürülməsinin uzaq proses qrupunda avtomatik olaraq aktiv edilmədiyi yazılmışdı? Skripti həyata keçirərkən mən də bu problemlə qarşılaşdım. O zaman mən API-dən istifadə edərək məlumat ötürülməsinə başlaya bilmədim və NiPyAPI kitabxanasının tərtibatçısına yazıb məsləhət/kömək istəməyə qərar verdim. Tərtibatçı mənə cavab verdi, problemi müzakirə etdik və o, "nəyisə yoxlamaq" üçün vaxt lazım olduğunu yazdı. Və sonra, bir neçə gündən sonra, işə salınma problemimi həll edən Python-da bir funksiyanın yazıldığı bir məktub gəlir!!! O zaman NiPyAPI versiyası 0.13.3 idi və təbii ki, belə bir şey yox idi. Ancaq bu yaxınlarda buraxılmış 0.14.0 versiyasında bu funksiya artıq kitabxanaya daxil edilmişdir. Görüşmək,

nipyapi.canvas.set_remote_process_group_transmission

Beləliklə, NiPyAPI kitabxanasından istifadə edərək reyestri birləşdirdik, axını yaydıq və hətta prosessorlara və məlumat ötürülməsinə başladıq. Sonra kodu tara, hər cür çek əlavə edə, qeyd edə bilərsiniz və hamısı budur. Amma bu tamam başqa hekayədir.

Nəzərdən keçirdiyim avtomatlaşdırma variantlarından sonuncusu mənə ən səmərəli kimi göründü. Birincisi, bu hələ də köməkçi proqram kodunu daxil edə və proqramlaşdırma dilinin bütün üstünlüklərindən istifadə edə biləcəyiniz python kodudur. İkincisi, NiPyAPI layihəsi aktiv şəkildə inkişaf edir və problem yarandıqda tərtibatçıya yaza bilərsiniz. Üçüncüsü, NiPyAPI hələ də mürəkkəb problemlərin həllində NiFi ilə qarşılıqlı əlaqə üçün daha çevik bir vasitədir. Məsələn, mesaj növbələrinin indi axında boş olub-olmadığını və proses qrupunun yenilənə biləcəyini müəyyən etmək üçün.

Hamısı budur. NiFi-da axın çatdırılmasının avtomatlaşdırılması üçün 3 yanaşmanı, tərtibatçının qarşılaşa biləcəyi tələləri təsvir etdim və çatdırılmanın avtomatlaşdırılması üçün iş kodunu təqdim etdim. Bu mövzu ilə mənim qədər maraqlanırsınızsa - yaz!

Mənbə: www.habr.com

Добавить комментарий