Παρουσιάζουμε το Debezium - CDC για τον Apache Kafka

Παρουσιάζουμε το Debezium - CDC για τον Apache Kafka

Στη δουλειά μου, συναντώ συχνά νέες τεχνικές λύσεις / προϊόντα λογισμικού, οι πληροφορίες για τις οποίες είναι μάλλον σπάνιες στο ρωσόφωνο Διαδίκτυο. Με αυτό το άρθρο, θα προσπαθήσω να καλύψω ένα τέτοιο κενό με ένα παράδειγμα από την πρόσφατη πρακτική μου, όταν χρειάστηκε να ρυθμίσω την αποστολή συμβάντων CDC από δύο δημοφιλή DBMS (PostgreSQL και MongoDB) σε ένα σύμπλεγμα Kafka χρησιμοποιώντας το Debezium. Ελπίζω ότι αυτό το άρθρο ανασκόπησης, το οποίο εμφανίστηκε ως αποτέλεσμα της δουλειάς που έγινε, θα είναι χρήσιμο σε άλλους.

Τι είναι το Debezium και το CDC γενικά;

Debezium - Εκπρόσωπος της κατηγορίας λογισμικού CDC (Λήψη αλλαγής δεδομένων), ή πιο συγκεκριμένα, είναι ένα σύνολο συνδέσεων για διάφορα DBMS που είναι συμβατά με το πλαίσιο Apache Kafka Connect.

Το έργο ανοιχτού κώδικα, με άδεια χρήσης Apache License v2.0 και χορηγείται από την Red Hat. Η ανάπτυξη βρίσκεται σε εξέλιξη από το 2016 και επί του παρόντος παρέχει επίσημη υποστήριξη για τα ακόλουθα DBMS: MySQL, PostgreSQL, MongoDB, SQL Server. Υπάρχουν επίσης σύνδεσμοι για την Cassandra και την Oracle, αλλά αυτή τη στιγμή βρίσκονται σε κατάσταση "πρώιμης πρόσβασης" και οι νέες εκδόσεις δεν εγγυώνται συμβατότητα προς τα πίσω.

Εάν συγκρίνουμε το CDC με την παραδοσιακή προσέγγιση (όταν η εφαρμογή διαβάζει απευθείας δεδομένα από το DBMS), τα κύρια πλεονεκτήματά του περιλαμβάνουν την εφαρμογή ροής αλλαγής δεδομένων σε επίπεδο γραμμής με χαμηλή καθυστέρηση, υψηλή αξιοπιστία και διαθεσιμότητα. Τα δύο τελευταία σημεία επιτυγχάνονται χρησιμοποιώντας ένα σύμπλεγμα Kafka ως χώρο αποθήκευσης για συμβάντα CDC.

Επίσης, τα πλεονεκτήματα περιλαμβάνουν το γεγονός ότι χρησιμοποιείται ένα μεμονωμένο μοντέλο για την αποθήκευση συμβάντων, επομένως η τελική εφαρμογή δεν χρειάζεται να ανησυχεί για τις αποχρώσεις της λειτουργίας διαφορετικών DBMS.

Τέλος, η χρήση ενός διαμεσολαβητή μηνυμάτων ανοίγει το περιθώριο για εφαρμογές κλιμάκωσης που παρακολουθούν αλλαγές στα δεδομένα. Ταυτόχρονα, ο αντίκτυπος στην πηγή δεδομένων ελαχιστοποιείται, καθώς τα δεδομένα λαμβάνονται όχι απευθείας από το DBMS, αλλά από το σύμπλεγμα Kafka.

Σχετικά με την αρχιτεκτονική Debezium

Η χρήση του Debezium καταλήγει σε αυτό το απλό σχήμα:

DBMS (ως πηγή δεδομένων) → σύνδεσμος στο Kafka Connect → Apache Kafka → καταναλωτής

Ενδεικτικά, θα δώσω ένα διάγραμμα από τον ιστότοπο του έργου:

Παρουσιάζουμε το Debezium - CDC για τον Apache Kafka

Ωστόσο, δεν μου αρέσει πολύ αυτό το σχέδιο, επειδή φαίνεται ότι είναι δυνατή μόνο η χρήση ενός συνδετήρα νεροχύτη.

Στην πραγματικότητα, η κατάσταση είναι διαφορετική: γεμίζοντας τη λίμνη δεδομένων σας (τελευταίος σύνδεσμος στο παραπάνω διάγραμμα) δεν είναι ο μόνος τρόπος χρήσης του Debezium. Τα συμβάντα που αποστέλλονται στον Apache Kafka μπορούν να χρησιμοποιηθούν από τις εφαρμογές σας για την αντιμετώπιση διαφόρων καταστάσεων. Για παράδειγμα:

  • αφαίρεση άσχετων δεδομένων από την κρυφή μνήμη.
  • αποστολή ειδοποιήσεων·
  • ενημερώσεις ευρετηρίου αναζήτησης.
  • κάποιου είδους αρχεία καταγραφής ελέγχου.
  • ...

Σε περίπτωση που διαθέτετε εφαρμογή Java και δεν υπάρχει ανάγκη/δυνατότητα να χρησιμοποιήσετε ένα σύμπλεγμα Kafka, υπάρχει επίσης η δυνατότητα εργασίας μέσω ενσωματωμένος σύνδεσμος. Το προφανές πλεονέκτημα είναι ότι με αυτό μπορείτε να αρνηθείτε πρόσθετη υποδομή (με τη μορφή συνδετήρα και Κάφκα). Ωστόσο, αυτή η λύση έχει καταργηθεί από την έκδοση 1.1 και δεν συνιστάται πλέον για χρήση (μπορεί να αφαιρεθεί σε μελλοντικές εκδόσεις).

Αυτό το άρθρο θα συζητήσει την αρχιτεκτονική που προτείνουν οι προγραμματιστές, η οποία παρέχει ανοχή σφαλμάτων και επεκτασιμότητα.

Διαμόρφωση συνδετήρα

Για να ξεκινήσουμε την παρακολούθηση αλλαγών στην πιο σημαντική τιμή - δεδομένα - χρειαζόμαστε:

  1. πηγή δεδομένων, η οποία μπορεί να είναι MySQL ξεκινώντας από την έκδοση 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (πλήρης κατάλογος);
  2. Συστάδα Απάτσι Κάφκα
  3. Περίπτωση Kafka Connect (εκδόσεις 1.x, 2.x).
  4. διαμορφωμένη υποδοχή Debezium.

Εργαστείτε στα δύο πρώτα σημεία, δηλ. Η διαδικασία εγκατάστασης του DBMS και του Apache Kafka ξεφεύγει από το πεδίο εφαρμογής του άρθρου. Ωστόσο, για όσους θέλουν να αναπτύξουν τα πάντα σε ένα sandbox, υπάρχει ένα έτοιμο στο επίσημο αποθετήριο με παραδείγματα docker-compose.yaml.

Θα σταθούμε αναλυτικότερα στα δύο τελευταία σημεία.

0. Kafka Connect

Εδώ και αργότερα στο άρθρο, όλα τα παραδείγματα διαμόρφωσης εξετάζονται στο πλαίσιο της εικόνας Docker που διανέμεται από τους προγραμματιστές του Debezium. Περιέχει όλα τα απαραίτητα αρχεία πρόσθετων (συνδέσεις) και παρέχει διαμόρφωση του Kafka Connect χρησιμοποιώντας μεταβλητές περιβάλλοντος.

Εάν σκοπεύετε να χρησιμοποιήσετε το Kafka Connect από το Confluent, θα χρειαστεί να προσθέσετε μόνοι σας τα πρόσθετα των απαραίτητων συνδέσεων στον κατάλογο που καθορίζεται στο plugin.path ή ρυθμίστε μέσω μιας μεταβλητής περιβάλλοντος CLASSPATH. Οι ρυθμίσεις για τον εργαζόμενο και τις συνδέσεις Kafka Connect καθορίζονται μέσω αρχείων διαμόρφωσης που μεταβιβάζονται ως ορίσματα στην εντολή έναρξης εργασίας. Για λεπτομέρειες βλ τεκμηρίωση.

Η όλη διαδικασία ρύθμισης του Debeizum στην έκδοση σύνδεσης πραγματοποιείται σε δύο στάδια. Ας δούμε καθένα από αυτά:

1. Ρύθμιση του πλαισίου Kafka Connect

Για τη ροή δεδομένων στο σύμπλεγμα Apache Kafka, ορίζονται συγκεκριμένες παράμετροι στο πλαίσιο Kafka Connect, όπως:

  • παραμέτρους για τη σύνδεση στο σύμπλεγμα,
  • ονόματα θεμάτων στα οποία θα αποθηκευτεί η διαμόρφωση της ίδιας της εφαρμογής σύνδεσης,
  • το όνομα της ομάδας στην οποία εκτελείται η εφαρμογή σύνδεσης (εάν χρησιμοποιείται η κατανεμημένη λειτουργία).

Η επίσημη εικόνα Docker του έργου υποστηρίζει διαμόρφωση με χρήση μεταβλητών περιβάλλοντος - αυτό θα χρησιμοποιήσουμε. Ας κατεβάσουμε λοιπόν την εικόνα:

docker pull debezium/connect

Το ελάχιστο σύνολο μεταβλητών περιβάλλοντος που απαιτούνται για την εκτέλεση του βύσματος είναι το εξής:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - αρχική λίστα διακομιστών συμπλέγματος Kafka για να λάβετε μια πλήρη λίστα μελών συμπλέγματος.
  • OFFSET_STORAGE_TOPIC=connector-offsets — ένα θέμα για την αποθήκευση θέσεων όπου βρίσκεται αυτή τη στιγμή ο σύνδεσμος·
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - ένα θέμα για την αποθήκευση της κατάστασης του συνδέσμου και των εργασιών του.
  • CONFIG_STORAGE_TOPIC=connector-config - ένα θέμα για την αποθήκευση δεδομένων διαμόρφωσης εφαρμογής σύνδεσης και των εργασιών του.
  • GROUP_ID=1 — αναγνωριστικό της ομάδας εργαζομένων στην οποία μπορεί να εκτελεστεί η εργασία σύνδεσης· απαιτείται κατά τη χρήση διανεμημένων (διανέμονται) τρόπος.

Ξεκινάμε το κοντέινερ με αυτές τις μεταβλητές:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Σημείωση για τον Avro

Από προεπιλογή, το Debezium εγγράφει δεδομένα σε μορφή JSON, η οποία είναι αποδεκτή για sandbox και μικρές ποσότητες δεδομένων, αλλά μπορεί να είναι πρόβλημα σε πολύ φορτωμένες βάσεις δεδομένων. Μια εναλλακτική λύση στον μετατροπέα JSON είναι η σειριοποίηση μηνυμάτων χρησιμοποιώντας Avro σε δυαδική μορφή, η οποία μειώνει το φορτίο στο υποσύστημα I/O στον Apache Kafka.

Για να χρησιμοποιήσετε το Avro, πρέπει να αναπτύξετε ένα ξεχωριστό σχήμα-μητρώο (για την αποθήκευση διαγραμμάτων). Οι μεταβλητές για τον μετατροπέα θα μοιάζουν με αυτό:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Λεπτομέρειες σχετικά με τη χρήση του Avro και τη δημιουργία μητρώου για αυτό δεν εμπίπτουν στο πεδίο εφαρμογής του άρθρου - επιπλέον, για λόγους σαφήνειας, θα χρησιμοποιήσουμε το JSON.

2. Ρύθμιση του ίδιου του βύσματος

Τώρα μπορείτε να μεταβείτε απευθείας στη διαμόρφωση της ίδιας της υποδοχής, η οποία θα διαβάζει δεδομένα από την πηγή.

Ας δούμε το παράδειγμα των συνδέσεων για δύο DBMS: PostgreSQL και MongoDB, στα οποία έχω εμπειρία και στα οποία υπάρχουν διαφορές (αν και μικρές, αλλά σε ορισμένες περιπτώσεις σημαντικές!).

Η διαμόρφωση περιγράφεται σε σημειογραφία JSON και μεταφορτώνεται στο Kafka Connect χρησιμοποιώντας ένα αίτημα POST.

2.1. PostgreSQL

Παράδειγμα διαμόρφωσης εφαρμογής σύνδεσης για PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Η αρχή λειτουργίας του συνδετήρα μετά από αυτή τη διαμόρφωση είναι αρκετά απλή:

  • Όταν εκκινείται για πρώτη φορά, συνδέεται με τη βάση δεδομένων που καθορίζεται στη διαμόρφωση και ξεκινά στη λειτουργία αρχικό στιγμιότυπο, στέλνοντας στον Κάφκα το αρχικό σύνολο δεδομένων που ελήφθησαν χρησιμοποιώντας την υπό όρους SELECT * FROM table_name.
  • Αφού ολοκληρωθεί η προετοιμασία, ο σύνδεσμος εισέρχεται στη λειτουργία ανάγνωσης αλλαγών από τα αρχεία PostgreSQL WAL.

Σχετικά με τις επιλογές που χρησιμοποιούνται:

  • name — το όνομα του συνδετήρα για τον οποίο χρησιμοποιείται η διαμόρφωση που περιγράφεται παρακάτω· στο μέλλον, αυτό το όνομα χρησιμοποιείται για να λειτουργεί με την εφαρμογή σύνδεσης (δηλαδή προβολή της κατάστασης / επανεκκίνηση / ενημέρωση της διαμόρφωσης) μέσω του Kafka Connect REST API.
  • connector.class — Κατηγορία σύνδεσης DBMS που θα χρησιμοποιηθεί από τη διαμορφωμένη υποδοχή σύνδεσης.
  • plugin.name είναι το όνομα του πρόσθετου για λογική αποκωδικοποίηση δεδομένων από αρχεία WAL. Διαθέσιμο για επιλογή wal2json, decoderbuffs и pgoutput. Τα δύο πρώτα απαιτούν την εγκατάσταση των κατάλληλων επεκτάσεων στο DBMS και pgoutput για PostgreSQL έκδοση 10 και νεότερη δεν απαιτεί πρόσθετους χειρισμούς.
  • database.* — επιλογές για σύνδεση στη βάση δεδομένων, όπου database.server.name - το όνομα της παρουσίας PostgreSQL που χρησιμοποιείται για να σχηματίσει το όνομα του θέματος στο σύμπλεγμα Kafka.
  • table.include.list - μια λίστα πινάκων στους οποίους θέλουμε να παρακολουθούμε τις αλλαγές. δίνεται στη μορφή schema.table_name; δεν μπορεί να χρησιμοποιηθεί μαζί με table.exclude.list;
  • heartbeat.interval.ms — διάστημα (σε χιλιοστά του δευτερολέπτου) με το οποίο ο σύνδεσμος στέλνει μηνύματα καρδιακού παλμού σε ένα ειδικό θέμα.
  • heartbeat.action.query - ένα αίτημα που θα εκτελείται κατά την αποστολή κάθε μηνύματος καρδιακού παλμού (η επιλογή έχει εμφανιστεί από την έκδοση 1.1).
  • slot.name — το όνομα της υποδοχής αναπαραγωγής που θα χρησιμοποιηθεί από τον σύνδεσμο·
  • publication.name - Ονομα Δημοσίευση σε PostgreSQL που χρησιμοποιεί η εφαρμογή σύνδεσης. Σε περίπτωση που δεν υπάρχει, το Debezium θα προσπαθήσει να το δημιουργήσει. Εάν ο χρήστης με τον οποίο πραγματοποιείται η σύνδεση δεν έχει αρκετά δικαιώματα για αυτήν την ενέργεια, η εφαρμογή σύνδεσης θα εξέλθει με σφάλμα.
  • transforms καθορίζει πώς ακριβώς να αλλάξετε το όνομα του θέματος στόχου:
    • transforms.AddPrefix.type υποδεικνύει ότι θα χρησιμοποιήσουμε κανονικές εκφράσεις.
    • transforms.AddPrefix.regex — μάσκα με την οποία επαναπροσδιορίζεται το όνομα του θέματος-στόχου·
    • transforms.AddPrefix.replacement - άμεσα τι επαναπροσδιορίζουμε.

Περισσότερα για τον καρδιακό παλμό και τις μεταμορφώσεις

Από προεπιλογή, η εφαρμογή σύνδεσης στέλνει δεδομένα στον Kafka για κάθε δεσμευμένη συναλλαγή και εγγράφει το LSN (Αριθμός ακολουθίας καταγραφής) στο θέμα της υπηρεσίας offset. Τι συμβαίνει όμως εάν η εφαρμογή σύνδεσης έχει ρυθμιστεί να διαβάζει όχι ολόκληρη τη βάση δεδομένων, αλλά μόνο μέρος των πινάκων της (στους οποίους τα δεδομένα ενημερώνονται σπάνια);

  • Ο σύνδεσμος θα διαβάζει αρχεία WAL και δεν θα ανιχνεύει δεσμεύσεις συναλλαγών σε αυτά στους πίνακες που παρακολουθεί.
  • Επομένως, δεν θα ενημερώσει την τρέχουσα θέση του ούτε στο θέμα ούτε στην υποδοχή αναπαραγωγής.
  • Αυτό, με τη σειρά του, θα έχει ως αποτέλεσμα τα αρχεία WAL να «κολλήσουν» στο δίσκο και πιθανότατα θα εξαντληθεί ο χώρος στο δίσκο.

Και εδώ οι επιλογές έρχονται στη διάσωση. heartbeat.interval.ms и heartbeat.action.query. Η χρήση αυτών των επιλογών ανά ζεύγη καθιστά δυνατή την εκτέλεση ενός αιτήματος για αλλαγή δεδομένων σε ξεχωριστό πίνακα κάθε φορά που αποστέλλεται ένα μήνυμα καρδιακού παλμού. Έτσι, το LSN στο οποίο βρίσκεται ο σύνδεσμος (στην υποδοχή αναπαραγωγής) ενημερώνεται συνεχώς. Αυτό επιτρέπει στο DBMS να αφαιρεί αρχεία WAL που δεν χρειάζονται πλέον. Για περισσότερες πληροφορίες σχετικά με τον τρόπο λειτουργίας των επιλογών, βλ τεκμηρίωση.

Μια άλλη επιλογή που αξίζει περισσότερη προσοχή είναι transforms. Αν και αφορά περισσότερο την ευκολία και την ομορφιά...

Από προεπιλογή, το Debezium δημιουργεί θέματα χρησιμοποιώντας την ακόλουθη πολιτική ονομασίας: serverName.schemaName.tableName. Αυτό μπορεί να μην είναι πάντα βολικό. Επιλογές transforms χρησιμοποιώντας κανονικές εκφράσεις, μπορείτε να ορίσετε μια λίστα πινάκων των οποίων τα συμβάντα πρέπει να δρομολογηθούν σε ένα θέμα με συγκεκριμένο όνομα.

Στη διαμόρφωσή μας χάρη σε transforms συμβαίνει το εξής: όλα τα συμβάντα CDC από την παρακολουθούμενη βάση δεδομένων θα μεταβούν στο θέμα με το όνομα data.cdc.dbname. Διαφορετικά (χωρίς αυτές τις ρυθμίσεις), το Debezium θα δημιουργούσε από προεπιλογή ένα θέμα για κάθε πίνακα της φόρμας: pg-dev.public.<table_name>.

Περιορισμοί σύνδεσης

Στο τέλος της περιγραφής της διαμόρφωσης σύνδεσης για το PostgreSQL, αξίζει να μιλήσουμε για τα ακόλουθα χαρακτηριστικά / περιορισμούς της εργασίας του:

  1. Η λειτουργία σύνδεσης για το PostgreSQL βασίζεται στην έννοια της λογικής αποκωδικοποίησης. Επομένως αυτός δεν παρακολουθεί αιτήματα για αλλαγή της δομής της βάσης δεδομένων (DDL) - κατά συνέπεια, αυτά τα δεδομένα δεν θα είναι στα θέματα.
  2. Εφόσον χρησιμοποιούνται υποδοχές αναπαραγωγής, η σύνδεση του βύσματος είναι δυνατή μόνο στην κύρια παρουσία DBMS.
  3. Εάν ο χρήστης από τον οποίο συνδέεται η εφαρμογή σύνδεσης στη βάση δεδομένων έχει δικαιώματα μόνο για ανάγνωση, τότε πριν από την πρώτη εκκίνηση θα χρειαστεί να δημιουργήσετε με μη αυτόματο τρόπο μια υποδοχή αναπαραγωγής και να δημοσιεύσετε στη βάση δεδομένων.

Εφαρμογή διαμόρφωσης

Ας φορτώσουμε λοιπόν τη διαμόρφωσή μας στον σύνδεσμο:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Ελέγχουμε ότι η λήψη ήταν επιτυχής και ότι η εφαρμογή σύνδεσης ξεκίνησε:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Υπέροχο: έχει ρυθμιστεί και είναι έτοιμο. Τώρα ας προσποιηθούμε ότι είμαστε καταναλωτής και ας συνδεθούμε με τον Κάφκα, μετά από τον οποίο προσθέτουμε και αλλάζουμε μια καταχώρηση στον πίνακα:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Στο θέμα μας θα εμφανίζεται ως εξής:

Πολύ μεγάλο JSON με τις αλλαγές μας

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Και στις δύο περιπτώσεις, οι εγγραφές αποτελούνται από το κλειδί (PK) της εγγραφής που άλλαξε, και την ίδια την ουσία των αλλαγών: τι ήταν η εγγραφή πριν και τι έγινε μετά.

  • Στην περίπτωση του INSERT: αξία πριν (before) ισοδυναμεί nullακολουθούμενη από τη συμβολοσειρά που εισήχθη.
  • Στην περίπτωση του UPDATE: στο payload.before εμφανίζεται η προηγούμενη κατάσταση της γραμμής και μέσα payload.after — νέο με την ουσία των αλλαγών.

2.2 MongoDB

Αυτή η σύνδεση χρησιμοποιεί τον τυπικό μηχανισμό αναπαραγωγής MongoDB, διαβάζοντας πληροφορίες από το oplog του κύριου κόμβου DBMS.

Παρόμοια με την ήδη περιγραφείσα σύνδεση για το PgSQL, και εδώ, κατά την πρώτη εκκίνηση, λαμβάνεται το πρωτεύον στιγμιότυπο δεδομένων, μετά το οποίο ο σύνδεσμος μεταβαίνει σε λειτουργία ανάγνωσης αποσύνδεσης.

Παράδειγμα διαμόρφωσης:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Όπως μπορείτε να δείτε, δεν υπάρχουν νέες επιλογές σε σύγκριση με το προηγούμενο παράδειγμα, αλλά μόνο ο αριθμός των επιλογών που είναι υπεύθυνες για τη σύνδεση στη βάση δεδομένων και τα προθέματά τους έχει μειωθεί.

Ρυθμίσεις transforms αυτή τη φορά κάνουν τα εξής: γυρίστε το όνομα του θέματος στόχου από το σχήμα <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

ανοχή σε σφάλματα

Το ζήτημα της ανοχής σφαλμάτων και της υψηλής διαθεσιμότητας στην εποχή μας είναι πιο οξύ από ποτέ - ειδικά όταν μιλάμε για δεδομένα και συναλλαγές και η παρακολούθηση αλλαγών δεδομένων δεν είναι στο περιθώριο σε αυτό το θέμα. Ας δούμε τι μπορεί να πάει στραβά καταρχήν και τι θα συμβεί με το Debezium σε κάθε περίπτωση.

Υπάρχουν τρεις επιλογές εξαίρεσης:

  1. Αποτυχία Kafka Connect. Εάν το Connect έχει ρυθμιστεί ώστε να λειτουργεί σε κατανεμημένη λειτουργία, αυτό απαιτεί από πολλούς εργαζόμενους να ορίσουν το ίδιο group.id. Στη συνέχεια, εάν αποτύχει ένα από αυτά, ο σύνδεσμος θα επανεκκινηθεί στον άλλο εργαζόμενο και θα συνεχίσει την ανάγνωση από την τελευταία δεσμευμένη θέση στο θέμα στον Κάφκα.
  2. Απώλεια συνδεσιμότητας με το σύμπλεγμα Κάφκα. Ο σύνδεσμος απλώς θα σταματήσει να διαβάζει στη θέση που απέτυχε να στείλει στον Κάφκα και περιοδικά θα προσπαθήσει να το στείλει ξανά μέχρι να πετύχει η προσπάθεια.
  3. Η πηγή δεδομένων δεν είναι διαθέσιμη. Ο σύνδεσμος θα προσπαθήσει να επανασυνδεθεί στην πηγή σύμφωνα με τη διαμόρφωση. Η προεπιλογή είναι 16 προσπάθειες χρήσης εκθετική υποχώρηση. Μετά την 16η αποτυχημένη προσπάθεια, η εργασία θα επισημανθεί ως απέτυχε και θα χρειαστεί να γίνει μη αυτόματη επανεκκίνηση μέσω της διεπαφής Kafka Connect REST.
    • Στην περίπτωση του PostgreSQL τα δεδομένα δεν θα χαθούν, γιατί Η χρήση υποδοχών αναπαραγωγής θα σας εμποδίσει να διαγράψετε αρχεία WAL που δεν διαβάζονται από την εφαρμογή σύνδεσης. Σε αυτήν την περίπτωση, υπάρχει επίσης ένα μειονέκτημα του νομίσματος: εάν η συνδεσιμότητα δικτύου μεταξύ της υποδοχής και του DBMS διακοπεί για μεγάλο χρονικό διάστημα, υπάρχει πιθανότητα να εξαντληθεί ο χώρος στο δίσκο και αυτό μπορεί να οδηγήσει σε αστοχία ολόκληρο το DBMS.
    • Στην περίπτωση του MySQL Τα αρχεία binlog μπορούν να περιστραφούν από το ίδιο το DBMS πριν αποκατασταθεί η συνδεσιμότητα. Αυτό θα έχει ως αποτέλεσμα η σύνδεση να μεταβεί στην αποτυχημένη κατάσταση και για να επαναφέρετε την κανονική λειτουργία, θα χρειαστεί να κάνετε επανεκκίνηση στην αρχική λειτουργία στιγμιότυπου για να συνεχίσετε την ανάγνωση από τα binlogs.
    • επί MongoDB. Η τεκμηρίωση λέει: η συμπεριφορά της εφαρμογής σύνδεσης σε περίπτωση που τα αρχεία καταγραφής/oplog έχουν διαγραφεί και η εφαρμογή σύνδεσης δεν μπορεί να συνεχίσει την ανάγνωση από τη θέση όπου σταμάτησε είναι η ίδια για όλα τα DBMS. Σημαίνει ότι ο σύνδεσμος θα μεταβεί στην κατάσταση απέτυχε και θα απαιτήσει επανεκκίνηση στη λειτουργία αρχικό στιγμιότυπο.

      Ωστόσο, υπάρχουν και εξαιρέσεις. Εάν ο σύνδεσμος αποσυνδέθηκε για μεγάλο χρονικό διάστημα (ή δεν μπορούσε να φτάσει στην παρουσία του MongoDB) και το oplog πέρασε από περιστροφή κατά τη διάρκεια αυτού του χρόνου, τότε όταν αποκατασταθεί η σύνδεση, ο σύνδεσμος θα συνεχίσει ήρεμα να διαβάζει δεδομένα από την πρώτη διαθέσιμη θέση, γι' αυτό και μερικά από τα δεδομένα στον Κάφκα όχι θα χτυπήσει.

Συμπέρασμα

Το Debezium είναι η πρώτη μου εμπειρία με συστήματα CDC και ήταν πολύ θετικό συνολικά. Το έργο δωροδοκούσε την υποστήριξη του κύριου DBMS, την ευκολία διαμόρφωσης, την υποστήριξη για ομαδοποίηση και μια ενεργή κοινότητα. Για όσους ενδιαφέρονται για πρακτική, συνιστώ να διαβάσουν τους οδηγούς για Kafka Connect и Debezium.

Σε σύγκριση με την υποδοχή JDBC για το Kafka Connect, το κύριο πλεονέκτημα του Debezium είναι ότι οι αλλαγές διαβάζονται από τα αρχεία καταγραφής του DBMS, γεγονός που επιτρέπει τη λήψη δεδομένων με ελάχιστο λανθάνοντα χρόνο. Η εφαρμογή σύνδεσης JDBC (από το Kafka Connect) υποβάλλει ερωτήματα στον πίνακα παρακολούθησης σε ένα σταθερό διάστημα και (για τον ίδιο λόγο) δεν δημιουργεί μηνύματα όταν διαγράφονται τα δεδομένα (πώς μπορείτε να ρωτήσετε δεδομένα που δεν υπάρχουν;).

Για να λύσετε παρόμοια προβλήματα, μπορείτε να δώσετε προσοχή στις ακόλουθες λύσεις (εκτός από το Debezium):

PS

Διαβάστε επίσης στο blog μας:

Πηγή: www.habr.com

Προσθέστε ένα σχόλιο