Automatyzacja dostarczania przepływu w Apache NiFi

Witam wszystkich!

Automatyzacja dostarczania przepływu w Apache NiFi

Zadanie jest następujące - istnieje przepływ pokazany na powyższym obrazku, który należy wdrożyć na N serwerów Apache NiFi. Test przepływu - plik jest generowany i wysyłany do innej instancji NiFi. Przesyłanie danych odbywa się przy użyciu protokołu NiFi Site to Site.

NiFi Site to Site (S2S) to bezpieczny, wysoce konfigurowalny sposób przesyłania danych między instancjami NiFi. Zobacz jak działa S2S dokumentacja i ważne jest, aby pamiętać o skonfigurowaniu instancji NiFi, aby umożliwić S2S widzenie tutaj.

Jeśli chodzi o przesyłanie danych za pomocą S2S, jedna instancja nazywana jest klientem, druga jest serwerem. Klient wysyła dane, serwer je odbiera. Dwa sposoby skonfigurowania przesyłania danych między nimi:

  1. Naciskać. Dane są wysyłane z instancji klienta za pomocą zdalnej grupy procesów (RPG). W instancji serwera dane są odbierane za pomocą portu wejściowego
  2. Ciągnąć. Serwer odbiera dane za pomocą RPG, klient wysyła za pomocą portu wyjściowego.


Przepływ walcowania jest przechowywany w rejestrze Apache.

Apache NiFi Registry to podprojekt Apache NiFi, który zapewnia narzędzie do przechowywania przepływu i wersjonowania. Coś w rodzaju GIT-a. Informacje na temat instalacji, konfiguracji i pracy z rejestrem można znaleźć w oficjalna dokumentacja. Przepływ do przechowywania jest łączony w grupę procesów i w tej formie przechowywany w rejestrze. Wrócimy do tego w dalszej części artykułu.

Na początku, gdy N jest małą liczbą, przepływ jest dostarczany i aktualizowany ręcznie w rozsądnym czasie.

Ale wraz ze wzrostem N pojawia się więcej problemów:

  1. aktualizacja przepływu zajmuje więcej czasu. Musisz udać się na wszystkie serwery
  2. podczas aktualizacji szablonów występują błędy. Tutaj zaktualizowali, ale tutaj zapomnieli
  3. błąd ludzki podczas wykonywania dużej liczby podobnych operacji

Wszystko to prowadzi nas do faktu, że konieczna jest automatyzacja procesu. Próbowałem następujących sposobów rozwiązania tego problemu:

  1. Użyj MiNiFi zamiast NiFi
  2. Interfejs NiFi
  3. NiPyAPI

Korzystanie z MiNiFi

ApacheMiNify jest podprojektem Apache NiFi. MiNiFy to kompaktowy agent wykorzystujący te same procesory co NiFi, pozwalający na stworzenie takiego samego przepływu jak w NiFi. Lekkość agenta została osiągnięta między innymi dzięki temu, że MiNiFy nie posiada interfejsu graficznego umożliwiającego konfigurację przepływu. Brak interfejsu graficznego MiNiFy powoduje, że konieczne jest rozwiązanie problemu dostarczania przepływu w minifi. Ponieważ MiNiFy jest aktywnie wykorzystywane w IOT, istnieje wiele komponentów, a proces dostarczania przepływu do końcowych instancji minifi musi być zautomatyzowany. Znane zadanie, prawda?

W rozwiązaniu tego problemu pomoże kolejny podprojekt, MiNiFi C2 Server. Produkt ten ma być centralnym punktem architektury wdrożeniowej. Jak skonfigurować środowisko - opisano w ten artykuł w sprawie Habré i informacje te wystarczą do rozwiązania problemu. MiNiFi w połączeniu z serwerem C2 automatycznie aktualizuje swoją konfigurację. Jedyną wadą tego podejścia jest to, że trzeba tworzyć szablony na serwerze C2, proste zatwierdzenie rejestru nie wystarczy.

Opcja opisana w powyższym artykule działa i nie jest trudna do wdrożenia, ale nie możemy zapominać o następujących kwestiach:

  1. minifi nie ma wszystkich procesorów firmy nifi
  2. Wersje procesorów w Minifi pozostają w tyle za wersjami procesorów w NiFi.

W chwili pisania tego tekstu najnowsza wersja NiFi to 1.9.2. Wersja procesora najnowszej wersji MiNiFi to 1.7.0. Procesory można dodawać do MiNiFi, ale ze względu na rozbieżności w wersjach między procesorami NiFi i MiNiFi może to nie działać.

Interfejs NiFi

Sądząc opis narzędzie na oficjalnej stronie internetowej, jest to narzędzie do automatyzacji interakcji pomiędzy NiFI a Rejestrem NiFi w zakresie dostarczania przepływów lub zarządzania procesami. Pobierz to narzędzie, aby rozpocząć. stąd.

Uruchom narzędzie

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Abyśmy mogli załadować niezbędny przepływ z rejestru, musimy znać identyfikatory koszyka (identyfikator Bucket) i samego przepływu (identyfikator przepływu). Dane te można uzyskać za pośrednictwem interfejsu CLI lub interfejsu internetowego rejestru NiFi. Interfejs sieciowy wygląda następująco:

Automatyzacja dostarczania przepływu w Apache NiFi

Za pomocą CLI odbywa się to:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Uruchom import grupy procesów z rejestru:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Ważnym punktem jest to, że dowolną instancję nifi można określić jako host, na który przerzucimy grupę procesów.

Dodano grupę procesów z zatrzymanymi procesorami, należy je uruchomić

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Świetnie, procesory ruszyły. Jednak zgodnie z warunkami problemu potrzebujemy instancji NiFi, aby wysyłać dane do innych instancji. Załóżmy, że do przesłania danych na serwer wybrano metodę Push. Aby zorganizować przesyłanie danych, konieczne jest włączenie przesyłania danych (Włącz przesyłanie) na dodanej Grupie Zdalnych Procesów (RPG), która jest już uwzględniona w naszym przepływie.

Automatyzacja dostarczania przepływu w Apache NiFi

W dokumentacji w CLI i innych źródłach nie znalazłem sposobu na umożliwienie przesyłania danych. Jeśli wiecie jak to zrobić, napiszcie w komentarzach.

Skoro mamy basha i jesteśmy gotowi do końca, znajdziemy wyjście! Aby rozwiązać ten problem, możesz użyć interfejsu API NiFi. Zastosujmy następującą metodę, bierzemy ID z powyższych przykładów (w naszym przypadku jest to 7f522a13-016e-1000-e504-d5b15587f2f3). Opis metod API NiFi tutaj.

Automatyzacja dostarczania przepływu w Apache NiFi
W treści musisz przekazać JSON w następującej formie:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Parametry jakie należy wypełnić aby „działało”:
były — status przesyłania danych. Dostępne TRANSMITING, aby umożliwić przesyłanie danych, ZATRZYMANE, aby wyłączyć
wersja - wersja procesora

wersja po utworzeniu domyślnie przyjmie wartość 0, ale parametry te można uzyskać za pomocą metody

Automatyzacja dostarczania przepływu w Apache NiFi

Dla miłośników skryptów basha ta metoda może wydawać się odpowiednia, ale dla mnie jest trudna - skrypty bashowe nie są moimi ulubionymi. Następny sposób jest moim zdaniem ciekawszy i wygodniejszy.

NiPyAPI

NiPyAPI to biblioteka Pythona do interakcji z instancjami NiFi. Strona z dokumentacją zawiera informacje niezbędne do współpracy z biblioteką. Szybki start opisano w projekt na githubie.

Nasz skrypt do wdrożenia konfiguracji to program w języku Python. Przejdźmy do kodowania.
Skonfiguruj konfiguracje do dalszej pracy. Będziemy potrzebować następujących parametrów:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Dalej wstawię nazwy metod tej biblioteki, które są opisane tutaj.

Łączymy rejestr z instancją nifi za pomocą

nipyapi.versioning.create_registry_client

Na tym etapie możesz także dodać sprawdzenie czy rejestr został już dodany do instancji, możesz w tym celu skorzystać z metody

nipyapi.versioning.list_registry_clients

Znajdujemy wiadro, aby dalej szukać przepływu w koszu

nipyapi.versioning.get_registry_bucket

Według znalezionego wiadra szukamy przepływu

nipyapi.versioning.get_flow_in_bucket

Następnie ważne jest, aby zrozumieć, czy ta grupa procesów została już dodana. Grupa procesów jest umieszczana według współrzędnych i może wystąpić sytuacja, gdy druga zostanie nałożona na jedną. Sprawdziłem, może być 🙂 Aby uzyskać całą dodaną grupę procesów, użyj metody

nipyapi.canvas.list_all_process_groups

i wtedy możemy wyszukiwać np. po nazwie.

Nie będę opisywał procesu aktualizacji szablonu, powiem tylko, że jeśli w nowej wersji szablonu zostaną dodane procesory, to nie ma problemów z obecnością wiadomości w kolejkach. Ale jeśli procesory zostaną usunięte, mogą pojawić się problemy (nifi nie pozwala na usunięcie procesora, jeśli przed nim zgromadziła się kolejka komunikatów). Jeśli ciekawi Cię jak rozwiązałem ten problem - napisz do mnie, omówimy ten punkt. Kontakty na końcu artykułu. Przejdźmy do kroku dodawania grupy procesów.

Podczas debugowania skryptu natknąłem się na funkcję polegającą na tym, że nie zawsze pobierana jest najnowsza wersja flow, dlatego polecam najpierw wyjaśnić tę wersję:

nipyapi.versioning.get_latest_flow_ver

Wdróż grupę procesów:

nipyapi.versioning.deploy_flow_version

Uruchamiamy procesory:

nipyapi.canvas.schedule_process_group

W bloku o CLI napisano, że transmisja danych nie jest automatycznie włączana w zdalnej grupie procesów? Implementując skrypt, również napotkałem ten problem. Nie mogłem wówczas rozpocząć przesyłania danych za pomocą API i postanowiłem napisać do dewelopera biblioteki NiPyAPI z prośbą o poradę/pomoc. Deweloper mi odpowiedział, omówiliśmy problem i napisał, że potrzebuje czasu, żeby „coś sprawdzić”. A teraz, kilka dni później, przychodzi e-mail, w którym napisana jest funkcja Pythona, która rozwiązuje mój problem z uruchamianiem !!! Wersja NiPyAPI miała wówczas numer 0.13.3 i oczywiście nie było w niej nic takiego. Jednak w wersji 0.14.0, która została wydana całkiem niedawno, funkcja ta została już uwzględniona w bibliotece. Poznać

nipyapi.canvas.set_remote_process_group_transmission

Tak więc za pomocą biblioteki NiPyAPI podłączyliśmy rejestr, zwinęliśmy przepływ, a nawet uruchomiliśmy procesory i transfer danych. Potem można przeczesać kod, dodać wszelkiego rodzaju sprawdzenia, logowanie i tyle. Ale to zupełnie inna historia.

Spośród opcji automatyzacji, które rozważałem, ta ostatnia wydała mi się najbardziej wydajna. Po pierwsze, jest to nadal kod Pythona, w którym można osadzić kod programu pomocniczego i cieszyć się wszystkimi zaletami języka programowania. Po drugie, projekt NiPyAPI aktywnie się rozwija i w razie problemów możesz napisać do dewelopera. Po trzecie, NiPyAPI jest w dalszym ciągu bardziej elastycznym narzędziem do interakcji z NiFi w rozwiązywaniu złożonych problemów. Na przykład przy ustalaniu, czy kolejki komunikatów są aktualnie puste w przepływie i czy możliwa jest aktualizacja grupy procesów.

To wszystko. Opisałem 3 podejścia do automatyzacji dostarczania przepływu w NiFi, pułapki, na które może napotkać programista, oraz udostępniłem działający kod do automatyzacji dostarczania. Jeżeli interesuje Cię ten temat tak samo jak mnie - pisać!

Źródło: www.habr.com

Dodaj komentarz