Apache NIFI - Μια σύντομη επισκόπηση των ευκαιριών στην πράξη

Εισαγωγή

Έτυχε ότι στον τρέχοντα τόπο εργασίας μου έπρεπε να εξοικειωθώ με αυτήν την τεχνολογία. Θα ξεκινήσω με ένα μικρό υπόβαθρο. Στην επόμενη συνάντηση, η ομάδα μας είπαν ότι έπρεπε να δημιουργήσουμε ολοκλήρωση με γνωστό σύστημα. Με τον όρο ενσωμάτωση εννοούνταν ότι αυτό το γνωστό σύστημα θα μας έστελνε αιτήματα μέσω HTTP σε ένα συγκεκριμένο τελικό σημείο και εμείς, παραδόξως, θα στέλναμε απαντήσεις με τη μορφή μηνύματος SOAP. Όλα φαίνονται απλά και τετριμμένα. Από αυτό προκύπτει ότι χρειάζεστε...

Έργο

Δημιουργήστε 3 υπηρεσίες. Το πρώτο από αυτά είναι η υπηρεσία ενημέρωσης βάσης δεδομένων. Αυτή η υπηρεσία, όταν έρχονται νέα δεδομένα από ένα σύστημα τρίτου μέρους, ενημερώνει τα δεδομένα στη βάση δεδομένων και δημιουργεί ένα αρχείο σε μορφή CSV για να τα μεταφέρει στο επόμενο σύστημα. Το τελικό σημείο της δεύτερης υπηρεσίας ονομάζεται - η υπηρεσία μεταφοράς FTP, η οποία λαμβάνει το μεταφερόμενο αρχείο, το επικυρώνει και το τοποθετεί σε αποθήκευση αρχείων μέσω FTP. Η τρίτη υπηρεσία, η υπηρεσία μεταφοράς δεδομένων καταναλωτή, λειτουργεί ασύγχρονα με τις δύο πρώτες. Λαμβάνει ένα αίτημα από ένα εξωτερικό σύστημα τρίτου κατασκευαστή για να λάβει το αρχείο που συζητήθηκε παραπάνω, λαμβάνει το έτοιμο αρχείο απάντησης, το τροποποιεί (ενημερώνει τα πεδία αναγνωριστικού, περιγραφής, linkToFile) και στέλνει την απάντηση με τη μορφή μηνύματος SOAP. Δηλαδή, η συνολική εικόνα είναι η εξής: οι δύο πρώτες υπηρεσίες ξεκινούν τη δουλειά τους μόνο όταν φτάσουν τα δεδομένα για ενημέρωση. Η τρίτη υπηρεσία λειτουργεί συνεχώς γιατί υπάρχουν πολλοί καταναλωτές πληροφοριών, περίπου 1000 αιτήματα για δεδομένα ανά λεπτό. Οι υπηρεσίες είναι συνεχώς διαθέσιμες και οι παρουσίες τους βρίσκονται σε διαφορετικά περιβάλλοντα, όπως δοκιμή, επίδειξη, προπαραγωγή και παραγωγή. Ακολουθεί ένα διάγραμμα του τρόπου λειτουργίας αυτών των υπηρεσιών. Επιτρέψτε μου να διευκρινίσω αμέσως ότι ορισμένες λεπτομέρειες έχουν απλοποιηθεί για να αποφευχθεί η περιττή πολυπλοκότητα.

Apache NIFI - Μια σύντομη επισκόπηση των ευκαιριών στην πράξη

Τεχνική Εμβάθυνση

Όταν σχεδιάζαμε μια λύση στο πρόβλημα, αποφασίσαμε αρχικά να κάνουμε εφαρμογές σε Java χρησιμοποιώντας το Spring Framework, Nginx balancer, Postgres βάση δεδομένων και άλλα τεχνικά και όχι τόσο τεχνικά πράγματα. Δεδομένου ότι ο χρόνος ανάπτυξης μιας τεχνικής λύσης μας επέτρεψε να εξετάσουμε άλλες προσεγγίσεις για την επίλυση αυτού του προβλήματος, το βλέμμα μας έπεσε στην τεχνολογία Apache NIFI, η οποία είναι της μόδας σε ορισμένους κύκλους. Θα πω αμέσως ότι αυτή η τεχνολογία μας επέτρεψε να παρατηρήσουμε αυτές τις 3 υπηρεσίες. Αυτό το άρθρο θα περιγράψει την ανάπτυξη μιας υπηρεσίας μεταφοράς αρχείων και μιας υπηρεσίας μεταφοράς δεδομένων στον καταναλωτή, αλλά εάν το άρθρο είναι χρήσιμο, θα γράψω για την υπηρεσία ενημέρωσης δεδομένων στη βάση δεδομένων.

Τι είναι αυτό;

Το NIFI είναι μια κατανεμημένη αρχιτεκτονική για γρήγορη παράλληλη φόρτωση και επεξεργασία δεδομένων, μεγάλος αριθμός πρόσθετων για πηγές και μετασχηματισμούς, έκδοση εκδόσεων διαμορφώσεων και πολλά άλλα. Ένα ωραίο μπόνους είναι ότι είναι πολύ εύκολο στη χρήση. Οι ασήμαντες διεργασίες όπως το getFile, το sendHttpRequest και άλλες μπορούν να αναπαρασταθούν ως τετράγωνα. Κάθε τετράγωνο αντιπροσωπεύει μια διαδικασία, η αλληλεπίδραση της οποίας φαίνεται στο παρακάτω σχήμα. Έχει γραφτεί πιο λεπτομερής τεκμηρίωση σχετικά με τις αλληλεπιδράσεις της ρύθμισης της διαδικασίας εδώ , για όσους μιλούν ρωσικά - εδώ. Η τεκμηρίωση περιγράφει τέλεια τον τρόπο αποσυσκευασίας και εκτέλεσης του NIFI, καθώς και τον τρόπο δημιουργίας διεργασιών, γνωστών και ως τετραγώνων
Η ιδέα να γράψω ένα άρθρο γεννήθηκε μετά από μια μακρά αναζήτηση και τη δόμηση των πληροφοριών που λαμβάνονται σε κάτι συνειδητό, καθώς και την επιθυμία να κάνουμε τη ζωή λίγο πιο εύκολη για τους μελλοντικούς προγραμματιστές.

Παράδειγμα

Εξετάζεται ένα παράδειγμα του τρόπου με τον οποίο τα τετράγωνα αλληλεπιδρούν μεταξύ τους. Το γενικό σχήμα είναι αρκετά απλό: Λαμβάνουμε ένα αίτημα HTTP (θεωρητικά, με ένα αρχείο στο σώμα του αιτήματος. Για να δείξουμε τις δυνατότητες του NIFI, σε αυτό το παράδειγμα το αίτημα ξεκινά τη διαδικασία λήψης ενός αρχείου από την τοπική αποθήκευση αρχείων ), στη συνέχεια στέλνουμε απάντηση ότι το αίτημα έχει ληφθεί, παράλληλα η διαδικασία λήψης ενός αρχείου από το FH και στη συνέχεια η διαδικασία μετακίνησης του μέσω FTP στο FH. Αξίζει να διευκρινιστεί ότι οι διεργασίες αλληλεπιδρούν μεταξύ τους μέσω του λεγόμενου flowFile. Αυτή είναι η βασική οντότητα στο NIFI που αποθηκεύει χαρακτηριστικά και περιεχόμενο. Το περιεχόμενο είναι τα δεδομένα που αντιπροσωπεύονται από το αρχείο ροής. Δηλαδή, χοντρικά, αν λάβετε ένα αρχείο από ένα τετράγωνο και το μεταφέρετε σε άλλο, το περιεχόμενο θα είναι το αρχείο σας.

Apache NIFI - Μια σύντομη επισκόπηση των ευκαιριών στην πράξη

Όπως μπορείτε να δείτε, αυτή η εικόνα δείχνει τη γενική διαδικασία. HandleHttpRequest - δέχεται αιτήματα, ReplaceText - δημιουργεί ένα σώμα απόκρισης, HandleHttpResponse - στέλνει μια απάντηση. FetchFile - λαμβάνει ένα αρχείο από μια αποθήκευση αρχείων, το μεταφέρει στο τετράγωνο PutSftp - τοποθετεί αυτό το αρχείο στο FTP, στην καθορισμένη διεύθυνση. Τώρα περισσότερα για αυτή τη διαδικασία.

Σε αυτή την περίπτωση, το αίτημα είναι η αρχή των πάντων. Ας δούμε τις παραμέτρους διαμόρφωσής του.

Apache NIFI - Μια σύντομη επισκόπηση των ευκαιριών στην πράξη

Όλα εδώ είναι αρκετά ασήμαντα με εξαίρεση το StandardHttpContextMap - αυτό είναι ένα είδος υπηρεσίας που σας επιτρέπει να στέλνετε και να λαμβάνετε αιτήματα. Πιο αναλυτικά και ακόμη και με παραδείγματα, μπορείτε να δείτε - εδώ

Στη συνέχεια, ας δούμε τις παραμέτρους διαμόρφωσης ReplaceText του τετραγώνου. Αξίζει να δώσετε προσοχή στο ReplacementValue - αυτό θα επιστραφεί στον χρήστη με τη μορφή απάντησης. Στις ρυθμίσεις μπορείτε να προσαρμόσετε το επίπεδο καταγραφής, μπορείτε να δείτε τα αρχεία καταγραφής {where you unpacked nifi}/nifi-1.9.2/logs, υπάρχουν επίσης παράμετροι αποτυχίας/επιτυχίας - με βάση αυτές τις παραμέτρους μπορείτε να ρυθμίσετε τη διαδικασία ως σύνολο . Δηλαδή, σε περίπτωση επιτυχούς επεξεργασίας κειμένου, θα καλείται η διαδικασία αποστολής απάντησης στον χρήστη και σε άλλη περίπτωση απλά θα καταγράψουμε την ανεπιτυχή διαδικασία.

Apache NIFI - Μια σύντομη επισκόπηση των ευκαιριών στην πράξη

Δεν υπάρχει τίποτα ιδιαίτερα ενδιαφέρον στις ιδιότητες HandleHttpResponse εκτός από την κατάσταση όταν μια απάντηση δημιουργείται με επιτυχία.

Apache NIFI - Μια σύντομη επισκόπηση των ευκαιριών στην πράξη

Τακτοποιήσαμε το αίτημα και την απάντηση - ας προχωρήσουμε στη λήψη του αρχείου και την τοποθέτησή του στον διακομιστή FTP. FetchFile - λαμβάνει ένα αρχείο στη διαδρομή που καθορίζεται στις ρυθμίσεις και το μεταβιβάζει στην επόμενη διαδικασία.

Apache NIFI - Μια σύντομη επισκόπηση των ευκαιριών στην πράξη

Και μετά το τετράγωνο PutSftp - τοποθετεί το αρχείο στον χώρο αποθήκευσης αρχείων. Μπορούμε να δούμε τις παραμέτρους διαμόρφωσης παρακάτω.

Apache NIFI - Μια σύντομη επισκόπηση των ευκαιριών στην πράξη

Αξίζει να δοθεί προσοχή στο γεγονός ότι κάθε τετράγωνο είναι μια ξεχωριστή διαδικασία που πρέπει να δρομολογηθεί. Εξετάσαμε το απλούστερο παράδειγμα που δεν απαιτεί περίπλοκη προσαρμογή. Στη συνέχεια, θα δούμε τη διαδικασία λίγο πιο περίπλοκη, όπου θα γράψουμε λίγο στα αυλάκια.

Πιο περίπλοκο παράδειγμα

Η υπηρεσία μεταφοράς δεδομένων στον καταναλωτή αποδείχθηκε λίγο πιο περίπλοκη λόγω της διαδικασίας τροποποίησης του μηνύματος SOAP. Η γενική διαδικασία φαίνεται στο παρακάτω σχήμα.

Apache NIFI - Μια σύντομη επισκόπηση των ευκαιριών στην πράξη

Εδώ η ιδέα δεν είναι επίσης ιδιαίτερα περίπλοκη: λάβαμε ένα αίτημα από τον καταναλωτή ότι χρειαζόταν δεδομένα, στείλαμε μια απάντηση ότι έλαβε ένα μήνυμα, ξεκινήσαμε τη διαδικασία λήψης του αρχείου απάντησης, στη συνέχεια το επεξεργαστήκαμε με μια συγκεκριμένη λογική και στη συνέχεια μετέφερε το αρχείο στον καταναλωτή με τη μορφή μηνύματος SOAP στον διακομιστή.

Νομίζω ότι δεν χρειάζεται να περιγράψουμε ξανά εκείνα τα τετράγωνα που είδαμε παραπάνω - ας προχωρήσουμε κατευθείαν στα νέα. Εάν χρειάζεται να επεξεργαστείτε οποιοδήποτε αρχείο και τα συνηθισμένα τετράγωνα τύπου ReplaceText δεν είναι κατάλληλα, θα πρέπει να γράψετε το δικό σας σενάριο. Αυτό μπορεί να γίνει χρησιμοποιώντας το τετράγωνο ExecuteGroogyScript. Οι ρυθμίσεις του παρουσιάζονται παρακάτω.

Apache NIFI - Μια σύντομη επισκόπηση των ευκαιριών στην πράξη

Υπάρχουν δύο επιλογές για τη φόρτωση του σεναρίου σε αυτό το τετράγωνο. Το πρώτο είναι με τη λήψη ενός αρχείου με ένα σενάριο. Το δεύτερο είναι με την εισαγωγή ενός σεναρίου στο scriptBody. Από όσο γνωρίζω, το τετράγωνο executeScript υποστηρίζει πολλές γλώσσες - μία από αυτές είναι groovy. Θα απογοητεύσω τους προγραμματιστές java - δεν μπορείτε να γράψετε σενάρια σε java σε τέτοια τετράγωνα. Για όσους το θέλουν πραγματικά, πρέπει να δημιουργήσετε το δικό σας προσαρμοσμένο τετράγωνο και να το προσθέσετε στο σύστημα NIFI. Όλη αυτή η επέμβαση συνοδεύεται από αρκετά μεγάλο χορό με ντέφι, με τον οποίο δεν θα ασχοληθούμε σε αυτό το άρθρο. Διάλεξα τη γλώσσα groovy. Παρακάτω είναι ένα δοκιμαστικό σενάριο που απλώς ενημερώνει σταδιακά το αναγνωριστικό σε ένα μήνυμα SOAP. Είναι σημαντικό να σημειωθεί. Παίρνετε το αρχείο από το flowFile και το ενημερώνετε, μην ξεχνάτε ότι πρέπει να το ξαναβάλετε εκεί, ενημερωμένο. Αξίζει επίσης να σημειωθεί ότι δεν περιλαμβάνονται όλες οι βιβλιοθήκες. Μπορεί να συμβεί ότι πρέπει ακόμα να εισαγάγετε ένα από τα lib. Ένα άλλο μειονέκτημα είναι ότι το σενάριο σε αυτό το τετράγωνο είναι αρκετά δύσκολο να διορθωθεί. Υπάρχει τρόπος να συνδεθείτε στο NIFI JVM και να ξεκινήσετε τη διαδικασία εντοπισμού σφαλμάτων. Προσωπικά, ξεκίνησα μια τοπική εφαρμογή και προσομοίωσα τη λήψη ενός αρχείου από τη συνεδρία. Έκανα επίσης εντοπισμό σφαλμάτων τοπικά. Τα σφάλματα που εμφανίζονται κατά τη φόρτωση ενός σεναρίου είναι αρκετά εύκολα στην Google και γράφονται από το ίδιο το NIFI στο αρχείο καταγραφής.

import org.apache.commons.io.IOUtils
import groovy.xml.XmlUtil
import java.nio.charset.*
import groovy.xml.StreamingMarkupBuilder

def flowFile = session.get()
if (!flowFile) return
try {
    flowFile = session.write(flowFile, { inputStream, outputStream ->
        String result = IOUtils.toString(inputStream, "UTF-8");
        def recordIn = new XmlSlurper().parseText(result)
        def element = recordIn.depthFirst().find {
            it.name() == 'id'
        }

        def newId = Integer.parseInt(element.toString()) + 1
        def recordOut = new XmlSlurper().parseText(result)
        recordOut.Body.ClientMessage.RequestMessage.RequestContent.content.MessagePrimaryContent.ResponseBody.id = newId

        def res = new StreamingMarkupBuilder().bind { mkp.yield recordOut }.toString()
        outputStream.write(res.getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
     session.transfer(flowFile, REL_SUCCESS)
}
catch(Exception e) {
    log.error("Error during processing of validate.groovy", e)
    session.transfer(flowFile, REL_FAILURE)
}

Στην πραγματικότητα, εδώ τελειώνει η προσαρμογή του τετραγώνου. Στη συνέχεια, το ενημερωμένο αρχείο μεταφέρεται στο τετράγωνο, το οποίο είναι υπεύθυνο για την αποστολή του αρχείου στον διακομιστή. Παρακάτω είναι οι ρυθμίσεις για αυτό το τετράγωνο.

Apache NIFI - Μια σύντομη επισκόπηση των ευκαιριών στην πράξη

Περιγράφουμε τη μέθοδο με την οποία θα μεταδοθεί ένα μήνυμα SOAP. Γράφουμε πού. Στη συνέχεια, πρέπει να υποδείξετε ότι αυτό είναι SOAP.

Apache NIFI - Μια σύντομη επισκόπηση των ευκαιριών στην πράξη

Προσθέστε πολλές ιδιότητες όπως host και action (soapAction). Αποθηκεύουμε και ελέγχουμε. Μπορείτε να δείτε περισσότερες λεπτομέρειες σχετικά με τον τρόπο αποστολής αιτημάτων SOAP εδώ

Εξετάσαμε διάφορες επιλογές για τη χρήση διαδικασιών NIFI. Πώς αλληλεπιδρούν και ποιο είναι το πραγματικό τους όφελος; Τα παραδείγματα που εξετάζονται είναι δοκιμαστικά και είναι ελαφρώς διαφορετικά από αυτό που συμβαίνει στην πραγματικότητα στη μάχη. Ελπίζω ότι αυτό το άρθρο θα είναι λίγο χρήσιμο για τους προγραμματιστές. Σας ευχαριστώ για την προσοχή σας. Αν έχετε ερωτήσεις, γράψτε. Θα προσπαθήσω να απαντήσω.

Πηγή: www.habr.com

Προσθέστε ένα σχόλιο