Debezium təqdimatı - Apache Kafka üçün CDC

Debezium təqdimatı - Apache Kafka üçün CDC

İşimdə tez-tez rusdilli İnternetdə məlumat olduqca az olan yeni texniki həllər / proqram məhsulları ilə rastlaşıram. Bu məqalə ilə mən iki məşhur DBMS-dən (PostgreSQL və MongoDB) Debezium-dan istifadə edərək Kafka klasterinə CDC hadisələrinin göndərilməsini qurmaq lazım gələndə son təcrübəmdən bir nümunə ilə belə bir boşluğu doldurmağa çalışacağam. Ümid edirəm ki, görülən işlərin nəticəsi olaraq ortaya çıxan bu icmal məqaləsi başqaları üçün faydalı olacaqdır.

Debezium və ümumiyyətlə CDC nədir?

Debezium - CDC proqram təminatı kateqoriyasının nümayəndəsi (Məlumat dəyişikliyini çəkin) və ya daha dəqiq desək, Apache Kafka Connect çərçivəsi ilə uyğun gələn müxtəlif DBMS-lər üçün birləşdiricilər dəstidir.

O açıq mənbə layihəsi, Apache License v2.0 altında lisenziyalı və Red Hat tərəfindən sponsorluq edilmişdir. İnkişaf 2016-cı ildən davam edir və hazırda aşağıdakı DBMS-lərə rəsmi dəstək verir: MySQL, PostgreSQL, MongoDB, SQL Server. Cassandra və Oracle üçün birləşdiricilər də var, lakin hazırda onlar "erkən giriş" statusundadırlar və yeni buraxılışlar geriyə uyğun uyğunluğa zəmanət vermir.

CDC-ni ənənəvi yanaşma ilə (tətbiq DBMS-dən məlumatları birbaşa oxuduqda) müqayisə etsək, onun əsas üstünlükləri sıra səviyyəsində aşağı gecikmə, yüksək etibarlılıq və əlçatanlıq ilə məlumatların dəyişdirilməsi axınının həyata keçirilməsini əhatə edir. Son iki nöqtəyə CDC hadisələri üçün depo kimi Kafka klasterindən istifadə etməklə nail olunur.

Həmçinin, üstünlüklərə hadisələrin saxlanması üçün tək modelin istifadə edilməsi daxildir, beləliklə, son proqram müxtəlif DBMS-lərin işlədilməsinin nüanslarından narahat olmur.

Nəhayət, mesaj brokerindən istifadə məlumatlarda dəyişiklikləri izləyən proqramların üfüqi miqyası üçün geniş imkanlar açır. Eyni zamanda, məlumatların birbaşa DBMS-dən deyil, Kafka klasterindən qəbul edildiyi üçün məlumat mənbəyinə təsir minimuma endirilir.

Debezium memarlığı haqqında

Debezium-dan istifadə bu sadə sxemə gəlir:

DBMS (məlumat mənbəyi kimi) → Kafka Connect-də birləşdirici → Apache Kafka → istehlakçı

Bir illüstrasiya olaraq layihənin saytından bir diaqram verəcəyəm:

Debezium təqdimatı - Apache Kafka üçün CDC

Ancaq bu sxemi həqiqətən sevmirəm, çünki görünür ki, yalnız bir lavabo birləşdiricisi mümkündür.

Reallıqda vəziyyət fərqlidir: Data Lake doldurmaq (yuxarıdakı diaqramdakı son keçid) Debezium istifadə etməyin yeganə yolu deyil. Apache Kafka-ya göndərilən hadisələr müxtəlif vəziyyətləri həll etmək üçün tətbiqləriniz tərəfindən istifadə edilə bilər. Misal üçün:

  • önbelleğe aid olmayan məlumatların çıxarılması;
  • bildirişlərin göndərilməsi;
  • axtarış indeksi yeniləmələri;
  • bir növ audit jurnalları;
  • ...

Əgər sizin Java proqramınız varsa və Kafka klasterindən istifadə etməyə ehtiyac/mümkün olmadıqda, üzərində işləmək imkanı da var. quraşdırılmış bağlayıcı. Aşkar artı odur ki, onunla əlavə infrastrukturdan (konnektor və Kafka şəklində) imtina edə bilərsiniz. Bununla belə, bu həll 1.1 versiyasından bəri köhnəlmişdir və artıq istifadə üçün tövsiyə edilmir (gələcək buraxılışlarda silinə bilər).

Bu məqalədə səhvlərə dözümlülük və miqyaslılığı təmin edən tərtibatçılar tərəfindən tövsiyə edilən arxitektura müzakirə olunacaq.

Bağlayıcı konfiqurasiya

Ən vacib dəyərdə - verilənlərdə dəyişiklikləri izləməyə başlamaq üçün bizə lazımdır:

  1. 5.7 versiyasından başlayaraq MySQL ola bilən məlumat mənbəyi, PostgreSQL 9.6+, MongoDB 3.2+ (tam siyahı);
  2. Apache Kafka çoxluğu
  3. Kafka Connect nümunəsi (versiya 1.x, 2.x);
  4. konfiqurasiya edilmiş Debezium konnektoru.

İlk iki nöqtə üzərində işləyin, yəni. DBMS və Apache Kafka quraşdırma prosesi məqalənin əhatə dairəsindən kənardadır. Bununla birlikdə, hər şeyi bir sandboxda yerləşdirmək istəyənlər üçün rəsmi depoda nümunələri olan hazır bir şey var. docker-compose.yaml.

Son iki məqama daha ətraflı diqqət yetirəcəyik.

0. Kafka Connect

Burada və daha sonra məqalədə bütün konfiqurasiya nümunələri Debezium tərtibatçıları tərəfindən paylanmış Docker təsviri kontekstində nəzərdən keçirilir. O, bütün lazımi plagin fayllarını (bağlayıcıları) ehtiva edir və ətraf mühit dəyişənlərindən istifadə edərək Kafka Connect konfiqurasiyasını təmin edir.

Confluent-dən Kafka Connect-dən istifadə etmək niyyətindəsinizsə, lazımi konnektorların plaginlərini özünüz burada göstərilən qovluğa əlavə etməlisiniz. plugin.path və ya mühit dəyişəni vasitəsilə təyin oluna bilər CLASSPATH. Kafka Connect işçisi və birləşdiriciləri üçün parametrlər işçi işə salma əmrinə arqumentlər kimi ötürülən konfiqurasiya faylları vasitəsilə müəyyən edilir. Ətraflı məlumat üçün baxın sənədləşdirmə.

Bağlayıcı versiyada Debeizumun qurulmasının bütün prosesi iki mərhələdə həyata keçirilir. Onların hər birini nəzərdən keçirək:

1. Kafka Connect çərçivəsinin qurulması

Məlumatı Apache Kafka klasterinə ötürmək üçün Kafka Connect çərçivəsində xüsusi parametrlər təyin edilir, məsələn:

  • klaster bağlantısı parametrləri,
  • konnektorun konfiqurasiyasının özünün saxlanacağı mövzuların adları,
  • konnektorun işlədiyi qrupun adı (paylanmış rejimdən istifadə edildikdə).

Layihənin rəsmi Docker təsviri mühit dəyişənlərindən istifadə edərək konfiqurasiyanı dəstəkləyir - istifadə edəcəyimiz budur. Beləliklə, şəkli yükləyək:

docker pull debezium/connect

Konnektoru işə salmaq üçün tələb olunan minimum mühit dəyişənləri dəsti aşağıdakı kimidir:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - klaster üzvlərinin tam siyahısını əldə etmək üçün Kafka klaster serverlərinin ilkin siyahısı;
  • OFFSET_STORAGE_TOPIC=connector-offsets — birləşdiricinin hazırda yerləşdiyi mövqelərin saxlanması üçün mövzu;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - birləşdiricinin vəziyyətini və onun vəzifələrini saxlamaq üçün mövzu;
  • CONFIG_STORAGE_TOPIC=connector-config - birləşdirici konfiqurasiya məlumatlarının və onun vəzifələrinin saxlanması üçün bir mövzu;
  • GROUP_ID=1 — birləşdirici tapşırığın yerinə yetirilə biləcəyi işçilər qrupunun identifikatoru; paylanmış istifadə zamanı tələb olunur (paylanmış) rejim.

Konteyneri bu dəyişənlərlə başlayırıq:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Avro haqqında qeyd

Varsayılan olaraq, Debezium məlumatları JSON formatında yazır, bu, sandboxlar və az miqdarda məlumat üçün məqbuldur, lakin çox yüklənmiş verilənlər bazalarında problem ola bilər. JSON çeviricisinə alternativ istifadə edərək mesajları seriallaşdırmaqdır avro Apache Kafka-da I/O alt sistemindəki yükü azaldan ikili formata.

Avro-dan istifadə etmək üçün ayrıca yerləşdirməlisiniz sxem-reyestr (sxemləri saxlamaq üçün). Dönüştürücü üçün dəyişənlər belə görünəcək:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Avro-dan istifadə və onun üçün reyestrin yaradılması ilə bağlı təfərrüatlar məqalənin əhatə dairəsindən kənardadır - daha sonra aydınlıq üçün JSON-dan istifadə edəcəyik.

2. Bağlayıcının özünün qurulması

İndi mənbədən məlumatları oxuyacaq konnektorun özünün konfiqurasiyasına birbaşa keçə bilərsiniz.

İki DBMS üçün birləşdiricilərin nümunəsinə baxaq: PostgreSQL və MongoDB, mənim təcrübəm var və fərqlər var (kiçik olsa da, lakin bəzi hallarda əhəmiyyətli!).

Konfiqurasiya JSON notasiyasında təsvir edilir və POST sorğusu vasitəsilə Kafka Connect-ə yüklənir.

2.1. PostgreSQL

PostgreSQL üçün birləşdirici konfiqurasiya nümunəsi:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Bu konfiqurasiyadan sonra bağlayıcının işləmə prinsipi olduqca sadədir:

  • İlk başlanğıcda konfiqurasiyada göstərilən verilənlər bazasına qoşulur və rejimdə başlayır ilkin snapshot, şərti ilə alınan ilkin məlumat toplusunu Kafkaya göndərir SELECT * FROM table_name.
  • Başlama tamamlandıqdan sonra bağlayıcı PostgreSQL WAL fayllarından dəyişiklikləri oxumaq rejiminə daxil olur.

İstifadə olunan variantlar haqqında:

  • name — aşağıda təsvir edilən konfiqurasiyanın istifadə olunduğu birləşdiricinin adı; gələcəkdə bu ad Kafka Connect REST API vasitəsilə konnektorla işləmək üçün istifadə olunur (yəni statusa baxmaq / yenidən başladın / konfiqurasiyanı yeniləmək);
  • connector.class — konfiqurasiya edilmiş konnektor tərəfindən istifadə olunacaq DBMS birləşdirici sinfi;
  • plugin.name WAL fayllarından verilənlərin məntiqi dekodlanması üçün plaqinin adıdır. Seçmək üçün mövcuddur wal2json, decoderbuffs и pgoutput. İlk ikisi DBMS-də müvafiq genişləndirmələrin quraşdırılmasını tələb edir və pgoutput PostgreSQL 10 və daha yüksək versiyalar üçün əlavə manipulyasiya tələb etmir;
  • database.* — verilənlər bazasına qoşulma variantları, burada database.server.name - Kafka klasterində mövzunun adını formalaşdırmaq üçün istifadə edilən PostgreSQL instansiyasının adı;
  • table.include.list - dəyişiklikləri izləmək istədiyimiz cədvəllərin siyahısı; formatda verilir schema.table_name; ilə birlikdə istifadə edilə bilməz table.exclude.list;
  • heartbeat.interval.ms — konnektorun xüsusi mövzuya ürək döyüntüsü mesajları göndərdiyi interval (millisaniyələrlə);
  • heartbeat.action.query - hər ürək döyüntüsü mesajı göndərilərkən yerinə yetiriləcək sorğu (seçim 1.1 versiyasından bəri ortaya çıxdı);
  • slot.name — konnektor tərəfindən istifadə olunacaq replikasiya yuvasının adı;
  • publication.name - ad Nəşr bağlayıcının istifadə etdiyi PostgreSQL-də. Əgər mövcud deyilsə, Debezium onu ​​yaratmağa çalışacaq. Bağlantının qurulduğu istifadəçinin bu hərəkət üçün kifayət qədər hüquqları yoxdursa, konnektor xəta ilə çıxacaq;
  • transforms hədəf mövzunun adının dəqiq necə dəyişdiriləcəyini müəyyən edir:
    • transforms.AddPrefix.type müntəzəm ifadələrdən istifadə edəcəyimizi göstərir;
    • transforms.AddPrefix.regex — hədəf mövzunun adının yenidən təyin olunduğu maska;
    • transforms.AddPrefix.replacement - birbaşa nəyi yenidən təyin edirik.

Ürək döyüntüsü və çevrilmələr haqqında daha çox

Varsayılan olaraq, konnektor hər bir əməliyyat üçün məlumatı Kafkaya göndərir və xidmət mövzusuna öz LSN (Log Sequence Number) yazır. offset. Bəs bağlayıcı bütün verilənlər bazasını deyil, cədvəllərinin yalnız bir hissəsini oxumaq üçün konfiqurasiya edilərsə nə baş verir (məlumatlar nadir hallarda yenilənir)

  • Konnektor WAL fayllarını oxuyacaq və monitorinq etdiyi cədvəllərdə onlarda əməliyyat öhdəliyini aşkar etməyəcək.
  • Buna görə də, o, nə mövzuda, nə də təkrarlama yuvasındakı cari mövqeyini yeniləməyəcəkdir.
  • Bu da öz növbəsində WAL fayllarının diskdə "ilişməsinə" səbəb olacaq və çox güman ki, diskdə yer tükənəcək.

Və burada seçimlər köməyə gəlir. heartbeat.interval.ms и heartbeat.action.query. Bu seçimlərin cüt-cüt istifadəsi hər dəfə ürək döyüntüsü mesajı göndərildikdə ayrıca cədvəldə məlumatların dəyişdirilməsi sorğusunu yerinə yetirməyə imkan verir. Beləliklə, konnektorun hazırda yerləşdiyi LSN (replikasiya yuvasında) daim yenilənir. Bu, DBMS-ə artıq lazım olmayan WAL fayllarını silməyə imkan verir. Seçimlərin necə işlədiyi barədə ətraflı məlumat üçün baxın sənədləşdirmə.

Daha çox diqqətə layiq olan başqa bir seçimdir transforms. Baxmayaraq ki, söhbət daha çox rahatlıq və gözəllikdən gedir...

Varsayılan olaraq, Debezium aşağıdakı adlandırma siyasətindən istifadə edərək mövzular yaradır: serverName.schemaName.tableName. Bu həmişə əlverişli olmaya bilər. Seçimlər transforms müntəzəm ifadələrdən istifadə edərək hadisələri müəyyən bir adla mövzuya yönləndirilməli olan cədvəllərin siyahısını müəyyən edə bilərsiniz.

Bizim konfiqurasiyamız sayəsində transforms aşağıdakılar baş verir: izlənilən verilənlər bazasından bütün CDC hadisələri adı ilə mövzuya gedəcək data.cdc.dbname. Əks halda (bu parametrlər olmadan), Debezium defolt olaraq formanın hər bir cədvəli üçün mövzu yaradır: pg-dev.public.<table_name>.

Bağlayıcı məhdudiyyətlər

PostgreSQL üçün birləşdirici konfiqurasiyasının təsvirinin sonunda onun işinin aşağıdakı xüsusiyyətləri / məhdudiyyətləri haqqında danışmağa dəyər:

  1. PostgreSQL üçün birləşdirici funksionallıq məntiqi dekodlaşdırma konsepsiyasına əsaslanır. Ona görə də o verilənlər bazası strukturunu dəyişdirmək üçün sorğuları izləmir (DDL) - müvafiq olaraq, bu məlumatlar mövzularda olmayacaq.
  2. Replikasiya yuvalarından istifadə edildiyi üçün konnektorun qoşulması mümkündür yalnız əsas DBMS nümunəsinə.
  3. Bağlayıcının verilənlər bazasına qoşulduğu istifadəçi yalnız oxumaq hüququna malikdirsə, ilk işə salınmazdan əvvəl əl ilə təkrarlama yuvası yaratmalı və verilənlər bazasında dərc etməlisiniz.

Konfiqurasiyanın tətbiqi

Beləliklə, konfiqurasiyamızı konnektora yükləyək:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Yükləmənin uğurlu olduğunu və bağlayıcının işə salındığını yoxlayırıq:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Əla: qurulub və getməyə hazırdır. İndi gəlin özümüzü istehlakçı kimi göstərək və Kafka ilə əlaqə saxlayaq, bundan sonra cədvələ bir giriş əlavə edib dəyişdiririk:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Mövzumuzda bu aşağıdakı kimi göstəriləcək:

Dəyişikliklərimizlə çox uzun JSON

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Hər iki halda qeydlər dəyişdirilmiş qeydin açarından (PK) və dəyişikliklərin mahiyyətindən ibarətdir: qeyd əvvəl nə idi və sonra nə oldu.

  • Vəziyyətdə INSERT: əvvəl dəyər (before) bərabərdir nullardınca daxil edilmiş sətir.
  • Vəziyyətdə UPDATE: in payload.before sətrin əvvəlki vəziyyəti göstərilir və içərisində payload.after - dəyişikliyin mahiyyəti ilə yeni.

2.2 MongoDB

Bu bağlayıcı DBMS əsas qovşağının oploqundan məlumatları oxuyan standart MongoDB replikasiya mexanizmindən istifadə edir.

PgSQL üçün artıq təsvir edilmiş bağlayıcıya bənzər şəkildə, burada da ilk başlanğıcda ilkin məlumat görüntüsü alınır, bundan sonra bağlayıcı oplog oxu rejiminə keçir.

Konfiqurasiya nümunəsi:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Gördüyünüz kimi, əvvəlki nümunə ilə müqayisədə yeni seçimlər yoxdur, ancaq verilənlər bazasına qoşulmaq üçün məsul olan variantların sayı və onların prefiksləri azaldılıb.

Parametrlər transforms bu dəfə aşağıdakıları edirlər: hədəf mövzunun adını sxemdən çevirin <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

xətaya dözümlülük

Zəmanəmizdə səhvlərə dözümlülük və yüksək əlçatanlıq məsələsi həmişəkindən daha kəskindir - xüsusən də məlumat və əməliyyatlar haqqında danışarkən və məlumatların dəyişməsinin izlənməsi bu məsələdə kənarda deyil. Prinsipcə nəyin səhv ola biləcəyinə və hər bir halda Debeziumun başına nə gələcəyinə baxaq.

Üç imtina variantı var:

  1. Kafka Connect uğursuzluğu. Qoşulma paylanmış rejimdə işləmək üçün konfiqurasiya edilibsə, bu, bir neçə işçinin eyni group.id-ni təyin etməsini tələb edir. Daha sonra, onlardan biri uğursuz olarsa, birləşdirici digər işçidə yenidən işə salınacaq və Kafkadakı mövzuda sonuncu yerinə yetirilən mövqedən oxumağa davam edəcək.
  2. Kafka klasteri ilə əlaqənin itirilməsi. Bağlayıcı sadəcə olaraq Kafkaya göndərə bilmədiyi mövqedə oxumağı dayandıracaq və cəhd uğur qazanana qədər vaxtaşırı onu yenidən göndərməyə çalışacaq.
  3. Data mənbəyi əlçatan deyil. Bağlayıcı konfiqurasiyaya uyğun olaraq mənbəyə yenidən qoşulmağa çalışacaq. Varsayılan olaraq istifadə edilən 16 cəhddir eksponensial geriləmə. 16-cı uğursuz cəhddən sonra tapşırıq kimi qeyd olunacaq uğursuz və onu Kafka Connect REST interfeysi vasitəsilə əl ilə yenidən işə salmaq lazımdır.
    • Vəziyyətdə PostgreSQL məlumatlar itirilməyəcək, çünki təkrarlama yuvalarından istifadə konnektor tərəfindən oxunmayan WAL fayllarının silinməsinin qarşısını alacaq. Bu halda bir mənfi cəhət var: əgər konnektor ilə DBMS arasında şəbəkə bağlantısı uzun müddət pozularsa, disk sahəsinin tükənmə ehtimalı var və bu, bütün DBMS-nin sıradan çıxmasına səbəb ola bilər.
    • Vəziyyətdə MySQL binlog faylları əlaqə bərpa edilməzdən əvvəl DBMS-nin özü tərəfindən fırlana bilər. Bu, konnektorun uğursuz vəziyyətə düşməsinə səbəb olacaq və normal işləməyi bərpa etmək üçün binloglardan oxumağa davam etmək üçün ilkin snapshot rejimində yenidən başlamalı olacaq.
    • haqqında MongoDB. Sənədlərdə deyilir: log/oplog faylları silindikdə və bağlayıcı qaldığı yerdən oxumağa davam edə bilmədikdə konnektorun davranışı bütün DBMS üçün eynidir. Bu, konnektorun dövlətə gedəcəyinə bağlıdır uğursuz və rejimdə yenidən başlamağı tələb edəcək ilkin snapshot.

      Bununla belə, istisnalar var. Bağlayıcı uzun müddət əlaqəsiz vəziyyətdə idisə (və ya MongoDB instansiyasına çata bilmədi) və bu müddət ərzində oplog fırlandısa, əlaqə bərpa edildikdə, bağlayıcı sakitcə məlumatları ilk mövcud mövqedən oxumağa davam edəcəkdir. , buna görə də Kafkadakı bəzi məlumatlar heç bir vuracaq.

Nəticə

Debezium CDC sistemləri ilə ilk təcrübəmdir və ümumilikdə çox müsbət oldu. Layihə əsas DBMS-nin dəstəyini, konfiqurasiyanın asanlığını, klaster dəstəyini və aktiv icmanı rüşvətlə təmin etdi. Təcrübə ilə maraqlananlar üçün təlimatları oxumağı məsləhət görürəm Kafka Connect и Debezium.

Kafka Connect üçün JDBC konnektoru ilə müqayisədə Debezium-un əsas üstünlüyü dəyişikliklərin DBMS jurnallarından oxunmasıdır ki, bu da verilənlərin minimal gecikmə ilə qəbul edilməsinə imkan verir. JDBC Konnektoru (Kafka Connect tərəfindən təmin edilir) izlənilən cədvəli müəyyən edilmiş intervalla sorğulayır və (eyni səbəbdən) məlumat silinəndə mesajlar yaratmır (orada olmayan məlumatları necə sorğulaya bilərsiniz?).

Bənzər problemləri həll etmək üçün aşağıdakı həll yollarına diqqət yetirə bilərsiniz (Debezium-a əlavə olaraq):

PS

Bloqumuzda da oxuyun:

Mənbə: www.habr.com

Добавить комментарий