Esittelyssä Debezium - CDC Apache Kafkalle

Esittelyssä Debezium - CDC Apache Kafkalle

Työssäni törmään usein uusiin teknisiin ratkaisuihin/ohjelmistotuotteisiin, joista venäjänkielisessä Internetissä on varsin niukasti tietoa. Tällä artikkelilla yritän täyttää yhden tällaisen aukon esimerkillä viimeaikaisesta käytännöstäni, kun minun piti määrittää CDC-tapahtumien lähettäminen kahdesta suositusta DBMS:stä (PostgreSQL ja MongoDB) Kafka-klusteriin Debeziumin avulla. Toivon, että tämä katsausartikkeli, joka ilmestyy tehdyn työn tuloksena, on hyödyllinen muille.

Mikä on Debezium ja CDC yleensä?

Debezium — edustaa CDC-ohjelmistoluokkaa (Tallenna tietojen muutos), tai tarkemmin sanottuna, se on joukko liittimiä erilaisille DBMS-järjestelmille, jotka ovat yhteensopivia Apache Kafka Connect -kehyksen kanssa.

Se Avoimen lähdekoodin projekti, lisensoitu Apache License v2.0 -lisenssillä ja sponsoroi Red Hat. Kehitys on ollut käynnissä vuodesta 2016 ja tällä hetkellä se tarjoaa virallista tukea seuraaville tietokantajärjestelmille: MySQL, PostgreSQL, MongoDB, SQL Server. Myös Cassandralle ja Oraclelle on olemassa liittimet, mutta ne ovat tällä hetkellä "ennakkokäyttötilassa", eivätkä uudet julkaisut takaa yhteensopivuutta taaksepäin.

Jos vertaamme CDC:tä perinteiseen lähestymistapaan (kun sovellus lukee dataa suoraan DBMS:stä), sen tärkeimpiä etuja ovat tiedonmuutossuoratoiston toteuttaminen rivitasolla alhaisella latenssilla, korkealla luotettavuudella ja käytettävyydellä. Kaksi viimeistä kohtaa saavutetaan käyttämällä Kafka-klusteria CDC-tapahtumien arkistona.

Toinen etu on se, että tapahtumien tallentamiseen käytetään yhtä mallia, joten loppusovelluksen ei tarvitse huolehtia erilaisten DBMS-järjestelmien käytön vivahteista.

Lopuksi sanomavälittäjän käyttö mahdollistaa tietojen muutoksia valvovien sovellusten skaalauksen vaakasuunnassa. Samalla vaikutus tietolähteeseen on minimoitu, koska dataa ei saada suoraan DBMS:stä, vaan Kafka-klusterista.

Tietoja Debezium-arkkitehtuurista

Debeziumin käyttö perustuu tähän yksinkertaiseen järjestelmään:

DBMS (tietolähteenä) → Kafka Connectin liitin → Apache Kafka → kuluttaja

Tässä on esimerkkinä kaavio projektin verkkosivustolta:

Esittelyssä Debezium - CDC Apache Kafkalle

En kuitenkaan pidä tästä mallista, koska näyttää siltä, ​​​​että vain pesuallasliittimen käyttö on mahdollista.

Todellisuudessa tilanne on erilainen: Data Laken täyttäminen (viimeinen linkki yllä olevassa kaaviossa) Tämä ei ole ainoa tapa käyttää Debeziumia. Sovelluksesi voivat käyttää Apache Kafkalle lähetettyjä tapahtumia erilaisiin tilanteisiin. Esimerkiksi:

  • epäolennaisten tietojen poistaminen välimuistista;
  • ilmoitusten lähettäminen;
  • hakuindeksipäivitykset;
  • jonkinlaiset tarkastuslokit;
  • ...

Jos sinulla on Java-sovellus, eikä sinulla ole tarvetta/mahdollisuutta käyttää Kafka-klusteria, on myös mahdollisuus työskennellä upotettu liitin. Ilmeinen etu on, että se eliminoi lisäinfrastruktuurin tarpeen (liittimen ja Kafkan muodossa). Tämä ratkaisu on kuitenkin vanhentunut versiosta 1.1 lähtien, eikä sitä enää suositella käytettäväksi (sen tuki saatetaan poistaa tulevissa julkaisuissa).

Tässä artikkelissa käsitellään kehittäjien suosittelemaa arkkitehtuuria, joka tarjoaa vikasietoisuuden ja skaalautuvuuden.

Liittimen kokoonpano

Jotta voimme alkaa seurata muutoksia tärkeimmässä arvossa - tiedoissa - tarvitsemme:

  1. tietolähde, joka voi olla MySQL versiosta 5.7 alkaen, PostgreSQL 9.6+, MongoDB 3.2+ (täydellinen luettelo);
  2. Apache Kafka -klusteri;
  3. Kafka Connect -esiintymä (versiot 1.x, 2.x);
  4. määritetty Debezium-liitin.

Työskentele kahdessa ensimmäisessä kohdassa, ts. DBMS:n ja Apache Kafkan asennusprosessi ei kuulu artikkelin soveltamisalaan. Kuitenkin niille, jotka haluavat ottaa kaiken käyttöön hiekkalaatikossa, virallisessa arkistossa esimerkkien kanssa on valmis docker-compose.yaml.

Pysähdymme tarkemmin kahteen viimeiseen kohtaan.

0. Kafka Connect

Tässä ja myöhemmin artikkelissa kaikkia kokoonpanoesimerkkejä käsitellään Debeziumin kehittäjien jakaman Docker-kuvan yhteydessä. Se sisältää kaikki tarvittavat laajennustiedostot (liittimet) ja tarjoaa Kafka Connectin konfiguroinnin ympäristömuuttujien avulla.

Jos aiot käyttää Kafka Connectia Confluentista, sinun on erikseen lisättävä tarvittavien liittimien laajennukset kohdassa määritettyyn hakemistoon. plugin.path tai asettaa ympäristömuuttujan kautta CLASSPATH. Kafka Connect -työntekijän ja liittimien asetukset määritetään määritystiedostojen avulla, jotka välitetään argumenteina työntekijän käynnistyskomennolle. Katso lisätietoja dokumentointi.

Koko Debeizumin asennusprosessi liitinversiossa suoritetaan kahdessa vaiheessa. Katsotaanpa jokaista niistä:

1. Kafka Connect -kehyksen määrittäminen

Tietojen suoratoistoa varten Apache Kafka -klusteriin Kafka Connect -kehyksessä asetetaan tietyt parametrit, kuten:

  • klusteriin yhdistämisen parametrit,
  • aiheiden nimet, joihin itse liittimen asetukset tallennetaan suoraan,
  • sen ryhmän nimi, jossa liitin toimii (jos hajautettua tilaa käytetään).

Projektin virallinen Docker-kuva tukee konfigurointia ympäristömuuttujien avulla - tätä aiomme käyttää. Lataa siis kuva:

docker pull debezium/connect

Liittimen suorittamiseen vaadittava ympäristömuuttujien vähimmäisjoukko on seuraava:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 — alkuperäinen luettelo Kafka-klusteripalvelimista, jotta saadaan täydellinen luettelo klusterin jäsenistä;
  • OFFSET_STORAGE_TOPIC=connector-offsets — aihe paikkojen tallentamiseksi, missä liitin tällä hetkellä sijaitsee;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status — aihe liittimen tilan ja sen tehtävien tallentamiseksi;
  • CONFIG_STORAGE_TOPIC=connector-config — aihe liittimen konfigurointitietojen ja sen tehtävien tallentamisesta;
  • GROUP_ID=1 — sen työntekijäryhmän tunniste, jolle liitintehtävä voidaan suorittaa; tarvitaan hajautettua käyttöä käytettäessä (hajautettu) järjestelmä.

Käynnistämme kontin näillä muuttujilla:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Huomautus Avrosta

Oletuksena Debezium kirjoittaa tiedot JSON-muodossa, mikä on hyväksyttävää hiekkalaatikoille ja pienille tietomäärille, mutta voi muodostua ongelmaksi paljon kuormitetuissa tietokannoissa. Vaihtoehto JSON-muuntimelle on viestien sarjoittaminen käyttämällä Avro binäärimuotoon, mikä vähentää I/O-alijärjestelmän kuormitusta Apache Kafkassa.

Jotta voit käyttää Avroa, sinun on otettava käyttöön erillinen schema-rekisteri (kaavioiden tallentamiseen). Muuntimen muuttujat näyttävät tältä:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Yksityiskohdat Avron käytöstä ja sen rekisterin määrittämisestä eivät kuulu tämän artikkelin piiriin – selvyyden vuoksi käytämme jatkossa JSON-soittimia.

2. Itse liittimen konfigurointi

Nyt voit siirtyä suoraan itse liittimen kokoonpanoon, joka lukee tiedot lähteestä.

Katsotaanpa esimerkkiä kahden DBMS:n liittimistä: PostgreSQL ja MongoDB, joista minulla on kokemusta ja joissa on eroja (tosin pieniä, mutta joissain tapauksissa merkittäviä!).

Kokoonpano kuvataan JSON-merkinnällä ja ladataan Kafka Connectiin POST-pyynnön avulla.

2.1. PostgreSQL

Esimerkki liittimen kokoonpanosta PostgreSQL:lle:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Liittimen toimintaperiaate tämän asennuksen jälkeen on melko yksinkertainen:

  • Kun se käynnistetään ensimmäisen kerran, se muodostaa yhteyden kokoonpanossa määritettyyn tietokantaan ja käynnistyy tilassa ensimmäinen tilannekuva, lähettämällä Kafkalle ehtolausekkeella saadun alkutietojoukon SELECT * FROM table_name.
  • Kun alustus on valmis, liitin siirtyy tilaan lukemaan muutoksia PostgreSQL WAL -tiedostoista.

Tietoja käytetyistä vaihtoehdoista:

  • name — sen liittimen nimi, johon jäljempänä kuvattua kokoonpanoa käytetään; tulevaisuudessa tätä nimeä käytetään liittimen kanssa toimimiseen (eli tilan tarkastelemiseen/uudelleenkäynnistykseen/päivityskokoonpanoon) Kafka Connect REST API:n kautta;
  • connector.class — DBMS-liitinluokka, jota määritetty liitin käyttää;
  • plugin.name — WAL-tiedostojen tietojen loogisen purkamisen laajennuksen nimi. Valittavissa wal2json, decoderbuffs и pgoutput. Kaksi ensimmäistä edellyttävät asianmukaisten laajennusten asentamista DBMS:ään ja pgoutput PostgreSQL-versio 10 tai uudempi ei vaadi lisäkäsittelyjä;
  • database.* — tietokantaan yhdistämisvaihtoehdot, missä database.server.name — PostgreSQL-ilmentymän nimi, jota käytetään aiheen nimen muodostamiseen Kafka-klusterissa;
  • table.include.list — luettelo taulukoista, joissa haluamme seurata muutoksia; määritetyssä muodossa schema.table_name; ei voida käyttää yhdessä table.exclude.list;
  • heartbeat.interval.ms — aikaväli (millisekunteina), jolla liitin lähettää sykeviestit erityisaiheeseen;
  • heartbeat.action.query — pyyntö, joka suoritetaan lähetettäessä jokaista sykeviestiä (vaihtoehto esiintyi versiossa 1.1);
  • slot.name — sen replikointipaikan nimi, jota liitin käyttää;
  • publication.name - Nimi Julkaisu PostgreSQL:ssä, jota liitin käyttää. Jos sitä ei ole olemassa, Debezium yrittää luoda sen. Jos käyttäjällä, jolla yhteys muodostetaan, ei ole tarpeeksi oikeuksia tähän toimintoon, liitin päättyy virheen vuoksi;
  • transforms määrittää tarkalleen, kuinka kohdeaiheen nimi vaihdetaan:
    • transforms.AddPrefix.type osoittaa, että käytämme säännöllisiä lausekkeita;
    • transforms.AddPrefix.regex — maski, joka määrittää uudelleen kohdeaiheen nimen;
    • transforms.AddPrefix.replacement - suoraan mitä määrittelemme uudelleen.

Lisää sydämenlyönnistä ja muutoksista

Oletusarvoisesti liitin lähettää tiedot Kafkalle jokaisesta sitoutuneesta tapahtumasta, ja sen LSN (lokijärjestysnumero) tallennetaan palveluaiheeseen offset. Mutta mitä tapahtuu, jos liitin ei ole määritetty lukemaan koko tietokantaa, vaan vain osaa sen taulukoista (joissa datapäivityksiä ei tapahdu usein)?

  • Liitin lukee WAL-tiedostoja eikä havaitse mitään tapahtumasitoumuksia valvomiinsa taulukoihin.
  • Siksi se ei päivitä nykyistä sijaintiaan aiheessa tai replikointipaikassa.
  • Tämä puolestaan ​​johtaa siihen, että WAL-tiedostoja pidetään levyllä ja levytila ​​todennäköisesti loppuu.

Ja tässä vaihtoehdot tulevat apuun. heartbeat.interval.ms и heartbeat.action.query. Näiden vaihtoehtojen käyttäminen pareittain mahdollistaa tietojen muuttamispyynnön suorittamisen erillisessä taulukossa aina, kun sykeviesti lähetetään. Näin ollen LSN, jossa liitin tällä hetkellä sijaitsee (replikointipaikassa), päivitetään jatkuvasti. Näin DBMS voi poistaa WAL-tiedostoja, joita ei enää tarvita. Voit oppia lisää siitä, miten vaihtoehdot toimivat dokumentointi.

Toinen vaihtoehto, johon kannattaa kiinnittää huomiota transforms. Vaikka kyse on enemmän mukavuudesta ja kauneudesta...

Oletuksena Debezium luo aiheita käyttämällä seuraavaa nimeämiskäytäntöä: serverName.schemaName.tableName. Tämä ei välttämättä ole aina kätevää. Vaihtoehdot transforms Säännöllisten lausekkeiden avulla voit määrittää luettelon taulukoista, joista tapahtumat täytyy reitittää tietynnimiseen aiheeseen.

Kokoonpanossamme kiitos transforms tapahtuu seuraavaa: kaikki valvotun tietokannan CDC-tapahtumat siirtyvät aiheeseen, jolla on nimi data.cdc.dbname. Muuten (ilman näitä asetuksia) Debezium luo oletuksena aiheen jokaiselle taulukolle, kuten: pg-dev.public.<table_name>.

Liittimen rajoitukset

PostgreSQL:n liitinkokoonpanon kuvauksen päätteeksi kannattaa puhua seuraavista sen toiminnan ominaisuuksista/rajoituksista:

  1. PostgreSQL:n liittimen toiminnallisuus perustuu loogiseen dekoodaukseen. Siksi hän ei seuraa tietokannan rakenteen muutospyyntöjä (DDL) - vastaavasti näitä tietoja ei ole aiheissa.
  2. Koska replikointipaikkoja käytetään, liittimen yhdistäminen on mahdollista vain johtavaan DBMS-instanssiin.
  3. Jos käyttäjällä, jolla liitin muodostaa yhteyden tietokantaan, on vain luku -oikeudet, sinun on ennen ensimmäistä käynnistystä luotava manuaalisesti replikointipaikka ja julkaistava tietokantaan.

Määrityksen käyttöönotto

Joten ladataan kokoonpanomme liittimeen:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Tarkistamme, että lataus onnistui ja liitin käynnistyi:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Hienoa: se on asennettu ja valmis käyttöön. Oletetaan nyt kuluttajana ja yhdistetään Kafkaan, jonka jälkeen lisäämme ja muutamme merkinnän taulukkoon:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Aiheessamme se näytetään seuraavasti:

Erittäin pitkä JSON muutostemme kanssa

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Molemmissa tapauksissa tietueet koostuvat muutetun tietueen avaimesta (PK) ja muutosten olemuksesta: mitä tietue oli ennen ja mitä siitä tuli sen jälkeen.

  • Jos kyseessä on INSERT: arvo ennen (before) on yhtä suuri null, ja sen jälkeen - rivi, joka lisättiin.
  • Jos kyseessä on UPDATE: klo payload.before rivin edellinen tila näytetään, ja sisään payload.after — uutta ja muutosten ydin.

2.2 MongoDB

Tämä liitin käyttää tavallista MongoDB-replikointimekanismia, joka lukee tiedot ensisijaisen DBMS-solmun oplogista.

Kuten jo kuvatussa PgSQL-liittimessä, myös tässä ensimmäisessä käynnistyksessä otetaan ensisijainen datavedos, jonka jälkeen liitin siirtyy oplog-lukutilaan.

Esimerkki kokoonpanosta:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Kuten näette, tässä ei ole uusia vaihtoehtoja edelliseen esimerkkiin verrattuna, vaan vain tietokantaan yhteyden muodostamisesta vastaavien vaihtoehtojen ja niiden etuliitteiden määrää on vähennetty.

Asetukset transforms tällä kertaa he tekevät seuraavaa: he muuttavat kohdeaiheen nimen skeemasta <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

vikasietoisuus

Ongelma vikasietoisuudesta ja korkeasta käytettävyydestä meidän aikanamme on akuutimpi kuin koskaan - varsinkin kun puhutaan tiedoista ja tapahtumista, eikä datamuutosten seuranta jää tässä asiassa sivuun. Katsotaanpa, mikä voi mennä pieleen periaatteessa ja mitä tapahtuu Debeziumille kussakin tapauksessa.

On kolme kieltäytymisvaihtoehtoa:

  1. Kafka Connect -virhe. Jos Connect on määritetty toimimaan hajautetussa tilassa, useiden työntekijöiden on asetettava sama group.id. Sitten, jos jokin niistä epäonnistuu, liitin käynnistetään uudelleen toisella työntekijällä ja jatka lukemista Kafkan aiheen viimeisestä sitoutuneesta paikasta.
  2. Yhteyden katkeaminen Kafka-klusterin kanssa. Liitin yksinkertaisesti lopettaa lukemisen kohdassa, jota ei voitu lähettää Kafkalle, ja yrittää ajoittain lähettää sen uudelleen, kunnes yritys onnistuu.
  3. Tietolähde ei ole käytettävissä. Liitin yrittää muodostaa yhteyden lähteeseen uudelleen määritetyllä tavalla. Oletusarvo on 16 käyttöyritystä eksponentiaalinen perääntyminen. 16. epäonnistuneen yrityksen jälkeen tehtävä merkitään nimellä epäonnistui ja sinun on käynnistettävä se manuaalisesti uudelleen Kafka Connect REST -liitännän kautta.
    • Jos kyseessä on PostgreSQL tiedot eivät häviä, koska Replikointipaikkojen käyttäminen estää sinua poistamasta WAL-tiedostoja, joita liitin ei lue. Tässä tapauksessa kolikolla on myös haittapuoli: jos liittimen ja DBMS:n välinen verkkoyhteys katkeaa pitkään, on mahdollista, että levytila ​​loppuu, mikä voi johtaa koko DBMS.
    • Jos kyseessä on MySQL DBMS voi itse kääntää binlog-tiedostoja ennen yhteyden palauttamista. Tämä aiheuttaa sen, että liitin menee epäonnistuneeseen tilaan, ja normaalin toiminnan palauttamiseksi sinun on käynnistettävä uudelleen alkuperäisessä tilannekuvatilassa jatkaaksesi lukemista binlogeista.
    • Про MongoDB. Dokumentaatiossa todetaan: liittimen käyttäytyminen siinä tapauksessa, että loki-/oplog-tiedostot on poistettu eikä liitin voi jatkaa lukemista kohdasta, johon se jäi, on sama kaikissa DBMS-järjestelmissä. Se tarkoittaa, että liitin menee tilaan epäonnistui ja vaatii uudelleenkäynnistyksen tilassa ensimmäinen tilannekuva.

      Poikkeuksia kuitenkin löytyy. Jos liitin oli katkaistu pitkään (tai ei päässyt MongoDB-esiintymään) ja oplog kävi kierron aikana tänä aikana, niin kun yhteys palautetaan, liitin jatkaa rauhallisesti tietojen lukemista ensimmäisestä saatavilla olevasta paikasta, siksi osa Kafkan tiedoista ei tulee osumaan.

Johtopäätös

Debezium on ensimmäinen kokemukseni CDC-järjestelmistä ja kaiken kaikkiaan erittäin myönteinen. Projekti voitti tuella suurille tietokantajärjestelmille, konfiguroinnin helppoudella, klusterointituella ja aktiivisella yhteisöllä. Käytännöstä kiinnostuneille suosittelen oppaan lukemista Kafka Connect и Debezium.

Kafka Connectin JDBC-liittimeen verrattuna Debeziumin tärkein etu on, että muutokset luetaan DBMS-lokeista, mikä mahdollistaa tietojen vastaanottamisen mahdollisimman pienellä viiveellä. JDBC-liitin (Kafka Connectilta) kysyy valvotusta taulukosta tietyin väliajoin eikä (samasta syystä) luo viestejä, kun tietoja poistetaan (miten voit kysyä tietoja, joita ei ole olemassa?).

Samankaltaisten ongelmien ratkaisemiseksi voit kiinnittää huomiota seuraaviin ratkaisuihin (Debeziumin lisäksi):

PS.

Lue myös blogistamme:

Lähde: will.com

Lisää kommentti