introduction
Il se trouve que sur mon lieu de travail actuel, j'ai dû me familiariser avec cette technologie. Je vais commencer par un petit historique. Lors de la réunion suivante, notre équipe a été informée que nous devions créer une intégration avec système connu. Par intégration, cela signifiait que ce système bien connu nous enverrait des requêtes via HTTP à un point de terminaison spécifique et que, curieusement, nous renverrions des réponses sous la forme d'un message SOAP. Tout semble simple et trivial. Il s'ensuit que vous avez besoin...
Tâche
Créez 3 services. Le premier d'entre eux est le service de mise à jour de base de données. Ce service, lorsque de nouvelles données arrivent d'un système tiers, met à jour les données de la base de données et génère un fichier au format CSV pour le transférer vers le système suivant. Le point final du deuxième service est appelé - le service de transport FTP, qui reçoit le fichier transféré, le valide et le place dans le stockage de fichiers via FTP. Le troisième service, le service de transfert de données grand public, fonctionne de manière asynchrone avec les deux premiers. Il reçoit une demande d'un système externe tiers pour recevoir le fichier évoqué ci-dessus, prend le fichier de réponse prêt, le modifie (met à jour les champs id, description, linkToFile) et envoie la réponse sous la forme d'un message SOAP. Autrement dit, le tableau d'ensemble est le suivant : les deux premiers services ne commencent leur travail que lorsque les données à mettre à jour sont arrivées. Le troisième service fonctionne en permanence car il y a de nombreux consommateurs d'informations, environ 1000 XNUMX requêtes de données par minute. Les services sont disponibles en permanence et leurs instances sont situées dans différents environnements, tels que test, démo, pré-production et prod. Vous trouverez ci-dessous un schéma du fonctionnement de ces services. Permettez-moi de préciser tout de suite que certains détails ont été simplifiés pour éviter une complexité inutile.
Approfondissement technique
Lors de la planification d'une solution au problème, nous avons d'abord décidé de créer des applications en Java en utilisant le framework Spring, l'équilibreur Nginx, la base de données Postgres et d'autres éléments techniques et moins techniques. Puisque le temps de développer une solution technique nous a permis d'envisager d'autres approches pour résoudre ce problème, notre regard s'est porté sur la technologie Apache NIFI, à la mode dans certains milieux. Je dirai tout de suite que cette technologie nous a permis de remarquer ces 3 services. Cet article décrira le développement d'un service de transport de fichiers et d'un service de transfert de données vers le consommateur, mais si l'article est utile, j'écrirai sur le service de mise à jour des données dans la base de données.
Qu'est-ce
NIFI est une architecture distribuée pour un chargement et un traitement parallèles rapides des données, un grand nombre de plugins pour les sources et les transformations, le versionnage des configurations et bien plus encore. Un avantage appréciable est qu’il est très simple à utiliser. Les processus triviaux tels que getFile, sendHttpRequest et autres peuvent être représentés sous forme de carrés. Chaque carré représente un processus dont l’interaction est visible dans la figure ci-dessous. Une documentation plus détaillée sur les interactions de configuration du processus a été rédigée
L'idée d'écrire un article est née après une longue recherche et une structuration des informations reçues en quelque chose de conscient, ainsi que du désir de faciliter un peu la vie des futurs développeurs.
Exemple
Un exemple de la façon dont les carrés interagissent les uns avec les autres est considéré. Le schéma général est assez simple : Nous recevons une requête HTTP (en théorie, avec un fichier dans le corps de la requête. Pour démontrer les capacités de NIFI, dans cet exemple la requête démarre le processus de réception d'un fichier depuis le stockage de fichiers local ), puis nous renvoyons une réponse indiquant que la demande a été reçue, en parallèle le processus de réception d'un fichier de FH puis le processus de déplacement via FTP vers FH. Il convient de préciser que les processus interagissent les uns avec les autres via ce qu'on appelle flowFile. Il s'agit de l'entité de base de NIFI qui stocke les attributs et le contenu. Le contenu correspond aux données représentées par le fichier de flux. Autrement dit, si vous recevez un fichier d'un carré et que vous le transférez vers un autre, le contenu sera votre fichier.
Comme vous pouvez le voir, cette image montre le processus général. HandleHttpRequest - accepte les demandes, ReplaceText - génère un corps de réponse, HandleHttpResponse - envoie une réponse. FetchFile - reçoit un fichier d'un stockage de fichiers, le transfère sur le carré PutSftp - met ce fichier sur FTP, à l'adresse indiquée. Maintenant plus sur ce processus.
Dans ce cas, la demande est le début de tout. Regardons ses paramètres de configuration.
Tout ici est assez trivial, à l'exception de StandardHttpContextMap - c'est une sorte de service qui vous permet d'envoyer et de recevoir des demandes. Plus en détail et même avec des exemples, vous pouvez voir -
Examinons ensuite les paramètres de configuration ReplaceText du carré. Il convient de prêter attention à ReplacementValue - c'est ce qui sera renvoyé à l'utilisateur sous la forme d'une réponse. Dans les paramètres, vous pouvez ajuster le niveau de journalisation, vous pouvez voir les journaux {où vous avez décompressé nifi}/nifi-1.9.2/logs, il y a aussi des paramètres d'échec/succès - sur la base de ces paramètres, vous pouvez réguler le processus dans son ensemble . Autrement dit, dans le cas d'un traitement de texte réussi, le processus d'envoi d'une réponse à l'utilisateur sera appelé et, dans un autre cas, nous enregistrerons simplement le processus infructueux.
Il n'y a rien de particulièrement intéressant dans les propriétés HandleHttpResponse, à l'exception du statut lorsqu'une réponse est créée avec succès.
Nous avons trié la demande et la réponse - passons à la réception du fichier et à son placement sur le serveur FTP. FetchFile - reçoit un fichier au chemin spécifié dans les paramètres et le transmet au processus suivant.
Et puis le carré PutSftp - place le fichier dans le stockage de fichiers. Nous pouvons voir les paramètres de configuration ci-dessous.
Il convient de prêter attention au fait que chaque carré est un processus distinct qui doit être lancé. Nous avons examiné l'exemple le plus simple qui ne nécessite aucune personnalisation complexe. Ensuite, nous examinerons le processus un peu plus compliqué, où nous écrirons un peu sur les grooves.
Exemple plus complexe
Le service de transfert de données au consommateur s'est avéré un peu plus compliqué en raison du processus de modification du message SOAP. Le processus général est illustré dans la figure ci-dessous.
Ici, l'idée n'est pas non plus particulièrement compliquée : nous avons reçu une demande du consommateur indiquant qu'il avait besoin de données, envoyé une réponse indiquant qu'il avait reçu un message, lancé le processus de réception du fichier de réponse, puis l'avons édité avec une certaine logique, puis transféré le fichier au consommateur sous la forme d'un message SOAP au serveur.
Je pense qu'il n'est pas nécessaire de décrire à nouveau les carrés que nous avons vus ci-dessus - passons directement aux nouveaux. Si vous devez modifier un fichier et que les carrés de type ReplaceText ordinaires ne conviennent pas, vous devrez écrire votre propre script. Cela peut être fait en utilisant le carré ExecuteGroogyScript. Ses paramètres sont présentés ci-dessous.
Il existe deux options pour charger le script dans ce carré. La première consiste à télécharger un fichier avec un script. La seconde consiste à insérer un script dans scriptBody. Autant que je sache, le carréexecuteScript prend en charge plusieurs langages - l'un d'eux est groovy. Je vais décevoir les développeurs Java - vous ne pouvez pas écrire de scripts en Java dans de tels carrés. Pour ceux qui le souhaitent vraiment, vous devez créer votre propre carré personnalisé et l’ajouter au système NIFI. Toute cette opération s'accompagne d'une assez longue danse au tambourin, dont nous ne parlerons pas dans cet article. J'ai choisi le langage groovy. Vous trouverez ci-dessous un script de test qui met simplement à jour de manière incrémentielle l'identifiant dans un message SOAP. Il est important de noter. Vous prenez le fichier depuis flowFile et le mettez à jour, n'oubliez pas que vous devez le remettre là, mis à jour. Il convient également de noter que toutes les bibliothèques ne sont pas incluses. Il peut arriver que vous deviez quand même importer l'une des bibliothèques. Un autre inconvénient est que le script de ce carré est assez difficile à déboguer. Il existe un moyen de se connecter à la JVM NIFI et de démarrer le processus de débogage. Personnellement, j'ai lancé une application locale et simulé la réception d'un fichier de la session. J'ai également fait du débogage localement. Les erreurs qui apparaissent lors du chargement d'un script sont assez faciles à détecter par Google et sont écrites par NIFI lui-même dans le journal.
import org.apache.commons.io.IOUtils
import groovy.xml.XmlUtil
import java.nio.charset.*
import groovy.xml.StreamingMarkupBuilder
def flowFile = session.get()
if (!flowFile) return
try {
flowFile = session.write(flowFile, { inputStream, outputStream ->
String result = IOUtils.toString(inputStream, "UTF-8");
def recordIn = new XmlSlurper().parseText(result)
def element = recordIn.depthFirst().find {
it.name() == 'id'
}
def newId = Integer.parseInt(element.toString()) + 1
def recordOut = new XmlSlurper().parseText(result)
recordOut.Body.ClientMessage.RequestMessage.RequestContent.content.MessagePrimaryContent.ResponseBody.id = newId
def res = new StreamingMarkupBuilder().bind { mkp.yield recordOut }.toString()
outputStream.write(res.getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)
}
catch(Exception e) {
log.error("Error during processing of validate.groovy", e)
session.transfer(flowFile, REL_FAILURE)
}
En fait, c’est là que se termine la personnalisation du carré. Ensuite, le fichier mis à jour est transféré au carré, qui se charge d'envoyer le fichier au serveur. Vous trouverez ci-dessous les paramètres de ce carré.
Nous décrivons la méthode par laquelle un message SOAP sera transmis. Nous écrivons où. Ensuite, vous devez indiquer qu'il s'agit de SOAP.
Ajoutez plusieurs propriétés telles que l'hôte et l'action (soapAction). Nous sauvegardons et vérifions. Vous pouvez voir plus de détails sur la façon d'envoyer des requêtes SOAP
Nous avons examiné plusieurs options pour utiliser les processus NIFI. Comment interagissent-ils et quel est leur véritable bénéfice ? Les exemples considérés sont des tests et sont légèrement différents de ce qui se passe réellement au combat. J'espère que cet article sera un peu utile aux développeurs. Merci pour votre attention. Si vous avez des questions, écrivez. Je vais essayer de répondre.
Source: habr.com