Witam wszystkich!
Zadanie jest następujące - istnieje przepływ pokazany na powyższym obrazku, który należy wdrożyć na N serwerów
NiFi Site to Site (S2S) to bezpieczny, wysoce konfigurowalny sposób przesyłania danych między instancjami NiFi. Zobacz jak działa S2S
Jeśli chodzi o przesyłanie danych za pomocą S2S, jedna instancja nazywana jest klientem, druga jest serwerem. Klient wysyła dane, serwer je odbiera. Dwa sposoby skonfigurowania przesyłania danych między nimi:
- Naciskać. Dane są wysyłane z instancji klienta za pomocą zdalnej grupy procesów (RPG). W instancji serwera dane są odbierane za pomocą portu wejściowego
- Ciągnąć. Serwer odbiera dane za pomocą RPG, klient wysyła za pomocą portu wyjściowego.
Przepływ walcowania jest przechowywany w rejestrze Apache.
Apache NiFi Registry to podprojekt Apache NiFi, który zapewnia narzędzie do przechowywania przepływu i wersjonowania. Coś w rodzaju GIT-a. Informacje na temat instalacji, konfiguracji i pracy z rejestrem można znaleźć w
Na początku, gdy N jest małą liczbą, przepływ jest dostarczany i aktualizowany ręcznie w rozsądnym czasie.
Ale wraz ze wzrostem N pojawia się więcej problemów:
- aktualizacja przepływu zajmuje więcej czasu. Musisz udać się na wszystkie serwery
- podczas aktualizacji szablonów występują błędy. Tutaj zaktualizowali, ale tutaj zapomnieli
- błąd ludzki podczas wykonywania dużej liczby podobnych operacji
Wszystko to prowadzi nas do faktu, że konieczna jest automatyzacja procesu. Próbowałem następujących sposobów rozwiązania tego problemu:
- Użyj MiNiFi zamiast NiFi
- Interfejs NiFi
- NiPyAPI
Korzystanie z MiNiFi
W rozwiązaniu tego problemu pomoże kolejny podprojekt, MiNiFi C2 Server. Produkt ten ma być centralnym punktem architektury wdrożeniowej. Jak skonfigurować środowisko - opisano w
Opcja opisana w powyższym artykule działa i nie jest trudna do wdrożenia, ale nie możemy zapominać o następujących kwestiach:
- minifi nie ma wszystkich procesorów firmy nifi
- Wersje procesorów w Minifi pozostają w tyle za wersjami procesorów w NiFi.
W chwili pisania tego tekstu najnowsza wersja NiFi to 1.9.2. Wersja procesora najnowszej wersji MiNiFi to 1.7.0. Procesory można dodawać do MiNiFi, ale ze względu na rozbieżności w wersjach między procesorami NiFi i MiNiFi może to nie działać.
Interfejs NiFi
Sądząc
Uruchom narzędzie
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
Abyśmy mogli załadować niezbędny przepływ z rejestru, musimy znać identyfikatory koszyka (identyfikator Bucket) i samego przepływu (identyfikator przepływu). Dane te można uzyskać za pośrednictwem interfejsu CLI lub interfejsu internetowego rejestru NiFi. Interfejs sieciowy wygląda następująco:
Za pomocą CLI odbywa się to:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Uruchom import grupy procesów z rejestru:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
Ważnym punktem jest to, że dowolną instancję nifi można określić jako host, na który przerzucimy grupę procesów.
Dodano grupę procesów z zatrzymanymi procesorami, należy je uruchomić
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Świetnie, procesory ruszyły. Jednak zgodnie z warunkami problemu potrzebujemy instancji NiFi, aby wysyłać dane do innych instancji. Załóżmy, że do przesłania danych na serwer wybrano metodę Push. Aby zorganizować przesyłanie danych, konieczne jest włączenie przesyłania danych (Włącz przesyłanie) na dodanej Grupie Zdalnych Procesów (RPG), która jest już uwzględniona w naszym przepływie.
W dokumentacji w CLI i innych źródłach nie znalazłem sposobu na umożliwienie przesyłania danych. Jeśli wiecie jak to zrobić, napiszcie w komentarzach.
Skoro mamy basha i jesteśmy gotowi do końca, znajdziemy wyjście! Aby rozwiązać ten problem, możesz użyć interfejsu API NiFi. Zastosujmy następującą metodę, bierzemy ID z powyższych przykładów (w naszym przypadku jest to 7f522a13-016e-1000-e504-d5b15587f2f3). Opis metod API NiFi
W treści musisz przekazać JSON w następującej formie:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Parametry jakie należy wypełnić aby „działało”:
były — status przesyłania danych. Dostępne TRANSMITING, aby umożliwić przesyłanie danych, ZATRZYMANE, aby wyłączyć
wersja - wersja procesora
wersja po utworzeniu domyślnie przyjmie wartość 0, ale parametry te można uzyskać za pomocą metody
Dla miłośników skryptów basha ta metoda może wydawać się odpowiednia, ale dla mnie jest trudna - skrypty bashowe nie są moimi ulubionymi. Następny sposób jest moim zdaniem ciekawszy i wygodniejszy.
NiPyAPI
NiPyAPI to biblioteka Pythona do interakcji z instancjami NiFi.
Nasz skrypt do wdrożenia konfiguracji to program w języku Python. Przejdźmy do kodowania.
Skonfiguruj konfiguracje do dalszej pracy. Będziemy potrzebować następujących parametrów:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
Dalej wstawię nazwy metod tej biblioteki, które są opisane
Łączymy rejestr z instancją nifi za pomocą
nipyapi.versioning.create_registry_client
Na tym etapie możesz także dodać sprawdzenie czy rejestr został już dodany do instancji, możesz w tym celu skorzystać z metody
nipyapi.versioning.list_registry_clients
Znajdujemy wiadro, aby dalej szukać przepływu w koszu
nipyapi.versioning.get_registry_bucket
Według znalezionego wiadra szukamy przepływu
nipyapi.versioning.get_flow_in_bucket
Następnie ważne jest, aby zrozumieć, czy ta grupa procesów została już dodana. Grupa procesów jest umieszczana według współrzędnych i może wystąpić sytuacja, gdy druga zostanie nałożona na jedną. Sprawdziłem, może być 🙂 Aby uzyskać całą dodaną grupę procesów, użyj metody
nipyapi.canvas.list_all_process_groups
i wtedy możemy wyszukiwać np. po nazwie.
Nie będę opisywał procesu aktualizacji szablonu, powiem tylko, że jeśli w nowej wersji szablonu zostaną dodane procesory, to nie ma problemów z obecnością wiadomości w kolejkach. Ale jeśli procesory zostaną usunięte, mogą pojawić się problemy (nifi nie pozwala na usunięcie procesora, jeśli przed nim zgromadziła się kolejka komunikatów). Jeśli ciekawi Cię jak rozwiązałem ten problem - napisz do mnie, omówimy ten punkt. Kontakty na końcu artykułu. Przejdźmy do kroku dodawania grupy procesów.
Podczas debugowania skryptu natknąłem się na funkcję polegającą na tym, że nie zawsze pobierana jest najnowsza wersja flow, dlatego polecam najpierw wyjaśnić tę wersję:
nipyapi.versioning.get_latest_flow_ver
Wdróż grupę procesów:
nipyapi.versioning.deploy_flow_version
Uruchamiamy procesory:
nipyapi.canvas.schedule_process_group
W bloku o CLI napisano, że transmisja danych nie jest automatycznie włączana w zdalnej grupie procesów? Implementując skrypt, również napotkałem ten problem. Nie mogłem wówczas rozpocząć przesyłania danych za pomocą API i postanowiłem napisać do dewelopera biblioteki NiPyAPI z prośbą o poradę/pomoc. Deweloper mi odpowiedział, omówiliśmy problem i napisał, że potrzebuje czasu, żeby „coś sprawdzić”. A teraz, kilka dni później, przychodzi e-mail, w którym napisana jest funkcja Pythona, która rozwiązuje mój problem z uruchamianiem !!! Wersja NiPyAPI miała wówczas numer 0.13.3 i oczywiście nie było w niej nic takiego. Jednak w wersji 0.14.0, która została wydana całkiem niedawno, funkcja ta została już uwzględniona w bibliotece. Poznać
nipyapi.canvas.set_remote_process_group_transmission
Tak więc za pomocą biblioteki NiPyAPI podłączyliśmy rejestr, zwinęliśmy przepływ, a nawet uruchomiliśmy procesory i transfer danych. Potem można przeczesać kod, dodać wszelkiego rodzaju sprawdzenia, logowanie i tyle. Ale to zupełnie inna historia.
Spośród opcji automatyzacji, które rozważałem, ta ostatnia wydała mi się najbardziej wydajna. Po pierwsze, jest to nadal kod Pythona, w którym można osadzić kod programu pomocniczego i cieszyć się wszystkimi zaletami języka programowania. Po drugie, projekt NiPyAPI aktywnie się rozwija i w razie problemów możesz napisać do dewelopera. Po trzecie, NiPyAPI jest w dalszym ciągu bardziej elastycznym narzędziem do interakcji z NiFi w rozwiązywaniu złożonych problemów. Na przykład przy ustalaniu, czy kolejki komunikatów są aktualnie puste w przepływie i czy możliwa jest aktualizacja grupy procesów.
To wszystko. Opisałem 3 podejścia do automatyzacji dostarczania przepływu w NiFi, pułapki, na które może napotkać programista, oraz udostępniłem działający kod do automatyzacji dostarczania. Jeżeli interesuje Cię ten temat tak samo jak mnie -
Źródło: www.habr.com