Bekendstelling van Debezium - CDC vir Apache Kafka

Bekendstelling van Debezium - CDC vir Apache Kafka

In my werk kom ek gereeld nuwe tegniese oplossings/sagtewareprodukte teë, waaroor inligting nogal skaars is op die Russiessprekende internet. Met hierdie artikel sal ek probeer om een ​​so 'n leemte te vul met 'n voorbeeld uit my onlangse praktyk, toe ek die stuur van CDC-gebeure vanaf twee gewilde DBMS'e (PostgreSQL en MongoDB) na 'n Kafka-kluster met Debezium moes opstel. Ek hoop dat hierdie oorsigartikel, wat verskyn het as gevolg van die werk wat gedoen is, vir ander nuttig sal wees.

Wat is Debezium en CDC in die algemeen?

Debezium - Verteenwoordiger van die CDC sagteware kategorie (Vang dataverandering vas), of meer presies, dit is 'n stel verbindings vir verskeie DBBS'e wat versoenbaar is met die Apache Kafka Connect-raamwerk.

Dit oopbron projek, gelisensieer onder die Apache-lisensie v2.0 en geborg deur Red Hat. Ontwikkeling is sedert 2016 aan die gang en bied tans amptelike ondersteuning vir die volgende DBBS: MySQL, PostgreSQL, MongoDB, SQL Server. Daar is ook verbindings vir Cassandra en Oracle, maar hulle is tans in "vroeë toegang" status, en nuwe vrystellings waarborg nie terugwaartse versoenbaarheid nie.

As ons CDC vergelyk met die tradisionele benadering (wanneer die toepassing data direk vanaf die DBBS lees), sluit die belangrikste voordele daarvan die implementering van dataveranderingstroming op die ryvlak in met lae latensie, hoë betroubaarheid en beskikbaarheid. Die laaste twee punte word bereik deur 'n Kafka-kluster as 'n bewaarplek vir CDC-gebeure te gebruik.

Die voordele sluit ook die feit in dat 'n enkele model gebruik word om gebeurtenisse te stoor, sodat die finale toepassing nie bekommerd hoef te wees oor die nuanses van die bedryf van verskillende DBBS nie.

Ten slotte, die gebruik van 'n boodskap makelaar maak ruimte oop vir horisontale skaal van toepassings wat veranderinge in data opspoor. Terselfdertyd word die impak op die databron geminimaliseer, aangesien data nie direk vanaf die DBBS ontvang word nie, maar van die Kafka-kluster.

Oor die Debezium-argitektuur

Die gebruik van Debezium kom neer op hierdie eenvoudige skema:

DBMS (as databron) → koppelaar in Kafka Connect → Apache Kafka → verbruiker

Ter illustrasie gee ek 'n diagram vanaf die projekwebwerf:

Bekendstelling van Debezium - CDC vir Apache Kafka

Ek hou egter nie regtig van hierdie skema nie, want dit blyk dat slegs 'n wasbakaansluiting moontlik is.

In werklikheid is die situasie anders: vul jou Data Lake (laaste skakel in die diagram hierbo) is nie die enigste manier om Debezium te gebruik nie. Gebeurtenisse wat na Apache Kafka gestuur word, kan deur jou toepassings gebruik word om verskeie situasies op te los. Byvoorbeeld:

  • verwydering van irrelevante data uit die kas;
  • die stuur van kennisgewings;
  • soek indeksopdaterings;
  • 'n soort ouditlogboeke;
  • ...

As jy 'n Java-toepassing het en daar geen behoefte/moontlikheid is om 'n Kafka-kluster te gebruik nie, is daar ook die moontlikheid om deur te werk ingeboude verbinding. Die ooglopende pluspunt is dat u daarmee addisionele infrastruktuur (in die vorm van 'n koppelaar en Kafka) kan weier. Hierdie oplossing is egter opgeskort sedert weergawe 1.1 en word nie meer vir gebruik aanbeveel nie (dit kan in toekomstige uitgawes verwyder word).

Hierdie artikel sal die argitektuur bespreek wat deur ontwikkelaars aanbeveel word, wat fouttoleransie en skaalbaarheid bied.

Connector konfigurasie

Om veranderinge in die belangrikste waarde - data - te begin dophou, benodig ons:

  1. databron, wat MySQL kan wees vanaf weergawe 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (volledige lys);
  2. Apache Kafka-kluster
  3. Kafka Connect-instansie (weergawes 1.x, 2.x);
  4. gekonfigureerde Debezium-verbinding.

Werk aan die eerste twee punte, m.a.w. die proses van die installering van 'n DBMS en Apache Kafka is buite die bestek van die artikel. Vir diegene wat egter alles in 'n sandbox wil ontplooi, is daar 'n klaargemaakte een in die amptelike bewaarplek met voorbeelde docker-compose.yaml.

Ons sal in meer besonderhede op die laaste twee punte fokus.

0. Kafka Connect

Hier en later in die artikel word alle konfigurasievoorbeelde beskou in die konteks van die Docker-beeld wat deur die Debezium-ontwikkelaars versprei word. Dit bevat al die nodige inproplêers (verbindings) en verskaf Kafka Connect-konfigurasie deur omgewingsveranderlikes te gebruik.

As jy van plan is om Kafka Connect van Confluent te gebruik, sal jy self die inproppe van die nodige verbindings moet byvoeg by die gids gespesifiseer in plugin.path of stel via 'n omgewingsveranderlike CLASSPATH. Die instellings vir die Kafka Connect-werker en verbindings word gedefinieer deur konfigurasielêers wat as argumente aan die werker se begin-opdrag deurgegee word. Vir besonderhede sien dokumentasie.

Die hele proses om Debeizum in die koppelweergawe op te stel, word in twee fases uitgevoer. Kom ons kyk na elkeen van hulle:

1. Die opstel van die Kafka Connect-raamwerk

Om data na 'n Apache Kafka-kluster te stroom, word spesifieke parameters in die Kafka Connect-raamwerk gestel, soos:

  • cluster verbinding instellings,
  • name van onderwerpe waarin die konfigurasie van die verbinding self gestoor sal word,
  • die naam van die groep waarin die koppelaar loop (in die geval van die gebruik van verspreide modus).

Die amptelike Docker-beeld van die projek ondersteun konfigurasie deur omgewingsveranderlikes te gebruik - dit is wat ons sal gebruik. So kom ons laai die prent af:

docker pull debezium/connect

Die minimum stel omgewingsveranderlikes wat nodig is om die verbinding te laat loop, is soos volg:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - aanvanklike lys van Kafka-klusterbedieners om 'n volledige lys van groeplede te kry;
  • OFFSET_STORAGE_TOPIC=connector-offsets — 'n onderwerp vir die stoor van posisies waar die verbinding tans geleë is;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - 'n onderwerp vir die stoor van die status van die verbinding en sy take;
  • CONFIG_STORAGE_TOPIC=connector-config - 'n onderwerp vir die stoor van verbindingskonfigurasiedata en sy take;
  • GROUP_ID=1 — identifiseerder van die groep werkers waarop die verbindingstaak uitgevoer kan word; benodig wanneer verspreide gebruik word (verspreid) regime.

Ons begin die houer met hierdie veranderlikes:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Nota oor Avro

By verstek skryf Debezium data in JSON-formaat, wat aanvaarbaar is vir sandkaste en klein hoeveelhede data, maar kan 'n probleem wees in swaar gelaaide databasisse. 'n Alternatief vir die JSON-omskakelaar is om boodskappe te serialiseer deur gebruik te maak van Avro na 'n binêre formaat, wat die las op die I / O-substelsel in Apache Kafka verminder.

Om Avro te gebruik, moet jy 'n aparte ontplooi skema-register (vir die stoor van skemas). Die veranderlikes vir die omskakelaar sal soos volg lyk:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Besonderhede oor die gebruik van Avro en die opstel van 'n register daarvoor is buite die bestek van die artikel - verder, vir duidelikheid, sal ons JSON gebruik.

2. Die opstel van die verbinding self

Nou kan jy direk na die konfigurasie van die aansluiting self gaan, wat data van die bron sal lees.

Kom ons kyk na die voorbeeld van verbindings vir twee DBMS: PostgreSQL en MongoDB, waarvoor ek ondervinding het en waarvoor daar verskille is (hoewel klein, maar in sommige gevalle betekenisvol!).

Die konfigurasie word beskryf in JSON-notasie en opgelaai na Kafka Connect met 'n POST-versoek.

2.1. PostgreSQL

Voorbeeld konneksie konfigurasie vir PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Die beginsel van werking van die verbinding na hierdie konfigurasie is redelik eenvoudig:

  • By die eerste begin word dit gekoppel aan die databasis wat in die konfigurasie gespesifiseer is en begin in die modus aanvanklike momentopname, stuur aan Kafka die aanvanklike stel data wat ontvang is met die voorwaardelike SELECT * FROM table_name.
  • Nadat die inisialisering voltooi is, gaan die koppelaar die modus van leesveranderings van PostgreSQL WAL-lêers in.

Oor die opsies wat gebruik word:

  • name — die naam van die koppelaar waarvoor die konfigurasie wat hieronder beskryf word, gebruik word; in die toekoms word hierdie naam gebruik om met die koppelaar te werk (d.w.s. sien die status / herbegin / werk die konfigurasie op) deur die Kafka Connect REST API;
  • connector.class — die DBMS-koppelaarklas wat deur die gekonfigureerde koppelaar gebruik sal word;
  • plugin.name is die naam van die inprop vir logiese dekodering van data van WAL-lêers. Beskikbaar om van te kies wal2json, decoderbuffs и pgoutput. Die eerste twee vereis die installering van die toepaslike uitbreidings in die DBBS, en pgoutput vir PostgreSQL weergawe 10 en hoër vereis nie bykomende manipulasies nie;
  • database.* - opsies om aan die databasis te koppel, waar database.server.name - die naam van die PostgreSQL-instansie wat gebruik word om die naam van die onderwerp in die Kafka-groepering te vorm;
  • table.include.list - 'n lys van tabelle waarin ons veranderinge wil opspoor; gegee in die formaat schema.table_name; kan nie saam gebruik word nie table.exclude.list;
  • heartbeat.interval.ms — interval (in millisekondes) waarmee die koppelaar hartklopboodskappe na 'n spesiale onderwerp stuur;
  • heartbeat.action.query - 'n versoek wat uitgevoer sal word wanneer elke hartklopboodskap gestuur word (die opsie het sedert weergawe 1.1 verskyn);
  • slot.name — die naam van die replikasiegleuf wat deur die koppelaar gebruik sal word;
  • publication.name - Naam Publication in PostgreSQL wat die koppelaar gebruik. As dit nie bestaan ​​nie, sal Debezium probeer om dit te skep. As die gebruiker waaronder die verbinding gemaak word, nie genoeg regte vir hierdie aksie het nie, sal die verbinding met 'n fout verlaat;
  • transforms bepaal hoe presies om die naam van die teikenonderwerp te verander:
    • transforms.AddPrefix.type dui aan dat ons gereelde uitdrukkings sal gebruik;
    • transforms.AddPrefix.regex — masker waarmee die naam van die teikenonderwerp herdefinieer word;
    • transforms.AddPrefix.replacement - direk wat ons herdefinieer.

Meer oor hartklop en transformasies

By verstek stuur die koppelaar data na Kafka vir elke toegewyde transaksie, en skryf sy LSN (Log Sequence Number) na die diensonderwerp offset. Maar wat gebeur as die koppelaar opgestel is om nie die hele databasis te lees nie, maar slegs 'n deel van sy tabelle (waarin data selde opgedateer word)?

  • Die koppelaar sal WAL-lêers lees en nie transaksieverpligtinge daarin opspoor na die tabelle wat dit monitor nie.
  • Daarom sal dit nie sy huidige posisie in die onderwerp of in die replikasiegleuf opdateer nie.
  • Dit sal op sy beurt veroorsaak dat die WAL-lêers "vas" op die skyf sit en sal waarskynlik nie meer skyfspasie hê nie.

En hier kom opsies tot die redding. heartbeat.interval.ms и heartbeat.action.query. Deur hierdie opsies in pare te gebruik, maak dit moontlik om 'n versoek uit te voer om data in 'n aparte tabel te verander elke keer as 'n hartklopboodskap gestuur word. Dus, die LSN waarop die verbinding tans geleë is (in die replikasiegleuf) word voortdurend opgedateer. Dit laat die DBMS toe om WAL-lêers te verwyder wat nie meer nodig is nie. Vir meer inligting oor hoe opsies werk, sien dokumentasie.

Nog 'n opsie wat noukeuriger aandag verdien, is transforms. Alhoewel dit meer gaan oor gerief en skoonheid ...

By verstek skep Debezium onderwerpe deur die volgende naambeleid te gebruik: serverName.schemaName.tableName. Dit is dalk nie altyd gerieflik nie. Opsies transforms deur gebruik te maak van gereelde uitdrukkings, kan jy 'n lys tabelle definieer waarvan die gebeurtenisse na 'n onderwerp met 'n spesifieke naam herlei moet word.

In ons opset te danke aan transforms die volgende gebeur: alle CDC-gebeurtenisse vanaf die nagespoorde databasis sal na die onderwerp met die naam gaan data.cdc.dbname. Andersins (sonder hierdie instellings), sal Debezium by verstek 'n onderwerp vir elke tabel van die vorm skep: pg-dev.public.<table_name>.

Connector beperkings

Aan die einde van die beskrywing van die verbindingskonfigurasie vir PostgreSQL, is dit die moeite werd om te praat oor die volgende kenmerke / beperkings van sy werk:

  1. Die verbindingsfunksie vir PostgreSQL maak staat op die konsep van logiese dekodering. Daarom het hy spoor nie versoeke na om die struktuur van die databasis te verander nie (DDL) - dienooreenkomstig sal hierdie data nie in die onderwerpe wees nie.
  2. Aangesien replikasiegleuwe gebruik word, is die verbinding van die koppelaar moontlik slegs na die meester DBMS-instansie.
  3. As die gebruiker waaronder die koppelaar aan die databasis koppel, leesalleenregte het, sal jy voor die eerste bekendstelling handmatig 'n replikasiegleuf moet skep en na die databasis publiseer.

Pas 'n konfigurasie toe

So kom ons laai ons konfigurasie in die connector:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Ons kyk of die aflaai suksesvol was en dat die verbinding begin het:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Fantasties: dit is opgestel en gereed om te gaan. Laat ons nou voorgee dat ons 'n verbruiker is en koppel aan Kafka, waarna ons 'n inskrywing in die tabel byvoeg en verander:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

In ons onderwerp sal dit soos volg vertoon word:

Baie lank JSON met ons veranderinge

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

In beide gevalle bestaan ​​die rekords uit die sleutel (PK) van die rekord wat verander is, en die essensie van die veranderinge: wat die rekord voor was en wat dit daarna geword het.

  • In die geval van INSERT: waarde voor (before) gelyk is nullgevolg deur die tou wat ingevoeg is.
  • In die geval van UPDATE: in payload.before die vorige toestand van die ry word vertoon, en in payload.after - nuut met die essensie van verandering.

2.2 MongoDB

Hierdie koppelaar gebruik die standaard MongoDB replikasie meganisme, lees inligting van die oplog van die DBMS primêre nodus.

Soortgelyk aan die reeds beskryfde koppelstuk vir PgSQL, word ook hier, by die eerste aanvang, die primêre datafoto geneem, waarna die koppelaar oorskakel na oplog-leesmodus.

Konfigurasie voorbeeld:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Soos u kan sien, is daar geen nuwe opsies in vergelyking met die vorige voorbeeld nie, maar slegs die aantal opsies wat verantwoordelik is om aan die databasis te koppel en hul voorvoegsels is verminder.

Stellings transforms hierdie keer doen hulle die volgende: draai die naam van die teikenonderwerp uit die skema <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

fout verdraagsaamheid

Die kwessie van foutverdraagsaamheid en hoë beskikbaarheid in ons tyd is meer akuut as ooit tevore – veral wanneer ons oor data en transaksies praat, en dataveranderingnasporing is nie op die kantlyn in hierdie saak nie. Kom ons kyk na wat in beginsel verkeerd kan loop en wat in elke geval met Debezium gaan gebeur.

Daar is drie opt-out opsies:

  1. Kafka Connect mislukking. As Connect opgestel is om in verspreide modus te werk, vereis dit dat verskeie werkers dieselfde group.id stel. Dan, as een van hulle misluk, sal die koppelaar weer op die ander werker begin word en voortgaan om te lees vanaf die laaste toegewyde posisie in die onderwerp in Kafka.
  2. Verlies van konneksie met Kafka-groepering. Die koppelaar sal eenvoudig ophou lees by die posisie wat dit nie na Kafka kon stuur nie en periodiek probeer om dit weer te stuur totdat die poging slaag.
  3. Databron nie beskikbaar nie. Die koppelaar sal probeer om weer aan die bron te koppel volgens die konfigurasie. Die verstek is 16 pogings gebruik eksponensiële terugslag. Na die 16de mislukte poging, sal die taak as gemerk word misluk en dit sal met die hand herbegin moet word via die Kafka Connect REST-koppelvlak.
    • In die geval van PostgreSQL data sal nie verlore gaan nie, want die gebruik van replikasiegleuwe sal die uitvee van WAL-lêers wat nie deur die verbinding gelees word nie, verhoed. In hierdie geval is daar 'n nadeel: as die netwerkverbinding tussen die verbinding en die DBBS vir 'n lang tyd ontwrig word, is daar 'n kans dat die skyfspasie opraak, en dit kan lei tot die mislukking van die hele DBBS.
    • In die geval van MySQL binlog-lêers kan deur die DBMS self geroteer word voordat konneksie herstel word. Dit sal veroorsaak dat die koppelaar in die mislukte toestand gaan, en dit sal in die aanvanklike momentopname-modus moet herbegin om voort te gaan met lees van binlogs om normale werking te herstel.
    • op MongoDB. Die dokumentasie sê: die gedrag van die koppelaar ingeval die log-/oplog-lêers uitgevee is en die koppelaar nie kan voortgaan om te lees vanaf die posisie waar dit opgehou het nie, is dieselfde vir alle DBMS. Dit lê in die feit dat die koppelaar in die staat sal gaan misluk en sal 'n herbegin in die modus vereis aanvanklike momentopname.

      Daar is egter uitsonderings. As die verbinding vir 'n lang tyd in 'n ontkoppelde toestand was (of nie die MongoDB-instansie kon bereik nie), en oplog is gedurende hierdie tyd geroteer, wanneer die verbinding herstel is, sal die koppelaar rustig voortgaan om data vanaf die eerste beskikbare posisie te lees , en daarom is sommige van die data in Kafka geen sal tref.

Gevolgtrekking

Debezium is my eerste ervaring met CDC-stelsels en was oor die algemeen baie positief. Die projek het die ondersteuning van die hoof-DBBS, gemak van konfigurasie, ondersteuning vir groepering en 'n aktiewe gemeenskap omgekoop. Vir diegene wat belangstel in die praktyk, ek beveel aan dat jy lees die gidse vir Kafka Connect и Debezium.

In vergelyking met die JDBC-koppelaar vir Kafka Connect, is die grootste voordeel van Debezium dat veranderinge uit die DBMS-logboeke gelees word, wat toelaat dat data met minimale vertraging ontvang word. Die JDBC Connector (verskaf deur Kafka Connect) bevraagteken die nagespoorde tabel met 'n vaste interval en genereer (om dieselfde rede) nie boodskappe wanneer data uitgevee word nie (hoe kan jy navraag doen vir data wat nie daar is nie?).

Om soortgelyke probleme op te los, kan jy aandag gee aan die volgende oplossings (bykomend tot Debezium):

PS

Lees ook op ons blog:

Bron: will.com

Voeg 'n opmerking