Prezentante Debezium - CDC por Apache Kafka

Prezentante Debezium - CDC por Apache Kafka

En mia laboro, mi ofte renkontas novajn teknikajn solvojn/softvarajn produktojn, informoj pri kiuj estas sufiĉe malabundaj en la ruslingva Interreto. Kun ĉi tiu artikolo mi provos plenigi tian mankon per ekzemplo de mia lastatempa praktiko, kiam mi bezonis agordi sendon de CDC-eventoj de du popularaj DBMS-oj (PostgreSQL kaj MongoDB) al Kafka-grupo uzante Debezium. Mi esperas, ke ĉi tiu recenza artikolo, kiu aperas kiel rezulto de la farita laboro, estos utila al aliaj.

Kio estas Debezium kaj CDC ĝenerale?

Debezium - reprezentanto de la CDC-programa kategorio (Kapti Datuman Ŝanĝon), aŭ pli precize, ĝi estas aro da konektiloj por diversaj DBMS-oj kongruaj kun la kadro Apache Kafka Connect.

ĉi Malfermfonta projekto, licencita sub la Apache License v2.0 kaj sponsorita de Red Hat. Evoluo daŭras ekde 2016 kaj nuntempe ĝi provizas oficialan subtenon por la sekvaj DBMSoj: MySQL, PostgreSQL, MongoDB, SQL Server. Ekzistas ankaŭ konektiloj por Cassandra kaj Oracle, sed nuntempe ili estas en "frua aliro" statuso, kaj novaj eldonoj ne garantias malantaŭan kongruon.

Se ni komparas CDC kun la tradicia aliro (kiam la aplikaĵo legas datumojn de la DBMS rekte), ĝiaj ĉefaj avantaĝoj inkluzivas la efektivigon de datuma ŝanĝo fluanta ĉe la vica nivelo kun malalta latencia, alta fidindeco kaj havebleco. La lastaj du punktoj estas atingitaj uzante Kafka-areton kiel deponejon por CDC-okazaĵoj.

Alia avantaĝo estas la fakto, ke ununura modelo estas uzata por stoki eventojn, do la fina aplikaĵo ne devas zorgi pri la nuancoj funkciigante malsamajn DBMSojn.

Fine, uzado de mesaĝmakleristo permesas al aplikaĵoj, kiuj kontrolas ŝanĝojn en datumoj, malgrandigi horizontale. Samtempe, la efiko sur la datumfonto estas minimumigita, ĉar la datumoj estas akiritaj ne rekte de la DBMS, sed de la Kafka areto.

Pri la Debezium-arkitekturo

Uzado de Debezium estas ĉi tiu simpla skemo:

DBMS (kiel datumfonto) → konektilo en Kafka Connect → Apache Kafka → konsumanto

Kiel ilustraĵo, jen diagramo de la retejo de la projekto:

Prezentante Debezium - CDC por Apache Kafka

Tamen, mi ne tre ŝatas ĉi tiun skemon, ĉar ŝajnas, ke nur la uzo de lavujo-konektilo eblas.

En realeco, la situacio estas malsama: plenigi vian Data Lake (lasta ligo en la supra diagramo) Ĉi tio ne estas la sola maniero uzi Debezium. Eventoj senditaj al Apache Kafka povas esti uzataj de viaj aplikaĵoj por trakti diversajn situaciojn. Ekzemple:

  • forigi palajn datumojn el la kaŝmemoro;
  • sendo de sciigoj;
  • serĉaj indeksaj ĝisdatigoj;
  • iaj reviziaj protokoloj;
  • ...

Se vi havas Java-aplikaĵon kaj ne estas bezono/ebleco uzi Kafka-grupon, ekzistas ankaŭ la ebleco labori tra enigita-konektilo. La evidenta avantaĝo estas, ke ĝi forigas la bezonon de plia infrastrukturo (en formo de konektilo kaj Kafka). Tamen, ĉi tiu solvo estas malrekomendita ekde versio 1.1 kaj ne plu estas rekomendita por uzo (subteno por ĝi eble estos forigita en estontaj eldonoj).

Ĉi tiu artikolo diskutos pri la arkitekturo rekomendita de programistoj, kiu provizas misfunkciadon kaj skaleblon.

Konektilo agordo

Por komenci spuri ŝanĝojn en la plej grava valoro - datumoj - ni bezonas:

  1. datumfonto, kiu povas esti MySQL ekde versio 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (kompleta listo);
  2. Apache Kafka areto;
  3. Kafka Connect-instanco (versioj 1.x, 2.x);
  4. agordita Debezium-konektilo.

Laboru pri la unuaj du punktoj, t.e. La instala procezo de la DBMS kaj Apache Kafka estas preter la amplekso de la artikolo. Tamen, por tiuj, kiuj volas disfaldi ĉion en la sablokesto, la oficiala deponejo kun ekzemploj havas pretan docker-compose.yaml.

Ni pli detale detale pri la lastaj du punktoj.

0. Kafka Konekti

Ĉi tie kaj plu en la artikolo, ĉiuj agordaj ekzemploj estas diskutataj en la kunteksto de la bildo de Docker distribuita de la programistoj de Debezium. Ĝi enhavas ĉiujn necesajn kromprogramojn (konektilojn) kaj provizas agordon de Kafka Connect per mediovariabloj.

Se vi intencas uzi Kafka Connect de Confluent, vi devos sendepende aldoni la kromaĵojn de la necesaj konektiloj al la dosierujo specifita en plugin.path aŭ agordita per mediovariablo CLASSPATH. Agordoj por la Kafka Connect-laboristo kaj konektiloj estas determinitaj per agordaj dosieroj, kiuj estas pasigitaj kiel argumentoj al la laborista lanĉa komando. Por pliaj detaloj, vidu dokumentado.

La tuta procezo de agordo de Debeizum en la konektilo-versio estas efektivigita en du stadioj. Ni rigardu ĉiun el ili:

1. Agordi la kadron Kafka Connect

Por flui datumojn al la Apache Kafka areto, specifaj parametroj estas fiksitaj en la Kafka Connect kadro, kiel ekzemple:

  • parametroj por konektiĝi al la areto,
  • nomoj de temoj en kiuj la agordo de la konektilo mem estos stokita rekte,
  • la nomo de la grupo en kiu la konektilo funkcias (se distribuita reĝimo estas uzata).

La oficiala Docker-bildo de la projekto subtenas la agordon uzante mediajn variablojn - jen kion ni uzos. Do, elŝutu la bildon:

docker pull debezium/connect

La minimuma aro de mediaj variabloj necesaj por funkcii la konektilon estas kiel sekvas:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 — komenca listo de Kafka-grupo-serviloj por akiri kompletan liston de aretmembroj;
  • OFFSET_STORAGE_TOPIC=connector-offsets — temo por konservi poziciojn, kie la konektilo nuntempe troviĝas;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status — temo por konservi la staton de la konektilo kaj ĝiaj taskoj;
  • CONFIG_STORAGE_TOPIC=connector-config — temo por konservi datumojn de agordo de konektiloj kaj ĝiaj taskoj;
  • GROUP_ID=1 — identigilo de la grupo de laboristoj sur kiu la konektilo tasko povas esti efektivigita; necesa kiam oni uzas distribuita (disdonita) reĝimo.

Ni lanĉas la ujon kun ĉi tiuj variabloj:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Noto pri Avro

Defaŭlte, Debezium skribas datumojn en JSON-formato, kio estas akceptebla por sablokestoj kaj malgrandaj kvantoj da datumoj, sed povas fariĝi problemo en tre ŝarĝitaj datumbazoj. Alternativo al JSON-konvertilo estas serigi mesaĝojn uzante Avro en binaran formaton, kiu reduktas la ŝarĝon sur la I/O-subsistemo en Apache Kafka.

Por uzi Avro vi devas deploji apartan skemo-registro (por konservi diagramojn). La variabloj por la konvertilo aspektos jene:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Detaloj pri uzado de Avro kaj agordo de la registro por ĝi estas ekster la amplekso de ĉi tiu artikolo - plu, por klareco, ni uzos JSON.

2. Agordante la konektilon mem

Nun vi povas iri rekte al la agordo de la konektilo mem, kiu legos datumojn de la fonto.

Ni rigardu la ekzemplon de konektiloj por du DBMS-oj: PostgreSQL kaj MongoDB, en kiuj mi havas sperton kaj en kiuj estas diferencoj (kvankam malgrandaj, sed en kelkaj kazoj signifaj!).

La agordo estas priskribita en JSON-notacio kaj alŝutita al Kafka Connect uzante POST-peton.

2.1. PostgreSQL

Ekzempla konektilo-agordo por PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

La principo de funkciado de la konektilo post ĉi tiu agordo estas sufiĉe simpla:

  • Kiam ĝi estas lanĉita por la unua fojo, ĝi konektas al la datumbazo specifita en la agordo kaj komenciĝas en reĝimo komenca momentfoto, sendante al Kafka la komencan aron de datumoj akiritaj uzante la kondiĉon SELECT * FROM table_name.
  • Post komencado estas kompleta, la konektilo iras en reĝimon por legi ŝanĝojn de PostgreSQL WAL-dosieroj.

Pri la uzataj opcioj:

  • name — la nomo de la konektilo por kiu la agordo priskribita sube estas uzata; estonte, ĉi tiu nomo estas uzata por labori kun la konektilo (t.e., vidi la staton/rekomenci/ĝisdatigi la agordon) per la Kafka Connect REST API;
  • connector.class — DBMS-konektilo klaso, kiu estos uzata de la agordita konektilo;
  • plugin.name — la nomo de la kromprogramo por logika malkodado de datumoj de WAL-dosieroj. Disponebla por elekti wal2json, decoderbuffs и pgoutput. La unuaj du postulas instaladon de la taŭgaj etendaĵoj en la DBMS, kaj pgoutput por PostgreSQL-versio 10 kaj pli alta ne postulas pliajn manipuladojn;
  • database.* — opcioj por konektiĝi al la datumbazo, kie database.server.name — PostgreSQL-instanconomo uzata por formi la temnomon en la Kafka-grupo;
  • table.include.list — listo de tabeloj en kiuj ni volas spuri ŝanĝojn; specifita en la formato schema.table_name; ne povas esti uzata kune kun table.exclude.list;
  • heartbeat.interval.ms — intervalo (en milisekundoj) kun kiu la konektilo sendas korbatmesaĝojn al speciala temo;
  • heartbeat.action.query — peto, kiu estos plenumita dum la sendo de ĉiu korbatmesaĝo (la opcio aperis en versio 1.1);
  • slot.name — la nomo de la replika fendo, kiu estos uzata de la konektilo;
  • publication.name - Nomo publikigadoj en PostgreSQL, kiun la konektilo uzas. Se ĝi ne ekzistas, Debezium provos krei ĝin. Se la uzanto sub kiu la konekto estas farita ne havas sufiĉajn rajtojn por ĉi tiu ago, la konektilo finiĝos kun eraro;
  • transforms determinas precize kiel ŝanĝi la nomon de la cela temo:
    • transforms.AddPrefix.type indikas ke ni uzos regulajn esprimojn;
    • transforms.AddPrefix.regex — masko, kiu redifinas la nomon de la cela temo;
    • transforms.AddPrefix.replacement - rekte kion ni redifinas.

Pli pri korbato kaj transformoj

Defaŭlte, la konektilo sendas datumojn al Kafka por ĉiu farita transakcio, kaj ĝia LSN (Log Sequence Number) estas registrita en la servotemo offset. Sed kio okazas se la konektilo estas agordita por legi ne la tutan datumbazon, sed nur parton de ĝiaj tabeloj (en kiuj datumaj ĝisdatigoj ne okazas ofte)?

  • La konektilo legos WAL-dosierojn kaj ne detektos ajnajn transakciajn transakciojn al la tabloj kiujn ĝi kontrolas.
  • Tial, ĝi ne ĝisdatigos sian nunan pozicion aŭ en la temo aŭ en la reproduktadfendeto.
  • Ĉi tio, siavice, rezultos, ke WAL-dosieroj estas tenitaj sur disko kaj verŝajne elĉerpiĝos de diskospaco.

Kaj ĉi tie estas kie elektoj venas al la savo. heartbeat.interval.ms и heartbeat.action.query. Uzado de ĉi tiuj opcioj duope ebligas plenumi peton ŝanĝi datumojn en aparta tabelo ĉiufoje kiam korbatmesaĝo estas sendita. Tiel, la LSN sur kiu la konektilo nuntempe situas (en la reproduktadfendeto) estas konstante ĝisdatigita. Ĉi tio permesas al la DBMS forigi WAL-dosierojn, kiuj ne plu bezonas. Vi povas lerni pli pri kiel funkcias la opcioj dokumentado.

Alia eblo inda je pli proksima atento estas transforms. Kvankam temas pli pri komforto kaj beleco...

Defaŭlte, Debezium kreas temojn uzante la sekvan nompolitikon: serverName.schemaName.tableName. Ĉi tio eble ne ĉiam estas oportuna. Opcioj transforms Vi povas uzi regulajn esprimojn por difini liston de tabeloj, el kiuj eventoj devas esti direktitaj al temo kun specifa nomo.

En nia agordo dankon transforms la sekvanta okazas: ĉiuj CDC-okazaĵoj de la monitorita datumbazo iros al temo kun la nomo data.cdc.dbname. Alie (sen ĉi tiuj agordoj), Debezium defaŭlte kreus temon por ĉiu tabelo kiel: pg-dev.public.<table_name>.

Konektilaj Limoj

Por fini la priskribon de la konektilo-agordo por PostgreSQL, indas paroli pri la sekvaj trajtoj/limigoj de ĝia funkciado:

  1. La funkcieco de la konektilo por PostgreSQL dependas de la koncepto de logika malkodado. Tial li ne spuras petojn por ŝanĝi la datumbazan strukturon (DDL) - sekve, ĉi tiuj datumoj ne estos en la temoj.
  2. Ĉar reproduktadfendetoj estas uzitaj, ligado de konektilo estas ebla nur al la gvida DBMS-instanco.
  3. Se la uzanto sub kiu la konektilo konektas al la datumbazo havas nurlegeblajn rajtojn, tiam antaŭ la unua lanĉo vi devos permane krei reproduktan fendon kaj publikigi al la datumbazo.

Aplikante la agordon

Do, ni ŝarĝu nian agordon en la konektilon:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Ni kontrolas, ke la elŝuto sukcesis kaj la konektilo komenciĝis:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Bonege: ĝi estas instalita kaj preta por iri. Nun ni ŝajnigu esti konsumanto kaj konektiĝu al Kafka, post kio ni aldonos kaj ŝanĝos eniron en la tabelo:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

En nia temo ĝi estos montrata jene:

Tre longa JSON kun niaj ŝanĝoj

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

En ambaŭ kazoj, rekordoj konsistas el la ŝlosilo (PK) de la rekordo kiu estis ŝanĝita, kaj la esenco mem de la ŝanĝoj: kio la rekordo estis antaŭe kaj kio ĝi iĝis poste.

  • En la kazo de INSERT: valoro antaŭ (before) egalas null, kaj post - la linio kiu estis enigita.
  • En la kazo de UPDATE: en payload.before la antaŭa stato de la linio estas montrata, kaj en payload.after — nova kun la esenco de ŝanĝoj.

2.2 MongoDB

Ĉi tiu konektilo uzas la norman reproduktan mekanismon de MongoDB, legante informojn de la oplog de la primara DBMS-nodo.

Simile al la jam priskribita konektilo por PgSQL, ankaŭ ĉi tie, ĉe la unua komenco, la primara datuma momentfoto estas prenita, post kiu la konektilo ŝanĝas al oplog-lega reĝimo.

Ekzemplo de agordo:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Kiel vi povas vidi, ne estas novaj opcioj ĉi tie kompare kun la antaŭa ekzemplo, sed nur la nombro da opcioj respondecaj por konektiĝi al la datumbazo kaj iliaj prefiksoj estis reduktita.

Agordoj transforms ĉi-foje ili faras la jenon: ili transformas la nomon de la cela temo el la skemo <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

kulpo toleremo

La temo de misfunkciado kaj alta havebleco en nia tempo estas pli akra ol iam ajn - precipe kiam ni parolas pri datumoj kaj transakcioj, kaj spurado de datumoj ŝanĝoj ne flankenlasas en ĉi tiu afero. Ni rigardu kio povas misfunkcii principe kaj kio okazos al Debezium en ĉiu kazo.

Estas tri elektebloj:

  1. Fiasko de Kafka Connect. Se Konekti estas agordita por funkcii en distribuita reĝimo, tio postulas plurajn laboristojn agordi la saman group.id. Tiam, se unu el ili malsukcesos, la konektilo estos rekomencita ĉe alia laboristo kaj daŭre legos de la lasta engaĝita pozicio en la temo en Kafka.
  2. Perdo de konektebleco kun la Kafka areto. La konektilo simple ĉesos legi ĉe la pozicio, kiu malsukcesis sendi al Kafka, kaj periode provos resendi ĝin ĝis la provo sukcesos.
  3. Nehavebleco de datumfonto. La konektilo provos rekonekti al la fonto kiel agordita. La defaŭlta estas 16 provoj uzante eksponenta retroiro. Post la 16-a malsukcesa provo, la tasko estos markita kiel malsukcesis kaj vi devos permane rekomenci ĝin per la Kafka Connect REST-interfaco.
    • En la kazo de PostgreSQL la datumoj ne perdiĝos, ĉar Uzado de replikaj fendoj malhelpos vin forigi WAL-dosierojn, kiuj ne estas legitaj de la konektilo. En ĉi tiu kazo, estas ankaŭ malavantaĝo al la monero: se la reto-konekto inter la konektilo kaj la DBMS estas interrompita dum longa tempo, ekzistas ebleco, ke la diskospaco elĉerpiĝos, kaj ĉi tio povas konduki al fiasko de la tuta DBMS.
    • En la kazo de MySQL binlog-dosieroj povas esti turnitaj de la DBMS mem antaŭ ol konektebleco estas restarigita. Ĉi tio kaŭzos la konektilon en la malsukcesan staton, kaj por restarigi normalan funkciadon, vi devos rekomenci en komenca momentfoto reĝimo por daŭrigi legi el binlogs.
    • pri MongoDB. La dokumentaro deklaras: la konduto de la konektilo en la okazo ke protokolaj/oplog dosieroj estis forigitaj kaj la konektilo ne povas daŭrigi legi de la pozicio kie ĝi ĉesis estas la sama por ĉiuj DBMSoj. Ĝi signifas, ke la konektilo iros en la ŝtaton malsukcesis kaj postulos rekomencon en reĝimo komenca momentfoto.

      Tamen, estas esceptoj. Se la konektilo estis malkonektita dum longa tempo (aŭ ne povis atingi la MongoDB-instancon), kaj la oplog trapasis rotacion dum ĉi tiu tempo, tiam kiam la konekto estas restarigita, la konektilo trankvile daŭrigos legi datumojn de la unua disponebla pozicio, tial kelkaj el la datumoj en Kafka ne trafos.

konkludo

Debezium estas mia unua sperto kun CDC-sistemoj kaj ĝenerale tre pozitiva. La projekto venkis per sia subteno por gravaj DBMSoj, facileco de agordo, grupiga subteno kaj aktiva komunumo. Por tiuj, kiuj interesiĝas pri praktiko, mi rekomendas, ke vi legu la gvidojn por Kafka Konekti и Debezium.

Kompare kun la JDBC-konektilo por Kafka Connect, la ĉefa avantaĝo de Debezium estas, ke ŝanĝoj estas legitaj de la DBMS-protokoloj, kio permesas datumojn ricevi kun minimuma latenco. La JDBC-Konektilo (de Kafka Connect) demandas la monitoritan tabelon je fiksa intervalo kaj (pro la sama kialo) ne generas mesaĝojn kiam datumoj estas forigitaj (kiel vi povas pridemandi datumojn, kiuj ne ekzistas?).

Por solvi similajn problemojn, vi povas atenti la jenajn solvojn (krom Debezium):

PS

Legu ankaŭ en nia blogo:

fonto: www.habr.com

Aldoni komenton