In u mo travagliu, sò spessu scontru novi suluzioni tecniche / prudutti di software, infurmazione nantu à quale hè piuttostu scarsa in Internet di lingua russa. Cù questu articulu, pruvaraghju à cumpiendu una tale lacuna cù un esempiu da a mo pratica recente, quandu avia bisognu di stabilisce l'inviu di l'avvenimenti CDC da dui DBMS populari (PostgreSQL è MongoDB) à un cluster Kafka cù Debezium. Spergu chì questu articulu di rivista, chì hè apparsu per u risultatu di u travagliu fattu, serà utile à l'altri.
Chì ghjè Debezium è CDC in generale?
Debezium - Rappresentante di a categuria di software CDC (Capture cambiamenti di dati), o più precisamente, hè un inseme di connettori per diversi DBMS chì sò cumpatibili cù u framework Apache Kafka Connect.
issu prughjettu open source, licenziatu sottu a Licenza Apache v2.0 è sponsorizatu da Red Hat. U sviluppu hè in corso da 2016 è à u mumentu furnisce supportu ufficiale per i seguenti DBMS: MySQL, PostgreSQL, MongoDB, SQL Server. Ci sò ancu connettori per Cassandra è Oracle, ma sò attualmente in u statu di "accessu anticipatu", è e novi versioni ùn guarantisci micca a cumpatibilità retroattiva.
Se paragunemu CDC cù l'approcciu tradiziunale (quandu l'applicazione leghje direttamente i dati da u DBMS), allora i so vantaghji principali includenu l'implementazione di u cambiamentu di dati in streaming à u livellu di fila cù bassa latenza, alta affidabilità è dispunibilità. L'ultimi dui punti sò ottenuti utilizendu un cluster Kafka cum'è repository per l'avvenimenti CDC.
Inoltre, i vantaghji includenu u fattu chì un unicu mudellu hè utilizatu per almacenà l'avvenimenti, cusì l'applicazione finale ùn deve micca preoccupatu di i sfumaturi di uperà diversi DBMS.
Infine, l'usu di un broker di messagiu apre u scopu per a scala horizontale di l'applicazioni chì seguitanu i cambiamenti di dati. À u listessu tempu, l'impattu nantu à a fonte di dati hè minimizatu, postu chì i dati ùn sò micca ricevuti direttamente da u DBMS, ma da u cluster Kafka.
À propositu di l'architettura Debezium
Utilizà Debezium si riduce à questu schema simplice:
DBMS (cum'è fonte di dati) → connettore in Kafka Connect → Apache Kafka → cunsumadore
Comu illustrazione, daraghju un diagramma da u situ web di u prugettu:
In ogni casu, ùn mi piace micca veramente stu schema, perchè pare chì solu un connettore di lavabo hè pussibule.
In realità, a situazione hè diversa: riempia u vostru Data Lake (ultimu ligame in u diagramma sopra) Ùn hè micca l'unicu modu per aduprà Debezium. Avvenimenti mandati à Apache Kafka ponu esse utilizati da e vostre applicazioni per trattà cù diverse situazioni. Per esempiu:
eliminazione di dati irrilevanti da a cache;
invià notificazioni;
l'aghjurnamenti di l'indici di ricerca;
qualchì tipu di logs di auditu;
...
In casu chì avete una applicazione Java è ùn ci hè micca bisognu / pussibilità di utilizà un cluster Kafka, ci hè ancu a pussibilità di travaglià. cunnessu integratu. U plus evidenti hè chì cun ellu pudete ricusà infrastruttura supplementaria (in a forma di un connector è Kafka). Tuttavia, sta suluzione hè stata deprecata da a versione 1.1 è ùn hè più cunsigliatu per l'usu (pò esse sguassatu in versioni future).
Questu articulu discuterà l'architettura cunsigliata da i sviluppatori, chì furnisce a tolleranza di difetti è scalabilità.
Cunfigurazione di u cunnessu
Per cumincià à seguità i cambiamenti in u valore più impurtante - dati - avemu bisognu:
fonte di dati, chì pò esse MySQL à partesi da a versione 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (lista completa);
cluster Apache Kafka
Kafka Connect instance (versioni 1.x, 2.x);
cunfiguratu cunnessu Debezium.
U travagliu nantu à i primi dui punti, i.e. u prucessu di stallà un DBMS è Apache Kafka sò fora di u scopu di l'articulu. Tuttavia, per quelli chì volenu implementà tuttu in un sandbox, ci hè un ready-made in u repositoriu ufficiale cù esempi. docker-compose.yaml.
Fighjemu nantu à l'ultimi dui punti in più detail.
0. Kafka Cunnette
Quì è dopu in l'articulu, tutti l'esempii di cunfigurazione sò cunsiderati in u cuntestu di l'imaghjini Docker distribuitu da i sviluppatori Debezium. Contene tutti i fugliali plugin necessarii (connettori) è furnisce a cunfigurazione di Kafka Connect utilizendu variabili di l'ambiente.
Se avete intenzione di utilizà Kafka Connect da Confluent, avete bisognu di aghjunghje i plugins di i connettori necessarii stessu à u repertoriu specificatu in plugin.path o stabilitu via una variabile d'ambiente CLASSPATH. I paràmetri per u travagliu Kafka Connect è i connettori sò definiti per i schedarii di cunfigurazione chì sò passati cum'è argumenti à u cumandamentu di l'iniziu di u travagliu. Per i dettagli, vede ducumentazione.
Tuttu u prucessu di stallà Debeizum in a versione di cunnessu hè realizatu in duie tappe. Pensemu à ognunu di elli:
1. Stallà u framework Kafka Connect
Per trasmette dati à un cluster Apache Kafka, paràmetri specifichi sò stabiliti in u framework Kafka Connect, cum'è:
paràmetri di cunnessione di cluster,
nomi di temi in quale a cunfigurazione di u cunnessu stessu serà guardatu,
u nome di u gruppu in quale u connettore hè in esecuzione (in casu di utilizà u modu distribuitu).
L'imaghjini ufficiali di Docker di u prugettu sustene a cunfigurazione cù variabili di l'ambiente - questu hè ciò chì useremu. Allora scarichemu l'imaghjini:
docker pull debezium/connect
U settore minimu di variabili ambientali necessarii per eseguisce u connettore hè u seguitu:
BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - lista iniziale di i servitori di cluster Kafka per ottene una lista completa di i membri di u cluster;
OFFSET_STORAGE_TOPIC=connector-offsets - un tema per almacenà e pusizioni induve u connettore hè attualmente situatu;
CONNECT_STATUS_STORAGE_TOPIC=connector-status - un tema per almacenà u statutu di u connettore è e so funzioni;
CONFIG_STORAGE_TOPIC=connector-config - un tema per almacenà e dati di cunfigurazione di u connettore è e so funzioni;
GROUP_ID=1 - identificatore di u gruppu di travagliadori nantu à quale u compitu di cunnessione pò esse eseguitu; necessariu quandu si usa distribuitu (distribuitu) regime.
Per automaticamente, Debezium scrive dati in formatu JSON, chì hè accettatu per sandboxes è picculi quantità di dati, ma pò esse un prublema in basa di dati assai caricati. Un'alternativa à u cunvertitore JSON hè di serializà i missaghji usendu Avro à un furmatu binariu, chì riduce a carica nantu à u subsistema I / O in Apache Kafka.
Per utilizà Avro, avete bisognu di implementà un separatu schema-registru (per almacenà schemi). I variàbili per u cunvertitore pareranu cusì:
I dettagli nantu à l'usu di Avro è a creazione di un registru per questu sò fora di u scopu di l'articulu - in più, per a chiarezza, useremu JSON.
2. Stabbilimentu di u cunnessu stessu
Avà pudete andà direttamente à a cunfigurazione di u cunnessu stessu, chì leghje e dati da a fonte.
Fighjemu l'esempiu di connettori per dui DBMS: PostgreSQL è MongoDB, per quale aghju sperienza è per quale ci sò diffirenzii (anche chjuchi, ma in certi casi significativu!).
A cunfigurazione hè descritta in notazione JSON è caricata à Kafka Connect utilizendu una dumanda POST.
2.1. PostgreSQL
Esempiu di cunfigurazione di cunnessu per PostgreSQL:
U principiu di funziunamentu di u connector dopu sta cunfigurazione hè abbastanza sèmplice:
À u primu principiu, si cunnetta à a basa di dati specificata in a cunfigurazione è principia in u modu snapshot iniziale, mandendu à Kafka l'inseme iniziale di dati ricivutu cù a cundizzioni SELECT * FROM table_name.
Dopu chì l'inizializazione hè finita, u connettore entra in u modu di leghje i cambiamenti da i schedarii WAL PostgreSQL.
Circa l'opzioni aduprate:
name - u nome di u connettore per quale hè aduprata a cunfigurazione descritta quì sottu; in u futuru, stu nome hè adupratu per travaglià cù u connector (vale à dì vede u statu / riavvia / aghjurnà a cunfigurazione) attraversu l'API REST Kafka Connect;
connector.class - a classe di cunnessu DBMS chì serà utilizatu da u connettore cunfiguratu;
plugin.name hè u nome di u plugin per a decodificazione logica di dati da i schedari WAL. Disponibile à sceglie wal2json, decoderbuffs и pgoutput. I primi dui necessitanu a stallazione di l'estensioni appropritate in u DBMS, è pgoutput per a versione PostgreSQL 10 è superiore ùn hè micca bisognu di manipulazioni supplementari;
database.* - opzioni per cunnette à a basa di dati, induve database.server.name - u nome di l'istanza PostgreSQL utilizatu per furmà u nome di u tema in u cluster Kafka;
table.include.list - una lista di tavule in quale vulemu seguità i cambiamenti; datu in u furmatu schema.table_name; ùn pò esse usatu inseme cù table.exclude.list;
heartbeat.interval.ms - intervallu (in millisecondi) cù quale u connettore manda messagi di u battutu à un tema speciale;
heartbeat.action.query - una dumanda chì serà eseguita quandu invià ogni missaghju di battitu di cori (l'opzione hè apparsa da a versione 1.1);
slot.name - u nome di u slot di replicazione chì serà utilizatu da u connettore;
publication.name - Nome publicazioni in PostgreSQL chì u connettore usa. In casu ùn esiste micca, Debezium pruverà à creà. Se l'utilizatore sottu à quale a cunnessione hè fatta ùn hà micca abbastanza diritti per questa azione, u connettore esce cun un errore;
transforms determina esattamente cumu cambià u nome di u tema di destinazione:
transforms.AddPrefix.regex - maschera da quale u nome di u tema di destinazione hè ridefinitu;
transforms.AddPrefix.replacement - direttamente ciò chì avemu ridefinitu.
Più nantu à u battitu di u core è e trasformazioni
Per automaticamente, u connettore manda dati à Kafka per ogni transazzione impegnata, è scrive u so LSN (Log Sequence Number) à u tema di serviziu. offset. Ma chì succede se u connettore hè cunfiguratu per leghje micca tutta a basa di dati, ma solu una parte di e so tavule (in quale a data hè aghjurnata pocu frequente)?
U connettore leghje i fugliali WAL è ùn rileva micca transazzione cumminciate in elli à e tavule chì monitoreghja.
Dunque, ùn aghjurnà micca a so pusizione attuale nè in u tema nè in u slot di replicazione.
Questu, à u turnu, pruvucarà i schedari WAL per esse "stuck" in u discu è prubabilmente escerà u spaziu di discu.
E quì l'opzioni venenu in salvezza. heartbeat.interval.ms и heartbeat.action.query. L'usu di queste opzioni in coppie permette di eseguisce una dumanda di cambià dati in una tavola separata ogni volta chì un missaghju di battitu di cori hè mandatu. Cusì, u LSN nantu à quale u connettore hè attualmente situatu (in u slot di replicazione) hè constantemente aghjurnatu. Questu permette à u DBMS di sguassà i schedari WAL chì ùn sò più necessarii. Per più infurmazione nantu à cumu funziona l'opzioni, vede ducumentazione.
Un'altra opzione chì merita più attenzione hè transforms. Ancu s'ellu si tratta più di comodità è bellezza ...
Per automaticamente, Debezium crea temi utilizendu a seguente pulitica di nomi: serverName.schemaName.tableName. Questu pò micca sempre esse convenientu. Opzioni transforms usendu l'espressioni regulare, pudete definisce una lista di tavule chì l'avvenimenti anu da esse instradati à un tema cun un nome specificu.
In a nostra cunfigurazione grazia à transforms succede u seguente: tutti l'avvenimenti CDC da a basa di dati tracciati andaranu à u tema cù u nome data.cdc.dbname. Altrimenti (senza sti paràmetri), Debezium crea per automaticamente un tema per ogni tavula di a forma: pg-dev.public.<table_name>.
Limitazioni di u cunnessu
À a fine di a descrizzione di a cunfigurazione di u connector per PostgreSQL, vale a pena parlà di e seguenti caratteristiche / limitazioni di u so travagliu:
A funziunalità di u connector per PostgreSQL si basa in u cuncettu di decodificazione logica. Dunque ellu ùn traccia micca e dumande per cambià a struttura di a basa di dati (DDL) - per quessa, sta dati ùn saranu micca in i temi.
Siccomu i slot di replicazione sò usati, a cunnessione di u connettore hè pussibule solu à l'istanza DBMS maestru.
Se l'utilizatore sottu quale u cunnessu si cunnetta à a basa di dati hà diritti di sola lettura, allora prima di u primu lanciu, avete bisognu di creà manualmente un slot di replicazione è publicà à a basa di dati.
Applicà una cunfigurazione
Allora carchemu a nostra cunfigurazione in u connettore:
In i dui casi, i registri sò custituiti da a chjave (PK) di u record chì hè stata cambiata, è l'essenza stessa di i cambiamenti: ciò chì u record era prima è ciò chì hè diventatu dopu.
In u casu di INSERT: valore prima (before) uguali nullseguita da a stringa chì hè stata inserita.
In u casu di UPDATE: à payload.before u statu precedente di a fila hè visualizatu, è in payload.after - novu cù l'essenza di u cambiamentu.
2.2 MongoDB
Stu connettore usa u mecanismu di replicazione standard di MongoDB, leghjendu l'infurmazioni da l'oplog di u nodu primariu DBMS.
In modu simile à u connettore digià descrittu per PgSQL, ancu quì, à u primu iniziu, l'istantanea di dati primariu hè presa, dopu chì u connettore cambia à u modu di lettura oplog.
Comu pudete vede, ùn ci sò micca novi opzioni cumparatu cù l'esempiu precedente, ma solu u numeru di l'opzioni rispunsevuli di cunnette à a basa di dati è i so prefissi hè stata ridutta.
Settings transforms sta volta facenu i seguenti: turnà u nome di u tema di destinazione da u schema <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.
tolleranza à i difetti
U prublema di a tolleranza di difetti è l'alta dispunibilità in u nostru tempu hè più agutu chè mai - soprattuttu quandu parlemu di dati è transazzione, è u seguimentu di u cambiamentu di dati ùn hè micca in latu in questa materia. Fighjemu ciò chì pò sbaglià in principiu è ciò chì succede à Debezium in ogni casu.
Ci sò trè opzioni di opt-out:
Kafka Connect fallimentu. Se Connect hè cunfiguratu per travaglià in modu distribuitu, questu richiede parechji travagliadori per stabilisce u stessu group.id. Allora, se unu di elli falla, u connettore serà riavviatu nantu à l'altru travagliadore è cuntinueghja a leghje da l'ultima pusizioni impegnata in u tema in Kafka.
Perdita di cunnessione cù u cluster Kafka. U connettore smetterà solu di leghje à a pusizione chì hà fiascatu di mandà à Kafka è pruvate periodicamente di rinviallu finu à chì u tentativu riesce.
A fonte di dati ùn hè micca dispunibule. U connettore pruverà à ricunniscendu à a fonte secondu a cunfigurazione. U predefinitu hè 16 tentativi di usu backoff esponenziale. Dopu à u 16th tentativu fallutu, u compitu serà marcatu cum'è hà fiascatu è deve esse riavviatu manualmente via l'interfaccia REST Kafka Connect.
In u casu di PostgreSQL dati ùn sarà persu, perchè L'usu di slot di replicazione impedisce l'eliminazione di i fugliali WAL chì ùn sò micca letti da u connettore. In questu casu, ci hè un inconveniente: se a cunnessione di a rete trà u cunnessu è u DBMS hè disturbata per un bellu pezzu, ci hè una chance chì u spaziu di discu si esce, è questu pò purtà à u fallimentu di tuttu u DBMS.
In u casu di MySQL i schedari binlog ponu esse rotati da u DBMS stessu prima chì a cunnessione hè restaurata. Questu pruvucarà u connettore per andà in u statu fallutu, è hà da esse riavviatu in u modu di snapshot iniziale per cuntinuà a leghje da i binlogs per restaurà u funziunamentu normale.
nantu MongoDB. A documentazione dice: u cumpurtamentu di u connector in casu chì i schedarii di log / oplog sò stati sguassati è u connector ùn pò micca cuntinuà à leghje da a pusizione induve l'abbandunò hè u listessu per tutti i DBMS. Si trova in u fattu chì u connector andarà in u statu hà fiascatu è averà bisognu di un riavviu in u modu snapshot iniziale.
Tuttavia, ci sò eccezzioni. Se u connettore era in un statu disconnected per un bellu pezzu (o ùn pudia micca ghjunghje à l'istanza MongoDB), è oplog hè stata rotata durante stu tempu, allora quandu a cunnessione hè restaurata, u connettore cuntinuerà tranquillamente à leghje e dati da a prima pusizioni dispunibule. , chì hè per quessa alcuni di i dati in Kafka ùn batterà.
cunchiusioni
Debezium hè a mo prima sperienza cù i sistemi CDC è hè statu assai pusitivu in generale. U prughjettu bribed u sustegnu di u DBMS principale, facilità di cunfigurazione, supportu per clustering è una cumunità attiva. Per quelli chì anu interessatu in a pratica, vi cunsigliu di leghje e guide per Kafka Connect и Debezium.
Comparatu à u connector JDBC per Kafka Connect, u vantaghju principale di Debezium hè chì i cambiamenti sò leghjiti da i logs DBMS, chì permette à e dati per esse ricivutu cù ritardu minimu. U Connettore JDBC (furnitu da Kafka Connect) interrogà a tavula tracciata à un intervallu fissu è (per a listessa ragione) ùn genera micca missaghji quandu i dati sò sguassati (cumu pudete dumandà a dati chì ùn sò micca quì?).
Per risolve prublemi simili, pudete attentu à e seguenti suluzioni (in più di Debezium):