Tunakuletea Debezium - CDC ya Apache Kafka

Tunakuletea Debezium - CDC ya Apache Kafka

Katika kazi yangu, mara nyingi mimi hukutana na suluhisho mpya za kiufundi/bidhaa za programu, habari ambayo ni adimu kwenye Mtandao wa lugha ya Kirusi. Na kifungu hiki nitajaribu kujaza pengo kama hilo na mfano kutoka kwa mazoezi yangu ya hivi majuzi, nilipohitaji kusanidi kutuma matukio ya CDC kutoka kwa DBMS mbili maarufu (PostgreSQL na MongoDB) hadi nguzo ya Kafka kwa kutumia Debezium. Natumaini kwamba makala hii ya ukaguzi, ambayo inaonekana kama matokeo ya kazi iliyofanywa, itakuwa na manufaa kwa wengine.

Debezium na CDC ni nini kwa ujumla?

Debezium - Mwakilishi wa kitengo cha programu ya CDC (Nasa mabadiliko ya data), au kwa usahihi zaidi, ni seti ya viunganishi vya DBMS mbalimbali ambavyo vinaoana na mfumo wa Apache Kafka Connect.

Ni mradi wa chanzo wazi, iliyopewa leseni chini ya Apache License v2.0 na kufadhiliwa na Red Hat. Maendeleo yamekuwa yakiendelea tangu 2016 na kwa sasa inatoa usaidizi rasmi kwa DBMS zifuatazo: MySQL, PostgreSQL, MongoDB, SQL Server. Pia kuna viunganishi vya Cassandra na Oracle, lakini kwa sasa viko katika hali ya "ufikiaji wa mapema", na matoleo mapya hayahakikishi utangamano wa nyuma.

Ikiwa tunalinganisha CDC na mbinu ya jadi (wakati maombi inasoma data kutoka kwa DBMS moja kwa moja), basi faida zake kuu ni pamoja na utekelezaji wa utiririshaji wa mabadiliko ya data kwenye kiwango cha safu na latency ya chini, kuegemea juu na upatikanaji. Alama mbili za mwisho zinapatikana kwa kutumia nguzo ya Kafka kama ghala la matukio ya CDC.

Pia, faida ni pamoja na ukweli kwamba mfano mmoja hutumiwa kuhifadhi matukio, hivyo maombi ya mwisho haipaswi kuwa na wasiwasi juu ya nuances ya uendeshaji wa DBMS tofauti.

Hatimaye, kutumia wakala wa ujumbe hufungua wigo wa kuongeza mlalo wa programu zinazofuatilia mabadiliko katika data. Wakati huo huo, athari kwenye chanzo cha data hupunguzwa, kwani data haipokewi moja kwa moja kutoka kwa DBMS, lakini kutoka kwa nguzo ya Kafka.

Kuhusu usanifu wa Debezium

Kutumia Debezium kunakuja kwa mpango huu rahisi:

DBMS (kama chanzo cha data) β†’ kiunganishi katika Kafka Unganisha β†’ Apache Kafka β†’ mtumiaji

Kama kielelezo, nitatoa mchoro kutoka kwa tovuti ya mradi:

Tunakuletea Debezium - CDC ya Apache Kafka

Hata hivyo, siipendi sana mpango huu, kwa sababu inaonekana kwamba tu matumizi ya kontakt ya kuzama inawezekana.

Kwa kweli, hali ni tofauti: kujaza Ziwa lako la Data (kiungo cha mwisho kwenye mchoro hapo juu) sio njia pekee ya kutumia Debezium. Matukio yanayotumwa kwa Apache Kafka yanaweza kutumiwa na programu zako kushughulikia hali mbalimbali. Kwa mfano:

  • kuondoa data isiyo na maana kutoka kwa kashe;
  • kutuma arifa;
  • sasisho za index za utafutaji;
  • aina fulani ya kumbukumbu za ukaguzi;
  • ...

Iwapo utakuwa na programu ya Java na hakuna haja/uwezekano wa kutumia nguzo ya Kafka, pia kuna uwezekano wa kufanya kazi kupitia kiunganishi kilichopachikwa. Faida dhahiri ni kwamba huondoa hitaji la miundombinu ya ziada (kwa namna ya kontakt na Kafka). Hata hivyo, suluhisho hili limeacha kutumika tangu toleo la 1.1 na halipendekezwi tena kutumika (uhimili wake unaweza kuondolewa katika matoleo yajayo).

Nakala hii itajadili usanifu uliopendekezwa na watengenezaji, ambao hutoa uvumilivu wa makosa na uzani.

Usanidi wa kiunganishi

Ili kuanza kufuatilia mabadiliko katika thamani muhimu zaidi - data - tunahitaji:

  1. chanzo cha data, ambacho kinaweza kuwa MySQL kuanzia toleo la 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (orodha kamili);
  2. Nguzo ya Apache Kafka;
  3. Mfano wa Kafka Connect (matoleo 1.x, 2.x);
  4. imesanidiwa kiunganishi cha Debezium.

Fanya kazi kwa pointi mbili za kwanza, i.e. mchakato wa kusakinisha DBMS na Apache Kafka ni zaidi ya upeo wa makala. Walakini, kwa wale ambao wanataka kupeleka kila kitu kwenye sanduku la mchanga, kuna iliyotengenezwa tayari kwenye hazina rasmi na mifano. docker-compose.yaml.

Tutakaa kwa undani zaidi juu ya vidokezo viwili vya mwisho.

0. Kafka Unganisha

Hapa na baadaye katika kifungu hicho, mifano yote ya usanidi inazingatiwa katika muktadha wa picha ya Docker iliyosambazwa na watengenezaji wa Debezium. Ina faili zote muhimu za Plugin (viunganisho) na hutoa usanidi wa Kafka Connect kwa kutumia vigezo vya mazingira.

Ikiwa unakusudia kutumia Kafka Connect kutoka kwa Confluent, utahitaji kuongeza programu-jalizi za viunganishi muhimu mwenyewe kwenye saraka iliyoainishwa ndani. plugin.path au kuweka kupitia mabadiliko ya mazingira CLASSPATH. Mipangilio ya mfanyakazi na viunganishi vya Kafka Connect hufafanuliwa kupitia faili za usanidi ambazo hupitishwa kama hoja kwa amri ya kuanza kwa mfanyakazi. Kwa maelezo tazama nyaraka.

Mchakato mzima wa kuanzisha Debeizum katika toleo la kiunganishi unafanywa katika hatua mbili. Hebu fikiria kila mmoja wao:

1. Kuweka mfumo wa Kafka Connect

Ili kutiririsha data kwa nguzo ya Apache Kafka, vigezo maalum vimewekwa katika mfumo wa Kafka Connect, kama vile:

  • vigezo vya kuunganisha kwenye nguzo,
  • majina ya mada ambayo usanidi wa kiunganishi yenyewe utahifadhiwa,
  • jina la kikundi ambacho kiunganishi kinaendesha (katika kesi ya kutumia hali iliyosambazwa).

Picha rasmi ya Docker ya mradi inasaidia usanidi kwa kutumia anuwai za mazingira - hii ndio tutatumia. Kwa hivyo, wacha tupakue picha:

docker pull debezium/connect

Seti ya chini ya anuwai ya mazingira inayohitajika kuendesha kiunganishi ni kama ifuatavyo.

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - orodha ya awali ya seva za nguzo za Kafka ili kupata orodha kamili ya washiriki wa nguzo;
  • OFFSET_STORAGE_TOPIC=connector-offsets - mada ya kuhifadhi nafasi ambapo kontakt iko sasa;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - mada ya kuhifadhi hali ya kontakt na kazi zake;
  • CONFIG_STORAGE_TOPIC=connector-config - mada ya kuhifadhi data ya usanidi wa kontakt na kazi zake;
  • GROUP_ID=1 - kitambulisho cha kikundi cha wafanyikazi ambacho kazi ya kiunganishi inaweza kutekelezwa; muhimu wakati wa kutumia kusambazwa (imesambazwa) utawala.

Tunaanza chombo na vigezo hivi:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Kumbuka kuhusu Avro

Kwa chaguo-msingi, Debezium huandika data katika umbizo la JSON, ambalo linakubalika kwa visanduku vya mchanga na kiasi kidogo cha data, lakini linaweza kuwa tatizo katika hifadhidata zilizopakiwa sana. Njia mbadala ya kigeuzi cha JSON ni kusasisha ujumbe kwa kutumia Avro kuwa umbizo la binary, ambalo hupunguza mzigo kwenye mfumo mdogo wa I/O katika Apache Kafka.

Ili kutumia Avro, unahitaji kupeleka tofauti schema-rejista (kwa kuhifadhi michoro). Vigezo vya kibadilishaji vitaonekana kama hii:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Maelezo juu ya kutumia Avro na kusanidi sajili yake yako nje ya upeo wa kifungu hiki - zaidi, kwa uwazi, tutatumia JSON.

2. Kuweka kontakt yenyewe

Sasa unaweza kwenda moja kwa moja kwenye usanidi wa kontakt yenyewe, ambayo itasoma data kutoka kwa chanzo.

Wacha tuangalie mfano wa viunganisho vya DBMS mbili: PostgreSQL na MongoDB, ambayo nina uzoefu na ambayo kuna tofauti (ingawa ndogo, lakini katika hali zingine ni muhimu!).

Usanidi umefafanuliwa katika nukuu ya JSON na kupakiwa kwa Kafka Connect kwa kutumia ombi la POST.

2.1. PostgreSQL

Mfano wa usanidi wa kiunganishi cha PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Kanuni ya uendeshaji wa kontakt baada ya usanidi huu ni rahisi sana:

  • Mwanzoni mwa kwanza, inaunganisha kwenye hifadhidata iliyoainishwa katika usanidi na huanza katika hali picha ya awali, kutuma kwa Kafka seti ya awali ya data iliyopatikana kwa kutumia masharti SELECT * FROM table_name.
  • Baada ya uanzishaji kukamilika, kontakt huingia katika hali ya kusoma mabadiliko kutoka kwa faili za WAL za PostgreSQL.

Kuhusu chaguzi zinazotumiwa:

  • name - jina la kontakt ambayo usanidi ulioelezwa hapo chini unatumiwa; katika siku zijazo, jina hili linatumiwa kufanya kazi na kontakt (yaani, tazama hali / kuanzisha upya / kusasisha usanidi) kupitia Kafka Connect REST API;
  • connector.class - Darasa la kiunganishi la DBMS ambalo litatumiwa na kiunganishi kilichosanidiwa;
  • plugin.name β€” jina la programu-jalizi kwa utatuzi wa kimantiki wa data kutoka kwa faili za WAL. Inapatikana kwa kuchagua wal2json, decoderbuffs ΠΈ pgoutput. Mbili za kwanza zinahitaji usakinishaji wa upanuzi unaofaa katika DBMS, na pgoutput kwa toleo la 10 la PostgreSQL na la juu zaidi hauhitaji ghiliba za ziada;
  • database.* β€” chaguzi za kuunganisha kwenye hifadhidata, wapi database.server.name - Jina la mfano la PostgreSQL linalotumiwa kuunda jina la mada katika nguzo ya Kafka;
  • table.include.list - orodha ya meza ambayo tunataka kufuatilia mabadiliko; iliyotolewa katika muundo schema.table_name; haiwezi kutumika pamoja table.exclude.list;
  • heartbeat.interval.ms - muda (katika milliseconds) ambayo kontakt hutuma ujumbe wa mapigo ya moyo kwa mada maalum;
  • heartbeat.action.query - ombi ambalo litatekelezwa wakati wa kutuma kila ujumbe wa moyo (chaguo limeonekana tangu toleo la 1.1);
  • slot.name - jina la slot ya kurudia ambayo itatumiwa na kontakt;
  • publication.name - Jina machapisho katika PostgreSQL, ambayo kiunganishi hutumia. Ikiwa haipo, Debezium itajaribu kuunda. Ikiwa mtumiaji ambaye uunganisho unafanywa hana haki za kutosha kwa hatua hii, kontakt itaisha kwa hitilafu;
  • transforms huamua jinsi ya kubadilisha jina la mada inayolengwa:
    • transforms.AddPrefix.type inaonyesha kwamba tutatumia misemo ya kawaida;
    • transforms.AddPrefix.regex - mask ambayo jina la mada inayolengwa hufafanuliwa upya;
    • transforms.AddPrefix.replacement - moja kwa moja kile tunachofafanua upya.

Zaidi kuhusu mapigo ya moyo na mabadiliko

Kwa chaguo-msingi, kiunganishi hutuma data kwa Kafka kwa kila shughuli iliyofanywa, na huandika LSN yake (Nambari ya Mlolongo wa Ingia) kwa mada ya huduma. offset. Lakini ni nini kinachotokea ikiwa kiunganishi kimeundwa kusoma sio hifadhidata nzima, lakini sehemu tu ya meza zake (ambazo data husasishwa mara kwa mara)?

  • Kiunganishi kitasoma faili za WAL na hakitambui shughuli za muamala ndani yao hadi kwenye jedwali zinazofuatilia.
  • Kwa hivyo, haitasasisha msimamo wake wa sasa ama katika mada au katika nafasi ya kurudia.
  • Hii, kwa upande wake, itasababisha faili za WAL "kukwama" kwenye diski na kuna uwezekano kuwa na nafasi ya diski.

Na hapa ndipo chaguzi zinakuja kuwaokoa. heartbeat.interval.ms ΠΈ heartbeat.action.query. Kutumia chaguo hizi kwa jozi hufanya iwezekanavyo kutekeleza ombi la kubadilisha data katika jedwali tofauti kila wakati ujumbe wa mapigo ya moyo unapotumwa. Kwa hivyo, LSN ambayo kontakt iko sasa (katika slot ya replication) inasasishwa mara kwa mara. Hii inaruhusu DBMS kuondoa faili za WAL ambazo hazihitajiki tena. Kwa habari zaidi juu ya jinsi chaguzi zinavyofanya kazi, ona nyaraka.

Chaguo jingine linalostahili tahadhari ya karibu ni transforms. Ingawa ni zaidi juu ya urahisi na uzuri ...

Kwa chaguo-msingi, Debezium huunda mada kwa kutumia sera ifuatayo ya kumtaja: serverName.schemaName.tableName. Hii inaweza isiwe rahisi kila wakati. Chaguo transforms Unaweza kutumia misemo ya kawaida kufafanua orodha ya majedwali, matukio ambayo yanahitaji kuelekezwa kwa mada yenye jina maalum.

Katika usanidi wetu, asante transforms yafuatayo hutokea: matukio yote ya CDC kutoka kwa hifadhidata inayofuatiliwa yataenda kwa mada yenye jina data.cdc.dbname. Vinginevyo (bila mipangilio hii), Debezium bila msingi ingeunda mada kwa kila jedwali kama: pg-dev.public.<table_name>.

Mapungufu ya kiunganishi

Mwisho wa maelezo ya usanidi wa kiunganishi cha PostgreSQL, inafaa kuzungumza juu ya huduma / mapungufu yafuatayo ya kazi yake:

  1. Utendaji wa kiunganishi kwa PostgreSQL hutegemea dhana ya usimbaji wa kimantiki. Kwa hiyo yeye haifuatilii maombi ya kubadilisha muundo wa hifadhidata (DDL) - ipasavyo, data hii haitakuwa kwenye mada.
  2. Kwa kuwa nafasi za kuiga hutumiwa, kuunganisha kontakt inawezekana tu kwa mfano unaoongoza wa DBMS.
  3. Ikiwa mtumiaji ambaye kiunganishi kinaunganishwa chini yake na hifadhidata amepewa haki za kusoma pekee, basi kabla ya uzinduzi wa kwanza utahitaji kuunda nafasi ya kurudia na kuchapisha kwenye hifadhidata.

Inaweka usanidi

Kwa hivyo wacha tupakie usanidi wetu kwenye kiunganishi:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Tunaangalia kama upakuaji ulifanikiwa na kiunganishi kilianza:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Kubwa: imeundwa na iko tayari kwenda. Sasa wacha tujifanye kuwa mtumiaji na tuunganishe Kafka, baada ya hapo tunaongeza na kubadilisha kiingilio kwenye jedwali:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Katika mada yetu itaonyeshwa kama ifuatavyo:

JSON ndefu sana na mabadiliko yetu

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Katika visa vyote viwili, rekodi zinajumuisha ufunguo (PK) wa rekodi ambayo ilibadilishwa, na kiini cha mabadiliko: rekodi ilikuwa nini hapo awali na nini ikawa baada yake.

  • Katika kesi ya INSERT: thamani kabla (before) sawa nullikifuatiwa na kamba iliyoingizwa.
  • Katika kesi ya UPDATE: katika payload.before hali ya awali ya mstari inaonyeshwa, na ndani payload.after - mpya na kiini cha mabadiliko.

2.2 MongoDB

Kiunganishi hiki hutumia utaratibu wa kawaida wa urudufishaji wa MongoDB, kusoma maelezo kutoka kwa oplog ya nodi ya msingi ya DBMS.

Vile vile kwa kiunganishi kilichoelezwa tayari kwa PgSQL, hapa, pia, mwanzoni mwa kwanza, snapshot ya msingi ya data inachukuliwa, baada ya hapo kiunganishi kinabadilisha hali ya kusoma ya oplog.

Mfano wa usanidi:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Kama unaweza kuona, hakuna chaguzi mpya hapa ikilinganishwa na mfano uliopita, lakini ni idadi tu ya chaguo zinazohusika na kuunganisha kwenye hifadhidata na viambishi vyao vimepunguzwa.

Mipangilio transforms wakati huu wanafanya yafuatayo: geuza jina la mada inayolengwa kutoka kwa mpango <server_name>.<db_name>.<collection_name> Π² data.cdc.mongo_<db_name>.

uvumilivu wa makosa

Suala la uvumilivu wa makosa na upatikanaji wa juu katika wakati wetu ni kali zaidi kuliko hapo awali - hasa tunapozungumzia kuhusu data na shughuli, na ufuatiliaji wa mabadiliko ya data hausimami kando katika suala hili. Wacha tuangalie kile kinachoweza kwenda vibaya kwa kanuni na nini kitatokea kwa Debezium katika kila kesi.

Kuna chaguzi tatu za kutoka:

  1. Kafka Connect imeshindwa. Ikiwa Connect imesanidiwa kufanya kazi katika hali ya kusambazwa, hii inahitaji wafanyakazi wengi kuweka group.id sawa. Kisha, ikiwa mmoja wao atashindwa, kiunganishi kitaanzishwa upya kwa mfanyakazi mwingine na kuendelea kusoma kutoka nafasi ya mwisho ya kujitolea katika mada katika Kafka.
  2. Kupoteza muunganisho na nguzo ya Kafka. Kiunganishi kitaacha tu kusoma katika nafasi ambayo haikuweza kutuma kwa Kafka na mara kwa mara jaribu kuituma tena hadi jaribio lifaulu.
  3. Chanzo cha data hakipatikani. Kiunganishi kitajaribu kuunganisha tena kwenye chanzo kama kilivyosanidiwa. Chaguo-msingi ni majaribio 16 kutumia nyuma kielelezo. Baada ya jaribio la 16 lisilofanikiwa, kazi itawekwa alama kama alishindwa na itahitaji kuanzishwa upya kwa mikono kupitia kiolesura cha Kafka Connect REST.
    • Katika kesi ya PostgreSQL data haitapotea, kwa sababu kutumia nafasi za kurudia kutazuia ufutaji wa faili za WAL ambazo hazijasomwa na kiunganishi. Katika kesi hii, kuna upande wa chini: ikiwa uunganisho wa mtandao kati ya kontakt na DBMS umevunjika kwa muda mrefu, kuna nafasi ya kuwa nafasi ya disk itaisha, na hii inaweza kusababisha kushindwa kwa DBMS nzima.
    • Katika kesi ya MySQL faili za binlog zinaweza kuzungushwa na DBMS yenyewe kabla ya muunganisho kurejeshwa. Hii itasababisha kiunganishi kwenda katika hali imeshindwa, na itahitaji kuanzisha upya katika hali ya awali ya snapshot ili kuendelea kusoma kutoka kwa binlogs ili kurejesha uendeshaji wa kawaida.
    • juu ya MongoDB. Nyaraka zinasema: tabia ya kiunganishi ikiwa faili za logi/oplog zimefutwa na kontakt haiwezi kuendelea kusoma kutoka kwa nafasi ambayo iliacha ni sawa kwa DBMS zote. Iko katika ukweli kwamba kontakt itaingia katika hali alishindwa na itahitaji kuanzisha upya katika modi picha ya awali.

      Hata hivyo, kuna tofauti. Ikiwa kiunganishi kilikuwa katika hali ya kukatwa kwa muda mrefu (au haikuweza kufikia mfano wa MongoDB), na oplog ilizungushwa wakati huu, basi wakati unganisho umerejeshwa, kiunganishi kitaendelea kwa utulivu kusoma data kutoka kwa nafasi ya kwanza inayopatikana. , ndiyo maana baadhi ya data katika Kafka hakuna itapiga.

Hitimisho

Debezium ni uzoefu wangu wa kwanza na mifumo ya CDC na kwa ujumla ni chanya sana. Mradi ulishinda kwa msaada wake kwa DBMS kuu, urahisi wa usanidi, usaidizi wa nguzo, na jumuiya inayofanya kazi. Kwa wale wanaopenda mazoezi, ninapendekeza usome miongozo ya Kafka Unganisha ΠΈ Debezium.

Ikilinganishwa na kiunganishi cha JDBC cha Kafka Connect, faida kuu ya Debezium ni kwamba mabadiliko yanasomwa kutoka kwa kumbukumbu za DBMS, ambayo inaruhusu data kupokelewa kwa kuchelewa kidogo. Kiunganishi cha JDBC (kilichotolewa na Kafka Connect) huuliza jedwali linalofuatiliwa kwa muda maalum na (kwa sababu hiyo hiyo) haitoi ujumbe data inapofutwa (unawezaje kuuliza data ambayo haipo?).

Ili kutatua shida zinazofanana, unaweza kulipa kipaumbele kwa suluhisho zifuatazo (pamoja na Debezium):

PS

Soma pia kwenye blogi yetu:

Chanzo: mapenzi.com

Kuongeza maoni