Αυτοματισμός παροχής ροής σε Apache NiFi

Γεια σε όλους!

Αυτοματισμός παροχής ροής σε Apache NiFi

Η εργασία είναι η εξής - υπάρχει μια ροή που φαίνεται στην παραπάνω εικόνα, η οποία πρέπει να αναπτυχθεί σε N διακομιστές με Apache NiFi. Δοκιμή ροής - ένα αρχείο δημιουργείται και αποστέλλεται σε άλλη παρουσία NiFi. Η μεταφορά δεδομένων πραγματοποιείται χρησιμοποιώντας το πρωτόκολλο NiFi Site to Site.

Το NiFi Site to Site (S2S) είναι ένας ασφαλής, εξαιρετικά προσαρμόσιμος τρόπος μεταφοράς δεδομένων μεταξύ παρουσιών NiFi. Δείτε πώς λειτουργεί το S2S τεκμηρίωση και είναι σημαντικό να θυμάστε να ρυθμίσετε το στιγμιότυπο NiFi για να επιτρέψετε στο S2S να βλέπει εδώ.

Όταν πρόκειται για μεταφορά δεδομένων με χρήση S2S, η μία περίπτωση ονομάζεται πελάτης, η δεύτερη είναι διακομιστής. Ο πελάτης στέλνει δεδομένα, ο διακομιστής τα λαμβάνει. Δύο τρόποι για να ρυθμίσετε τη μεταφορά δεδομένων μεταξύ τους:

  1. Σπρώξτε. Τα δεδομένα αποστέλλονται από την παρουσία πελάτη χρησιμοποιώντας μια ομάδα απομακρυσμένων διεργασιών (RPG). Στην περίπτωση του διακομιστή, τα δεδομένα λαμβάνονται χρησιμοποιώντας τη θύρα εισόδου
  2. Τραβήξτε. Ο διακομιστής λαμβάνει δεδομένα χρησιμοποιώντας το RPG, ο πελάτης στέλνει χρησιμοποιώντας τη θύρα εξόδου.


Η ροή για κύλιση αποθηκεύεται στο μητρώο Apache.

Το Apache NiFi Registry είναι ένα υποέργο του Apache NiFi που παρέχει ένα εργαλείο αποθήκευσης ροής και έκδοσης. Ένα είδος GIT. Πληροφορίες σχετικά με την εγκατάσταση, τη διαμόρφωση και την εργασία με το μητρώο μπορείτε να βρείτε στο επίσημη τεκμηρίωση. Η ροή για αποθήκευση συνδυάζεται σε μια ομάδα διεργασιών και αποθηκεύεται στο μητρώο με αυτήν τη μορφή. Θα επανέλθουμε σε αυτό αργότερα στο άρθρο.

Στην αρχή, όταν το N είναι μικρός αριθμός, η ροή παραδίδεται και ενημερώνεται χειροκίνητα σε εύλογο χρόνο.

Αλλά καθώς το Ν αυξάνεται, υπάρχουν περισσότερα προβλήματα:

  1. χρειάζεται περισσότερος χρόνος για την ενημέρωση της ροής. Πρέπει να πάτε σε όλους τους διακομιστές
  2. υπάρχουν σφάλματα κατά την ενημέρωση των προτύπων. Εδώ ενημερώθηκαν, αλλά εδώ ξέχασαν
  3. ανθρώπινο λάθος κατά την εκτέλεση μεγάλου αριθμού παρόμοιων λειτουργιών

Όλα αυτά μας οδηγούν στο γεγονός ότι είναι απαραίτητο να αυτοματοποιηθεί η διαδικασία. Έχω δοκιμάσει τους παρακάτω τρόπους για να λύσω αυτό το πρόβλημα:

  1. Χρησιμοποιήστε το MiNiFi αντί για το NiFi
  2. NiFi CLI
  3. NiPyAPI

Χρησιμοποιώντας το MiniFi

ApacheMiNify είναι ένα υποέργο του Apache NiFi. Το MiNiFy είναι ένας συμπαγής παράγοντας που χρησιμοποιεί τους ίδιους επεξεργαστές με το NiFi, επιτρέποντάς σας να δημιουργήσετε την ίδια ροή όπως στο NiFi. Η ελαφρότητα του πράκτορα επιτυγχάνεται, μεταξύ άλλων, λόγω του γεγονότος ότι το MiNiFy δεν διαθέτει γραφική διεπαφή για τη διαμόρφωση ροής. Η έλλειψη γραφικής διεπαφής του MiNiFy σημαίνει ότι είναι απαραίτητο να λυθεί το πρόβλημα της παροχής ροής στο minifi. Δεδομένου ότι το MiNiFy χρησιμοποιείται ενεργά στο IOT, υπάρχουν πολλά στοιχεία και η διαδικασία παροχής ροής σε τελικές παρουσίες minifi πρέπει να αυτοματοποιηθεί. Γνωστό καθήκον, σωστά;

Ένα άλλο υποέργο, το MiNiFi C2 Server, θα βοηθήσει στην επίλυση αυτού του προβλήματος. Αυτό το προϊόν προορίζεται να είναι το κεντρικό σημείο στην αρχιτεκτονική ανάπτυξης. Πώς να διαμορφώσετε το περιβάλλον - περιγράφεται στο Αυτό το άρθρο στο Habré και οι πληροφορίες είναι αρκετές για να λυθεί το πρόβλημα. Το MiniFi σε συνδυασμό με τον διακομιστή C2 ενημερώνει αυτόματα τη διαμόρφωσή του. Το μόνο μειονέκτημα αυτής της προσέγγισης είναι ότι πρέπει να δημιουργήσετε πρότυπα στον διακομιστή C2, μια απλή δέσμευση στο μητρώο δεν αρκεί.

Η επιλογή που περιγράφεται στο παραπάνω άρθρο λειτουργεί και δεν είναι δύσκολο να εφαρμοστεί, αλλά δεν πρέπει να ξεχνάμε τα εξής:

  1. Το minifi δεν έχει όλους τους επεξεργαστές από το nifi
  2. Οι εκδόσεις CPU στο Minifi υστερούν σε σχέση με τις εκδόσεις CPU στο NiFi.

Τη στιγμή της γραφής, η τελευταία έκδοση του NiFi είναι η 1.9.2. Η έκδοση επεξεργαστή της τελευταίας έκδοσης MiniFi είναι 1.7.0. Επεξεργαστές μπορούν να προστεθούν στο MiNiFi, αλλά λόγω διαφορών έκδοσης μεταξύ των επεξεργαστών NiFi και MiNiFi, αυτό μπορεί να μην λειτουργεί.

NiFi CLI

Κρίνοντας από περιγραφή εργαλείο στον επίσημο ιστότοπο, αυτό είναι ένα εργαλείο για την αυτοματοποίηση της αλληλεπίδρασης μεταξύ NiFI και Μητρώου NiFi στον τομέα της παράδοσης ροής ή της διαχείρισης διαδικασιών. Κατεβάστε αυτό το εργαλείο για να ξεκινήσετε. ως εκ τούτου,.

Εκτελέστε το βοηθητικό πρόγραμμα

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Για να φορτώσουμε την απαραίτητη ροή από το μητρώο, πρέπει να γνωρίζουμε τα αναγνωριστικά του καλαθιού (αναγνωριστικό κάδου) και την ίδια τη ροή (αναγνωριστικό ροής). Αυτά τα δεδομένα μπορούν να ληφθούν είτε μέσω του cli είτε μέσω της διεπαφής web του μητρώου NiFi. Η διεπαφή ιστού μοιάζει με αυτό:

Αυτοματισμός παροχής ροής σε Apache NiFi

Χρησιμοποιώντας το CLI, κάνετε αυτό:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Εκτελέστε την ομάδα διεργασιών εισαγωγής από το μητρώο:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Ένα σημαντικό σημείο είναι ότι οποιοδήποτε στιγμιότυπο nifi μπορεί να καθοριστεί ως ο κεντρικός υπολογιστής στον οποίο εισάγουμε την ομάδα διεργασιών.

Η ομάδα διεργασιών προστέθηκε με τερματισμένους επεξεργαστές, πρέπει να ξεκινήσουν

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Μπράβο, ξεκίνησαν οι επεξεργαστές. Ωστόσο, σύμφωνα με τις συνθήκες του προβλήματος, χρειαζόμαστε περιπτώσεις NiFi για να στείλουμε δεδομένα σε άλλες περιπτώσεις. Ας υποθέσουμε ότι η μέθοδος Push επιλέχθηκε για τη μεταφορά δεδομένων στον διακομιστή. Για να οργανώσετε τη μεταφορά δεδομένων, είναι απαραίτητο να ενεργοποιήσετε τη μεταφορά δεδομένων (Ενεργοποίηση μετάδοσης) στην προστιθέμενη Ομάδα Απομακρυσμένων Διαδικασιών (RPG), η οποία περιλαμβάνεται ήδη στη ροή μας.

Αυτοματισμός παροχής ροής σε Apache NiFi

Στην τεκμηρίωση στο CLI και σε άλλες πηγές, δεν βρήκα τρόπο να ενεργοποιήσω τη μεταφορά δεδομένων. Εάν ξέρετε πώς να το κάνετε αυτό, γράψτε στα σχόλια.

Αφού έχουμε bash και είμαστε έτοιμοι να πάμε μέχρι το τέλος, θα βρούμε διέξοδο! Μπορείτε να χρησιμοποιήσετε το NiFi API για να λύσετε αυτό το πρόβλημα. Ας χρησιμοποιήσουμε την ακόλουθη μέθοδο, παίρνουμε το αναγνωριστικό από τα παραπάνω παραδείγματα (στην περίπτωσή μας είναι 7f522a13-016e-1000-e504-d5b15587f2f3). Περιγραφή των μεθόδων NiFi API εδώ.

Αυτοματισμός παροχής ροής σε Apache NiFi
Στο σώμα, πρέπει να περάσετε JSON, της ακόλουθης μορφής:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Παράμετροι που πρέπει να συμπληρωθούν για να «λειτουργήσει»:
κατάσταση — κατάσταση μεταφοράς δεδομένων. Διαθέσιμο TRANSMITTING για ενεργοποίηση της μεταφοράς δεδομένων, STOPPED για απενεργοποίηση
εκδοχή - έκδοση επεξεργαστή

Η έκδοση θα είναι προεπιλεγμένη σε 0 όταν δημιουργηθεί, αλλά αυτές οι παράμετροι μπορούν να ληφθούν χρησιμοποιώντας τη μέθοδο

Αυτοματισμός παροχής ροής σε Apache NiFi

Για τους λάτρεις των σεναρίων bash, αυτή η μέθοδος μπορεί να φαίνεται κατάλληλη, αλλά είναι δύσκολο για μένα - τα σενάρια bash δεν είναι τα αγαπημένα μου. Ο επόμενος τρόπος είναι πιο ενδιαφέρον και πιο βολικός κατά τη γνώμη μου.

NiPyAPI

Το NiPyAPI είναι μια βιβλιοθήκη Python για αλληλεπίδραση με παρουσίες NiFi. Σελίδα τεκμηρίωσης περιέχει τις απαραίτητες πληροφορίες για την εργασία με τη βιβλιοθήκη. Η γρήγορη εκκίνηση περιγράφεται στο σχέδιο στο github.

Το σενάριό μας για την ανάπτυξη της διαμόρφωσης είναι ένα πρόγραμμα Python. Ας προχωρήσουμε στην κωδικοποίηση.
Ρυθμίστε τις ρυθμίσεις παραμέτρων για περαιτέρω εργασία. Θα χρειαστούμε τις παρακάτω παραμέτρους:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Περαιτέρω θα εισάγω τα ονόματα των μεθόδων αυτής της βιβλιοθήκης, τα οποία περιγράφονται εδώ.

Συνδέουμε το μητρώο με την παρουσία nifi χρησιμοποιώντας

nipyapi.versioning.create_registry_client

Σε αυτό το βήμα, μπορείτε επίσης να προσθέσετε έναν έλεγχο ότι το μητρώο έχει ήδη προστεθεί στην παρουσία, για αυτό μπορείτε να χρησιμοποιήσετε τη μέθοδο

nipyapi.versioning.list_registry_clients

Βρίσκουμε τον κάδο για να αναζητήσουμε περαιτέρω τη ροή στο καλάθι

nipyapi.versioning.get_registry_bucket

Σύμφωνα με τον κουβά που βρέθηκε, ψάχνουμε για ροή

nipyapi.versioning.get_flow_in_bucket

Στη συνέχεια, είναι σημαντικό να κατανοήσουμε εάν αυτή η ομάδα διεργασιών έχει ήδη προστεθεί. Η ομάδα διεργασιών τοποθετείται με συντεταγμένες και μπορεί να προκύψει μια κατάσταση όταν μια δεύτερη υπερτίθεται πάνω από μία. Έλεγξα, μπορεί να είναι 🙂 Για να λάβετε όλη την προστιθέμενη ομάδα διαδικασιών, χρησιμοποιήστε τη μέθοδο

nipyapi.canvas.list_all_process_groups

και μετά μπορούμε να κάνουμε αναζήτηση, για παράδειγμα, με το όνομα.

Δεν θα περιγράψω τη διαδικασία ενημέρωσης του προτύπου, θα πω μόνο ότι εάν προστεθούν επεξεργαστές στη νέα έκδοση του προτύπου, τότε δεν υπάρχουν προβλήματα με την παρουσία μηνυμάτων στις ουρές. Εάν όμως αφαιρεθούν οι επεξεργαστές, τότε μπορεί να προκύψουν προβλήματα (το nifi δεν επιτρέπει την αφαίρεση του επεξεργαστή εάν έχει συσσωρευτεί μια ουρά μηνυμάτων μπροστά του). Εάν ενδιαφέρεστε για το πώς έλυσα αυτό το πρόβλημα - γράψτε μου, παρακαλώ, θα συζητήσουμε αυτό το σημείο. Επαφές στο τέλος του άρθρου. Ας προχωρήσουμε στο βήμα της προσθήκης μιας ομάδας διεργασιών.

Κατά τον εντοπισμό σφαλμάτων του σεναρίου, συνάντησα μια δυνατότητα ότι η πιο πρόσφατη έκδοση της ροής δεν εμφανίζεται πάντα, γι' αυτό σας συνιστώ να διευκρινίσετε πρώτα αυτήν την έκδοση:

nipyapi.versioning.get_latest_flow_ver

Ομάδα διαδικασιών ανάπτυξης:

nipyapi.versioning.deploy_flow_version

Ξεκινάμε τους επεξεργαστές:

nipyapi.canvas.schedule_process_group

Στο μπλοκ για το CLI, γράφτηκε ότι η μεταφορά δεδομένων δεν ενεργοποιείται αυτόματα στην ομάδα απομακρυσμένης διαδικασίας; Κατά την υλοποίηση του σεναρίου, αντιμετώπισα κι εγώ αυτό το πρόβλημα. Εκείνη την εποχή, δεν μπορούσα να ξεκινήσω τη μεταφορά δεδομένων χρησιμοποιώντας το API και αποφάσισα να γράψω στον προγραμματιστή της βιβλιοθήκης NiPyAPI και να ζητήσω συμβουλές / βοήθεια. Ο προγραμματιστής μου απάντησε, συζητήσαμε το πρόβλημα και έγραψε ότι χρειαζόταν χρόνο για να "ελέγχει κάτι". Και τώρα, μερικές μέρες αργότερα, έρχεται ένα email στο οποίο είναι γραμμένη μια συνάρτηση Python που λύνει το πρόβλημα εκκίνησης μου !!! Εκείνη την εποχή, η έκδοση του NiPyAPI ήταν 0.13.3 και, φυσικά, δεν υπήρχε τίποτα τέτοιο σε αυτήν. Αλλά στην έκδοση 0.14.0, η οποία κυκλοφόρησε αρκετά πρόσφατα, αυτή η λειτουργία έχει ήδη συμπεριληφθεί στη βιβλιοθήκη. Συναντώ

nipyapi.canvas.set_remote_process_group_transmission

Έτσι, με τη βοήθεια της βιβλιοθήκης NiPyAPI, συνδέσαμε το μητρώο, κυκλοφόρησαν τη ροή και ξεκινήσαμε ακόμη και τους επεξεργαστές και τη μεταφορά δεδομένων. Στη συνέχεια, μπορείτε να χτενίσετε τον κωδικό, να προσθέσετε όλα τα είδη επιταγών, καταγραφής, και αυτό είναι. Αλλά αυτό είναι μια εντελώς διαφορετική ιστορία.

Από τις επιλογές αυτοματισμού που εξέτασα, η τελευταία μου φάνηκε η πιο αποτελεσματική. Πρώτον, αυτός είναι ακόμα κώδικας python, στον οποίο μπορείτε να ενσωματώσετε βοηθητικό κώδικα προγράμματος και να απολαύσετε όλα τα πλεονεκτήματα μιας γλώσσας προγραμματισμού. Δεύτερον, το έργο NiPyAPI αναπτύσσεται ενεργά και σε περίπτωση προβλημάτων μπορείτε να γράψετε στον προγραμματιστή. Τρίτον, το NiPyAPI εξακολουθεί να είναι ένα πιο ευέλικτο εργαλείο για την αλληλεπίδραση με το NiFi για την επίλυση πολύπλοκων προβλημάτων. Για παράδειγμα, για τον καθορισμό του εάν οι ουρές μηνυμάτων είναι επί του παρόντος κενές στη ροή και εάν είναι δυνατή η ενημέρωση της ομάδας διεργασιών.

Αυτό είναι όλο. Περιέγραψα 3 προσεγγίσεις για την αυτοματοποίηση της παροχής ροής στο NiFi, τις παγίδες που μπορεί να συναντήσει ένας προγραμματιστής και παρείχα έναν κώδικα εργασίας για την αυτοματοποίηση της παράδοσης. Εάν ενδιαφέρεστε για αυτό το θέμα όπως και εγώ - γράφω!

Πηγή: www.habr.com

Προσθέστε ένα σχόλιο