Automazione dell'erogazione del flusso in Apache NiFi

Ciao a tutti!

Automazione dell'erogazione del flusso in Apache NiFi

Il compito è il seguente: c'è un flusso, presentato nell'immagine sopra, che deve essere implementato su N server con ApacheNiFi. Test di flusso: un file viene generato e inviato a un'altra istanza NiFi. Il trasferimento dei dati avviene utilizzando il protocollo NiFi Site to Site.

NiFi Site to Site (S2S) è un modo sicuro e facilmente configurabile per trasferire dati tra istanze NiFi. Come funziona S2S, vedi documentazione ed è importante non dimenticare di configurare l'istanza NiFi per consentire S2S, vedi qui.

Nei casi in cui si tratta di trasferimento dati tramite S2S, un'istanza si chiama client, la seconda server. Il client invia i dati, il server riceve. Due modi per configurare il trasferimento dei dati tra di loro:

  1. Spingi. Dall'istanza client, i dati vengono inviati utilizzando un Remote Process Group (RPG). Nell'istanza del server, i dati vengono ricevuti utilizzando la porta di input
  2. Tirare. Il server riceve i dati utilizzando RPG, il client li invia utilizzando la porta di output.


Il flusso per l'implementazione è archiviato nel registro Apache.

Apache NiFi Registry è un sottoprogetto di Apache NiFi che fornisce uno strumento per l'archiviazione del flusso e il controllo della versione. Una sorta di GIT. Le informazioni sull'installazione, la configurazione e l'utilizzo del registro sono reperibili in documentazione ufficiale. Il flusso per l'archiviazione viene combinato in un gruppo di processi e archiviato in questo modulo nel registro. Torneremo su questo più avanti nell'articolo.

All'inizio, quando N è un numero piccolo, il flusso viene erogato e aggiornato manualmente in un tempo accettabile.

Ma man mano che N cresce, i problemi diventano più numerosi:

  1. ci vuole più tempo per aggiornare il flusso. È necessario accedere a tutti i server
  2. Si verificano errori di aggiornamento del modello. Qui l'hanno aggiornato, ma qui si sono dimenticati
  3. errori umani durante l'esecuzione di un gran numero di operazioni simili

Tutto ciò ci porta al fatto che dobbiamo automatizzare il processo. Ho provato i seguenti modi per risolvere questo problema:

  1. Usa MiNiFi invece di NiFi
  2. NiFi CLI
  3. NiPyAPI

Utilizzando MiNiFi

Apache MiNiFy - sottoprogetto di Apache NiFi. MiNiFy è un agente compatto che utilizza gli stessi processori di NiFi, consentendoti di creare gli stessi flussi di NiFi. La leggerezza dell'agente è dovuta, tra l'altro, al fatto che MiNiFy non dispone di un'interfaccia grafica per la configurazione del flusso. La mancanza di un'interfaccia grafica in MiNiFy significa che è necessario risolvere il problema della fornitura del flusso a minifi. Poiché MiNiFy viene utilizzato attivamente nell'IOT, sono presenti molti componenti e il processo di distribuzione del flusso alle istanze minifi finali deve essere automatizzato. Un compito familiare, vero?

Un altro sottoprogetto aiuterà a risolvere questo problema: MiNiFi C2 Server. Questo prodotto è destinato ad essere il punto centrale nell'architettura di implementazione della configurazione. Come configurare l'ambiente - descritto in questo articolo Ci sono abbastanza informazioni su Habré per risolvere il problema. MiNiFi, insieme al server C2, aggiorna automaticamente la sua configurazione. L'unico inconveniente di questo approccio è che è necessario creare modelli su C2 Server; un semplice commit nel registro non è sufficiente.

L'opzione descritta nell'articolo sopra funziona e non è difficile da implementare, ma non dobbiamo dimenticare quanto segue:

  1. Minifi non ha tutti i processori di nifi
  2. Le versioni del processore Minifi sono in ritardo rispetto alle versioni del processore NiFi.

Al momento in cui scrivo, l'ultima versione di NiFi è la 1.9.2. L'ultima versione del processore MiNiFi è 1.7.0. È possibile aggiungere processori a MiNiFi, ma a causa delle discrepanze di versione tra i processori NiFi e MiNiFi, ciò potrebbe non funzionare.

NiFi CLI

A giudicare dalle descrizione tool sul sito ufficiale, si tratta di uno strumento per automatizzare l'interazione tra NiFI e NiFi Registry nel campo dell'erogazione del flusso o della gestione dei processi. Per iniziare, è necessario scaricare questo strumento. quindi.

Avvia l'utilità

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Per poter caricare il flusso richiesto dal registro, dobbiamo conoscere gli identificatori del bucket (bucket identifier) ​​e il flusso stesso (flow identifier). Questi dati possono essere ottenuti tramite la CLI o nell'interfaccia web del registro NiFi. Nell'interfaccia web appare così:

Automazione dell'erogazione del flusso in Apache NiFi

Utilizzando la CLI questo viene fatto:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Iniziamo a importare il gruppo di processi dal registro:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Un punto importante è che qualsiasi istanza nifi può essere specificata come host su cui eseguire il rolling del gruppo di processi.

Gruppo di processi aggiunto con processori arrestati, è necessario avviarli

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Ottimo, i processori sono iniziati. Tuttavia, secondo i termini dell'attività, abbiamo bisogno che le istanze NiFi inviino dati ad altre istanze. Supponiamo che tu abbia scelto il metodo Push per trasferire i dati al server. Per organizzare il trasferimento dei dati, è necessario abilitare il trasferimento dei dati sul Remote Process Group (RPG) aggiunto, che è già incluso nel nostro flusso.

Automazione dell'erogazione del flusso in Apache NiFi

Nella documentazione della CLI e in altre fonti, non ho trovato un modo per abilitare il trasferimento dei dati. Se sai come farlo, scrivi nei commenti.

Dato che abbiamo Bash e siamo pronti per arrivare alla fine, troveremo una via d'uscita! Puoi utilizzare l'API NiFi per risolvere questo problema. Usiamo il seguente metodo, prendiamo l'ID dagli esempi sopra (nel nostro caso è 7f522a13-016e-1000-e504-d5b15587f2f3). Descrizione dei metodi API NiFi qui.

Automazione dell'erogazione del flusso in Apache NiFi
Nel corpo devi passare JSON, in questo modo:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Parametri che devono essere compilati affinché funzioni:
stato — stato del trasferimento dei dati. Disponibile: IN TRASMISSIONE per abilitare il trasferimento dati, ARRESTATO per disabilitarlo
versione - versione del processore

La versione verrà impostata su 0 per impostazione predefinita al momento della creazione, ma questi parametri possono essere ottenuti utilizzando il metodo

Automazione dell'erogazione del flusso in Apache NiFi

Per gli appassionati degli script bash, questo metodo può sembrare adatto, ma per me è un po' difficile: gli script bash non sono i miei preferiti. Il metodo successivo è più interessante e conveniente secondo me.

NiPyAPI

NiPyAPI è una libreria Python per interagire con le istanze NiFi. Pagina della documentazione contiene le informazioni necessarie per lavorare con la libreria. L'avvio rapido è descritto in progetto su github.

Il nostro script per implementare la configurazione è un programma in Python. Passiamo alla codifica.
Impostiamo le configurazioni per ulteriori lavori. Avremo bisogno dei seguenti parametri:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Successivamente inserirò i nomi dei metodi di questa libreria, che sono descritti qui.

Connetti il ​​registro all'istanza nifi utilizzando

nipyapi.versioning.create_registry_client

In questo passaggio puoi anche aggiungere la verifica che il registro sia già stato aggiunto all'istanza; per questo puoi utilizzare il metodo

nipyapi.versioning.list_registry_clients

Troviamo il secchio per un'ulteriore ricerca del flusso nel cestino

nipyapi.versioning.get_registry_bucket

Usando il secchio trovato, cerchiamo il flusso

nipyapi.versioning.get_flow_in_bucket

Successivamente, è importante capire se questo gruppo di processi è già stato aggiunto. Il gruppo Processo è posizionato in base alle coordinate e può verificarsi una situazione in cui un secondo componente si sovrappone all'altro. Ho controllato, questo può succedere :) Per ottenere tutti i gruppi di processi aggiunti utilizziamo il metodo

nipyapi.canvas.list_all_process_groups

Possiamo effettuare ulteriori ricerche, ad esempio, per nome.

Non descriverò il processo di aggiornamento del modello, dirò solo che se i processori vengono aggiunti nella nuova versione del modello, non ci saranno problemi con la presenza di messaggi nelle code. Ma se i processori vengono rimossi, potrebbero sorgere problemi (nifi non consente di rimuovere un processore se davanti ad esso si è accumulata una coda di messaggi). Se sei interessato a come ho risolto questo problema, scrivimi e ne discuteremo. Contatti a fine articolo. Passiamo alla fase di aggiunta di un gruppo di processi.

Durante il debug dello script, mi sono imbattuto in una particolarità secondo cui l'ultima versione di flow non viene sempre richiamata, quindi consiglio di controllare prima questa versione:

nipyapi.versioning.get_latest_flow_ver

Gruppo di processi di distribuzione:

nipyapi.versioning.deploy_flow_version

Avviamo i processori:

nipyapi.canvas.schedule_process_group

Nel blocco relativo alla CLI è stato scritto che il trasferimento dei dati non è abilitato automaticamente nel gruppo di processi remoti? Durante l'implementazione dello script, ho riscontrato anch'io questo problema. In quel momento non ero in grado di avviare il trasferimento dati utilizzando l'API e ho deciso di scrivere allo sviluppatore della libreria NiPyAPI e chiedere consiglio/aiuto. Lo sviluppatore mi ha risposto, abbiamo discusso del problema e ha scritto che aveva bisogno di tempo per “controllare qualcosa”. E poi, un paio di giorni dopo, arriva una lettera in cui è scritta una funzione in Python che risolve il mio problema di lancio!!! A quel tempo, la versione NiPyAPI era la 0.13.3 e, ovviamente, non esisteva nulla del genere. Ma nella versione 0.14.0, rilasciata di recente, questa funzione era già inclusa nella libreria. Incontrare,

nipyapi.canvas.set_remote_process_group_transmission

Quindi, utilizzando la libreria NiPyAPI, abbiamo collegato il registro, implementato il flusso e persino avviato i processori e il trasferimento dei dati. Quindi puoi pettinare il codice, aggiungere tutti i tipi di controlli, registrazioni e questo è tutto. Ma questa è una storia completamente diversa.

Tra le opzioni di automazione che ho considerato, l'ultima mi è sembrata la più efficiente. Innanzitutto, questo è ancora codice Python, nel quale puoi incorporare il codice del programma ausiliario e sfruttare tutti i vantaggi del linguaggio di programmazione. In secondo luogo, il progetto NiPyAPI si sta sviluppando attivamente e in caso di problemi puoi scrivere allo sviluppatore. In terzo luogo, NiPyAPI è ancora uno strumento più flessibile per interagire con NiFi nella risoluzione di problemi complessi. Ad esempio, nel determinare se le code dei messaggi sono ora vuote nel flusso e se il gruppo di processi può essere aggiornato.

È tutto. Ho descritto 3 approcci per automatizzare la distribuzione del flusso in NiFi, le insidie ​​che uno sviluppatore potrebbe incontrare e ho fornito codice funzionante per automatizzare la distribuzione. Se sei interessato a questo argomento quanto me: scrivere!

Fonte: habr.com

Aggiungi un commento