Yntroduksje fan Debezium - CDC foar Apache Kafka

Yntroduksje fan Debezium - CDC foar Apache Kafka

Yn myn wurk kom ik faak nije technyske oplossingen/softwareprodukten tsjin, dêr't ynformaasje oer op it Russysktalige ynternet nochal min is. Mei dit artikel sil ik besykje ien sa'n gat te foljen mei in foarbyld út myn resinte praktyk, doe't ik it ferstjoeren fan CDC-eveneminten moast konfigurearje fan twa populêre DBMS's (PostgreSQL en MongoDB) nei in Kafka-kluster mei Debezium. Ik hoopje dat dit resinsje-artikel, dat ferskynt as gefolch fan it dien wurk, nuttich sil wêze foar oaren.

Wat is Debezium en CDC yn 't algemien?

Debezium - fertsjintwurdiger fan 'e kategory CDC software (Capture Data Change), of krekter, it is in set fan Anschlüsse foar ferskate DBMS's kompatibel mei it Apache Kafka Connect-ramt.

dizze Open Source projekt, lisinsje ûnder de Apache License v2.0 en sponsore troch Red Hat. Untwikkeling is oanhâldend sûnt 2016 en op it stuit biedt it offisjele stipe foar de folgjende DBMS's: MySQL, PostgreSQL, MongoDB, SQL Server. D'r binne ek ferbiningen foar Cassandra en Oracle, mar op it stuit binne se yn 'e status fan' iere tagong ', en nije releases garandearje gjin efterkompatibiliteit.

As wy CDC fergelykje mei de tradisjonele oanpak (as de applikaasje direkt gegevens fan 'e DBMS lêst), omfetsje har wichtichste foardielen de ymplemintaasje fan streaming fan gegevensferoarings op it rigelnivo mei lege latency, hege betrouberens en beskikberens. De lêste twa punten wurde berikt troch it brûken fan in Kafka-kluster as repository foar CDC-eveneminten.

In oar foardiel is it feit dat ien model wurdt brûkt om eveneminten op te slaan, sadat de einapplikaasje gjin soargen hoecht te meitsjen oer de nuânses fan it operearjen fan ferskate DBMS's.

As lêste, mei it brûken fan in berjochtmakelaar kinne applikaasjes dy't wizigingen yn gegevens kontrolearje, horizontaal skaalje. Tagelyk wurdt de ynfloed op 'e gegevensboarne minimalisearre, om't de gegevens net direkt fan' e DBMS krije, mar fan 'e Kafka-kluster.

Oer de Debezium-arsjitektuer

It brûken fan Debezium komt del op dit ienfâldige skema:

DBMS (as gegevensboarne) → ferbiner yn Kafka Connect → Apache Kafka → konsumint

As yllustraasje is hjir in diagram fan 'e projektwebside:

Yntroduksje fan Debezium - CDC foar Apache Kafka

Ik hâld lykwols net echt fan dit skema, om't it liket dat allinich it gebrûk fan in sinkferbining mooglik is.

Yn werklikheid is de situaasje oars: jo Data Lake ynfolje (lêste keppeling yn it diagram hjirboppe) Dit is net de ienige manier om Debezium te brûken. Eveneminten stjoerd nei Apache Kafka kinne wurde brûkt troch jo applikaasjes om in ferskaat oan situaasjes te behanneljen. Bygelyks:

  • fuortsmite irrelevante gegevens út de cache;
  • ferstjoeren fan notifikaasjes;
  • sykje yndeks updates;
  • in soarte fan kontrôle logs;
  • ...

As jo ​​​​in Java-applikaasje hawwe en d'r gjin need/mooglikheid is om in Kafka-kluster te brûken, is d'r ek de mooglikheid om troch te wurkjen ynbêde-ferbining. De foar de hân lizzende foardiel is dat it elimineert de needsaak foar ekstra ynfrastruktuer (yn 'e foarm fan in ferbining en Kafka). Dizze oplossing is lykwols ôfkard sûnt ferzje 1.1 en wurdt net mear oanrikkemandearre foar gebrûk (stipe dêrfoar kin yn takomstige releases fuortsmiten wurde).

Dit artikel sil beprate de arsjitektuer oanrikkemandearre troch ûntwikkelders, dy't soarget foar fout tolerânsje en scalability.

Connector konfiguraasje

Om wizigingen te folgjen yn 'e wichtichste wearde - gegevens - hawwe wy nedich:

  1. gegevensboarne, dy't MySQL kin wêze fanôf ferzje 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (folsleine list);
  2. Apache Kafka kluster;
  3. Kafka Connect eksimplaar (ferzjes 1.x, 2.x);
  4. ynstelde Debezium ferbiner.

Wurkje oan de earste twa punten, d.w.s. It ynstallaasjeproses fan 'e DBMS en Apache Kafka is bûten it berik fan it artikel. Foar dyjingen dy't lykwols alles yn 'e sânbak wolle ynsette, hat it offisjele repository mei foarbylden in ready-made docker-compose.yaml.

Op de lêste twa punten sille wy yn mear detail yngean.

0. Kafka Ferbine

Hjir en fierder yn it artikel wurde alle konfiguraasjefoarbylden besprutsen yn 'e kontekst fan' e Docker-ôfbylding ferspraat troch de Debezium-ûntwikkelders. It befettet alle nedige plugin-bestannen (ferbinings) en biedt konfiguraasje fan Kafka Connect mei help fan omjouwingsfariabelen.

As jo ​​​​fan doel binne Kafka Connect te brûken fan Confluent, moatte jo de plugins fan 'e nedige ferbiningen selsstannich tafoegje oan 'e map spesifisearre yn plugin.path of ynsteld fia in omjouwingsfariabele CLASSPATH. Ynstellings foar de Kafka Connect-arbeider en ferbinings wurde bepaald troch konfiguraasjetriemmen dy't as arguminten trochjûn wurde oan it kommando fan 'e arbeidersstart. Foar mear details, sjoch dokumintaasje.

It hiele proses fan it ynstellen fan Debeizum yn 'e connectorferzje wurdt útfierd yn twa stadia. Litte wy nei elk fan har sjen:

1. It ynstellen fan it Kafka Connect-ramt

Om gegevens te streamen nei it Apache Kafka-kluster, wurde spesifike parameters ynsteld yn it Kafka Connect-ramt, lykas:

  • parameters foar ferbining mei it kluster,
  • nammen fan ûnderwerpen wêryn de konfiguraasje fan 'e ferbining sels direkt sil wurde opslein,
  • de namme fan 'e groep wêryn de ferbining rint (as ferspraat modus wurdt brûkt).

De offisjele Docker-ôfbylding fan it projekt stipet de konfiguraasje mei omjouwingsfariabelen - dit is wat wy sille brûke. Dat, download de ôfbylding:

docker pull debezium/connect

De minimale set fan omjouwingsfariabelen dy't nedich binne om de ferbining út te fieren is as folget:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - inisjele list fan Kafka-klusterservers om in folsleine list fan klusterleden te krijen;
  • OFFSET_STORAGE_TOPIC=connector-offsets - in ûnderwerp foar it opslaan fan posysjes wêr't de ferbining op it stuit leit;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - ûnderwerp foar it opslaan fan de status fan 'e ferbining en syn taken;
  • CONFIG_STORAGE_TOPIC=connector-config - ûnderwerp foar it opslaan fan konfiguraasjegegevens fan connectoren en har taken;
  • GROUP_ID=1 - identifier fan 'e groep arbeiders wêrop de ferbiningstaak kin wurde útfierd; nedich by it brûken fan ferspraat (ferdield) regime.

Wy lansearje de kontener mei dizze fariabelen:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Opmerking oer Avro

Standert skriuwt Debezium gegevens yn JSON-formaat, wat akseptabel is foar sânbakken en lytse hoemannichten gegevens, mar kin in probleem wurde yn tige laden databases. In alternatyf foar in JSON-konverter is om berjochten te serialisearjen mei Avro yn in binêre opmaak, dat ferleget de lading op de I / O subsysteem yn Apache Kafka.

Om Avro te brûken moatte jo in apart ynsette skema-registraasje (foar it opslaan fan diagrammen). De fariabelen foar de converter sille der sa útsjen:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Details oer it brûken fan Avro en it ynstellen fan it register dêrfoar lizze bûten it berik fan dit artikel - fierderop, foar dúdlikens, sille wy JSON brûke.

2. It konfigurearjen fan de ferbining sels

No kinne jo direkt nei de konfiguraasje fan 'e ferbining sels gean, dy't gegevens fan' e boarne lêze sil.

Litte wy nei it foarbyld sjen fan ferbiningen foar twa DBMS's: PostgreSQL en MongoDB, wêryn ik ûnderfining haw en wêryn ferskillen binne (hoewol lyts, mar yn guon gefallen signifikant!).

De konfiguraasje wurdt beskreaun yn JSON-notaasje en upload nei Kafka Connect mei in POST-fersyk.

2.1. PostgreSQL

Foarbyld konfiguraasje fan ferbining foar PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

It prinsipe fan wurking fan de ferbining nei dizze opset is frij simpel:

  • As it foar it earst lansearre wurdt, ferbynt it mei de databank spesifisearre yn 'e konfiguraasje en begjint yn modus earste momintopname, ferstjoeren nei Kafka de earste set fan gegevens krigen mei de betingst SELECT * FROM table_name.
  • Nei inisjalisaasje is foltôge, giet de ferbining yn modus om wizigingen te lêzen fan PostgreSQL WAL-bestannen.

Oer de brûkte opsjes:

  • name - de namme fan 'e ferbining wêrfoar de hjirûnder beskreaune konfiguraasje wurdt brûkt; yn 'e takomst, dizze namme wurdt brûkt om te wurkjen mei de ferbining (dat wol sizze, besjoch de status / opnij starte / bywurkje de konfiguraasje) fia de Kafka Connect REST API;
  • connector.class - DBMS-ferbiningsklasse dy't sil wurde brûkt troch de konfigureare ferbining;
  • plugin.name - de namme fan it plugin foar logyske dekodearring fan gegevens út WAL-bestannen. Beskikber om út te kiezen wal2json, decoderbuffs и pgoutput. De earste twa fereaskje ynstallaasje fan de passende útwreidings yn de DBMS, en pgoutput foar PostgreSQL ferzje 10 en heger fereasket gjin ekstra manipulaasjes;
  • database.* - opsjes foar ferbining mei de databank, wêr database.server.name - PostgreSQL-eksimplaarnamme brûkt om de ûnderwerpnamme te foarmjen yn it Kafka-kluster;
  • table.include.list - in list mei tabellen wêryn wy wizigingen wolle folgje; oantsjutte yn it formaat schema.table_name; kin net brûkt wurde tegearre mei table.exclude.list;
  • heartbeat.interval.ms - ynterval (yn millisekonden) wêrmei't de ferbining heartbeat-berjochten stjoert nei in spesjaal ûnderwerp;
  • heartbeat.action.query - in fersyk dat sil wurde útfierd by it ferstjoeren fan elk hertslachberjocht (de opsje ferskynde yn ferzje 1.1);
  • slot.name - de namme fan it replikaasjeslot dat sil wurde brûkt troch de ferbining;
  • publication.name - Namme publikaasjes yn PostgreSQL, dy't de ferbining brûkt. As it net bestiet, sil Debezium besykje it te meitsjen. As de brûker ûnder wa't de ferbining makke is net genôch rjochten hat foar dizze aksje, sil de ferbining mei in flater beëinigje;
  • transforms bepaalt krekt hoe't jo de namme fan it doelûnderwerp wizigje:
    • transforms.AddPrefix.type jout oan dat wy reguliere útdrukkingen sille brûke;
    • transforms.AddPrefix.regex - in masker dat de namme fan it doelûnderwerp opnij definiearret;
    • transforms.AddPrefix.replacement - direkt wat wy opnij definiearje.

Mear oer hertslach en transformaasjes

Standert stjoert de ferbining gegevens nei Kafka foar elke tawijd transaksje, en har LSN (Log Sequence Number) wurdt opnommen yn it tsjinstûnderwerp offset. Mar wat bart der as de ferbining is konfigurearre om net de hiele databank te lêzen, mar mar in diel fan syn tabellen (wêryn gegevensupdates net faak foarkomme)?

  • De ferbining sil WAL-bestannen lêze en sil gjin transaksje-ferplichtingen detektearje foar de tabellen dy't it kontrolearret.
  • Dêrom sil it har hjoeddeistige posysje net bywurkje yn it ûnderwerp of yn 'e replikaasjeslot.
  • Dit sil op syn beurt resultearje yn WAL-bestannen dy't op skiif hâlden wurde en wierskynlik sûnder skiifromte rinne.

En dit is wêr't opsjes ta de rêding komme. heartbeat.interval.ms и heartbeat.action.query. It brûken fan dizze opsjes yn pearen makket it mooglik om in fersyk út te fieren om gegevens yn in aparte tabel te feroarjen elke kear as in hertslachberjocht ferstjoerd wurdt. Sa wurdt de LSN dêr't de ferbining op it stuit leit (yn 'e replikaasjeslot) konstant bywurke. Hjirmei kin de DBMS WAL-bestannen fuortsmite dy't net mear nedich binne. Jo kinne mear leare oer hoe't de opsjes yn wurkje dokumintaasje.

In oare opsje weardich fan tichterby omtinken is transforms. Hoewol it mear giet oer gemak en skientme ...

Standert makket Debezium ûnderwerpen mei it folgjende nammebelied: serverName.schemaName.tableName. Dit kin net altyd handich wêze. Opsjes transforms Jo kinne reguliere útdrukkingen brûke om in list mei tabellen te definiearjen, eveneminten wêrfan't moatte wurde trochstjoerd nei in ûnderwerp mei in spesifike namme.

Yn ús konfiguraasje tank transforms it folgjende bart: alle CDC-eveneminten fan 'e kontroleare databank sille nei in ûnderwerp gean mei de namme data.cdc.dbname. Oars (sûnder dizze ynstellings), soe Debezium standert in ûnderwerp foar elke tabel meitsje lykas: pg-dev.public.<table_name>.

Connector Beheinings

Om de beskriuwing fan 'e ferbiningskonfiguraasje foar PostgreSQL te sluten, is it wurdich te praten oer de folgjende funksjes / beheiningen fan har wurking:

  1. De funksjonaliteit fan 'e ferbining foar PostgreSQL is basearre op it konsept fan logyske dekodearring. Dêrom hy folget gjin fersiken om de databankstruktuer te feroarjen (DDL) - dêrom sille dizze gegevens net yn 'e ûnderwerpen stean.
  2. Sûnt replikaasje-slots wurde brûkt, is it ferbinen fan in ferbining mooglik allinnich nei de liedende DBMS-eksimplaar.
  3. As de brûker ûnder wa't de ferbining mei de databank ferbynt, allinich lêsrjochten wurdt takend, dan moatte jo foar de earste lansearring manuell in replikaasjeslot oanmeitsje en publisearje nei de databank.

It tapassen fan de konfiguraasje

Dat, litte wy ús konfiguraasje laden yn 'e ferbining:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Wy kontrolearje dat de ynlaad suksesfol wie en de ferbining begon:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Geweldich: it is ynsteld en klear om te gean. Litte wy no pretendearje as in konsumint en ferbine mei Kafka, wêrnei't wy in yngong yn 'e tabel tafoegje en feroarje:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Yn ús ûnderwerp sil it as folgjend wurde werjûn:

Hiel lang JSON mei ús feroarings

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Yn beide gefallen, records besteane út de kaai (PK) fan it record dat waard feroare, en de essinsje fan de feroarings: wat it record wie foar en wat it waard nei.

  • Yn it gefal fan INSERT: wearde foar (before) is gelyk oan null, en nei - de line dy't waard ynfoege.
  • Yn it gefal fan UPDATE: yn payload.before de foarige steat fan de line wurdt werjûn, en yn payload.after - nij mei de essinsje fan feroarings.

2.2 MongoDB

Dizze ferbining brûkt it standert MongoDB-replikaasjemeganisme, it lêzen fan ynformaasje fan 'e oplog fan' e primêre DBMS-knooppunt.

Fergelykber mei de al beskreaune ferbining foar PgSQL, ek hjir, by de earste start, wurdt de primêre data-snapshot makke, wêrnei't de ferbining oerstapt nei oplog-lêsmodus.

Foarbyld fan konfiguraasje:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Sa't jo sjen kinne, binne d'r hjir gjin nije opsjes yn ferliking mei it foarige foarbyld, mar allinich it oantal opsjes ferantwurdlik foar ferbining mei de databank en har foarheaksels is fermindere.

Ynstellings transforms dizze kear dogge se it folgjende: se transformearje de namme fan it doelûnderwerp út it skema <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

marzje foar flaters

De kwestje fan fouttolerânsje en hege beskikberens yn ús tiid is akuter dan ea - foaral as wy it hawwe oer gegevens en transaksjes, en it folgjen fan gegevenswizigingen stiet net oan 'e kant yn dizze kwestje. Litte wy sjen nei wat der yn prinsipe mis gean kin en wat der yn elk gefal mei Debezium barre sil.

D'r binne trije opsjes foar opt-out:

  1. Kafka Connect mislearring. As Ferbine is konfigurearre om te wurkjen yn ferspraat modus, dit fereasket meardere arbeiders te setten deselde group.id. Dan, as ien fan har mislearret, sil de ferbining opnij starte op in oare arbeider en trochgean mei lêzen fan 'e lêste tawijde posysje yn it ûnderwerp yn Kafka.
  2. Ferlies fan ferbining mei it Kafka-kluster. De ferbining sil gewoan stopje mei lêzen op 'e posysje dy't net slagge om te stjoeren nei Kafka, en sil periodyk besykje it opnij te ferstjoeren oant de poging slagget.
  3. Unbeskikberens fan gegevensboarne. De ferbining sil besykje om opnij te ferbinen mei de boarne lykas ynsteld. De standert is 16 besykjen te brûken eksponinsjele backoff. Nei de 16e mislearre poging sil de taak wurde markearre as mislearre en jo moatte it manuell opnij starte fia de Kafka Connect REST-ynterface.
    • Yn it gefal fan PostgreSQL de gegevens sille net ferlern gean, omdat It brûken fan replikaasje-slots sil foarkomme dat jo WAL-bestannen wiskje dy't net wurde lêzen troch de ferbining. Yn dit gefal is d'r ek in neidiel oan 'e munt: as de netwurkferbining tusken de ferbining en de DBMS foar in lange tiid fersteurd is, is der in mooglikheid dat de skiifromte opfalt, en dit kin liede ta in mislearring fan de hiele DBMS.
    • Yn it gefal fan MySQL binlog-bestannen kinne wurde rotearre troch de DBMS sels foardat de ferbining wersteld wurdt. Dit sil feroarsaakje dat de ferbining yn 'e mislearre steat giet, en om normale wurking te herstellen, moatte jo opnij starte yn' e earste snapshotmodus om troch te gean mei it lêzen fan binlogs.
    • op MongoDB. De dokumintaasje stelt: it gedrach fan 'e ferbining yn it gefal dat log-/oplog-bestannen binne wiske en de ferbining kin net trochgean mei lêzen fanôf de posysje wêr't it ophâlde is itselde foar alle DBMS's. It betsjut dat de ferbining yn 'e steat sil gean mislearre en sil opnij starte yn modus earste momintopname.

      D'r binne lykwols útsûnderingen. As de ferbining foar in lange tiid is loskeppele (of koe de MongoDB-eksimplaar net berikke), en de oplog gie yn dizze tiid troch rotaasje, as de ferbining wersteld is, sil de ferbining rêstich trochgean mei it lêzen fan gegevens fan 'e earste beskikbere posysje, dat is wêrom guon fan 'e gegevens yn Kafka net sil slaan.

konklúzje

Debezium is myn earste ûnderfining mei CDC-systemen en oer it algemien heul posityf. It projekt wûn oer mei syn stipe foar grutte DBMS's, gemak fan konfiguraasje, clustering-stipe en aktive mienskip. Foar dyjingen dy't ynteressearre binne yn 'e praktyk, ried ik oan dat jo de gidsen lêze foar Kafka Connect и Debezium.

Yn ferliking mei de JDBC-ferbining foar Kafka Connect is it wichtichste foardiel fan Debezium dat wizigingen wurde lêzen fan 'e DBMS-logs, wêrtroch gegevens mei minimale latency kinne wurde ûntfongen. De JDBC Connector (fan Kafka Connect) freget de kontroleare tabel op in fêst ynterval en genereart (om deselde reden) gjin berjochten as gegevens wurde wiske (hoe kinne jo gegevens opfreegje dy't net bestean?).

Om ferlykbere problemen op te lossen, kinne jo omtinken jaan oan de folgjende oplossingen (neist Debezium):

PS

Lês ek op ús blog:

Boarne: www.habr.com

Add a comment