Automatisation de la livraison des flux dans Apache NiFi

Bonjour à tous!

Automatisation de la livraison des flux dans Apache NiFi

La tâche est la suivante : il existe un flux, présenté dans l'image ci-dessus, qui doit être déployé sur N serveurs avec Apache NiFi. Test de flux : un fichier est généré et envoyé à une autre instance NiFi. Le transfert de données s'effectue à l'aide du protocole NiFi Site to Site.

NiFi Site to Site (S2S) est un moyen sécurisé et facilement configurable de transférer des données entre des instances NiFi. Comment fonctionne S2S, voir documentation et il est important de ne pas oublier de configurer l'instance NiFi pour autoriser S2S, voir ici.

Dans les cas où nous parlons de transfert de données via S2S, une instance est appelée client, la deuxième serveur. Le client envoie des données, le serveur les reçoit. Deux façons de configurer le transfert de données entre eux :

  1. Push. À partir de l'instance client, les données sont envoyées à l'aide d'un groupe de processus distant (RPG). Sur l'instance de serveur, les données sont reçues à l'aide du port d'entrée
  2. Pull. Le serveur reçoit les données via RPG, le client les envoie via le port de sortie.


Le flux de déploiement est stocké dans le registre Apache.

Apache NiFi Registry est un sous-projet d'Apache NiFi qui fournit un outil de stockage de flux et de contrôle de version. Une sorte de GIT. Des informations sur l'installation, la configuration et l'utilisation du registre peuvent être trouvées dans documents officiels. Le flux de stockage est combiné dans un groupe de processus et stocké sous cette forme dans le registre. Nous y reviendrons plus tard dans l'article.

Au début, lorsque N est un petit nombre, le flux est délivré et mis à jour manuellement dans un délai acceptable.

Mais à mesure que N grandit, les problèmes deviennent plus nombreux :

  1. la mise à jour du flux prend plus de temps. Vous devez vous connecter à tous les serveurs
  2. Des erreurs de mise à jour du modèle se produisent. Ici, ils l'ont mis à jour, mais ici ils ont oublié
  3. erreurs humaines lors de l'exécution d'un grand nombre d'opérations similaires

Tout cela nous amène au fait que nous devons automatiser le processus. J'ai essayé les méthodes suivantes pour résoudre ce problème :

  1. Utilisez MiNiFi au lieu de NiFi
  2. Ni-Fi CLI
  3. NiPyAPI

Utiliser le MiNiFi

Apache MiNiFy - sous-projet d'Apache NiFi. MiNiFy est un agent compact qui utilise les mêmes processeurs que NiFi, permettant de créer les mêmes flux que dans NiFi. La légèreté de l'agent est obtenue, entre autres, par le fait que MiNiFy ne dispose pas d'interface graphique pour la configuration des flux. L'absence d'interface graphique dans MiNiFy signifie qu'il est nécessaire de résoudre le problème de la fourniture du flux à minifi. Étant donné que MiNiFy est activement utilisé dans l'IOT, il existe de nombreux composants et le processus de fourniture du flux aux instances minifi finales doit être automatisé. Une tâche familière, non ?

Un autre sous-projet aidera à résoudre ce problème : le serveur MiNiFi C2. Ce produit est destiné à être le point central de l'architecture de déploiement de la configuration. Comment configurer l'environnement - décrit dans cet article Il existe suffisamment d’informations sur Habré pour résoudre le problème. MiNiFi, en conjonction avec le serveur C2, met automatiquement à jour sa configuration. Le seul inconvénient de cette approche est que vous devez créer des modèles sur le serveur C2 ; un simple commit dans le registre ne suffit pas.

L'option décrite dans l'article ci-dessus fonctionne et n'est pas difficile à mettre en œuvre, mais il ne faut pas oublier ce qui suit :

  1. Minifi n'a pas tous les processeurs de nifi
  2. Les versions du processeur Minifi sont en retard sur les versions du processeur NiFi.

Au moment de la rédaction de cet article, la dernière version de NiFi est la 1.9.2. La dernière version du processeur MiNiFi est la 1.7.0. Des processeurs peuvent être ajoutés à MiNiFi, mais en raison des différences de version entre les processeurs NiFi et MiNiFi, cela peut ne pas fonctionner.

Ni-Fi CLI

A en juger par la description sur le site officiel, il s'agit d'un outil permettant d'automatiser l'interaction entre NiFI et NiFi Registry dans le domaine de la livraison de flux ou de la gestion de processus. Pour commencer, vous devez télécharger cet outil. par conséquent,.

Lancez l'utilitaire

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Afin que nous puissions charger le flux requis à partir du registre, nous devons connaître les identifiants du bucket (identifiant du bucket) ​​et du flux lui-même (identifiant du flux). Ces données peuvent être obtenues soit via le cli, soit dans l'interface Web du registre NiFi. Dans l'interface Web, cela ressemble à ceci :

Automatisation de la livraison des flux dans Apache NiFi

En utilisant la CLI, cela se fait :

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Nous commençons à importer le groupe de processus depuis le registre :

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Un point important est que n'importe quelle instance nifi peut être spécifiée comme hôte sur lequel nous déployons le groupe de processus.

Groupe de processus ajouté avec des processeurs arrêtés, ils doivent être démarrés

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Super, les processeurs ont démarré. Cependant, selon les termes de la tâche, nous avons besoin d'instances NiFi pour envoyer des données à d'autres instances. Supposons que vous ayez choisi la méthode Push pour transférer les données vers le serveur. Afin d'organiser le transfert de données, vous devez activer le transfert de données sur le groupe de processus distant (RPG) ajouté, qui est déjà inclus dans notre flux.

Automatisation de la livraison des flux dans Apache NiFi

Dans la documentation de la CLI et d'autres sources, je n'ai pas trouvé de moyen d'activer le transfert de données. Si vous savez comment faire cela, écrivez-nous dans les commentaires.

Puisque nous avons bash et que nous sommes prêts à aller jusqu'au bout, nous trouverons une issue ! Vous pouvez utiliser l'API NiFi pour résoudre ce problème. Utilisons la méthode suivante, prenons l'ID des exemples ci-dessus (dans notre cas, il s'agit de 7f522a13-016e-1000-e504-d5b15587f2f3). Description des méthodes de l'API NiFi ici.

Automatisation de la livraison des flux dans Apache NiFi
Dans le corps, vous devez passer du JSON, comme ceci :

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Paramètres à renseigner pour que cela fonctionne :
Etat — l'état du transfert de données. Disponible : TRANSMITTING pour activer le transfert de données, STOPPED pour désactiver
version - version du processeur

la version sera par défaut à 0 lors de sa création, mais ces paramètres peuvent être obtenus en utilisant la méthode

Automatisation de la livraison des flux dans Apache NiFi

Pour les fans de scripts bash, cette méthode peut sembler appropriée, mais c'est un peu difficile pour moi - les scripts bash ne sont pas mes préférés. La méthode suivante est à mon avis plus intéressante et plus pratique.

NiPyAPI

NiPyAPI est une bibliothèque Python permettant d'interagir avec les instances NiFi. Page de documentation contient les informations nécessaires pour travailler avec la bibliothèque. Le démarrage rapide est décrit dans projet sur github.

Notre script de déploiement de la configuration est un programme en Python. Passons au codage.
Nous mettons en place des configurations pour des travaux ultérieurs. Nous aurons besoin des paramètres suivants :

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Ensuite j'insérerai les noms des méthodes de cette bibliothèque, qui sont décrites ici.

Connectez le registre à l'instance nifi en utilisant

nipyapi.versioning.create_registry_client

A cette étape, vous pouvez également ajouter une vérification que le registre a déjà été ajouté à l'instance ; pour cela, vous pouvez utiliser la méthode

nipyapi.versioning.list_registry_clients

Nous trouvons le seau pour une recherche plus approfondie du flux dans le panier

nipyapi.versioning.get_registry_bucket

À l'aide du bucket trouvé, nous recherchons le flux

nipyapi.versioning.get_flow_in_bucket

Ensuite, il est important de comprendre si ce groupe de processus a déjà été ajouté. Le groupe Processus est placé en fonction des coordonnées et une situation peut survenir lorsqu'un deuxième composant est superposé à un autre. J'ai vérifié, cela peut arriver :) Pour obtenir tous les groupes de processus ajoutés, nous utilisons la méthode

nipyapi.canvas.list_all_process_groups

Nous pouvons effectuer une recherche plus approfondie, par exemple, par nom.

Je ne décrirai pas le processus de mise à jour du modèle, je dirai seulement que si des processeurs sont ajoutés dans la nouvelle version du modèle, il n'y aura aucun problème avec la présence de messages dans les files d'attente. Mais si les processeurs sont supprimés, des problèmes peuvent survenir (nifi ne permet pas de supprimer un processeur si une file d'attente de messages s'est accumulée devant lui). Si vous souhaitez savoir comment j'ai résolu ce problème, écrivez-moi et nous discuterons de ce problème. Contacts en fin d'article. Passons à l'étape d'ajout d'un groupe de processus.

Lors du débogage du script, je suis tombé sur une particularité selon laquelle la dernière version de flow n'est pas toujours extraite, je recommande donc de vérifier d'abord cette version :

nipyapi.versioning.get_latest_flow_ver

Groupe de processus de déploiement :

nipyapi.versioning.deploy_flow_version

On démarre les processeurs :

nipyapi.canvas.schedule_process_group

Dans le bloc sur la CLI, il a été écrit que le transfert de données n'est pas automatiquement activé dans le groupe de processus distant ? Lors de l'implémentation du script, j'ai également rencontré ce problème. À ce moment-là, je n'étais pas en mesure de démarrer le transfert de données à l'aide de l'API et j'ai décidé d'écrire au développeur de la bibliothèque NiPyAPI et de lui demander conseil/aide. Le développeur m'a répondu, nous avons discuté du problème et il a écrit qu'il avait besoin de temps pour « vérifier quelque chose ». Et puis, quelques jours plus tard, une lettre arrive dans laquelle une fonction est écrite en Python qui résout mon problème de lancement !!! À cette époque, la version de NiPyAPI était la 0.13.3 et, bien sûr, rien de tel. Mais dans la version 0.14.0, sortie assez récemment, cette fonction était déjà incluse dans la bibliothèque. Rencontrer,

nipyapi.canvas.set_remote_process_group_transmission

Ainsi, à l'aide de la bibliothèque NiPyAPI, nous avons connecté le registre, déployé le flux et même démarré les processeurs et le transfert de données. Ensuite, vous pouvez passer au peigne fin le code, ajouter toutes sortes de vérifications, journaliser, et c'est tout. Mais c'est une histoire complètement différente.

Parmi les options d'automatisation que j'ai envisagées, la dernière m'a semblé la plus efficace. Premièrement, il s'agit toujours de code Python, dans lequel vous pouvez intégrer du code de programme auxiliaire et profiter de tous les avantages du langage de programmation. Deuxièmement, le projet NiPyAPI se développe activement et en cas de problème, vous pouvez écrire au développeur. Troisièmement, NiPyAPI reste un outil plus flexible pour interagir avec NiFi afin de résoudre des problèmes complexes. Par exemple, pour déterminer si les files d'attente de messages sont désormais vides dans le flux et si le groupe de processus peut être mis à jour.

C'est tout. J'ai décrit 3 approches pour automatiser la livraison de flux dans NiFi, les pièges qu'un développeur peut rencontrer, et fourni un code fonctionnel pour automatiser la livraison. Si ce sujet vous intéresse autant que moi - oui !

Source: habr.com

Ajouter un commentaire