Prezantimi i Debezium - CDC për Apache Kafka

Prezantimi i Debezium - CDC për Apache Kafka

Në punën time, shpesh ndeshem me zgjidhje të reja teknike / produkte softuerike, informacioni për të cilin është mjaft i pakët në internetin rusisht-folës. Me këtë artikull, do të përpiqem të plotësoj një boshllëk të tillë me një shembull nga praktika ime e fundit, kur më duhej të konfiguroja dërgimin e ngjarjeve CDC nga dy DBMS të njohura (PostgreSQL dhe MongoDB) në një grup Kafka duke përdorur Debezium. Shpresoj që ky artikull rishikues, i cili u shfaq si rezultat i punës së bërë, do të jetë i dobishëm për të tjerët.

ÇfarĂ« Ă«shtĂ« Debezium dhe CDC nĂ« pĂ«rgjithĂ«si?

Debezium - Përfaqësues i kategorisë së softuerit CDC (Regjistroni ndryshimin e të dhënave), ose më saktë, është një grup lidhësish për DBMS të ndryshme që janë në përputhje me kornizën Apache Kafka Connect.

Ajo projekt me kod të hapur, licencuar sipas licencës Apache v2.0 dhe sponsorizuar nga Red Hat. Zhvillimi ka nisur që nga viti 2016 dhe për momentin ofron mbështetje zyrtare për DBMS-të e mëposhtme: MySQL, PostgreSQL, MongoDB, SQL Server. Ekzistojnë gjithashtu lidhës për Cassandra dhe Oracle, por ato aktualisht janë në statusin e "qasjes së hershme" dhe lëshimet e reja nuk garantojnë pajtueshmëri të prapambetur.

Nëse krahasojmë CDC me qasjen tradicionale (kur aplikacioni lexon drejtpërdrejt të dhënat nga DBMS), atëherë avantazhet e tij kryesore përfshijnë zbatimin e transmetimit të ndryshimit të të dhënave në nivelin e rreshtit me vonesë të ulët, besueshmëri të lartë dhe disponueshmëri. Dy pikat e fundit arrihen duke përdorur një grup Kafka si një depo për ngjarjet e CDC.

Gjithashtu, avantazhet përfshijnë faktin se një model i vetëm përdoret për të ruajtur ngjarjet, kështu që aplikacioni përfundimtar nuk duhet të shqetësohet për nuancat e funksionimit të DBMS të ndryshme.

Më në fund, përdorimi i një ndërmjetësi mesazhesh hap hapësirën për shkallëzim horizontal të aplikacioneve që gjurmojnë ndryshimet në të dhëna. Në të njëjtën kohë, ndikimi në burimin e të dhënave minimizohet, pasi të dhënat nuk merren drejtpërdrejt nga DBMS, por nga grupi Kafka.

Rreth arkitekturës Debezium

Përdorimi i Debezium zbret në këtë skemë të thjeshtë:

DBMS (si burim i tĂ« dhĂ«nave) → lidhĂ«s nĂ« Kafka Connect → Apache Kafka → konsumator

Si ilustrim, unë do të jap një diagram nga faqja e internetit e projektit:

Prezantimi i Debezium - CDC për Apache Kafka

Sidoqoftë, nuk më pëlqen shumë kjo skemë, sepse duket se vetëm një lidhës lavaman është i mundur.

Në realitet, situata është ndryshe: mbushja e Liqenit tuaj të të Dhënave (lidhja e fundit në diagramin e mësipërm) nuk është mënyra e vetme për të përdorur Debezium. Ngjarjet e dërguara te Apache Kafka mund të përdoren nga aplikacionet tuaja për t'u marrë me situata të ndryshme. Për shembull:

  • heqja e tĂ« dhĂ«nave tĂ« parĂ«ndĂ«sishme nga cache;
  • dĂ«rgimi i njoftimeve;
  • pĂ«rditĂ«simet e indeksit tĂ« kĂ«rkimit;
  • njĂ« lloj regjistrash auditimi;
  • ...

Në rast se keni një aplikacion Java dhe nuk ka nevojë/mundësi për të përdorur një grup Kafka, ekziston gjithashtu mundësia për të punuar përmes lidhës i ngulitur. Plus i dukshëm është se me të mund të refuzoni infrastrukturën shtesë (në formën e një lidhësi dhe Kafka). Megjithatë, kjo zgjidhje është vjetëruar që nga versioni 1.1 dhe nuk rekomandohet më për përdorim (mund të hiqet në versionet e ardhshme).

Ky artikull do të diskutojë arkitekturën e rekomanduar nga zhvilluesit, e cila siguron tolerancën e gabimeve dhe shkallëzueshmërinë.

Konfigurimi i lidhësit

Për të filluar ndjekjen e ndryshimeve në vlerën më të rëndësishme - të dhënat - na duhen:

  1. burimi i të dhënave, i cili mund të jetë MySQL duke filluar nga versioni 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (Lista e plotë);
  2. Grup Apache Kafka
  3. Shembulli i Kafka Connect (versionet 1.x, 2.x);
  4. lidhës i konfiguruar Debezium.

Punoni në dy pikat e para, d.m.th. procesi i instalimit të një DBMS dhe Apache Kafka janë përtej qëllimit të artikullit. Sidoqoftë, për ata që duan të vendosin gjithçka në një sandbox, ekziston një i gatshëm në depon zyrtare me shembuj doker-kompozoj.yaml.

Ne do të përqendrohemi në dy pikat e fundit më në detaje.

0. Kafka Connect

Këtu dhe më vonë në artikull, të gjithë shembujt e konfigurimit konsiderohen në kontekstin e imazhit Docker të shpërndarë nga zhvilluesit e Debezium. Ai përmban të gjithë skedarët e nevojshëm të shtojcave (lidhësit) dhe siguron konfigurimin e Kafka Connect duke përdorur variabla të mjedisit.

Nëse keni ndërmend të përdorni Kafka Connect nga Confluent, do t'ju duhet të shtoni vetë shtojcat e lidhësve të nevojshëm në drejtorinë e specifikuar në plugin.path ose vendoset nëpërmjet një ndryshoreje mjedisi CLASSPATH. Cilësimet për punëtorin dhe lidhësit e Kafka Connect përcaktohen përmes skedarëve të konfigurimit që i kalohen si argumente komandës startuese të punëtorit. Për detaje, shih dokumentacionin.

I gjithë procesi i konfigurimit të Debeizum në versionin lidhës kryhet në dy faza. Le të shqyrtojmë secilën prej tyre:

1. Vendosja e kornizës Kafka Connect

Për të transmetuar të dhëna në një grupim Apache Kafka, parametrat specifikë vendosen në kornizën Kafka Connect, si p.sh.

  • cilĂ«simet e lidhjes sĂ« grupit,
  • emrat e temave nĂ« tĂ« cilat do tĂ« ruhet konfigurimi i vetĂ« lidhĂ«sit,
  • emri i grupit nĂ« tĂ« cilin lidhĂ«si funksionon (nĂ« rast tĂ« pĂ«rdorimit tĂ« modalitetit tĂ« shpĂ«rndarĂ«).

Imazhi zyrtar Docker i projektit mbështet konfigurimin duke përdorur variablat e mjedisit - kjo është ajo që ne do të përdorim. Pra, le të shkarkojmë imazhin:

docker pull debezium/connect

Grupi minimal i variablave të mjedisit që kërkohet për të ekzekutuar lidhësin është si më poshtë:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - lista fillestare e serverĂ«ve tĂ« grupimit Kafka pĂ«r tĂ« marrĂ« njĂ« listĂ« tĂ« plotĂ« tĂ« anĂ«tarĂ«ve tĂ« grupimit;
  • OFFSET_STORAGE_TOPIC=connector-offsets — njĂ« temĂ« pĂ«r ruajtjen e pozicioneve ku ndodhet aktualisht lidhĂ«si;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - njĂ« temĂ« pĂ«r ruajtjen e statusit tĂ« lidhĂ«sit dhe detyrave tĂ« tij;
  • CONFIG_STORAGE_TOPIC=connector-config - njĂ« temĂ« pĂ«r ruajtjen e tĂ« dhĂ«nave tĂ« konfigurimit tĂ« lidhĂ«sit dhe detyrat e tij;
  • GROUP_ID=1 — identifikuesi i grupit tĂ« punĂ«torĂ«ve nĂ« tĂ« cilin mund tĂ« ekzekutohet detyra lidhĂ«se; kĂ«rkohet gjatĂ« pĂ«rdorimit tĂ« shpĂ«rndarĂ« (shpĂ«rndarĂ«) regjimi.

Ne e fillojmë kontejnerin me këto variabla:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Shënim për Avro

Si parazgjedhje, Debezium shkruan të dhëna në formatin JSON, i cili është i pranueshëm për sandboxet dhe sasi të vogla të dhënash, por mund të jetë problem në bazat e të dhënave të ngarkuara shumë. Një alternativë ndaj konvertuesit JSON është serializimi i mesazheve duke përdorur Avro në një format binar, i cili redukton ngarkesën në nënsistemin I/O në Apache Kafka.

Për të përdorur Avro, duhet të vendosni një të veçantë skema-regjistri (për ruajtjen e skemave). Variablat për konvertuesin do të duken kështu:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Detajet mbi përdorimin e Avro dhe vendosjen e një regjistri për të janë përtej qëllimit të artikullit - më tej, për qartësi, ne do të përdorim JSON.

2. Vendosja e vetë lidhësit

Tani mund të shkoni drejtpërdrejt në konfigurimin e vetë lidhësit, i cili do të lexojë të dhënat nga burimi.

Le të shohim shembullin e lidhësve për dy DBMS: PostgreSQL dhe MongoDB, për të cilat kam përvojë dhe për të cilat ka dallime (megjithëse të vogla, por në disa raste domethënëse!).

Konfigurimi përshkruhet në shënimin JSON dhe ngarkohet në Kafka Connect duke përdorur një kërkesë POST.

2.1. PostgreSQL

Shembull i konfigurimit të lidhësit për PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Parimi i funksionimit të lidhësit pas këtij konfigurimi është mjaft i thjeshtë:

  • NĂ« fillimin e parĂ«, ai lidhet me bazĂ«n e tĂ« dhĂ«nave tĂ« specifikuar nĂ« konfigurim dhe fillon nĂ« modalitetin fotografi fillestare, duke i dĂ«rguar KafkĂ«s grupin fillestar tĂ« tĂ« dhĂ«nave tĂ« marra me kusht SELECT * FROM table_name.
  • Pas pĂ«rfundimit tĂ« inicializimit, lidhĂ«si hyn nĂ« modalitetin e leximit tĂ« ndryshimeve nga skedarĂ«t PostgreSQL WAL.

Rreth opsioneve të përdorura:

  • name — emri i lidhĂ«sit pĂ«r tĂ« cilin pĂ«rdoret konfigurimi i pĂ«rshkruar mĂ« poshtĂ«; nĂ« tĂ« ardhmen, ky emĂ«r pĂ«rdoret pĂ«r tĂ« punuar me lidhĂ«sin (d.m.th. shikoni statusin / rinisni / pĂ«rditĂ«soni konfigurimin) pĂ«rmes Kafka Connect REST API;
  • connector.class — klasa e lidhĂ«sit DBMS qĂ« do tĂ« pĂ«rdoret nga lidhĂ«si i konfiguruar;
  • plugin.name Ă«shtĂ« emri i shtojcĂ«s pĂ«r dekodimin logjik tĂ« tĂ« dhĂ«nave nga skedarĂ«t WAL. NĂ« dispozicion pĂ«r tĂ« zgjedhur wal2json, decoderbuffs Đž pgoutput. Dy tĂ« parat kĂ«rkojnĂ« instalimin e shtesave tĂ« duhura nĂ« DBMS, dhe pgoutput pĂ«r PostgreSQL versioni 10 dhe mĂ« i lartĂ« nuk kĂ«rkon manipulime shtesĂ«;
  • database.* — opsionet pĂ«r t'u lidhur me bazĂ«n e tĂ« dhĂ«nave, ku database.server.name - emri i shembullit PostgreSQL i pĂ«rdorur pĂ«r tĂ« formuar emrin e temĂ«s nĂ« grupimin Kafka;
  • table.include.list - njĂ« listĂ« tabelash nĂ« tĂ« cilat duam tĂ« gjurmojmĂ« ndryshimet; dhĂ«nĂ« nĂ« format schema.table_name; nuk mund tĂ« pĂ«rdoret sĂ« bashku me table.exclude.list;
  • heartbeat.interval.ms — intervali (nĂ« milisekonda) me tĂ« cilin lidhĂ«si dĂ«rgon mesazhe tĂ« rrahjeve tĂ« zemrĂ«s nĂ« njĂ« temĂ« tĂ« veçantĂ«;
  • heartbeat.action.query - njĂ« kĂ«rkesĂ« qĂ« do tĂ« ekzekutohet gjatĂ« dĂ«rgimit tĂ« çdo mesazhi tĂ« rrahjeve tĂ« zemrĂ«s (opsioni Ă«shtĂ« shfaqur qĂ« nga versioni 1.1);
  • slot.name — emri i slotit tĂ« replikimit qĂ« do tĂ« pĂ«rdoret nga lidhĂ«si;
  • publication.name - Emri Publikim nĂ« PostgreSQL qĂ« pĂ«rdor lidhĂ«si. NĂ« rast se nuk ekziston, Debezium do tĂ« pĂ«rpiqet ta krijojĂ« atĂ«. NĂ«se pĂ«rdoruesi nĂ«n tĂ« cilin Ă«shtĂ« bĂ«rĂ« lidhja nuk ka tĂ« drejta tĂ« mjaftueshme pĂ«r kĂ«tĂ« veprim, lidhĂ«si do tĂ« dalĂ« me njĂ« gabim;
  • transforms pĂ«rcakton se si tĂ« ndryshohet saktĂ«sisht emri i temĂ«s sĂ« synuar:
    • transforms.AddPrefix.type tregon se do tĂ« pĂ«rdorim shprehje tĂ« rregullta;
    • transforms.AddPrefix.regex — maskĂ« me tĂ« cilĂ«n ripĂ«rcaktohet emri i temĂ«s sĂ« synuar;
    • transforms.AddPrefix.replacement - drejtpĂ«rdrejt atĂ« qĂ« ne ripĂ«rcaktojmĂ«.

Më shumë rreth rrahjeve të zemrës dhe transformimeve

Si parazgjedhje, lidhĂ«si i dĂ«rgon tĂ« dhĂ«na Kafka-s pĂ«r çdo transaksion tĂ« kryer dhe shkruan LSN-nĂ« ​​e tij (Numri i SekuencĂ«s sĂ« Regjistrit) nĂ« temĂ«n e shĂ«rbimit offset. Por çfarĂ« ndodh nĂ«se lidhĂ«si Ă«shtĂ« konfiguruar tĂ« lexojĂ« jo tĂ« gjithĂ« bazĂ«n e tĂ« dhĂ«nave, por vetĂ«m njĂ« pjesĂ« tĂ« tabelave tĂ« saj (nĂ« tĂ« cilat tĂ« dhĂ«nat pĂ«rditĂ«sohen rrallĂ«)?

  • LidhĂ«si do tĂ« lexojĂ« skedarĂ«t WAL dhe nuk do tĂ« zbulojĂ« kryerjet e transaksioneve nĂ« to nĂ« tabelat qĂ« monitoron.
  • Prandaj, ai nuk do tĂ« pĂ«rditĂ«sojĂ« pozicionin e tij aktual as nĂ« temĂ«, as nĂ« slotin e pĂ«rsĂ«ritjes.
  • Kjo, nga ana tjetĂ«r, do tĂ« bĂ«jĂ« qĂ« skedarĂ«t WAL tĂ« "ngecin" nĂ« disk dhe ka tĂ« ngjarĂ« tĂ« mbarojĂ« hapĂ«sira nĂ« disk.

Dhe këtu opsionet vijnë në shpëtim. heartbeat.interval.ms О heartbeat.action.query. Përdorimi i këtyre opsioneve në çift bën të mundur ekzekutimin e një kërkese për ndryshimin e të dhënave në një tabelë të veçantë sa herë që dërgohet një mesazh rrahje zemre. Kështu, LSN në të cilin lidhësi ndodhet aktualisht (në slotin e replikimit) përditësohet vazhdimisht. Kjo i lejon DBMS të heqë skedarët WAL që nuk nevojiten më. Për më shumë informacion se si funksionojnë opsionet, shihni dokumentacionin.

Një tjetër opsion që meriton vëmendje më të madhe është transforms. Edhe pse bëhet fjalë më shumë për komoditetin dhe bukurinë ...

Si parazgjedhje, Debezium krijon tema duke përdorur politikën e mëposhtme të emërtimit: serverName.schemaName.tableName. Kjo mund të mos jetë gjithmonë e përshtatshme. Opsione transforms duke përdorur shprehje të rregullta, mund të përcaktoni një listë tabelash, ngjarjet e të cilave duhet të drejtohen në një temë me një emër specifik.

Në konfigurimin tonë falë transforms ndodh si më poshtë: të gjitha ngjarjet e CDC nga baza e të dhënave të gjurmuara do të shkojnë te tema me emrin data.cdc.dbname. Përndryshe (pa këto cilësime), Debezium si parazgjedhje do të krijonte një temë për secilën tabelë të formularit: pg-dev.public.<table_name>.

Kufizimet e lidhësit

Në fund të përshkrimit të konfigurimit të lidhësit për PostgreSQL, ia vlen të flasim për veçoritë / kufizimet e mëposhtme të punës së tij:

  1. Funksionaliteti i lidhësit për PostgreSQL mbështetet në konceptin e dekodimit logjik. Prandaj ai nuk gjurmon kërkesat për të ndryshuar strukturën e bazës së të dhënave (DDL) - në përputhje me rrethanat, këto të dhëna nuk do të jenë në tema.
  2. Meqenëse përdoren lojëra elektronike të riprodhimit, lidhja e lidhësit është e mundur vetëm tek shembulli kryesor i DBMS.
  3. Nëse përdoruesi nën të cilin lidhësi lidhet me bazën e të dhënave ka të drejta vetëm për lexim, atëherë përpara nisjes së parë, do t'ju duhet të krijoni manualisht një vend të përsëritjes dhe ta publikoni në bazën e të dhënave.

Aplikimi i një konfigurimi

Pra, le të ngarkojmë konfigurimin tonë në lidhës:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Ne kontrollojmë që shkarkimi ishte i suksesshëm dhe lidhësi filloi:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

E shkëlqyeshme: është konfiguruar dhe gati për të shkuar. Tani le të pretendojmë të jemi konsumator dhe të lidhemi me Kafka, pas së cilës shtojmë dhe ndryshojmë një hyrje në tabelë:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', 'foo@bar.com');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Në temën tonë, kjo do të shfaqet si më poshtë:

JSON shumë i gjatë me ndryshimet tona

{
  "schema":{
    "type":"struct",
    "fields":[
      {
        "type":"int32",
        "optional":false,
        "field":"id"
      }
    ],
    "optional":false,
    "name":"data.cdc.dbname.Key"
  },
  "payload":{
    "id":1005
  }
}{
  "schema":{
    "type":"struct",
    "fields":[
      {
        "type":"struct",
        "fields":[
          {
            "type":"int32",
            "optional":false,
            "field":"id"
          },
          {
            "type":"string",
            "optional":false,
            "field":"first_name"
          },
          {
            "type":"string",
            "optional":false,
            "field":"last_name"
          },
          {
            "type":"string",
            "optional":false,
            "field":"email"
          }
        ],
        "optional":true,
        "name":"data.cdc.dbname.Value",
        "field":"before"
      },
      {
        "type":"struct",
        "fields":[
          {
            "type":"int32",
            "optional":false,
            "field":"id"
          },
          {
            "type":"string",
            "optional":false,
            "field":"first_name"
          },
          {
            "type":"string",
            "optional":false,
            "field":"last_name"
          },
          {
            "type":"string",
            "optional":false,
            "field":"email"
          }
        ],
        "optional":true,
        "name":"data.cdc.dbname.Value",
        "field":"after"
      },
      {
        "type":"struct",
        "fields":[
          {
            "type":"string",
            "optional":false,
            "field":"version"
          },
          {
            "type":"string",
            "optional":false,
            "field":"connector"
          },
          {
            "type":"string",
            "optional":false,
            "field":"name"
          },
          {
            "type":"int64",
            "optional":false,
            "field":"ts_ms"
          },
          {
            "type":"string",
            "optional":true,
            "name":"io.debezium.data.Enum",
            "version":1,
            "parameters":{
              "allowed":"true,last,false"
            },
            "default":"false",
            "field":"snapshot"
          },
          {
            "type":"string",
            "optional":false,
            "field":"db"
          },
          {
            "type":"string",
            "optional":false,
            "field":"schema"
          },
          {
            "type":"string",
            "optional":false,
            "field":"table"
          },
          {
            "type":"int64",
            "optional":true,
            "field":"txId"
          },
          {
            "type":"int64",
            "optional":true,
            "field":"lsn"
          },
          {
            "type":"int64",
            "optional":true,
            "field":"xmin"
          }
        ],
        "optional":false,
        "name":"io.debezium.connector.postgresql.Source",
        "field":"source"
      },
      {
        "type":"string",
        "optional":false,
        "field":"op"
      },
      {
        "type":"int64",
        "optional":true,
        "field":"ts_ms"
      },
      {
        "type":"struct",
        "fields":[
          {
            "type":"string",
            "optional":false,
            "field":"id"
          },
          {
            "type":"int64",
            "optional":false,
            "field":"total_order"
          },
          {
            "type":"int64",
            "optional":false,
            "field":"data_collection_order"
          }
        ],
        "optional":true,
        "field":"transaction"
      }
    ],
    "optional":false,
    "name":"data.cdc.dbname.Envelope"
  },
  "payload":{
    "before":null,
    "after":{
      "id":1005,
      "first_name":"foo",
      "last_name":"bar",
      "email":"foo@bar.com"
    },
    "source":{
      "version":"1.2.3.Final",
      "connector":"postgresql",
      "name":"dbserver1",
      "ts_ms":1600374991648,
      "snapshot":"false",
      "db":"postgres",
      "schema":"public",
      "table":"customers",
      "txId":602,
      "lsn":34088472,
      "xmin":null
    },
    "op":"c",
    "ts_ms":1600374991762,
    "transaction":null
  }
}{
  "schema":{
    "type":"struct",
    "fields":[
      {
        "type":"int32",
        "optional":false,
        "field":"id"
      }
    ],
    "optional":false,
    "name":"data.cdc.dbname.Key"
  },
  "payload":{
    "id":1005
  }
}{
  "schema":{
    "type":"struct",
    "fields":[
      {
        "type":"struct",
        "fields":[
          {
            "type":"int32",
            "optional":false,
            "field":"id"
          },
          {
            "type":"string",
            "optional":false,
            "field":"first_name"
          },
          {
            "type":"string",
            "optional":false,
            "field":"last_name"
          },
          {
            "type":"string",
            "optional":false,
            "field":"email"
          }
        ],
        "optional":true,
        "name":"data.cdc.dbname.Value",
        "field":"before"
      },
      {
        "type":"struct",
        "fields":[
          {
            "type":"int32",
            "optional":false,
            "field":"id"
          },
          {
            "type":"string",
            "optional":false,
            "field":"first_name"
          },
          {
            "type":"string",
            "optional":false,
            "field":"last_name"
          },
          {
            "type":"string",
            "optional":false,
            "field":"email"
          }
        ],
        "optional":true,
        "name":"data.cdc.dbname.Value",
        "field":"after"
      },
      {
        "type":"struct",
        "fields":[
          {
            "type":"string",
            "optional":false,
            "field":"version"
          },
          {
            "type":"string",
            "optional":false,
            "field":"connector"
          },
          {
            "type":"string",
            "optional":false,
            "field":"name"
          },
          {
            "type":"int64",
            "optional":false,
            "field":"ts_ms"
          },
          {
            "type":"string",
            "optional":true,
            "name":"io.debezium.data.Enum",
            "version":1,
            "parameters":{
              "allowed":"true,last,false"
            },
            "default":"false",
            "field":"snapshot"
          },
          {
            "type":"string",
            "optional":false,
            "field":"db"
          },
          {
            "type":"string",
            "optional":false,
            "field":"schema"
          },
          {
            "type":"string",
            "optional":false,
            "field":"table"
          },
          {
            "type":"int64",
            "optional":true,
            "field":"txId"
          },
          {
            "type":"int64",
            "optional":true,
            "field":"lsn"
          },
          {
            "type":"int64",
            "optional":true,
            "field":"xmin"
          }
        ],
        "optional":false,
        "name":"io.debezium.connector.postgresql.Source",
        "field":"source"
      },
      {
        "type":"string",
        "optional":false,
        "field":"op"
      },
      {
        "type":"int64",
        "optional":true,
        "field":"ts_ms"
      },
      {
        "type":"struct",
        "fields":[
          {
            "type":"string",
            "optional":false,
            "field":"id"
          },
          {
            "type":"int64",
            "optional":false,
            "field":"total_order"
          },
          {
            "type":"int64",
            "optional":false,
            "field":"data_collection_order"
          }
        ],
        "optional":true,
        "field":"transaction"
      }
    ],
    "optional":false,
    "name":"data.cdc.dbname.Envelope"
  },
  "payload":{
    "before":{
      "id":1005,
      "first_name":"foo",
      "last_name":"bar",
      "email":"foo@bar.com"
    },
    "after":{
      "id":1005,
      "first_name":"egg",
      "last_name":"bar",
      "email":"foo@bar.com"
    },
    "source":{
      "version":"1.2.3.Final",
      "connector":"postgresql",
      "name":"dbserver1",
      "ts_ms":1600375609365,
      "snapshot":"false",
      "db":"postgres",
      "schema":"public",
      "table":"customers",
      "txId":603,
      "lsn":34089688,
      "xmin":null
    },
    "op":"u",
    "ts_ms":1600375609778,
    "transaction":null
  }
}

Në të dyja rastet, të dhënat përbëhen nga çelësi (PK) i rekordit që u ndryshua, dhe nga vetë thelbi i ndryshimeve: çfarë ishte rekordi më parë dhe çfarë u bë më pas.

  • NĂ« rastin e INSERT: vlera para (before) barazohet nulle ndjekur nga vargu qĂ« u fut.
  • NĂ« rastin e UPDATE: nĂ« payload.before shfaqet gjendja e mĂ«parshme e rreshtit dhe nĂ« payload.after - e re me thelbin e ndryshimit.

2.2 MongoDB

Ky lidhës përdor mekanizmin standard të replikimit MongoDB, duke lexuar informacion nga oplog i nyjës primare DBMS.

Ngjashëm me lidhësin e përshkruar tashmë për PgSQL, edhe këtu, në fillimin e parë, merret fotografia primare e të dhënave, pas së cilës lidhësi kalon në modalitetin e leximit oplog.

Shembull i konfigurimit:

{
  "name": "mp-k8s-mongo-connector",
  "config": {
        "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
        "tasks.max": "1",
        "mongodb.hosts": "MainRepSet/mongo:27017",
        "mongodb.name": "mongo",
        "mongodb.user": "debezium",
        "mongodb.password": "dbname",
        "database.whitelist": "db_1,db_2",
        "transforms": "AddPrefix",
        "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
        "transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
        }
  }

Siç mund ta shihni, nuk ka opsione të reja në krahasim me shembullin e mëparshëm, por vetëm numri i opsioneve përgjegjëse për lidhjen me bazën e të dhënave dhe prefikset e tyre është zvogëluar.

CilĂ«simet transforms kĂ«tĂ« herĂ« ata bĂ«jnĂ« si mĂ« poshtĂ«: kthejnĂ« emrin e temĂ«s sĂ« synuar nga skema <server_name>.<db_name>.<collection_name> ĐČ data.cdc.mongo_<db_name>.

toleranca ndaj gabimeve

Çështja e tolerancĂ«s sĂ« gabimeve dhe disponueshmĂ«risĂ« sĂ« lartĂ« nĂ« kohĂ«n tonĂ« Ă«shtĂ« mĂ« e mprehtĂ« se kurrĂ« - veçanĂ«risht kur flasim pĂ«r tĂ« dhĂ«na dhe transaksione, dhe gjurmimi i ndryshimit tĂ« tĂ« dhĂ«nave nuk Ă«shtĂ« anash nĂ« kĂ«tĂ« çështje. Le tĂ« shohim se çfarĂ« mund tĂ« shkojĂ« keq nĂ« parim dhe çfarĂ« do tĂ« ndodhĂ« me Debezium nĂ« secilin rast.

Ekzistojnë tre opsione të tërheqjes:

  1. Dështimi i Kafka Connect. Nëse Connect është konfiguruar për të punuar në modalitetin e shpërndarë, kjo kërkon që shumë punëtorë të vendosin të njëjtin group.id. Pastaj, nëse njëri prej tyre dështon, lidhësi do të rindizet në punëtorin tjetër dhe do të vazhdojë të lexojë nga pozicioni i fundit i angazhuar në temën në Kafka.
  2. Humbja e lidhjes me grupin Kafka. Lidhësi thjesht do të ndalojë së lexuari në pozicionin që nuk arriti t'i dërgojë Kafkës dhe do të përpiqet periodikisht ta ridërgojë atë derisa përpjekja të ketë sukses.
  3. Burimi i të dhënave është i padisponueshëm. Lidhësi do të përpiqet të rilidhet me burimin sipas konfigurimit. Parazgjedhja është 16 përpjekje duke përdorur zmbrapsja eksponenciale. Pas përpjekjes së 16-të të dështuar, detyra do të shënohet si dështuar dhe do të duhet të riniset manualisht nëpërmjet ndërfaqes Kafka Connect REST.
    • NĂ« rastin e PostgreSQL tĂ« dhĂ«nat nuk do tĂ« humbasin, sepse pĂ«rdorimi i slloteve tĂ« riprodhimit do tĂ« parandalojĂ« fshirjen e skedarĂ«ve WAL qĂ« nuk lexohen nga lidhĂ«si. NĂ« kĂ«tĂ« rast, ka njĂ« dobĂ«si: nĂ«se lidhja e rrjetit midis lidhĂ«sit dhe DBMS-sĂ« ndĂ«rpritet pĂ«r njĂ« kohĂ« tĂ« gjatĂ«, ekziston mundĂ«sia qĂ« hapĂ«sira nĂ« disk tĂ« mbarojĂ« dhe kjo mund tĂ« çojĂ« nĂ« dĂ«shtimin e tĂ« gjithĂ« DBMS-sĂ«.
    • NĂ« rastin e MySQL skedarĂ«t binlog mund tĂ« rrotullohen nga vetĂ« DBMS pĂ«rpara se tĂ« rivendoset lidhja. Kjo do tĂ« bĂ«jĂ« qĂ« lidhĂ«si tĂ« shkojĂ« nĂ« gjendjen e dĂ«shtuar dhe do tĂ« duhet tĂ« riniset nĂ« modalitetin fillestar tĂ« fotografisĂ« pĂ«r tĂ« vazhduar leximin nga bilogĂ«t pĂ«r tĂ« rivendosur funksionimin normal.
    • nĂ« MongoDB. Dokumentacioni thotĂ«: sjellja e lidhĂ«sit nĂ« rast se skedarĂ«t log/oplog janĂ« fshirĂ« dhe lidhĂ«si nuk mund tĂ« vazhdojĂ« tĂ« lexojĂ« nga pozicioni ku e ka lĂ«nĂ« Ă«shtĂ« e njĂ«jtĂ« pĂ«r tĂ« gjitha DBMS. Ai qĂ«ndron nĂ« faktin se lidhĂ«si do tĂ« shkojĂ« nĂ« gjendje dĂ«shtuar dhe do tĂ« kĂ«rkojĂ« njĂ« rinisje nĂ« modalitet fotografi fillestare.

      Megjithatë, ka përjashtime. Nëse lidhësi ishte në një gjendje të shkëputur për një kohë të gjatë (ose nuk mund të arrinte shembullin MongoDB), dhe oplog u rrotullua gjatë kësaj kohe, atëherë kur lidhja të rivendoset, lidhësi do të vazhdojë me qetësi të lexojë të dhënat nga pozicioni i parë i disponueshëm , prandaj disa nga të dhënat në Kafka jo do të godasë.

Përfundim

Debezium është përvoja ime e parë me sistemet CDC dhe ka qenë shumë pozitive në përgjithësi. Projekti korruptoi mbështetjen e DBMS kryesore, lehtësinë e konfigurimit, mbështetjen për grupimin dhe një komunitet aktiv. Për ata që janë të interesuar në praktikë, ju rekomandoj të lexoni udhëzuesit për Kafka Connect О Debezium.

Krahasuar me lidhësin JDBC për Kafka Connect, përparësia kryesore e Debezium është se ndryshimet lexohen nga regjistrat e DBMS, gjë që lejon marrjen e të dhënave me vonesë minimale. Lidhësi JDBC (i ofruar nga Kafka Connect) kërkon tabelën e gjurmuar në një interval të caktuar dhe (për të njëjtën arsye) nuk gjeneron mesazhe kur të dhënat fshihen (si mund të kërkoni për të dhëna që nuk janë aty?).

Për të zgjidhur probleme të ngjashme, mund t'i kushtoni vëmendje zgjidhjeve të mëposhtme (përveç Debezium):

PS

Lexoni edhe në blogun tonë:

Burimi: www.habr.com