Iepazīstinām ar Debezium — CDC, kas paredzēts Apache Kafka

Iepazīstinām ar Debezium — CDC, kas paredzēts Apache Kafka

Savā darbā bieži sastopos ar jauniem tehniskiem risinājumiem/programmatūras produktiem, par kuriem krievvalodīgajā internetā ir visai maz informācijas. Ar šo rakstu es mēģināšu aizpildīt vienu šādu robu ar piemēru no manas nesenās prakses, kad man vajadzēja iestatīt CDC notikumu sūtīšanu no divām populārām DBVS (PostgreSQL un MongoDB) uz Kafka klasteru, izmantojot Debezium. Ceru, ka šis pārskata raksts, kas parādījās paveiktā darba rezultātā, noderēs citiem.

Kas ir Debezium un CDC kopumā?

Debezium - CDC programmatūras kategorijas pārstāvis (Tvert datu izmaiņas), vai precīzāk, tas ir savienotāju komplekts dažādām DBVS, kas ir saderīgi ar Apache Kafka Connect ietvaru.

atvērtā koda projekts, licencēta saskaņā ar Apache licenci v2.0 un sponsorēta Red Hat. Izstrāde notiek kopš 2016. gada, un šobrīd tā nodrošina oficiālu atbalstu šādām DBVS: MySQL, PostgreSQL, MongoDB, SQL Server. Ir arī savienotāji Cassandra un Oracle, taču tie pašlaik ir "agrīnas piekļuves" statusā, un jaunie izlaidumi negarantē atpakaļejošu saderību.

Ja salīdzinām CDC ar tradicionālo pieeju (kad lietojumprogramma tieši nolasa datus no DBVS), tad tās galvenās priekšrocības ietver datu izmaiņu straumēšanas ieviešanu rindu līmenī ar zemu latentumu, augstu uzticamību un pieejamību. Pēdējie divi punkti tiek sasniegti, izmantojot Kafka klasteru kā CDC notikumu krātuvi.

Tāpat starp priekšrocībām var minēt to, ka notikumu glabāšanai tiek izmantots viens modelis, līdz ar to gala lietojumprogrammai nav jāuztraucas par dažādu DBVS darbības niansēm.

Visbeidzot, izmantojot ziņojumu brokeri, tiek atvērtas mērogošanas lietojumprogrammas, kas izseko datu izmaiņas. Tajā pašā laikā tiek samazināta ietekme uz datu avotu, jo dati tiek saņemti nevis tieši no DBVS, bet gan no Kafka klastera.

Par Debezium arhitektūru

Izmantojot Debezium, tiek izmantota šāda vienkārša shēma:

DBVS (kā datu avots) → savienotājs programmā Kafka Connect → Apache Kafka → patērētājs

Kā ilustrāciju es sniegšu diagrammu no projekta vietnes:

Iepazīstinām ar Debezium — CDC, kas paredzēts Apache Kafka

Taču man šī shēma ne visai patīk, jo šķiet, ka ir iespējams tikai izlietnes savienotājs.

Patiesībā situācija ir atšķirīga: aizpildiet savu Data Lake (pēdējā saite diagrammā iepriekš) nav vienīgais Debezium lietošanas veids. Uz Apache Kafka nosūtītos notikumus jūsu lietojumprogrammas var izmantot, lai risinātu dažādas situācijas. Piemēram:

  • neatbilstošu datu noņemšana no kešatmiņas;
  • paziņojumu sūtīšana;
  • meklēt indeksa atjauninājumus;
  • sava veida audita žurnāli;
  • ...

Ja jums ir Java lietojumprogramma un nav vajadzības/iespējas izmantot Kafka klasteru, ir arī iespēja strādāt ar iegultais savienotājs. Acīmredzamais plus ir tas, ka ar to jūs varat atteikties no papildu infrastruktūras (savienotāja un Kafka veidā). Tomēr šis risinājums ir novecojis kopš versijas 1.1 un vairs nav ieteicams lietošanai (nākamajos laidienos tas var tikt noņemts).

Šajā rakstā tiks apspriesta izstrādātāju ieteiktā arhitektūra, kas nodrošina kļūdu toleranci un mērogojamību.

Savienotāja konfigurācija

Lai sāktu izsekot izmaiņām vissvarīgākajā vērtībā - datos - mums ir nepieciešams:

  1. datu avots, kas var būt MySQL, sākot no versijas 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (pilns saraksts);
  2. Apache Kafka klasteris
  3. Kafka Connect instance (versijas 1.x, 2.x);
  4. konfigurēts Debezium savienotājs.

Strādājiet pie pirmajiem diviem punktiem, t.i. DBVS un Apache Kafka instalēšanas process ir ārpus raksta darbības jomas. Savukārt tiem, kas vēlas visu izvietot smilšu kastē, oficiālajā repozitorijā ir jau gatava ar piemēriem docker-compose.yaml.

Sīkāk pievērsīsimies pēdējiem diviem punktiem.

0. Kafka Connect

Šeit un vēlāk rakstā visi konfigurācijas piemēri ir aplūkoti Debezium izstrādātāju izplatītā Docker attēla kontekstā. Tas satur visus nepieciešamos spraudņu failus (savienotājus) un nodrošina Kafka Connect konfigurāciju, izmantojot vides mainīgos.

Ja plānojat izmantot Kafka Connect no Confluent, jums pašam būs jāpievieno nepieciešamo savienotāju spraudņi direktorijā, kas norādīts plugin.path vai iestatīt, izmantojot vides mainīgo CLASSPATH. Kafka Connect darbinieka un savienotāju iestatījumi tiek definēti, izmantojot konfigurācijas failus, kas tiek nodoti kā argumenti darbinieka starta komandai. Sīkāku informāciju sk dokumentācija.

Viss Debeizum iestatīšanas process savienotāja versijā tiek veikts divos posmos. Apskatīsim katru no tiem:

1. Kafka Connect ietvara iestatīšana

Lai straumētu datus Apache Kafka klasterī, Kafka Connect sistēmā ir iestatīti konkrēti parametri, piemēram:

  • klastera savienojuma iestatījumi,
  • tēmu nosaukumi, kuros tiks saglabāta paša savienotāja konfigurācija,
  • tās grupas nosaukums, kurā darbojas savienotājs (ja tiek izmantots sadalītais režīms).

Projekta oficiālais Docker attēls atbalsta konfigurāciju, izmantojot vides mainīgos - to mēs izmantosim. Tātad, lejupielādēsim attēlu:

docker pull debezium/connect

Minimālais vides mainīgo kopums, kas nepieciešams savienotāja palaišanai, ir šāds:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - sākotnējais Kafka klasteru serveru saraksts, lai iegūtu pilnu klastera dalībnieku sarakstu;
  • OFFSET_STORAGE_TOPIC=connector-offsets — tēma pozīciju saglabāšanai, kur pašlaik atrodas savienotājs;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - tēma savienotāja statusa un tā uzdevumu saglabāšanai;
  • CONFIG_STORAGE_TOPIC=connector-config - tēma savienotāja konfigurācijas datu glabāšanai un tās uzdevumi;
  • GROUP_ID=1 — tās darbinieku grupas identifikators, kurai var izpildīt savienojuma uzdevumu; nepieciešams, izmantojot izplatītu (izplatīts) režīms.

Mēs sākam konteineru ar šādiem mainīgajiem:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Piezīme par Avro

Pēc noklusējuma Debezium ieraksta datus JSON formātā, kas ir pieņemams smilšu kastēm un nelielam datu apjomam, taču var būt problēma ļoti noslogotās datu bāzēs. Alternatīva JSON pārveidotājam ir serializēt ziņojumus, izmantojot Avro uz bināro formātu, kas samazina Apache Kafka I/O apakšsistēmas slodzi.

Lai izmantotu Avro, ir jāizvieto atsevišķs shēma-reģistrs (shēmu glabāšanai). Pārveidotāja mainīgie izskatīsies šādi:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Sīkāka informācija par Avro lietošanu un reģistra iestatīšanu tam nav ietverta rakstā — turpmāk skaidrības labad mēs izmantosim JSON.

2. Paša savienotāja iestatīšana

Tagad varat doties tieši uz paša savienotāja konfigurāciju, kas nolasīs datus no avota.

Apskatīsim divu DBVS savienotāju piemēru: PostgreSQL un MongoDB, ar kuriem man ir pieredze un attiecībā uz kuriem ir atšķirības (kaut arī nelielas, bet dažos gadījumos būtiskas!).

Konfigurācija ir aprakstīta JSON apzīmējumā un augšupielādēta Kafka Connect, izmantojot POST pieprasījumu.

2.1. PostgreSQL

PostgreSQL savienotāja konfigurācijas piemērs:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Savienotāja darbības princips pēc šīs konfigurācijas ir diezgan vienkāršs:

  • Pirmajā palaišanas reizē tas savienojas ar konfigurācijā norādīto datu bāzi un sākas režīmā sākotnējais momentuzņēmums, nosūtot Kafkai sākotnējo datu kopu, kas saņemta ar nosacījumu SELECT * FROM table_name.
  • Kad inicializācija ir pabeigta, savienotājs pāriet uz režīmu, kurā tiek nolasītas izmaiņas no PostgreSQL WAL failiem.

Par izmantotajām opcijām:

  • name — tā savienotāja nosaukums, kuram izmanto turpmāk aprakstīto konfigurāciju; nākotnē šis nosaukums tiks izmantots darbam ar savienotāju (t.i., lai skatītu statusu / restartētu / atjauninātu konfigurāciju), izmantojot Kafka Connect REST API;
  • connector.class — DBVS savienotāja klase, ko izmantos konfigurētais savienotājs;
  • plugin.name ir spraudņa nosaukums datu loģiskai dekodēšanai no WAL failiem. Pieejams no kuriem izvēlēties wal2json, decoderbuffs и pgoutput. Pirmajiem diviem ir jāinstalē atbilstoši paplašinājumi DBVS un pgoutput PostgreSQL versijai 10 un jaunākai nav nepieciešamas papildu manipulācijas;
  • database.* — iespējas pieslēgties datu bāzei, kur database.server.name - PostgreSQL instances nosaukums, ko izmanto, lai izveidotu tēmas nosaukumu Kafka klasterī;
  • table.include.list - tabulu saraksts, kurās vēlamies izsekot izmaiņām; norādīts formātā schema.table_name; nevar lietot kopā ar table.exclude.list;
  • heartbeat.interval.ms — intervāls (milisekundēs), ar kādu savienotājs nosūta sirdsdarbības ziņojumus uz īpašu tēmu;
  • heartbeat.action.query - pieprasījums, kas tiks izpildīts, nosūtot katru sirdsdarbības ziņojumu (opcija parādījās kopš versijas 1.1);
  • slot.name — tā replikācijas slota nosaukums, kuru izmantos savienotājs;
  • publication.name - Vārds Publicēšana programmā PostgreSQL, ko savienotājs izmanto. Ja tas neeksistē, Debezium mēģinās to izveidot. Ja lietotājam, ar kuru tiek izveidots savienojums, nav pietiekamu tiesību šai darbībai, savienotājs tiks aizvērts ar kļūdu;
  • transforms nosaka, kā precīzi mainīt mērķa tēmas nosaukumu:
    • transforms.AddPrefix.type norāda, ka izmantosim regulārās izteiksmes;
    • transforms.AddPrefix.regex — maska, ar kuru tiek no jauna definēts mērķa tēmas nosaukums;
    • transforms.AddPrefix.replacement - tieši tas, ko mēs pārdefinējam.

Vairāk par sirdspukstiem un transformācijām

Pēc noklusējuma savienotājs nosūta datus Kafka par katru veikto darījumu un ieraksta savu LSN (žurnāla kārtas numuru) pakalpojuma tēmai. offset. Bet kas notiek, ja savienotājs ir konfigurēts tā, lai lasītu nevis visu datu bāzi, bet tikai daļu no tās tabulām (kurās dati tiek atjaunināti reti)?

  • Savienotājs nolasīs WAL failus un nenoteiks tajos transakciju saistības tabulās, kuras tas pārrauga.
  • Tāpēc tas neatjauninās savu pašreizējo pozīciju ne tēmā, ne replikācijas slotā.
  • Tas savukārt izraisīs WAL failu "iestrēgšanu" diskā un, iespējams, tiem pietrūks vietas diskā.

Un šeit palīgā nāk iespējas. heartbeat.interval.ms и heartbeat.action.query. Izmantojot šīs opcijas pa pāriem, katru reizi, kad tiek nosūtīts sirdsdarbības ziņojums, ir iespējams izpildīt pieprasījumu mainīt datus atsevišķā tabulā. Tādējādi LSN, kurā pašlaik atrodas savienotājs (replikācijas slotā), tiek pastāvīgi atjaunināts. Tas ļauj DBVS noņemt WAL failus, kas vairs nav vajadzīgi. Papildinformāciju par opciju darbību skatiet sadaļā dokumentācija.

Vēl viena iespēja, kurai jāpievērš lielāka uzmanība, ir transforms. Lai gan tas vairāk attiecas uz ērtībām un skaistumu ...

Pēc noklusējuma Debezium veido tēmas, izmantojot šādu nosaukšanas politiku: serverName.schemaName.tableName. Tas ne vienmēr var būt ērti. Iespējas transforms izmantojot regulārās izteiksmes, varat definēt to tabulu sarakstu, kuru notikumi jānovirza uz tēmu ar noteiktu nosaukumu.

Mūsu konfigurācijā, pateicoties transforms notiek šādi: visi CDC notikumi no izsekotās datu bāzes tiks novirzīti uz tēmu ar nosaukumu data.cdc.dbname. Pretējā gadījumā (bez šiem iestatījumiem) Debezium pēc noklusējuma izveido tēmu katrai veidlapas tabulai: pg-dev.public.<table_name>.

Savienotāju ierobežojumi

PostgreSQL savienotāja konfigurācijas apraksta beigās ir vērts runāt par šādām tā darba funkcijām / ierobežojumiem:

  1. PostgreSQL savienotāja funkcionalitāte balstās uz loģiskās dekodēšanas koncepciju. Tāpēc viņš neizseko pieprasījumus mainīt datu bāzes struktūru (DDL) - attiecīgi šie dati tēmās nebūs.
  2. Tā kā tiek izmantoti replikācijas sloti, savienotāja pieslēgšana ir iespējama tikai uz galveno DBVS gadījumu.
  3. Ja lietotājam, ar kuru savienotājs izveido savienojumu ar datu bāzi, ir tikai lasīšanas tiesības, tad pirms pirmās palaišanas jums būs manuāli jāizveido replikācijas slots un jāpublicē datu bāzē.

Konfigurācijas pielietošana

Tātad, ielādēsim mūsu konfigurāciju savienotājā:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Mēs pārbaudām, vai lejupielāde bija veiksmīga un savienotājs tika palaists:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Lieliski: tas ir iestatīts un gatavs darbam. Tagad izliksimies par patērētājiem un izveidosim savienojumu ar Kafka, pēc tam pievienojam un mainām ierakstu tabulā:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', 'foo@bar.com');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Mūsu tēmā tas tiks parādīts šādi:

Ļoti garš JSON ar mūsu izmaiņām

{
  "schema":{
    "type":"struct",
    "fields":[
      {
        "type":"int32",
        "optional":false,
        "field":"id"
      }
    ],
    "optional":false,
    "name":"data.cdc.dbname.Key"
  },
  "payload":{
    "id":1005
  }
}{
  "schema":{
    "type":"struct",
    "fields":[
      {
        "type":"struct",
        "fields":[
          {
            "type":"int32",
            "optional":false,
            "field":"id"
          },
          {
            "type":"string",
            "optional":false,
            "field":"first_name"
          },
          {
            "type":"string",
            "optional":false,
            "field":"last_name"
          },
          {
            "type":"string",
            "optional":false,
            "field":"email"
          }
        ],
        "optional":true,
        "name":"data.cdc.dbname.Value",
        "field":"before"
      },
      {
        "type":"struct",
        "fields":[
          {
            "type":"int32",
            "optional":false,
            "field":"id"
          },
          {
            "type":"string",
            "optional":false,
            "field":"first_name"
          },
          {
            "type":"string",
            "optional":false,
            "field":"last_name"
          },
          {
            "type":"string",
            "optional":false,
            "field":"email"
          }
        ],
        "optional":true,
        "name":"data.cdc.dbname.Value",
        "field":"after"
      },
      {
        "type":"struct",
        "fields":[
          {
            "type":"string",
            "optional":false,
            "field":"version"
          },
          {
            "type":"string",
            "optional":false,
            "field":"connector"
          },
          {
            "type":"string",
            "optional":false,
            "field":"name"
          },
          {
            "type":"int64",
            "optional":false,
            "field":"ts_ms"
          },
          {
            "type":"string",
            "optional":true,
            "name":"io.debezium.data.Enum",
            "version":1,
            "parameters":{
              "allowed":"true,last,false"
            },
            "default":"false",
            "field":"snapshot"
          },
          {
            "type":"string",
            "optional":false,
            "field":"db"
          },
          {
            "type":"string",
            "optional":false,
            "field":"schema"
          },
          {
            "type":"string",
            "optional":false,
            "field":"table"
          },
          {
            "type":"int64",
            "optional":true,
            "field":"txId"
          },
          {
            "type":"int64",
            "optional":true,
            "field":"lsn"
          },
          {
            "type":"int64",
            "optional":true,
            "field":"xmin"
          }
        ],
        "optional":false,
        "name":"io.debezium.connector.postgresql.Source",
        "field":"source"
      },
      {
        "type":"string",
        "optional":false,
        "field":"op"
      },
      {
        "type":"int64",
        "optional":true,
        "field":"ts_ms"
      },
      {
        "type":"struct",
        "fields":[
          {
            "type":"string",
            "optional":false,
            "field":"id"
          },
          {
            "type":"int64",
            "optional":false,
            "field":"total_order"
          },
          {
            "type":"int64",
            "optional":false,
            "field":"data_collection_order"
          }
        ],
        "optional":true,
        "field":"transaction"
      }
    ],
    "optional":false,
    "name":"data.cdc.dbname.Envelope"
  },
  "payload":{
    "before":null,
    "after":{
      "id":1005,
      "first_name":"foo",
      "last_name":"bar",
      "email":"foo@bar.com"
    },
    "source":{
      "version":"1.2.3.Final",
      "connector":"postgresql",
      "name":"dbserver1",
      "ts_ms":1600374991648,
      "snapshot":"false",
      "db":"postgres",
      "schema":"public",
      "table":"customers",
      "txId":602,
      "lsn":34088472,
      "xmin":null
    },
    "op":"c",
    "ts_ms":1600374991762,
    "transaction":null
  }
}{
  "schema":{
    "type":"struct",
    "fields":[
      {
        "type":"int32",
        "optional":false,
        "field":"id"
      }
    ],
    "optional":false,
    "name":"data.cdc.dbname.Key"
  },
  "payload":{
    "id":1005
  }
}{
  "schema":{
    "type":"struct",
    "fields":[
      {
        "type":"struct",
        "fields":[
          {
            "type":"int32",
            "optional":false,
            "field":"id"
          },
          {
            "type":"string",
            "optional":false,
            "field":"first_name"
          },
          {
            "type":"string",
            "optional":false,
            "field":"last_name"
          },
          {
            "type":"string",
            "optional":false,
            "field":"email"
          }
        ],
        "optional":true,
        "name":"data.cdc.dbname.Value",
        "field":"before"
      },
      {
        "type":"struct",
        "fields":[
          {
            "type":"int32",
            "optional":false,
            "field":"id"
          },
          {
            "type":"string",
            "optional":false,
            "field":"first_name"
          },
          {
            "type":"string",
            "optional":false,
            "field":"last_name"
          },
          {
            "type":"string",
            "optional":false,
            "field":"email"
          }
        ],
        "optional":true,
        "name":"data.cdc.dbname.Value",
        "field":"after"
      },
      {
        "type":"struct",
        "fields":[
          {
            "type":"string",
            "optional":false,
            "field":"version"
          },
          {
            "type":"string",
            "optional":false,
            "field":"connector"
          },
          {
            "type":"string",
            "optional":false,
            "field":"name"
          },
          {
            "type":"int64",
            "optional":false,
            "field":"ts_ms"
          },
          {
            "type":"string",
            "optional":true,
            "name":"io.debezium.data.Enum",
            "version":1,
            "parameters":{
              "allowed":"true,last,false"
            },
            "default":"false",
            "field":"snapshot"
          },
          {
            "type":"string",
            "optional":false,
            "field":"db"
          },
          {
            "type":"string",
            "optional":false,
            "field":"schema"
          },
          {
            "type":"string",
            "optional":false,
            "field":"table"
          },
          {
            "type":"int64",
            "optional":true,
            "field":"txId"
          },
          {
            "type":"int64",
            "optional":true,
            "field":"lsn"
          },
          {
            "type":"int64",
            "optional":true,
            "field":"xmin"
          }
        ],
        "optional":false,
        "name":"io.debezium.connector.postgresql.Source",
        "field":"source"
      },
      {
        "type":"string",
        "optional":false,
        "field":"op"
      },
      {
        "type":"int64",
        "optional":true,
        "field":"ts_ms"
      },
      {
        "type":"struct",
        "fields":[
          {
            "type":"string",
            "optional":false,
            "field":"id"
          },
          {
            "type":"int64",
            "optional":false,
            "field":"total_order"
          },
          {
            "type":"int64",
            "optional":false,
            "field":"data_collection_order"
          }
        ],
        "optional":true,
        "field":"transaction"
      }
    ],
    "optional":false,
    "name":"data.cdc.dbname.Envelope"
  },
  "payload":{
    "before":{
      "id":1005,
      "first_name":"foo",
      "last_name":"bar",
      "email":"foo@bar.com"
    },
    "after":{
      "id":1005,
      "first_name":"egg",
      "last_name":"bar",
      "email":"foo@bar.com"
    },
    "source":{
      "version":"1.2.3.Final",
      "connector":"postgresql",
      "name":"dbserver1",
      "ts_ms":1600375609365,
      "snapshot":"false",
      "db":"postgres",
      "schema":"public",
      "table":"customers",
      "txId":603,
      "lsn":34089688,
      "xmin":null
    },
    "op":"u",
    "ts_ms":1600375609778,
    "transaction":null
  }
}

Abos gadījumos ieraksti sastāv no mainītā ieraksta atslēgas (PK) un pašas izmaiņu būtības: kāds ieraksts bija pirms un kāds kļuva pēc tam.

  • Gadījumā, ja INSERT: vērtība pirms (before) vienāds nullkam seko ievietotā virkne.
  • Gadījumā, ja UPDATE: plkst payload.before tiek parādīts rindas iepriekšējais stāvoklis, un payload.after - jauns ar izmaiņu būtību.

2.2 MongoDB

Šis savienotājs izmanto standarta MongoDB replikācijas mehānismu, nolasot informāciju no DBVS primārā mezgla oplog.

Līdzīgi kā jau aprakstītajā PgSQL savienotājā, arī šeit pirmajā startā tiek uzņemts primāro datu momentuzņēmums, pēc kura savienotājs pārslēdzas uz oplog lasīšanas režīmu.

Konfigurācijas piemērs:

{
  "name": "mp-k8s-mongo-connector",
  "config": {
        "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
        "tasks.max": "1",
        "mongodb.hosts": "MainRepSet/mongo:27017",
        "mongodb.name": "mongo",
        "mongodb.user": "debezium",
        "mongodb.password": "dbname",
        "database.whitelist": "db_1,db_2",
        "transforms": "AddPrefix",
        "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
        "transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
        }
  }

Kā redzat, salīdzinājumā ar iepriekšējo piemēru nav jaunu opciju, bet ir samazināts tikai opciju skaits, kas ir atbildīgas par savienojumu ar datu bāzi, un to prefiksi.

iestatījumi transforms šoreiz viņi rīkojas šādi: pagriež no shēmas mērķa tēmas nosaukumu <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

kļūdu tolerance

Problēma par kļūdu toleranci un augstu pieejamību mūsdienās ir aktuālāka nekā jebkad agrāk – it īpaši, ja mēs runājam par datiem un darījumiem, un datu izmaiņu izsekošana šajā jautājumā nav malā. Apskatīsim, kas principā var noiet greizi un kas notiks ar Debezium katrā gadījumā.

Ir trīs atteikšanās iespējas:

  1. Kafka Connect kļūme. Ja savienojums ir konfigurēts darbam sadalītā režīmā, vairākiem darbiniekiem ir jāiestata viens un tas pats group.id. Pēc tam, ja viens no tiem neizdodas, savienotājs tiks restartēts otram darbiniekam un turpinās lasīt no pēdējās noteiktās pozīcijas tēmā Kafka.
  2. Savienojuma zudums ar Kafka kopu. Savienotājs vienkārši pārtrauks nolasīšanu vietā, kuru tas neizdevās nosūtīt Kafkai, un periodiski mēģinās to nosūtīt atkārtoti, līdz mēģinājums būs veiksmīgs.
  3. Datu avots nav pieejams. Savienotājs mēģinās atkārtoti izveidot savienojumu ar avotu atbilstoši konfigurācijai. Noklusējums ir 16 mēģinājumi izmantot eksponenciāla atkāpšanās. Pēc 16. neveiksmīgā mēģinājuma uzdevums tiks atzīmēts kā neizdevās un tas būs manuāli jārestartē, izmantojot Kafka Connect REST interfeisu.
    • Gadījumā, ja PostgreSQL dati netiks zaudēti, jo izmantojot replikācijas slotus, tiks novērsta to WAL failu dzēšana, kurus savienotājs nav nolasījis. Šajā gadījumā ir mīnuss: ja tīkla savienojamība starp savienotāju un DBVS tiek traucēta ilgu laiku, pastāv iespēja, ka diska vieta beigsies, un tas var izraisīt visas DBVS kļūmi.
    • Gadījumā, ja MySQL binlog failus var pagriezt pati DBVS, pirms tiek atjaunots savienojums. Tādējādi savienotājs pāries neveiksmīgā stāvoklī, un tas būs jārestartē sākotnējā momentuzņēmuma režīmā, lai turpinātu lasīt no binlogs un atjaunotu normālu darbību.
    • uz MongoDB. Dokumentācijā teikts, ka savienotāja darbība gadījumā, ja žurnāla/oplog faili ir izdzēsti un savienotājs nevar turpināt lasīšanu no vietas, kur tas tika pārtraukts, ir vienāda visām DBVS. Tas slēpjas faktā, ka savienotājs nonāks stāvoklī neizdevās un būs nepieciešama restartēšana režīmā sākotnējais momentuzņēmums.

      Tomēr ir izņēmumi. Ja savienotājs ilgu laiku bija atvienotā stāvoklī (vai nevarēja sasniegt MongoDB gadījumu) un oplog šajā laikā tika pagriezts, tad, kad savienojums tiks atjaunots, savienotājs mierīgi turpinās lasīt datus no pirmās pieejamās pozīcijas. , tāpēc daži Kafkas dati sitīs.

Secinājums

Debezium ir mana pirmā pieredze ar CDC sistēmām, un kopumā tā ir bijusi ļoti pozitīva. Projekts uzpirka galvenās DBVS atbalstu, vieglu konfigurāciju, atbalstu klasterēšanai un aktīvu kopienu. Tiem, kurus interesē prakse, iesaku izlasīt ceļvežus Kafka Connect и Debezium.

Salīdzinot ar Kafka Connect JDBC savienotāju, Debezium galvenā priekšrocība ir tā, ka izmaiņas tiek nolasītas no DBVS žurnāliem, kas ļauj saņemt datus ar minimālu aizkavi. JDBC savienotājs (nodrošina Kafka Connect) vaicā izsekoto tabulu ar noteiktu intervālu un (tā paša iemesla dēļ) neģenerē ziņojumus, kad dati tiek dzēsti (kā jūs varat pieprasīt datus, kas tur nav?).

Lai atrisinātu līdzīgas problēmas, varat pievērst uzmanību šādiem risinājumiem (papildus Debezium):

PS

Lasi arī mūsu emuārā:

Avots: www.habr.com