Ciao a tutti!
Il compito è il seguente: c'è un flusso, presentato nell'immagine sopra, che deve essere implementato su N server con
NiFi Site to Site (S2S) è un modo sicuro e facilmente configurabile per trasferire dati tra istanze NiFi. Come funziona S2S, vedi
Nei casi in cui si tratta di trasferimento dati tramite S2S, un'istanza si chiama client, la seconda server. Il client invia i dati, il server riceve. Due modi per configurare il trasferimento dei dati tra di loro:
- Spingi. Dall'istanza client, i dati vengono inviati utilizzando un Remote Process Group (RPG). Nell'istanza del server, i dati vengono ricevuti utilizzando la porta di input
- Tirare. Il server riceve i dati utilizzando RPG, il client li invia utilizzando la porta di output.
Il flusso per l'implementazione è archiviato nel registro Apache.
Apache NiFi Registry è un sottoprogetto di Apache NiFi che fornisce uno strumento per l'archiviazione del flusso e il controllo della versione. Una sorta di GIT. Le informazioni sull'installazione, la configurazione e l'utilizzo del registro sono reperibili in
All'inizio, quando N è un numero piccolo, il flusso viene erogato e aggiornato manualmente in un tempo accettabile.
Ma man mano che N cresce, i problemi diventano più numerosi:
- ci vuole più tempo per aggiornare il flusso. È necessario accedere a tutti i server
- Si verificano errori di aggiornamento del modello. Qui l'hanno aggiornato, ma qui si sono dimenticati
- errori umani durante l'esecuzione di un gran numero di operazioni simili
Tutto ciò ci porta al fatto che dobbiamo automatizzare il processo. Ho provato i seguenti modi per risolvere questo problema:
- Usa MiNiFi invece di NiFi
- NiFi CLI
- NiPyAPI
Utilizzando MiNiFi
Un altro sottoprogetto aiuterà a risolvere questo problema: MiNiFi C2 Server. Questo prodotto è destinato ad essere il punto centrale nell'architettura di implementazione della configurazione. Come configurare l'ambiente - descritto in
L'opzione descritta nell'articolo sopra funziona e non è difficile da implementare, ma non dobbiamo dimenticare quanto segue:
- Minifi non ha tutti i processori di nifi
- Le versioni del processore Minifi sono in ritardo rispetto alle versioni del processore NiFi.
Al momento in cui scrivo, l'ultima versione di NiFi è la 1.9.2. L'ultima versione del processore MiNiFi è 1.7.0. È possibile aggiungere processori a MiNiFi, ma a causa delle discrepanze di versione tra i processori NiFi e MiNiFi, ciò potrebbe non funzionare.
NiFi CLI
A giudicare dalle
Avvia l'utilità
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
Per poter caricare il flusso richiesto dal registro, dobbiamo conoscere gli identificatori del bucket (bucket identifier) e il flusso stesso (flow identifier). Questi dati possono essere ottenuti tramite la CLI o nell'interfaccia web del registro NiFi. Nell'interfaccia web appare così:
Utilizzando la CLI questo viene fatto:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Iniziamo a importare il gruppo di processi dal registro:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
Un punto importante è che qualsiasi istanza nifi può essere specificata come host su cui eseguire il rolling del gruppo di processi.
Gruppo di processi aggiunto con processori arrestati, è necessario avviarli
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Ottimo, i processori sono iniziati. Tuttavia, secondo i termini dell'attività, abbiamo bisogno che le istanze NiFi inviino dati ad altre istanze. Supponiamo che tu abbia scelto il metodo Push per trasferire i dati al server. Per organizzare il trasferimento dei dati, è necessario abilitare il trasferimento dei dati sul Remote Process Group (RPG) aggiunto, che è già incluso nel nostro flusso.
Nella documentazione della CLI e in altre fonti, non ho trovato un modo per abilitare il trasferimento dei dati. Se sai come farlo, scrivi nei commenti.
Dato che abbiamo Bash e siamo pronti per arrivare alla fine, troveremo una via d'uscita! Puoi utilizzare l'API NiFi per risolvere questo problema. Usiamo il seguente metodo, prendiamo l'ID dagli esempi sopra (nel nostro caso è 7f522a13-016e-1000-e504-d5b15587f2f3). Descrizione dei metodi API NiFi
Nel corpo devi passare JSON, in questo modo:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Parametri che devono essere compilati affinché funzioni:
stato — stato del trasferimento dei dati. Disponibile: IN TRASMISSIONE per abilitare il trasferimento dati, ARRESTATO per disabilitarlo
versione - versione del processore
La versione verrà impostata su 0 per impostazione predefinita al momento della creazione, ma questi parametri possono essere ottenuti utilizzando il metodo
Per gli appassionati degli script bash, questo metodo può sembrare adatto, ma per me è un po' difficile: gli script bash non sono i miei preferiti. Il metodo successivo è più interessante e conveniente secondo me.
NiPyAPI
NiPyAPI è una libreria Python per interagire con le istanze NiFi.
Il nostro script per implementare la configurazione è un programma in Python. Passiamo alla codifica.
Impostiamo le configurazioni per ulteriori lavori. Avremo bisogno dei seguenti parametri:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
Successivamente inserirò i nomi dei metodi di questa libreria, che sono descritti
Connetti il registro all'istanza nifi utilizzando
nipyapi.versioning.create_registry_client
In questo passaggio puoi anche aggiungere la verifica che il registro sia già stato aggiunto all'istanza; per questo puoi utilizzare il metodo
nipyapi.versioning.list_registry_clients
Troviamo il secchio per un'ulteriore ricerca del flusso nel cestino
nipyapi.versioning.get_registry_bucket
Usando il secchio trovato, cerchiamo il flusso
nipyapi.versioning.get_flow_in_bucket
Successivamente, è importante capire se questo gruppo di processi è già stato aggiunto. Il gruppo Processo è posizionato in base alle coordinate e può verificarsi una situazione in cui un secondo componente si sovrappone all'altro. Ho controllato, questo può succedere :) Per ottenere tutti i gruppi di processi aggiunti utilizziamo il metodo
nipyapi.canvas.list_all_process_groups
Possiamo effettuare ulteriori ricerche, ad esempio, per nome.
Non descriverò il processo di aggiornamento del modello, dirò solo che se i processori vengono aggiunti nella nuova versione del modello, non ci saranno problemi con la presenza di messaggi nelle code. Ma se i processori vengono rimossi, potrebbero sorgere problemi (nifi non consente di rimuovere un processore se davanti ad esso si è accumulata una coda di messaggi). Se sei interessato a come ho risolto questo problema, scrivimi e ne discuteremo. Contatti a fine articolo. Passiamo alla fase di aggiunta di un gruppo di processi.
Durante il debug dello script, mi sono imbattuto in una particolarità secondo cui l'ultima versione di flow non viene sempre richiamata, quindi consiglio di controllare prima questa versione:
nipyapi.versioning.get_latest_flow_ver
Gruppo di processi di distribuzione:
nipyapi.versioning.deploy_flow_version
Avviamo i processori:
nipyapi.canvas.schedule_process_group
Nel blocco relativo alla CLI è stato scritto che il trasferimento dei dati non è abilitato automaticamente nel gruppo di processi remoti? Durante l'implementazione dello script, ho riscontrato anch'io questo problema. In quel momento non ero in grado di avviare il trasferimento dati utilizzando l'API e ho deciso di scrivere allo sviluppatore della libreria NiPyAPI e chiedere consiglio/aiuto. Lo sviluppatore mi ha risposto, abbiamo discusso del problema e ha scritto che aveva bisogno di tempo per “controllare qualcosa”. E poi, un paio di giorni dopo, arriva una lettera in cui è scritta una funzione in Python che risolve il mio problema di lancio!!! A quel tempo, la versione NiPyAPI era la 0.13.3 e, ovviamente, non esisteva nulla del genere. Ma nella versione 0.14.0, rilasciata di recente, questa funzione era già inclusa nella libreria. Incontrare,
nipyapi.canvas.set_remote_process_group_transmission
Quindi, utilizzando la libreria NiPyAPI, abbiamo collegato il registro, implementato il flusso e persino avviato i processori e il trasferimento dei dati. Quindi puoi pettinare il codice, aggiungere tutti i tipi di controlli, registrazioni e questo è tutto. Ma questa è una storia completamente diversa.
Tra le opzioni di automazione che ho considerato, l'ultima mi è sembrata la più efficiente. Innanzitutto, questo è ancora codice Python, nel quale puoi incorporare il codice del programma ausiliario e sfruttare tutti i vantaggi del linguaggio di programmazione. In secondo luogo, il progetto NiPyAPI si sta sviluppando attivamente e in caso di problemi puoi scrivere allo sviluppatore. In terzo luogo, NiPyAPI è ancora uno strumento più flessibile per interagire con NiFi nella risoluzione di problemi complessi. Ad esempio, nel determinare se le code dei messaggi sono ora vuote nel flusso e se il gruppo di processi può essere aggiornato.
È tutto. Ho descritto 3 approcci per automatizzare la distribuzione del flusso in NiFi, le insidie che uno sviluppatore potrebbe incontrare e ho fornito codice funzionante per automatizzare la distribuzione. Se sei interessato a questo argomento quanto me:
Fonte: habr.com