Gabatar da Debezium - CDC don Apache Kafka

Gabatar da Debezium - CDC don Apache Kafka

A cikin aikina, sau da yawa nakan haɗu da sababbin hanyoyin fasaha / samfuran software, bayanai game da waɗanda ba su da yawa a Intanet mai magana da Rashanci. Da wannan labarin, zan yi ƙoƙarin cike irin wannan rata tare da misali daga aikina na kwanan nan, lokacin da na buƙaci saita aika abubuwan CDC daga shahararrun DBMSs guda biyu (PostgreSQL da MongoDB) zuwa gungu na Kafka ta amfani da Debezium. Ina fatan wannan labarin sake dubawa, wanda ya bayyana sakamakon aikin da aka yi, zai kasance da amfani ga wasu.

Menene Debezium da CDC gabaɗaya?

Debezium - Wakilin nau'in software na CDC (Ɗauki canjin bayanai), ko kuma fiye da haka, saitin masu haɗawa ne don DBMS daban-daban waɗanda suka dace da tsarin Haɗin Apache Kafka.

wannan bude tushen aikin, mai lasisi a ƙarƙashin lasisin Apache v2.0 kuma Red Hat ya ɗauki nauyinsa. Ana ci gaba da ci gaba tun daga 2016 kuma a halin yanzu yana ba da tallafi na hukuma don DBMS masu zuwa: MySQL, PostgreSQL, MongoDB, SQL Server. Hakanan akwai masu haɗawa don Cassandra da Oracle, amma a halin yanzu suna cikin "farkon samun dama" matsayi, kuma sabbin abubuwan da aka fitar ba su da garantin dacewa da baya.

Idan muka kwatanta CDC tare da tsarin gargajiya (lokacin da aikace-aikacen ke karanta bayanai daga DBMS kai tsaye), to babban fa'idodinsa sun haɗa da aiwatar da canjin bayanai a matakin layi tare da ƙarancin latency, babban aminci da samuwa. Ana samun maki biyu na ƙarshe ta amfani da gungu na Kafka azaman ma'ajiya don abubuwan CDC.

Hakanan, fa'idodin sun haɗa da gaskiyar cewa ana amfani da ƙirar guda ɗaya don adana abubuwan da suka faru, don haka aikace-aikacen ƙarshe ba dole ba ne ya damu da nuances na sarrafa DBMS daban-daban.

A ƙarshe, ta yin amfani da dillalin saƙo yana buɗe sararin sama don ƙima a kwance na aikace-aikacen da ke bin sauye-sauyen bayanai. A lokaci guda kuma, an rage girman tasirin tushen bayanan, tunda ba a karɓi bayanai kai tsaye daga DBMS ba, amma daga gungu na Kafka.

Game da gine-ginen Debezium

Amfani da Debezium ya zo ga wannan tsari mai sauƙi:

DBMS (a matsayin tushen bayanai) → mai haɗawa a cikin Haɗin Kafka → Apache Kafka → mabukaci

A matsayin misali, zan ba da zane daga gidan yanar gizon aikin:

Gabatar da Debezium - CDC don Apache Kafka

Duk da haka, ba na son wannan makirci da gaske, saboda da alama cewa mai haɗin nutsewa kawai zai yiwu.

A zahiri, lamarin ya bambanta: cika tafkin Data ɗin ku ( mahada ta ƙarshe a cikin zanen da ke sama) ba shine kawai hanyar amfani da Debezium ba. Abubuwan da aka aika zuwa Apache Kafka na iya amfani da aikace-aikacenku don warware yanayi daban-daban. Misali:

  • cire bayanan da ba su da mahimmanci daga cache;
  • aika sanarwar;
  • sabunta fihirisar bincike;
  • wani nau'i na rajistan ayyukan dubawa;
  • ...

Idan kuna da aikace-aikacen Java kuma babu buƙatar / yuwuwar amfani da gungu na Kafka, akwai kuma yuwuwar yin aiki ta hanyar. mai haɗa haɗin gwiwa. Ƙarin bayyane shine cewa tare da shi za ku iya ƙin ƙarin abubuwan more rayuwa (a cikin hanyar haɗi da Kafka). Koyaya, an soke wannan maganin tun daga sigar 1.1 kuma ba a sake ba da shawarar amfani da ita (ana iya cire shi a cikin sakewa na gaba).

Wannan labarin zai tattauna tsarin gine-ginen da masu haɓakawa suka ba da shawarar, wanda ke ba da haƙuri da kuskure.

Tsarin haɗin haɗi

Domin fara bin canje-canje a cikin mafi mahimmancin ƙima - bayanai - muna buƙatar:

  1. tushen bayanai, wanda zai iya zama MySQL farawa daga sigar 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (cikakken jerin);
  2. Apache Kafka cluster
  3. Misalin Haɗin Kafka (versions 1.x, 2.x);
  4. saita mai haɗin Debezium.

Yi aiki a kan maki biyu na farko, watau. tsarin shigar da DBMS da Apache Kafka sun wuce iyakar labarin. Koyaya, ga waɗanda suke son tura komai a cikin akwatin yashi, akwai wanda aka shirya a cikin ma'ajin hukuma tare da misalai. docker-hada.yaml.

Za mu mai da hankali kan batutuwa biyu na ƙarshe dalla-dalla.

0. Kafka Connect

Anan kuma daga baya a cikin labarin, ana la'akari da duk misalan daidaitawa a cikin mahallin hoton Docker da masu haɓaka Debezium suka rarraba. Ya ƙunshi duk fayilolin plugin ɗin da ake buƙata (masu haɗawa) kuma yana ba da saitin Haɗin Kafka ta amfani da masu canjin yanayi.

Idan kuna da niyyar amfani da Haɗin Kafka daga Confluent, kuna buƙatar ƙara plugins na masu haɗin kai masu dacewa da kanku zuwa littafin da aka kayyade a ciki. plugin.path ko saita ta hanyar canjin yanayi CLASSPATH. An bayyana saitunan ma'aikacin Haɗin Kafka da masu haɗin kai ta hanyar fayilolin sanyi waɗanda aka wuce azaman gardama ga umarnin farawa ma'aikaci. Don cikakkun bayanai duba takardun.

Dukkanin tsarin kafa Debeizum a cikin sigar mai haɗawa ana aiwatar da shi ta matakai biyu. Bari mu yi la'akari da kowannensu:

1. Kafa Kafka Connect framework

Don jera bayanai zuwa gungu na Apache Kafka, ana saita takamaiman sigogi a cikin tsarin Haɗin Kafka, kamar:

  • saitin haɗin gungu,
  • sunayen batutuwan da za a adana tsarin haɗin haɗin kanta,
  • sunan rukunin da mai haɗin ke gudana (idan ana amfani da yanayin rarrabawa).

Hoton Docker na aikin yana goyan bayan daidaitawa ta amfani da masu canjin yanayi - wannan shine abin da zamu yi amfani da shi. Don haka bari mu sauke hoton:

docker pull debezium/connect

Matsakaicin madaidaicin madaidaicin mahalli da ake buƙata don gudanar da haɗin haɗi shine kamar haka:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - jerin farko na sabobin gungu na Kafka don samun cikakken jerin membobin tari;
  • OFFSET_STORAGE_TOPIC=connector-offsets - batu don adana wurare inda mai haɗawa yake a halin yanzu;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - wani batu don adana matsayi na mai haɗawa da ayyukansa;
  • CONFIG_STORAGE_TOPIC=connector-config - batu don adana bayanan saitin haɗin haɗin da ayyukansa;
  • GROUP_ID=1 - mai gano ƙungiyar ma'aikata wanda za'a iya aiwatar da aikin haɗin kai; da ake buƙata lokacin amfani da rarraba (an rarraba) tsarin mulki.

Muna fara akwati da waɗannan masu canji:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Bayani game da Avro

Ta hanyar tsoho, Debezium yana rubuta bayanai a cikin tsarin JSON, wanda aka yarda da shi don akwatunan yashi da ƙananan bayanai, amma yana iya zama matsala a cikin manyan bayanai masu nauyi. Madadin mai jujjuyawar JSON shine jera saƙonni ta amfani da Avro zuwa tsarin binary, wanda ke rage nauyi akan tsarin I / O a cikin Apache Kafka.

Don amfani da Avro, kuna buƙatar tura wani dabam tsari - rajista (don adana schemas). Canje-canje ga mai canzawa zai yi kama da haka:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Cikakkun bayanai kan amfani da Avro da kafa masa rajista sun wuce iyakar labarin - ƙari, don bayyanawa, za mu yi amfani da JSON.

2. Saita haɗin haɗin kanta

Yanzu zaku iya zuwa kai tsaye zuwa tsarin haɗin haɗin kanta, wanda zai karanta bayanai daga tushen.

Bari mu kalli misalin masu haɗawa don DBMS guda biyu: PostgreSQL da MongoDB, waɗanda nake da gogewa kuma waɗanda ke da bambance-bambance (ko da yake ƙananan, amma a wasu lokuta masu mahimmanci!).

An kwatanta daidaitawar a cikin bayanin JSON kuma an loda shi zuwa Haɗin Kafka ta amfani da buƙatar POST.

2.1. PostgreSQL

Misalin haɗin haɗin haɗin gwiwa don PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Ka'idar aiki na haɗin bayan wannan saitin abu ne mai sauƙi:

  • A farkon farawa, yana haɗi zuwa bayanan da aka ƙayyade a cikin tsari kuma yana farawa a cikin yanayin hoton farko, aika zuwa Kafka farkon saitin bayanan da aka karɓa tare da sharadi SELECT * FROM table_name.
  • Bayan an gama farawa, mai haɗawa ya shiga yanayin karatun canje-canje daga fayilolin PostgreSQL WAL.

Game da zaɓuɓɓukan da aka yi amfani da su:

  • name - sunan mai haɗawa wanda aka yi amfani da tsarin da aka kwatanta a ƙasa; a nan gaba, ana amfani da wannan sunan don aiki tare da mai haɗawa (watau duba matsayi / sake farawa / sabunta tsarin) ta hanyar Kafka Connect REST API;
  • connector.class - aji mai haɗin DBMS wanda mai haɗawa zai yi amfani da shi;
  • plugin.name shine sunan plugin ɗin don yanke ma'anar bayanai daga fayilolin WAL. Akwai don zaɓar daga wal2json, decoderbuffs и pgoutput. Biyu na farko suna buƙatar shigar da abubuwan da suka dace a cikin DBMS, kuma pgoutput don nau'in PostgreSQL na 10 kuma mafi girma baya buƙatar ƙarin magudi;
  • database.* - zaɓuɓɓuka don haɗi zuwa bayanan bayanai, inda database.server.name - sunan misalin PostgreSQL da aka yi amfani da shi don samar da sunan jigon a cikin gungu na Kafka;
  • table.include.list - jerin tebur wanda muke so mu bi diddigin canje-canje; da aka ba a cikin tsari schema.table_name; ba za a iya amfani da tare da table.exclude.list;
  • heartbeat.interval.ms - tazara (a cikin millise seconds) wanda mai haɗin ke aika saƙonnin bugun zuciya zuwa wani batu na musamman;
  • heartbeat.action.query - buƙatun da za a aiwatar yayin aika kowane saƙon bugun zuciya (zaɓin ya bayyana tun sigar 1.1);
  • slot.name - sunan ramin kwafi wanda mai haɗin zai yi amfani da shi;
  • publication.name - Suna wallafe a cikin PostgreSQL wanda mai haɗin ke amfani da shi. Idan babu shi, Debezium zai yi ƙoƙarin ƙirƙirar shi. Idan mai amfani wanda aka haɗa haɗin a ƙarƙashinsa ba shi da isassun haƙƙoƙi don wannan aikin, mai haɗawa zai fita tare da kuskure;
  • transforms yana ƙayyade yadda za a canza sunan batun da ake nufi:
    • transforms.AddPrefix.type yana nuna cewa za mu yi amfani da maganganu na yau da kullum;
    • transforms.AddPrefix.regex - abin rufe fuska wanda aka sake fayyace sunan taken da aka yi niyya;
    • transforms.AddPrefix.replacement - kai tsaye abin da muke sake fasalin.

Ƙari game da bugun zuciya da canzawa

Ta hanyar tsoho, mai haɗin haɗin yana aika bayanai zuwa Kafka don kowane ma'amala da aka yi, kuma ya rubuta LSN ɗin sa (Lambar Lissafin Shiga) zuwa taken sabis ɗin. offset. Amma menene zai faru idan an saita mai haɗin don karanta ba duka bayanan bayanai ba, amma kawai wani ɓangare na teburinsa (a cikin waɗanne bayanai ba a sabunta su ba)?

  • Mai haɗin haɗin zai karanta fayilolin WAL kuma ba zai gano ma'amala a cikin su zuwa teburin da yake saka idanu ba.
  • Don haka, ba za ta sabunta matsayinta na yanzu a cikin ko dai batun ko ramin kwafi ba.
  • Wannan, bi da bi, zai sa fayilolin WAL su zama ''maƙale'' akan faifai kuma wataƙila za su ƙare daga sararin diski.

Kuma a nan zaɓuɓɓuka sun zo don ceto. heartbeat.interval.ms и heartbeat.action.query. Yin amfani da waɗannan zaɓuɓɓukan bi-biyu yana ba da damar aiwatar da buƙatar canza bayanai a cikin tebur daban duk lokacin da aka aika saƙon bugun zuciya. Don haka, LSN ɗin da mai haɗin ke a halin yanzu (a cikin ramin kwafi) ana sabunta shi akai-akai. Wannan yana ba DBMS damar cire fayilolin WAL waɗanda ba a buƙata. Don ƙarin bayani kan yadda zaɓuɓɓuka ke aiki, duba takardun.

Wani zabin da ya cancanci kulawa shine transforms. Kodayake ya fi dacewa da dacewa da kyau ...

Ta hanyar tsoho, Debezium yana ƙirƙirar batutuwa ta amfani da manufofin sanya suna mai zuwa: serverName.schemaName.tableName. Wannan ƙila ba koyaushe ya dace ba. Zabuka transforms ta yin amfani da maganganu na yau da kullun, zaku iya ayyana jerin tebur waɗanda abubuwan da suka faru suna buƙatar karkatar da su zuwa wani batu mai takamaiman suna.

A cikin tsarin mu godiya ga transforms abubuwan da ke biyowa suna faruwa: duk abubuwan da suka faru na CDC daga bayanan da aka sa ido za su je kan batun tare da sunan data.cdc.dbname. In ba haka ba (ba tare da waɗannan saitunan ba), Debezium ta tsohuwa zai ƙirƙiri jigon kowane tebur na fom: pg-dev.public.<table_name>.

Iyakokin haɗin haɗi

A ƙarshen bayanin saitin mai haɗawa don PostgreSQL, yana da daraja magana game da waɗannan fasalulluka / iyakokin aikin sa:

  1. Ayyukan mai haɗawa don PostgreSQL ya dogara da manufar ƙaddamar da ma'ana. Saboda haka ya baya bin buƙatun don canza tsarin bayanan (DDL) - saboda haka, wannan bayanan ba zai kasance cikin batutuwa ba.
  2. Tunda ana amfani da ramukan kwafi, haɗin haɗin haɗin yana yiwuwa kawai zuwa babban misali DBMS.
  3. Idan mai amfani wanda mai haɗin ke haɗawa da bayanan bayanan yana da haƙƙin karantawa kawai, to kafin ƙaddamar da farko, kuna buƙatar ƙirƙirar ramin kwafi da hannu da hannu kuma a buga zuwa bayanan.

Aiwatar da tsari

Don haka bari mu loda tsarin mu cikin mahaɗin:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Mun duba cewa zazzagewar ta yi nasara kuma mai haɗawa ta fara:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Mai girma: an saita shi kuma yana shirye don tafiya. Yanzu bari mu yi kamar mabukaci kuma mu haɗa zuwa Kafka, bayan haka mun ƙara kuma mu canza shigarwa a cikin tebur:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

A cikin maudu'inmu, za a nuna wannan kamar haka:

JSON mai tsayi sosai tare da canje-canjen mu

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

A cikin duka biyun, bayanan sun ƙunshi maɓallin (PK) na rikodin da aka canza, da ainihin ainihin canje-canje: menene rikodin ya kasance a baya da abin da ya zama bayansa.

  • A cikin yanayin INSERT: darajar kafin (before) daidai nullsai kuma igiyar da aka saka.
  • A cikin yanayin UPDATE: a payload.before an nuna yanayin layin da ya gabata, kuma a ciki payload.after - sabo tare da ainihin canji.

2.2 MongoDB

Wannan haɗin yana amfani da daidaitaccen tsarin kwafi na MongoDB, yana karanta bayanai daga oplog na kumburin farko na DBMS.

Hakazalika ga mahaɗin da aka riga aka kwatanta don PgSQL, anan ma, a farkon farawa, ana ɗaukar hoton bayanan farko, bayan haka mai haɗawa ya canza zuwa yanayin karatun oplog.

Misalin tsari:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Kamar yadda kake gani, babu sababbin zaɓuɓɓuka idan aka kwatanta da misalin da ya gabata, amma kawai adadin zaɓuɓɓukan da ke da alhakin haɗawa da bayanan bayanai da prefixes ɗin su kawai an rage.

Saituna transforms wannan lokacin suna yin haka: juya sunan taken da aka yi niyya daga makircin <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

hakuri da laifi

Batun juriya ga kuskure da samun wadatuwa a zamaninmu ya fi kowane lokaci - musamman idan muka yi magana game da bayanai da ma'amaloli, kuma bin diddigin canjin bayanai ba ya cikin wannan lamarin. Bari mu ga abin da zai iya yin kuskure bisa manufa da abin da zai faru da Debezium a kowane hali.

Akwai zaɓuɓɓukan ficewa guda uku:

  1. Kafka Connect gazawar. Idan an saita Connect don yin aiki a yanayin rarraba, wannan yana buƙatar ma'aikata da yawa don saita group.id iri ɗaya. Sa'an nan, idan daya daga cikinsu ya kasa, za a sake kunna connector a kan sauran ma'aikacin da kuma ci gaba da karatu daga karshe sadaukar matsayi a cikin topic a cikin Kafka.
  2. Asarar haɗin kai tare da tarin Kafka. Mai haɗin haɗin zai kawai daina karantawa a wurin da ya kasa aikawa zuwa Kafka kuma yana ƙoƙarin sake aika shi lokaci-lokaci har sai ƙoƙarin ya yi nasara.
  3. Babu tushen bayanai. Mai haɗawa zai yi ƙoƙarin sake haɗawa zuwa tushen bisa ga tsari. Tsohuwar ita ce yunƙurin amfani da 16 koma baya mai ma'ana. Bayan ƙoƙari na 16 da ya gaza, za a yiwa aikin alama a matsayin gaza kuma za a buƙaci a sake farawa da hannu ta hanyar Kafka Connect REST interface.
    • A cikin yanayin PostgreSQL bayanai ba za a rasa, saboda Yin amfani da ramukan kwafi zai hana goge fayilolin WAL wanda mahaɗin bai karanta ba. A wannan yanayin, akwai rashin daidaituwa: idan haɗin hanyar sadarwa tsakanin mai haɗawa da DBMS ya lalace na dogon lokaci, akwai damar cewa sararin diski zai ƙare, kuma hakan na iya haifar da gazawar DBMS gaba ɗaya.
    • A cikin yanayin MySQL DBMS na iya jujjuya fayilolin binlog kafin a dawo da haɗin kai. Wannan zai sa mai haɗa haɗin ya shiga cikin yanayin rashin nasara, kuma yana buƙatar sake kunnawa a yanayin ɗaukar hoto na farko don ci gaba da karantawa daga binlogs don dawo da aiki na yau da kullun.
    • a kan MongoDB. Takardun ya ce: halayen mai haɗawa idan an share fayilolin log/oplog kuma mai haɗin ba zai iya ci gaba da karantawa daga inda ya tsaya ba iri ɗaya ne ga duk DBMS. Ya ta'allaka ne a cikin gaskiyar cewa mai haɗawa zai shiga cikin jihar gaza kuma zai buƙaci sake farawa a cikin yanayin hoton farko.

      Duk da haka, akwai keɓancewa. Idan mai haɗin haɗin ya daɗe na dogon lokaci (ko kuma ya kasa isa ga misalin MongoDB), kuma an juya oplog a wannan lokacin, sannan lokacin da haɗin ya dawo, mai haɗin zai ci gaba da karanta bayanai daga wuri na farko, wanda shine. dalilin da yasa wasu bayanai a cikin Kafka ba zai buga.

ƙarshe

Debezium shine gwaninta na farko tare da tsarin CDC kuma ya kasance mai inganci gabaɗaya. Aikin ya ba da cin hancin tallafin babban DBMS, sauƙin daidaitawa, tallafi don tari da kuma al'umma mai aiki. Ga masu sha'awar yin aiki, ina ba da shawarar ku karanta jagororin don Kafka Connect и Debezium.

Idan aka kwatanta da mai haɗin JDBC don Haɗin Kafka, babban fa'idar Debezium shine ana karanta canje-canje daga rajistan ayyukan DBMS, wanda ke ba da damar karɓar bayanai tare da ɗan jinkiri. Mai Haɗin JDBC (wanda Kafka Connect ya samar) yana tambayar teburin da aka sa ido a ƙayyadaddun tazara kuma (saboda wannan dalili) baya samar da saƙonni lokacin da aka share bayanai (ta yaya za ku iya neman bayanan da ba a can?).

Don magance irin waɗannan matsalolin, zaku iya kula da mafita masu zuwa (ban da Debezium):

PS

Karanta kuma a kan shafinmu:

source: www.habr.com

Add a comment