Hello!
Tapşırıq aşağıdakı kimidir - yuxarıdakı şəkildə təqdim olunan bir axın var, onu N serverlərə yaymaq lazımdır.
NiFi Saytdan Sayta (S2S) NiFi nümunələri arasında məlumat ötürmək üçün təhlükəsiz, asanlıqla konfiqurasiya edilə bilən bir yoldur. S2S necə işləyir, baxın
S2S istifadə edərək məlumat ötürülməsindən danışdığımız hallarda, bir nümunə müştəri, ikinci server adlanır. Müştəri məlumatları göndərir, server qəbul edir. Onlar arasında məlumat ötürülməsini konfiqurasiya etməyin iki yolu:
- Basmaq. Müştəri nümunəsindən məlumat Uzaqdan Proses Qrupundan (RPG) istifadə edərək göndərilir. Server nümunəsində məlumat Giriş Portundan istifadə etməklə qəbul edilir
- Çəkmək. Server RPG-dən istifadə edərək məlumatları qəbul edir, müştəri çıxış portundan istifadə edərək göndərir.
Yayılma üçün axın Apache Registry-də saxlanılır.
Apache NiFi Registry, axın saxlanması və versiyaya nəzarət üçün alət təmin edən Apache NiFi alt layihəsidir. Bir növ GIT. Quraşdırma, konfiqurasiya və reyestrlə işləmək haqqında məlumatı burada tapa bilərsiniz
Başlanğıcda, N kiçik rəqəm olduqda, axın məqbul vaxt ərzində əl ilə çatdırılır və yenilənir.
Lakin N böyüdükcə problemlər daha da çoxalır:
- axını yeniləmək üçün daha çox vaxt tələb olunur. Bütün serverlərə daxil olmalısınız
- Şablonun yenilənməsi xətaları baş verir. Burada onu yenilədilər, amma burada unutdular
- çox sayda oxşar əməliyyatları yerinə yetirərkən insan səhvləri
Bütün bunlar bizi prosesi avtomatlaşdırmağa ehtiyac duyduğumuza gətirir. Bu problemi həll etmək üçün aşağıdakı yolları sınadım:
- NiFi əvəzinə MiNiFi istifadə edin
- NiFi CLI
- NiPyAPI
MiNiFi istifadə
Başqa bir alt layihə bu problemi həll etməyə kömək edəcək - MiNiFi C2 Server. Bu məhsul konfiqurasiyanın təqdimat arxitekturasında mərkəzi nöqtə olmaq üçün nəzərdə tutulub. Ətraf mühiti necə konfiqurasiya etmək olar - bölmədə təsvir edilmişdir
Yuxarıdakı məqalədə təsvir olunan seçim işləyir və həyata keçirmək çətin deyil, lakin aşağıdakıları unutmamalıyıq:
- Minifi-də nifi-dən bütün prosessorlar yoxdur
- Minifi prosessor versiyaları NiFi prosessor versiyalarından geri qalır.
Yazı zamanı NiFi-nin ən son versiyası 1.9.2-dir. MiNiFi prosessorunun ən son versiyası 1.7.0-dır. Prosessorları MiNiFi-a əlavə etmək olar, lakin NiFi və MiNiFi prosessorları arasında versiya uyğunsuzluğu səbəbindən bu işləməyə bilər.
NiFi CLI
Hörmətlə
Utiliti işə salın
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
Reyestrdən tələb olunan axını yükləməyimiz üçün vedrənin identifikatorlarını (kovanın identifikatoru) və axının özünün (axın identifikatoru) bilməliyik. Bu məlumatları cli vasitəsilə və ya NiFi reyestrinin veb interfeysində əldə etmək olar. Veb interfeysində bu belə görünür:
CLI istifadə edərək, bu edilir:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Proses qrupunu registrdən idxal etməyə başlayırıq:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
Əhəmiyyətli bir məqam ondan ibarətdir ki, hər hansı bir nifi nümunəsi proses qrupunu daşıdığımız host kimi göstərilə bilər.
Proses qrupu dayandırılmış prosessorlarla əlavə edildi, onları işə salmaq lazımdır
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Əla, prosessorlar işə salınıb. Bununla belə, tapşırığın şərtlərinə görə, məlumatları digər instansiyalara göndərmək üçün bizə NiFi nümunələri lazımdır. Tutaq ki, verilənləri serverə ötürmək üçün Push metodunu seçmisiniz. Məlumat ötürülməsini təşkil etmək üçün siz artıq axınımıza daxil edilmiş əlavə Uzaqdan Proses Qrupunda (RPG) məlumat ötürülməsini aktivləşdirməlisiniz.
CLI və digər mənbələrdəki sənədlərdə məlumat ötürülməsini aktivləşdirmək üçün bir yol tapmadım. Bunu necə edəcəyinizi bilirsinizsə, şərhlərdə yazın.
Bashımız olduğundan və sona qədər getməyə hazır olduğumuz üçün çıxış yolu tapacağıq! Bu problemi həll etmək üçün NiFi API istifadə edə bilərsiniz. Aşağıdakı üsuldan istifadə edək, yuxarıdakı nümunələrdən ID götürək (bizim vəziyyətimizdə 7f522a13-016e-1000-e504-d5b15587f2f3-dir). NiFi API metodlarının təsviri
Bədəndə JSON-u belə keçirməlisiniz:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Onun işləməsi üçün doldurulmalı olan parametrlər:
idi — məlumat ötürülməsi vəziyyəti. Mövcuddur: Məlumat ötürülməsini aktivləşdirmək üçün VERİLİR, deaktiv etmək üçün DAYANDIRILDI
variant - prosessor versiyası
versiya yaradılarkən standart olaraq 0 olacaq, lakin bu parametrlər metoddan istifadə etməklə əldə edilə bilər
Bash skriptlərinin pərəstişkarları üçün bu üsul uyğun görünə bilər, amma mənim üçün bir az çətindir - bash skriptləri mənim sevimli deyil. Növbəti üsul mənim fikrimcə daha maraqlı və rahatdır.
NiPyAPI
NiPyAPI NiFi nümunələri ilə qarşılıqlı əlaqə üçün Python kitabxanasıdır.
Konfiqurasiyanı yaymaq üçün skriptimiz Python-da bir proqramdır. Gəlin kodlaşdırmaya keçək.
Sonrakı iş üçün konfiqurasiyaları quraşdırdıq. Aşağıdakı parametrlərə ehtiyacımız olacaq:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
Sonra bu kitabxananın təsvir olunan metodlarının adlarını daxil edəcəyəm
İstifadə edərək reyestri nifi nümunəsinə qoşun
nipyapi.versioning.create_registry_client
Bu addımda siz həmçinin qeyd dəftərinin artıq nümunəyə əlavə olunduğunu yoxlaya bilərsiniz; bunun üçün metoddan istifadə edə bilərsiniz.
nipyapi.versioning.list_registry_clients
Səbətdə axın üçün əlavə axtarış üçün vedrəni tapırıq
nipyapi.versioning.get_registry_bucket
Tapılan vedrədən istifadə edərək axını axtarırıq
nipyapi.versioning.get_flow_in_bucket
Sonra, bu proses qrupunun artıq əlavə edilib-edilmədiyini anlamaq vacibdir. Proses qrupu koordinatlara uyğun yerləşdirilir və birinin üzərinə ikinci komponent qoyulduqda vəziyyət yarana bilər. Yoxladım, bu baş verə bilər :) Bütün əlavə proses qruplarını əldə etmək üçün metoddan istifadə edirik
nipyapi.canvas.list_all_process_groups
Məsələn, adla daha çox axtarış edə bilərik.
Şablonun yenilənməsi prosesini təsvir etməyəcəyəm, yalnız onu deyim ki, şablonun yeni versiyasında prosessorlar əlavə olunarsa, o zaman növbələrdə mesajların olması ilə bağlı heç bir problem yoxdur. Ancaq prosessorlar çıxarılsa, problemlər yarana bilər (nifi qarşısında mesaj növbəsi yığılıbsa, prosessoru çıxarmağa imkan vermir). Bu problemi necə həll etdiyimlə maraqlanırsınızsa, mənə yazın, bu məsələni müzakirə edək. Məqalənin sonundakı əlaqə. Proses qrupunun əlavə edilməsi mərhələsinə keçək.
Skripti sazlayarkən, axının ən son versiyasının həmişə açılmaması ilə bağlı bir xüsusiyyətlə qarşılaşdım, ona görə də əvvəlcə bu versiyanı yoxlamağı məsləhət görürəm:
nipyapi.versioning.get_latest_flow_ver
Proses qrupunu yerləşdirin:
nipyapi.versioning.deploy_flow_version
Prosessorları işə salırıq:
nipyapi.canvas.schedule_process_group
CLI haqqında blokda məlumatların ötürülməsinin uzaq proses qrupunda avtomatik olaraq aktiv edilmədiyi yazılmışdı? Skripti həyata keçirərkən mən də bu problemlə qarşılaşdım. O zaman mən API-dən istifadə edərək məlumat ötürülməsinə başlaya bilmədim və NiPyAPI kitabxanasının tərtibatçısına yazıb məsləhət/kömək istəməyə qərar verdim. Tərtibatçı mənə cavab verdi, problemi müzakirə etdik və o, "nəyisə yoxlamaq" üçün vaxt lazım olduğunu yazdı. Və sonra, bir neçə gündən sonra, işə salınma problemimi həll edən Python-da bir funksiyanın yazıldığı bir məktub gəlir!!! O zaman NiPyAPI versiyası 0.13.3 idi və təbii ki, belə bir şey yox idi. Ancaq bu yaxınlarda buraxılmış 0.14.0 versiyasında bu funksiya artıq kitabxanaya daxil edilmişdir. Görüşmək,
nipyapi.canvas.set_remote_process_group_transmission
Beləliklə, NiPyAPI kitabxanasından istifadə edərək reyestri birləşdirdik, axını yaydıq və hətta prosessorlara və məlumat ötürülməsinə başladıq. Sonra kodu tara, hər cür çek əlavə edə, qeyd edə bilərsiniz və hamısı budur. Amma bu tamam başqa hekayədir.
Nəzərdən keçirdiyim avtomatlaşdırma variantlarından sonuncusu mənə ən səmərəli kimi göründü. Birincisi, bu hələ də köməkçi proqram kodunu daxil edə və proqramlaşdırma dilinin bütün üstünlüklərindən istifadə edə biləcəyiniz python kodudur. İkincisi, NiPyAPI layihəsi aktiv şəkildə inkişaf edir və problem yarandıqda tərtibatçıya yaza bilərsiniz. Üçüncüsü, NiPyAPI hələ də mürəkkəb problemlərin həllində NiFi ilə qarşılıqlı əlaqə üçün daha çevik bir vasitədir. Məsələn, mesaj növbələrinin indi axında boş olub-olmadığını və proses qrupunun yenilənə biləcəyini müəyyən etmək üçün.
Hamısı budur. NiFi-da axın çatdırılmasının avtomatlaşdırılması üçün 3 yanaşmanı, tərtibatçının qarşılaşa biləcəyi tələləri təsvir etdim və çatdırılmanın avtomatlaşdırılması üçün iş kodunu təqdim etdim. Bu mövzu ilə mənim qədər maraqlanırsınızsa -
Mənbə: www.habr.com