Prezantimi i Debezium - CDC për Apache Kafka

Prezantimi i Debezium - CDC për Apache Kafka

Në punën time, shpesh ndeshem me zgjidhje të reja teknike / produkte softuerike, informacioni për të cilin është mjaft i pakët në internetin rusisht-folës. Me këtë artikull, do të përpiqem të plotësoj një boshllëk të tillë me një shembull nga praktika ime e fundit, kur më duhej të konfiguroja dërgimin e ngjarjeve CDC nga dy DBMS të njohura (PostgreSQL dhe MongoDB) në një grup Kafka duke përdorur Debezium. Shpresoj që ky artikull rishikues, i cili u shfaq si rezultat i punës së bërë, do të jetë i dobishëm për të tjerët.

Çfarë është Debezium dhe CDC në përgjithësi?

Debezium - Përfaqësues i kategorisë së softuerit CDC (Regjistroni ndryshimin e të dhënave), ose më saktë, është një grup lidhësish për DBMS të ndryshme që janë në përputhje me kornizën Apache Kafka Connect.

Ajo projekt me kod të hapur, licencuar sipas licencës Apache v2.0 dhe sponsorizuar nga Red Hat. Zhvillimi ka nisur që nga viti 2016 dhe për momentin ofron mbështetje zyrtare për DBMS-të e mëposhtme: MySQL, PostgreSQL, MongoDB, SQL Server. Ekzistojnë gjithashtu lidhës për Cassandra dhe Oracle, por ato aktualisht janë në statusin e "qasjes së hershme" dhe lëshimet e reja nuk garantojnë pajtueshmëri të prapambetur.

Nëse krahasojmë CDC me qasjen tradicionale (kur aplikacioni lexon drejtpërdrejt të dhënat nga DBMS), atëherë avantazhet e tij kryesore përfshijnë zbatimin e transmetimit të ndryshimit të të dhënave në nivelin e rreshtit me vonesë të ulët, besueshmëri të lartë dhe disponueshmëri. Dy pikat e fundit arrihen duke përdorur një grup Kafka si një depo për ngjarjet e CDC.

Gjithashtu, avantazhet përfshijnë faktin se një model i vetëm përdoret për të ruajtur ngjarjet, kështu që aplikacioni përfundimtar nuk duhet të shqetësohet për nuancat e funksionimit të DBMS të ndryshme.

Më në fund, përdorimi i një ndërmjetësi mesazhesh hap hapësirën për shkallëzim horizontal të aplikacioneve që gjurmojnë ndryshimet në të dhëna. Në të njëjtën kohë, ndikimi në burimin e të dhënave minimizohet, pasi të dhënat nuk merren drejtpërdrejt nga DBMS, por nga grupi Kafka.

Rreth arkitekturës Debezium

Përdorimi i Debezium zbret në këtë skemë të thjeshtë:

DBMS (si burim i të dhënave) → lidhës në Kafka Connect → Apache Kafka → konsumator

Si ilustrim, unë do të jap një diagram nga faqja e internetit e projektit:

Prezantimi i Debezium - CDC për Apache Kafka

Sidoqoftë, nuk më pëlqen shumë kjo skemë, sepse duket se vetëm një lidhës lavaman është i mundur.

Në realitet, situata është ndryshe: mbushja e Liqenit tuaj të të Dhënave (lidhja e fundit në diagramin e mësipërm) nuk është mënyra e vetme për të përdorur Debezium. Ngjarjet e dërguara te Apache Kafka mund të përdoren nga aplikacionet tuaja për t'u marrë me situata të ndryshme. Për shembull:

  • heqja e të dhënave të parëndësishme nga cache;
  • dërgimi i njoftimeve;
  • përditësimet e indeksit të kërkimit;
  • një lloj regjistrash auditimi;
  • ...

Në rast se keni një aplikacion Java dhe nuk ka nevojë/mundësi për të përdorur një grup Kafka, ekziston gjithashtu mundësia për të punuar përmes lidhës i ngulitur. Plus i dukshëm është se me të mund të refuzoni infrastrukturën shtesë (në formën e një lidhësi dhe Kafka). Megjithatë, kjo zgjidhje është vjetëruar që nga versioni 1.1 dhe nuk rekomandohet më për përdorim (mund të hiqet në versionet e ardhshme).

Ky artikull do të diskutojë arkitekturën e rekomanduar nga zhvilluesit, e cila siguron tolerancën e gabimeve dhe shkallëzueshmërinë.

Konfigurimi i lidhësit

Për të filluar ndjekjen e ndryshimeve në vlerën më të rëndësishme - të dhënat - na duhen:

  1. burimi i të dhënave, i cili mund të jetë MySQL duke filluar nga versioni 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (Lista e plotë);
  2. Grup Apache Kafka
  3. Shembulli i Kafka Connect (versionet 1.x, 2.x);
  4. lidhës i konfiguruar Debezium.

Punoni në dy pikat e para, d.m.th. procesi i instalimit të një DBMS dhe Apache Kafka janë përtej qëllimit të artikullit. Sidoqoftë, për ata që duan të vendosin gjithçka në një sandbox, ekziston një i gatshëm në depon zyrtare me shembuj doker-kompozoj.yaml.

Ne do të përqendrohemi në dy pikat e fundit më në detaje.

0. Kafka Connect

Këtu dhe më vonë në artikull, të gjithë shembujt e konfigurimit konsiderohen në kontekstin e imazhit Docker të shpërndarë nga zhvilluesit e Debezium. Ai përmban të gjithë skedarët e nevojshëm të shtojcave (lidhësit) dhe siguron konfigurimin e Kafka Connect duke përdorur variabla të mjedisit.

Nëse keni ndërmend të përdorni Kafka Connect nga Confluent, do t'ju duhet të shtoni vetë shtojcat e lidhësve të nevojshëm në drejtorinë e specifikuar në plugin.path ose vendoset nëpërmjet një ndryshoreje mjedisi CLASSPATH. Cilësimet për punëtorin dhe lidhësit e Kafka Connect përcaktohen përmes skedarëve të konfigurimit që i kalohen si argumente komandës startuese të punëtorit. Për detaje, shih dokumentacionin.

I gjithë procesi i konfigurimit të Debeizum në versionin lidhës kryhet në dy faza. Le të shqyrtojmë secilën prej tyre:

1. Vendosja e kornizës Kafka Connect

Për të transmetuar të dhëna në një grupim Apache Kafka, parametrat specifikë vendosen në kornizën Kafka Connect, si p.sh.

  • cilësimet e lidhjes së grupit,
  • emrat e temave në të cilat do të ruhet konfigurimi i vetë lidhësit,
  • emri i grupit në të cilin lidhësi funksionon (në rast të përdorimit të modalitetit të shpërndarë).

Imazhi zyrtar Docker i projektit mbështet konfigurimin duke përdorur variablat e mjedisit - kjo është ajo që ne do të përdorim. Pra, le të shkarkojmë imazhin:

docker pull debezium/connect

Grupi minimal i variablave të mjedisit që kërkohet për të ekzekutuar lidhësin është si më poshtë:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - lista fillestare e serverëve të grupimit Kafka për të marrë një listë të plotë të anëtarëve të grupimit;
  • OFFSET_STORAGE_TOPIC=connector-offsets — një temë për ruajtjen e pozicioneve ku ndodhet aktualisht lidhësi;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - një temë për ruajtjen e statusit të lidhësit dhe detyrave të tij;
  • CONFIG_STORAGE_TOPIC=connector-config - një temë për ruajtjen e të dhënave të konfigurimit të lidhësit dhe detyrat e tij;
  • GROUP_ID=1 — identifikuesi i grupit të punëtorëve në të cilin mund të ekzekutohet detyra lidhëse; kërkohet gjatë përdorimit të shpërndarë (shpërndarë) regjimi.

Ne e fillojmë kontejnerin me këto variabla:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Shënim për Avro

Si parazgjedhje, Debezium shkruan të dhëna në formatin JSON, i cili është i pranueshëm për sandboxet dhe sasi të vogla të dhënash, por mund të jetë problem në bazat e të dhënave të ngarkuara shumë. Një alternativë ndaj konvertuesit JSON është serializimi i mesazheve duke përdorur Avro në një format binar, i cili redukton ngarkesën në nënsistemin I/O në Apache Kafka.

Për të përdorur Avro, duhet të vendosni një të veçantë skema-regjistri (për ruajtjen e skemave). Variablat për konvertuesin do të duken kështu:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Detajet mbi përdorimin e Avro dhe vendosjen e një regjistri për të janë përtej qëllimit të artikullit - më tej, për qartësi, ne do të përdorim JSON.

2. Vendosja e vetë lidhësit

Tani mund të shkoni drejtpërdrejt në konfigurimin e vetë lidhësit, i cili do të lexojë të dhënat nga burimi.

Le të shohim shembullin e lidhësve për dy DBMS: PostgreSQL dhe MongoDB, për të cilat kam përvojë dhe për të cilat ka dallime (megjithëse të vogla, por në disa raste domethënëse!).

Konfigurimi përshkruhet në shënimin JSON dhe ngarkohet në Kafka Connect duke përdorur një kërkesë POST.

2.1. PostgreSQL

Shembull i konfigurimit të lidhësit për PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Parimi i funksionimit të lidhësit pas këtij konfigurimi është mjaft i thjeshtë:

  • Në fillimin e parë, ai lidhet me bazën e të dhënave të specifikuar në konfigurim dhe fillon në modalitetin fotografi fillestare, duke i dërguar Kafkës grupin fillestar të të dhënave të marra me kusht SELECT * FROM table_name.
  • Pas përfundimit të inicializimit, lidhësi hyn në modalitetin e leximit të ndryshimeve nga skedarët PostgreSQL WAL.

Rreth opsioneve të përdorura:

  • name — emri i lidhësit për të cilin përdoret konfigurimi i përshkruar më poshtë; në të ardhmen, ky emër përdoret për të punuar me lidhësin (d.m.th. shikoni statusin / rinisni / përditësoni konfigurimin) përmes Kafka Connect REST API;
  • connector.class — klasa e lidhësit DBMS që do të përdoret nga lidhësi i konfiguruar;
  • plugin.name është emri i shtojcës për dekodimin logjik të të dhënave nga skedarët WAL. Në dispozicion për të zgjedhur wal2json, decoderbuffs и pgoutput. Dy të parat kërkojnë instalimin e shtesave të duhura në DBMS, dhe pgoutput për PostgreSQL versioni 10 dhe më i lartë nuk kërkon manipulime shtesë;
  • database.* — opsionet për t'u lidhur me bazën e të dhënave, ku database.server.name - emri i shembullit PostgreSQL i përdorur për të formuar emrin e temës në grupimin Kafka;
  • table.include.list - një listë tabelash në të cilat duam të gjurmojmë ndryshimet; dhënë në format schema.table_name; nuk mund të përdoret së bashku me table.exclude.list;
  • heartbeat.interval.ms — intervali (në milisekonda) me të cilin lidhësi dërgon mesazhe të rrahjeve të zemrës në një temë të veçantë;
  • heartbeat.action.query - një kërkesë që do të ekzekutohet gjatë dërgimit të çdo mesazhi të rrahjeve të zemrës (opsioni është shfaqur që nga versioni 1.1);
  • slot.name — emri i slotit të replikimit që do të përdoret nga lidhësi;
  • publication.name - Emri Publikim në PostgreSQL që përdor lidhësi. Në rast se nuk ekziston, Debezium do të përpiqet ta krijojë atë. Nëse përdoruesi nën të cilin është bërë lidhja nuk ka të drejta të mjaftueshme për këtë veprim, lidhësi do të dalë me një gabim;
  • transforms përcakton se si të ndryshohet saktësisht emri i temës së synuar:
    • transforms.AddPrefix.type tregon se do të përdorim shprehje të rregullta;
    • transforms.AddPrefix.regex — maskë me të cilën ripërcaktohet emri i temës së synuar;
    • transforms.AddPrefix.replacement - drejtpërdrejt atë që ne ripërcaktojmë.

Më shumë rreth rrahjeve të zemrës dhe transformimeve

Si parazgjedhje, lidhësi i dërgon të dhëna Kafka-s për çdo transaksion të kryer dhe shkruan LSN-në ​​e tij (Numri i Sekuencës së Regjistrit) në temën e shërbimit offset. Por çfarë ndodh nëse lidhësi është konfiguruar të lexojë jo të gjithë bazën e të dhënave, por vetëm një pjesë të tabelave të saj (në të cilat të dhënat përditësohen rrallë)?

  • Lidhësi do të lexojë skedarët WAL dhe nuk do të zbulojë kryerjet e transaksioneve në to në tabelat që monitoron.
  • Prandaj, ai nuk do të përditësojë pozicionin e tij aktual as në temë, as në slotin e përsëritjes.
  • Kjo, nga ana tjetër, do të bëjë që skedarët WAL të "ngecin" në disk dhe ka të ngjarë të mbarojë hapësira në disk.

Dhe këtu opsionet vijnë në shpëtim. heartbeat.interval.ms и heartbeat.action.query. Përdorimi i këtyre opsioneve në çift bën të mundur ekzekutimin e një kërkese për ndryshimin e të dhënave në një tabelë të veçantë sa herë që dërgohet një mesazh rrahje zemre. Kështu, LSN në të cilin lidhësi ndodhet aktualisht (në slotin e replikimit) përditësohet vazhdimisht. Kjo i lejon DBMS të heqë skedarët WAL që nuk nevojiten më. Për më shumë informacion se si funksionojnë opsionet, shihni dokumentacionin.

Një tjetër opsion që meriton vëmendje më të madhe është transforms. Edhe pse bëhet fjalë më shumë për komoditetin dhe bukurinë ...

Si parazgjedhje, Debezium krijon tema duke përdorur politikën e mëposhtme të emërtimit: serverName.schemaName.tableName. Kjo mund të mos jetë gjithmonë e përshtatshme. Opsione transforms duke përdorur shprehje të rregullta, mund të përcaktoni një listë tabelash, ngjarjet e të cilave duhet të drejtohen në një temë me një emër specifik.

Në konfigurimin tonë falë transforms ndodh si më poshtë: të gjitha ngjarjet e CDC nga baza e të dhënave të gjurmuara do të shkojnë te tema me emrin data.cdc.dbname. Përndryshe (pa këto cilësime), Debezium si parazgjedhje do të krijonte një temë për secilën tabelë të formularit: pg-dev.public.<table_name>.

Kufizimet e lidhësit

Në fund të përshkrimit të konfigurimit të lidhësit për PostgreSQL, ia vlen të flasim për veçoritë / kufizimet e mëposhtme të punës së tij:

  1. Funksionaliteti i lidhësit për PostgreSQL mbështetet në konceptin e dekodimit logjik. Prandaj ai nuk gjurmon kërkesat për të ndryshuar strukturën e bazës së të dhënave (DDL) - në përputhje me rrethanat, këto të dhëna nuk do të jenë në tema.
  2. Meqenëse përdoren lojëra elektronike të riprodhimit, lidhja e lidhësit është e mundur vetëm tek shembulli kryesor i DBMS.
  3. Nëse përdoruesi nën të cilin lidhësi lidhet me bazën e të dhënave ka të drejta vetëm për lexim, atëherë përpara nisjes së parë, do t'ju duhet të krijoni manualisht një vend të përsëritjes dhe ta publikoni në bazën e të dhënave.

Aplikimi i një konfigurimi

Pra, le të ngarkojmë konfigurimin tonë në lidhës:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Ne kontrollojmë që shkarkimi ishte i suksesshëm dhe lidhësi filloi:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

E shkëlqyeshme: është konfiguruar dhe gati për të shkuar. Tani le të pretendojmë të jemi konsumator dhe të lidhemi me Kafka, pas së cilës shtojmë dhe ndryshojmë një hyrje në tabelë:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Në temën tonë, kjo do të shfaqet si më poshtë:

JSON shumë i gjatë me ndryshimet tona

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Në të dyja rastet, të dhënat përbëhen nga çelësi (PK) i rekordit që u ndryshua, dhe nga vetë thelbi i ndryshimeve: çfarë ishte rekordi më parë dhe çfarë u bë më pas.

  • Në rastin e INSERT: vlera para (before) barazohet nulle ndjekur nga vargu që u fut.
  • Në rastin e UPDATE: në payload.before shfaqet gjendja e mëparshme e rreshtit dhe në payload.after - e re me thelbin e ndryshimit.

2.2 MongoDB

Ky lidhës përdor mekanizmin standard të replikimit MongoDB, duke lexuar informacion nga oplog i nyjës primare DBMS.

Ngjashëm me lidhësin e përshkruar tashmë për PgSQL, edhe këtu, në fillimin e parë, merret fotografia primare e të dhënave, pas së cilës lidhësi kalon në modalitetin e leximit oplog.

Shembull i konfigurimit:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Siç mund ta shihni, nuk ka opsione të reja në krahasim me shembullin e mëparshëm, por vetëm numri i opsioneve përgjegjëse për lidhjen me bazën e të dhënave dhe prefikset e tyre është zvogëluar.

Cilësimet transforms këtë herë ata bëjnë si më poshtë: kthejnë emrin e temës së synuar nga skema <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

toleranca ndaj gabimeve

Çështja e tolerancës së gabimeve dhe disponueshmërisë së lartë në kohën tonë është më e mprehtë se kurrë - veçanërisht kur flasim për të dhëna dhe transaksione, dhe gjurmimi i ndryshimit të të dhënave nuk është anash në këtë çështje. Le të shohim se çfarë mund të shkojë keq në parim dhe çfarë do të ndodhë me Debezium në secilin rast.

Ekzistojnë tre opsione të tërheqjes:

  1. Dështimi i Kafka Connect. Nëse Connect është konfiguruar për të punuar në modalitetin e shpërndarë, kjo kërkon që shumë punëtorë të vendosin të njëjtin group.id. Pastaj, nëse njëri prej tyre dështon, lidhësi do të rindizet në punëtorin tjetër dhe do të vazhdojë të lexojë nga pozicioni i fundit i angazhuar në temën në Kafka.
  2. Humbja e lidhjes me grupin Kafka. Lidhësi thjesht do të ndalojë së lexuari në pozicionin që nuk arriti t'i dërgojë Kafkës dhe do të përpiqet periodikisht ta ridërgojë atë derisa përpjekja të ketë sukses.
  3. Burimi i të dhënave është i padisponueshëm. Lidhësi do të përpiqet të rilidhet me burimin sipas konfigurimit. Parazgjedhja është 16 përpjekje duke përdorur zmbrapsja eksponenciale. Pas përpjekjes së 16-të të dështuar, detyra do të shënohet si dështuar dhe do të duhet të riniset manualisht nëpërmjet ndërfaqes Kafka Connect REST.
    • Në rastin e PostgreSQL të dhënat nuk do të humbasin, sepse përdorimi i slloteve të riprodhimit do të parandalojë fshirjen e skedarëve WAL që nuk lexohen nga lidhësi. Në këtë rast, ka një dobësi: nëse lidhja e rrjetit midis lidhësit dhe DBMS-së ndërpritet për një kohë të gjatë, ekziston mundësia që hapësira në disk të mbarojë dhe kjo mund të çojë në dështimin e të gjithë DBMS-së.
    • Në rastin e MySQL skedarët binlog mund të rrotullohen nga vetë DBMS përpara se të rivendoset lidhja. Kjo do të bëjë që lidhësi të shkojë në gjendjen e dështuar dhe do të duhet të riniset në modalitetin fillestar të fotografisë për të vazhduar leximin nga bilogët për të rivendosur funksionimin normal.
    • MongoDB. Dokumentacioni thotë: sjellja e lidhësit në rast se skedarët log/oplog janë fshirë dhe lidhësi nuk mund të vazhdojë të lexojë nga pozicioni ku e ka lënë është e njëjtë për të gjitha DBMS. Ai qëndron në faktin se lidhësi do të shkojë në gjendje dështuar dhe do të kërkojë një rinisje në modalitet fotografi fillestare.

      Megjithatë, ka përjashtime. Nëse lidhësi ishte në një gjendje të shkëputur për një kohë të gjatë (ose nuk mund të arrinte shembullin MongoDB), dhe oplog u rrotullua gjatë kësaj kohe, atëherë kur lidhja të rivendoset, lidhësi do të vazhdojë me qetësi të lexojë të dhënat nga pozicioni i parë i disponueshëm , prandaj disa nga të dhënat në Kafka jo do të godasë.

Përfundim

Debezium është përvoja ime e parë me sistemet CDC dhe ka qenë shumë pozitive në përgjithësi. Projekti korruptoi mbështetjen e DBMS kryesore, lehtësinë e konfigurimit, mbështetjen për grupimin dhe një komunitet aktiv. Për ata që janë të interesuar në praktikë, ju rekomandoj të lexoni udhëzuesit për Kafka Connect и Debezium.

Krahasuar me lidhësin JDBC për Kafka Connect, përparësia kryesore e Debezium është se ndryshimet lexohen nga regjistrat e DBMS, gjë që lejon marrjen e të dhënave me vonesë minimale. Lidhësi JDBC (i ofruar nga Kafka Connect) kërkon tabelën e gjurmuar në një interval të caktuar dhe (për të njëjtën arsye) nuk gjeneron mesazhe kur të dhënat fshihen (si mund të kërkoni për të dhëna që nuk janë aty?).

Për të zgjidhur probleme të ngjashme, mund t'i kushtoni vëmendje zgjidhjeve të mëposhtme (përveç Debezium):

PS

Lexoni edhe në blogun tonë:

Burimi: www.habr.com

Shto një koment