IepazÄ«stinām ar Debezium ā€” CDC, kas paredzēts Apache Kafka

IepazÄ«stinām ar Debezium ā€” CDC, kas paredzēts Apache Kafka

Savā darbā bieži sastopos ar jauniem tehniskiem risinājumiem/programmatÅ«ras produktiem, par kuriem krievvalodÄ«gajā internetā ir visai maz informācijas. Ar Å”o rakstu es mēģināŔu aizpildÄ«t vienu Ŕādu robu ar piemēru no manas nesenās prakses, kad man vajadzēja iestatÄ«t CDC notikumu sÅ«tÄ«Å”anu no divām populārām DBVS (PostgreSQL un MongoDB) uz Kafka klasteru, izmantojot Debezium. Ceru, ka Å”is pārskata raksts, kas parādÄ«jās paveiktā darba rezultātā, noderēs citiem.

Kas ir Debezium un CDC kopumā?

Debezium - CDC programmatūras kategorijas pārstāvis (Tvert datu izmaiņas), vai precīzāk, tas ir savienotāju komplekts dažādām DBVS, kas ir saderīgi ar Apache Kafka Connect ietvaru.

Tā atvērtā koda projekts, licencēta saskaņā ar Apache licenci v2.0 un sponsorēta Red Hat. Izstrāde notiek kopÅ” 2016. gada, un Å”obrÄ«d tā nodroÅ”ina oficiālu atbalstu Ŕādām DBVS: MySQL, PostgreSQL, MongoDB, SQL Server. Ir arÄ« savienotāji Cassandra un Oracle, taču tie paÅ”laik ir "agrÄ«nas piekļuves" statusā, un jaunie izlaidumi negarantē atpakaļejoÅ”u saderÄ«bu.

Ja salÄ«dzinām CDC ar tradicionālo pieeju (kad lietojumprogramma tieÅ”i nolasa datus no DBVS), tad tās galvenās priekÅ”rocÄ«bas ietver datu izmaiņu straumÄ“Å”anas ievieÅ”anu rindu lÄ«menÄ« ar zemu latentumu, augstu uzticamÄ«bu un pieejamÄ«bu. Pēdējie divi punkti tiek sasniegti, izmantojot Kafka klasteru kā CDC notikumu krātuvi.

Tāpat starp priekÅ”rocÄ«bām var minēt to, ka notikumu glabāŔanai tiek izmantots viens modelis, lÄ«dz ar to gala lietojumprogrammai nav jāuztraucas par dažādu DBVS darbÄ«bas niansēm.

Visbeidzot, izmantojot ziņojumu brokeri, tiek atvērtas mērogoÅ”anas lietojumprogrammas, kas izseko datu izmaiņas. Tajā paŔā laikā tiek samazināta ietekme uz datu avotu, jo dati tiek saņemti nevis tieÅ”i no DBVS, bet gan no Kafka klastera.

Par Debezium arhitektūru

Izmantojot Debezium, tiek izmantota Ŕāda vienkārÅ”a shēma:

DBVS (kā datu avots) ā†’ savienotājs programmā Kafka Connect ā†’ Apache Kafka ā†’ patērētājs

Kā ilustrāciju es sniegŔu diagrammu no projekta vietnes:

IepazÄ«stinām ar Debezium ā€” CDC, kas paredzēts Apache Kafka

Taču man Ŕī shēma ne visai patÄ«k, jo Ŕķiet, ka ir iespējams tikai izlietnes savienotājs.

PatiesÄ«bā situācija ir atŔķirÄ«ga: aizpildiet savu Data Lake (pēdējā saite diagrammā iepriekÅ”) nav vienÄ«gais Debezium lietoÅ”anas veids. Uz Apache Kafka nosÅ«tÄ«tos notikumus jÅ«su lietojumprogrammas var izmantot, lai risinātu dažādas situācijas. Piemēram:

  • neatbilstoÅ”u datu noņemÅ”ana no keÅ”atmiņas;
  • paziņojumu sÅ«tÄ«Å”ana;
  • meklēt indeksa atjauninājumus;
  • sava veida audita žurnāli;
  • ...

Ja jums ir Java lietojumprogramma un nav vajadzÄ«bas/iespējas izmantot Kafka klasteru, ir arÄ« iespēja strādāt ar iegultais savienotājs. AcÄ«mredzamais plus ir tas, ka ar to jÅ«s varat atteikties no papildu infrastruktÅ«ras (savienotāja un Kafka veidā). Tomēr Å”is risinājums ir novecojis kopÅ” versijas 1.1 un vairs nav ieteicams lietoÅ”anai (nākamajos laidienos tas var tikt noņemts).

Å ajā rakstā tiks apspriesta izstrādātāju ieteiktā arhitektÅ«ra, kas nodroÅ”ina kļūdu toleranci un mērogojamÄ«bu.

Savienotāja konfigurācija

Lai sāktu izsekot izmaiņām vissvarÄ«gākajā vērtÄ«bā - datos - mums ir nepiecieÅ”ams:

  1. datu avots, kas var būt MySQL, sākot no versijas 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (pilns saraksts);
  2. Apache Kafka klasteris
  3. Kafka Connect instance (versijas 1.x, 2.x);
  4. konfigurēts Debezium savienotājs.

Strādājiet pie pirmajiem diviem punktiem, t.i. DBVS un Apache Kafka instalÄ“Å”anas process ir ārpus raksta darbÄ«bas jomas. Savukārt tiem, kas vēlas visu izvietot smilÅ”u kastē, oficiālajā repozitorijā ir jau gatava ar piemēriem docker-compose.yaml.

Sīkāk pievērsīsimies pēdējiem diviem punktiem.

0. Kafka Connect

Å eit un vēlāk rakstā visi konfigurācijas piemēri ir aplÅ«koti Debezium izstrādātāju izplatÄ«tā Docker attēla kontekstā. Tas satur visus nepiecieÅ”amos spraudņu failus (savienotājus) un nodroÅ”ina Kafka Connect konfigurāciju, izmantojot vides mainÄ«gos.

Ja plānojat izmantot Kafka Connect no Confluent, jums paÅ”am bÅ«s jāpievieno nepiecieÅ”amo savienotāju spraudņi direktorijā, kas norādÄ«ts plugin.path vai iestatÄ«t, izmantojot vides mainÄ«go CLASSPATH. Kafka Connect darbinieka un savienotāju iestatÄ«jumi tiek definēti, izmantojot konfigurācijas failus, kas tiek nodoti kā argumenti darbinieka starta komandai. SÄ«kāku informāciju sk dokumentācija.

Viss Debeizum iestatīŔanas process savienotāja versijā tiek veikts divos posmos. Apskatīsim katru no tiem:

1. Kafka Connect ietvara iestatīŔana

Lai straumētu datus Apache Kafka klasterī, Kafka Connect sistēmā ir iestatīti konkrēti parametri, piemēram:

  • klastera savienojuma iestatÄ«jumi,
  • tēmu nosaukumi, kuros tiks saglabāta paÅ”a savienotāja konfigurācija,
  • tās grupas nosaukums, kurā darbojas savienotājs (ja tiek izmantots sadalÄ«tais režīms).

Projekta oficiālais Docker attēls atbalsta konfigurāciju, izmantojot vides mainīgos - to mēs izmantosim. Tātad, lejupielādēsim attēlu:

docker pull debezium/connect

Minimālais vides mainīgo kopums, kas nepiecieŔams savienotāja palaiŔanai, ir Ŕāds:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - sākotnējais Kafka klasteru serveru saraksts, lai iegÅ«tu pilnu klastera dalÄ«bnieku sarakstu;
  • OFFSET_STORAGE_TOPIC=connector-offsets ā€” tēma pozÄ«ciju saglabāŔanai, kur paÅ”laik atrodas savienotājs;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - tēma savienotāja statusa un tā uzdevumu saglabāŔanai;
  • CONFIG_STORAGE_TOPIC=connector-config - tēma savienotāja konfigurācijas datu glabāŔanai un tās uzdevumi;
  • GROUP_ID=1 ā€” tās darbinieku grupas identifikators, kurai var izpildÄ«t savienojuma uzdevumu; nepiecieÅ”ams, izmantojot izplatÄ«tu (izplatÄ«ts) režīms.

Mēs sākam konteineru ar Ŕādiem mainÄ«gajiem:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Piezīme par Avro

Pēc noklusējuma Debezium ieraksta datus JSON formātā, kas ir pieņemams smilÅ”u kastēm un nelielam datu apjomam, taču var bÅ«t problēma ļoti noslogotās datu bāzēs. AlternatÄ«va JSON pārveidotājam ir serializēt ziņojumus, izmantojot Avro uz bināro formātu, kas samazina Apache Kafka I/O apakÅ”sistēmas slodzi.

Lai izmantotu Avro, ir jāizvieto atseviŔķs shēma-reÄ£istrs (shēmu glabāŔanai). Pārveidotāja mainÄ«gie izskatÄ«sies Ŕādi:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

SÄ«kāka informācija par Avro lietoÅ”anu un reÄ£istra iestatÄ«Å”anu tam nav ietverta rakstā ā€” turpmāk skaidrÄ«bas labad mēs izmantosim JSON.

2. PaŔa savienotāja iestatīŔana

Tagad varat doties tieŔi uz paŔa savienotāja konfigurāciju, kas nolasīs datus no avota.

ApskatÄ«sim divu DBVS savienotāju piemēru: PostgreSQL un MongoDB, ar kuriem man ir pieredze un attiecÄ«bā uz kuriem ir atŔķirÄ«bas (kaut arÄ« nelielas, bet dažos gadÄ«jumos bÅ«tiskas!).

Konfigurācija ir aprakstÄ«ta JSON apzÄ«mējumā un augÅ”upielādēta Kafka Connect, izmantojot POST pieprasÄ«jumu.

2.1. PostgreSQL

PostgreSQL savienotāja konfigurācijas piemērs:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Savienotāja darbÄ«bas princips pēc Ŕīs konfigurācijas ir diezgan vienkārÅ”s:

  • Pirmajā palaiÅ”anas reizē tas savienojas ar konfigurācijā norādÄ«to datu bāzi un sākas režīmā sākotnējais momentuzņēmums, nosÅ«tot Kafkai sākotnējo datu kopu, kas saņemta ar nosacÄ«jumu SELECT * FROM table_name.
  • Kad inicializācija ir pabeigta, savienotājs pāriet uz režīmu, kurā tiek nolasÄ«tas izmaiņas no PostgreSQL WAL failiem.

Par izmantotajām opcijām:

  • name ā€” tā savienotāja nosaukums, kuram izmanto turpmāk aprakstÄ«to konfigurāciju; nākotnē Å”is nosaukums tiks izmantots darbam ar savienotāju (t.i., lai skatÄ«tu statusu / restartētu / atjauninātu konfigurāciju), izmantojot Kafka Connect REST API;
  • connector.class ā€” DBVS savienotāja klase, ko izmantos konfigurētais savienotājs;
  • plugin.name ir spraudņa nosaukums datu loÄ£iskai dekodÄ“Å”anai no WAL failiem. Pieejams no kuriem izvēlēties wal2json, decoderbuffs Šø pgoutput. Pirmajiem diviem ir jāinstalē atbilstoÅ”i paplaÅ”inājumi DBVS un pgoutput PostgreSQL versijai 10 un jaunākai nav nepiecieÅ”amas papildu manipulācijas;
  • database.* ā€” iespējas pieslēgties datu bāzei, kur database.server.name - PostgreSQL instances nosaukums, ko izmanto, lai izveidotu tēmas nosaukumu Kafka klasterÄ«;
  • table.include.list - tabulu saraksts, kurās vēlamies izsekot izmaiņām; norādÄ«ts formātā schema.table_name; nevar lietot kopā ar table.exclude.list;
  • heartbeat.interval.ms ā€” intervāls (milisekundēs), ar kādu savienotājs nosÅ«ta sirdsdarbÄ«bas ziņojumus uz Ä«paÅ”u tēmu;
  • heartbeat.action.query - pieprasÄ«jums, kas tiks izpildÄ«ts, nosÅ«tot katru sirdsdarbÄ«bas ziņojumu (opcija parādÄ«jās kopÅ” versijas 1.1);
  • slot.name ā€” tā replikācijas slota nosaukums, kuru izmantos savienotājs;
  • publication.name - Vārds PublicÄ“Å”ana programmā PostgreSQL, ko savienotājs izmanto. Ja tas neeksistē, Debezium mēģinās to izveidot. Ja lietotājam, ar kuru tiek izveidots savienojums, nav pietiekamu tiesÄ«bu Å”ai darbÄ«bai, savienotājs tiks aizvērts ar kļūdu;
  • transforms nosaka, kā precÄ«zi mainÄ«t mērÄ·a tēmas nosaukumu:
    • transforms.AddPrefix.type norāda, ka izmantosim regulārās izteiksmes;
    • transforms.AddPrefix.regex ā€” maska, ar kuru tiek no jauna definēts mērÄ·a tēmas nosaukums;
    • transforms.AddPrefix.replacement - tieÅ”i tas, ko mēs pārdefinējam.

Vairāk par sirdspukstiem un transformācijām

Pēc noklusējuma savienotājs nosūta datus Kafka par katru veikto darījumu un ieraksta savu LSN (žurnāla kārtas numuru) pakalpojuma tēmai. offset. Bet kas notiek, ja savienotājs ir konfigurēts tā, lai lasītu nevis visu datu bāzi, bet tikai daļu no tās tabulām (kurās dati tiek atjaunināti reti)?

  • Savienotājs nolasÄ«s WAL failus un nenoteiks tajos transakciju saistÄ«bas tabulās, kuras tas pārrauga.
  • Tāpēc tas neatjauninās savu paÅ”reizējo pozÄ«ciju ne tēmā, ne replikācijas slotā.
  • Tas savukārt izraisÄ«s WAL failu "iestrēgÅ”anu" diskā un, iespējams, tiem pietrÅ«ks vietas diskā.

Un Å”eit palÄ«gā nāk iespējas. heartbeat.interval.ms Šø heartbeat.action.query. Izmantojot Ŕīs opcijas pa pāriem, katru reizi, kad tiek nosÅ«tÄ«ts sirdsdarbÄ«bas ziņojums, ir iespējams izpildÄ«t pieprasÄ«jumu mainÄ«t datus atseviŔķā tabulā. Tādējādi LSN, kurā paÅ”laik atrodas savienotājs (replikācijas slotā), tiek pastāvÄ«gi atjaunināts. Tas ļauj DBVS noņemt WAL failus, kas vairs nav vajadzÄ«gi. Papildinformāciju par opciju darbÄ«bu skatiet sadaļā dokumentācija.

Vēl viena iespēja, kurai jāpievērÅ” lielāka uzmanÄ«ba, ir transforms. Lai gan tas vairāk attiecas uz ērtÄ«bām un skaistumu ...

Pēc noklusējuma Debezium veido tēmas, izmantojot Ŕādu nosaukÅ”anas politiku: serverName.schemaName.tableName. Tas ne vienmēr var bÅ«t ērti. Iespējas transforms izmantojot regulārās izteiksmes, varat definēt to tabulu sarakstu, kuru notikumi jānovirza uz tēmu ar noteiktu nosaukumu.

MÅ«su konfigurācijā, pateicoties transforms notiek Ŕādi: visi CDC notikumi no izsekotās datu bāzes tiks novirzÄ«ti uz tēmu ar nosaukumu data.cdc.dbname. Pretējā gadÄ«jumā (bez Å”iem iestatÄ«jumiem) Debezium pēc noklusējuma izveido tēmu katrai veidlapas tabulai: pg-dev.public.<table_name>.

Savienotāju ierobežojumi

PostgreSQL savienotāja konfigurācijas apraksta beigās ir vērts runāt par Ŕādām tā darba funkcijām / ierobežojumiem:

  1. PostgreSQL savienotāja funkcionalitāte balstās uz loÄ£iskās dekodÄ“Å”anas koncepciju. Tāpēc viņŔ neizseko pieprasÄ«jumus mainÄ«t datu bāzes struktÅ«ru (DDL) - attiecÄ«gi Å”ie dati tēmās nebÅ«s.
  2. Tā kā tiek izmantoti replikācijas sloti, savienotāja pieslēgÅ”ana ir iespējama tikai uz galveno DBVS gadÄ«jumu.
  3. Ja lietotājam, ar kuru savienotājs izveido savienojumu ar datu bāzi, ir tikai lasÄ«Å”anas tiesÄ«bas, tad pirms pirmās palaiÅ”anas jums bÅ«s manuāli jāizveido replikācijas slots un jāpublicē datu bāzē.

Konfigurācijas pielietoŔana

Tātad, ielādēsim mūsu konfigurāciju savienotājā:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Mēs pārbaudām, vai lejupielāde bija veiksmīga un savienotājs tika palaists:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Lieliski: tas ir iestatīts un gatavs darbam. Tagad izliksimies par patērētājiem un izveidosim savienojumu ar Kafka, pēc tam pievienojam un mainām ierakstu tabulā:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

MÅ«su tēmā tas tiks parādÄ«ts Ŕādi:

Ä»oti garÅ” JSON ar mÅ«su izmaiņām

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Abos gadÄ«jumos ieraksti sastāv no mainÄ«tā ieraksta atslēgas (PK) un paÅ”as izmaiņu bÅ«tÄ«bas: kāds ieraksts bija pirms un kāds kļuva pēc tam.

  • GadÄ«jumā, ja INSERT: vērtÄ«ba pirms (before) vienāds nullkam seko ievietotā virkne.
  • GadÄ«jumā, ja UPDATE: plkst payload.before tiek parādÄ«ts rindas iepriekŔējais stāvoklis, un payload.after - jauns ar izmaiņu bÅ«tÄ«bu.

2.2 MongoDB

Šis savienotājs izmanto standarta MongoDB replikācijas mehānismu, nolasot informāciju no DBVS primārā mezgla oplog.

LÄ«dzÄ«gi kā jau aprakstÄ«tajā PgSQL savienotājā, arÄ« Å”eit pirmajā startā tiek uzņemts primāro datu momentuzņēmums, pēc kura savienotājs pārslēdzas uz oplog lasÄ«Å”anas režīmu.

Konfigurācijas piemērs:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Kā redzat, salÄ«dzinājumā ar iepriekŔējo piemēru nav jaunu opciju, bet ir samazināts tikai opciju skaits, kas ir atbildÄ«gas par savienojumu ar datu bāzi, un to prefiksi.

iestatÄ«jumi transforms Å”oreiz viņi rÄ«kojas Ŕādi: pagriež no shēmas mērÄ·a tēmas nosaukumu <server_name>.<db_name>.<collection_name> Š² data.cdc.mongo_<db_name>.

kļūdu tolerance

Problēma par kļūdu toleranci un augstu pieejamÄ«bu mÅ«sdienās ir aktuālāka nekā jebkad agrāk ā€“ it Ä«paÅ”i, ja mēs runājam par datiem un darÄ«jumiem, un datu izmaiņu izsekoÅ”ana Å”ajā jautājumā nav malā. ApskatÄ«sim, kas principā var noiet greizi un kas notiks ar Debezium katrā gadÄ«jumā.

Ir trÄ«s atteikÅ”anās iespējas:

  1. Kafka Connect kļūme. Ja savienojums ir konfigurēts darbam sadalītā režīmā, vairākiem darbiniekiem ir jāiestata viens un tas pats group.id. Pēc tam, ja viens no tiem neizdodas, savienotājs tiks restartēts otram darbiniekam un turpinās lasīt no pēdējās noteiktās pozīcijas tēmā Kafka.
  2. Savienojuma zudums ar Kafka kopu. Savienotājs vienkārÅ”i pārtrauks nolasÄ«Å”anu vietā, kuru tas neizdevās nosÅ«tÄ«t Kafkai, un periodiski mēģinās to nosÅ«tÄ«t atkārtoti, lÄ«dz mēģinājums bÅ«s veiksmÄ«gs.
  3. Datu avots nav pieejams. Savienotājs mēģinās atkārtoti izveidot savienojumu ar avotu atbilstoÅ”i konfigurācijai. Noklusējums ir 16 mēģinājumi izmantot eksponenciāla atkāpÅ”anās. Pēc 16. neveiksmÄ«gā mēģinājuma uzdevums tiks atzÄ«mēts kā neizdevās un tas bÅ«s manuāli jārestartē, izmantojot Kafka Connect REST interfeisu.
    • GadÄ«jumā, ja PostgreSQL dati netiks zaudēti, jo izmantojot replikācijas slotus, tiks novērsta to WAL failu dzÄ“Å”ana, kurus savienotājs nav nolasÄ«jis. Å ajā gadÄ«jumā ir mÄ«nuss: ja tÄ«kla savienojamÄ«ba starp savienotāju un DBVS tiek traucēta ilgu laiku, pastāv iespēja, ka diska vieta beigsies, un tas var izraisÄ«t visas DBVS kļūmi.
    • GadÄ«jumā, ja MySQL binlog failus var pagriezt pati DBVS, pirms tiek atjaunots savienojums. Tādējādi savienotājs pāries neveiksmÄ«gā stāvoklÄ«, un tas bÅ«s jārestartē sākotnējā momentuzņēmuma režīmā, lai turpinātu lasÄ«t no binlogs un atjaunotu normālu darbÄ«bu.
    • uz MongoDB. Dokumentācijā teikts, ka savienotāja darbÄ«ba gadÄ«jumā, ja žurnāla/oplog faili ir izdzēsti un savienotājs nevar turpināt lasÄ«Å”anu no vietas, kur tas tika pārtraukts, ir vienāda visām DBVS. Tas slēpjas faktā, ka savienotājs nonāks stāvoklÄ« neizdevās un bÅ«s nepiecieÅ”ama restartÄ“Å”ana režīmā sākotnējais momentuzņēmums.

      Tomēr ir izņēmumi. Ja savienotājs ilgu laiku bija atvienotā stāvoklÄ« (vai nevarēja sasniegt MongoDB gadÄ«jumu) un oplog Å”ajā laikā tika pagriezts, tad, kad savienojums tiks atjaunots, savienotājs mierÄ«gi turpinās lasÄ«t datus no pirmās pieejamās pozÄ«cijas. , tāpēc daži Kafkas dati nē sitÄ«s.

Secinājums

Debezium ir mana pirmā pieredze ar CDC sistēmām, un kopumā tā ir bijusi ļoti pozitÄ«va. Projekts uzpirka galvenās DBVS atbalstu, vieglu konfigurāciju, atbalstu klasterÄ“Å”anai un aktÄ«vu kopienu. Tiem, kurus interesē prakse, iesaku izlasÄ«t ceļvežus Kafka Connect Šø Debezium.

SalÄ«dzinot ar Kafka Connect JDBC savienotāju, Debezium galvenā priekÅ”rocÄ«ba ir tā, ka izmaiņas tiek nolasÄ«tas no DBVS žurnāliem, kas ļauj saņemt datus ar minimālu aizkavi. JDBC savienotājs (nodroÅ”ina Kafka Connect) vaicā izsekoto tabulu ar noteiktu intervālu un (tā paÅ”a iemesla dēļ) neÄ£enerē ziņojumus, kad dati tiek dzēsti (kā jÅ«s varat pieprasÄ«t datus, kas tur nav?).

Lai atrisinātu lÄ«dzÄ«gas problēmas, varat pievērst uzmanÄ«bu Ŕādiem risinājumiem (papildus Debezium):

PS

Lasi arī mūsu emuārā:

Avots: www.habr.com

Pievieno komentāru