Debezium Tanıtımı - Apache Kafka için CDC

Debezium Tanıtımı - Apache Kafka için CDC

İşimde sık sık yeni teknik çözümler/yazılım ürünleriyle karşılaşıyorum; bunlar hakkında Rusça internette oldukça az bilgi var. Bu makale ile böyle bir boşluğu, Debezium kullanarak iki popüler DBMS'den (PostgreSQL ve MongoDB) bir Kafka kümesine CDC olaylarının gönderilmesini yapılandırmam gerektiğinde, son uygulamalarımdan bir örnekle doldurmaya çalışacağım. Yapılan çalışmalar sonucunda ortaya çıkan bu inceleme yazısının başkalarına faydalı olmasını diliyorum.

Genel olarak Debezium ve CDC nedir?

debezyum - CDC yazılım kategorisinin temsilcisi (Veri Değişikliğini Yakala) veya daha doğrusu, Apache Kafka Connect çerçevesiyle uyumlu çeşitli DBMS'ler için bir dizi bağlayıcıdır.

O Açık Kaynak projesi, Apache Lisansı v2.0 kapsamında lisanslıdır ve Red Hat sponsorluğundadır. Geliştirme 2016'dan beri devam etmektedir ve şu anda aşağıdaki DBMS'ler için resmi destek sağlamaktadır: MySQL, PostgreSQL, MongoDB, SQL Server. Cassandra ve Oracle için de bağlayıcılar mevcut ancak bunlar şu anda "erken erişim" durumunda ve yeni sürümler geriye dönük uyumluluğu garanti etmiyor.

CDC'yi geleneksel yaklaşımla karşılaştırırsak (uygulama doğrudan DBMS'den veri okuduğunda), ana avantajları arasında düşük gecikme, yüksek güvenilirlik ve kullanılabilirlik ile satır düzeyinde veri değişikliği akışının uygulanması yer alır. Son iki noktaya, CDC olayları için bir depo olarak Kafka kümesi kullanılarak ulaşılır.

Diğer bir avantaj ise olayları depolamak için tek bir modelin kullanılmasıdır, böylece son uygulamanın farklı DBMS'leri çalıştırmanın nüansları konusunda endişelenmesine gerek kalmaz.

Son olarak, bir mesaj aracısının kullanılması, verilerdeki değişiklikleri izleyen uygulamaların yatay olarak ölçeklendirilmesine olanak tanır. Aynı zamanda veriler doğrudan DBMS'den değil Kafka kümesinden alındığı için veri kaynağı üzerindeki etki en aza indirilir.

Debezium mimarisi hakkında

Debezium'u kullanmak şu basit şemaya iniyor:

DBMS (veri kaynağı olarak) → Kafka Connect'teki bağlayıcı → Apache Kafka → tüketici

Örnek olarak aşağıda proje web sitesinden bir diyagram verilmiştir:

Debezium Tanıtımı - Apache Kafka için CDC

Ancak bu şemayı pek sevmiyorum çünkü sadece lavabo konektörünün kullanılması mümkün görünüyor.

Gerçekte durum farklıdır: Veri Gölünüzü doldurmak (yukarıdaki diyagramdaki son bağlantı) Debezium'u kullanmanın tek yolu bu değil. Apache Kafka'ya gönderilen olaylar, uygulamalarınız tarafından çeşitli durumların üstesinden gelmek için kullanılabilir. Örneğin:

  • ilgisiz verilerin önbellekten kaldırılması;
  • bildirim gönderme;
  • arama dizini güncellemeleri;
  • bir tür denetim günlüğü;
  • ...

Bir Java uygulamanız varsa ve Kafka kümesini kullanmaya gerek yok/olasılık yoksa, üzerinde çalışma olasılığı da vardır. gömülü bağlayıcı. Bariz avantajı, ek altyapı ihtiyacını (bağlayıcı ve Kafka şeklinde) ortadan kaldırmasıdır. Ancak bu çözüm, sürüm 1.1'den beri kullanımdan kaldırılmıştır ve artık kullanılması önerilmemektedir (gelecek sürümlerde bu çözüme yönelik destek kaldırılabilir).

Bu makalede geliştiriciler tarafından önerilen, hata toleransı ve ölçeklenebilirlik sağlayan mimari ele alınacaktır.

Bağlayıcı yapılandırması

En önemli değer olan verilerdeki değişiklikleri izlemeye başlamak için şunlara ihtiyacımız var:

  1. veri kaynağı; sürüm 5.7'den itibaren MySQL, PostgreSQL 9.6+, MongoDB 3.2+ (tam listesini);
  2. Apache Kafka kümesi;
  3. Kafka Connect örneği (sürüm 1.x, 2.x);
  4. yapılandırılmış Debezium konektörü.

İlk iki nokta üzerinde çalışın, yani. DBMS ve Apache Kafka'nın kurulum süreci makalenin kapsamı dışındadır. Ancak her şeyi sanal alanda dağıtmak isteyenler için örneklerin bulunduğu resmi depoda hazır bir docker-compose.yaml.

Son iki nokta üzerinde daha ayrıntılı olarak duracağız.

0. Kafka Bağlantısı

Burada ve makalenin ilerleyen kısımlarında tüm konfigürasyon örnekleri Debezium geliştiricileri tarafından dağıtılan Docker görüntüsü bağlamında tartışılmaktadır. Gerekli tüm eklenti dosyalarını (konektörleri) içerir ve ortam değişkenlerini kullanarak Kafka Connect'in yapılandırılmasını sağlar.

Confluent'ten Kafka Connect'i kullanmayı düşünüyorsanız, gerekli bağlayıcıların eklentilerini aşağıda belirtilen dizine kendiniz eklemeniz gerekecektir. plugin.path veya bir ortam değişkeni aracılığıyla ayarlanır CLASSPATH. Kafka Connect çalışanı ve bağlayıcılarına ilişkin ayarlar, çalışan başlatma komutuna argüman olarak iletilen yapılandırma dosyaları aracılığıyla belirlenir. Daha fazla ayrıntı için bkz. belgeleme.

Debeizum'u konnektör versiyonunda kurma sürecinin tamamı iki aşamada gerçekleştirilir. Her birini ele alalım:

1. Kafka Connect çerçevesini kurma

Apache Kafka kümesine veri akışı sağlamak için Kafka Connect çerçevesinde belirli parametreler ayarlanır:

  • küme bağlantı ayarları,
  • Bağlayıcının konfigürasyonunun doğrudan saklanacağı konuların adları,
  • bağlayıcının çalıştığı grubun adı (dağıtılmış mod kullanılıyorsa).

Projenin resmi Docker görüntüsü, ortam değişkenlerini kullanan yapılandırmayı destekliyor; kullanacağımız şey bu. Yani, resmi indirin:

docker pull debezium/connect

Bağlayıcıyı çalıştırmak için gereken minimum ortam değişkenleri kümesi aşağıdaki gibidir:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 — küme üyelerinin tam listesini elde etmek için Kafka küme sunucularının ilk listesi;
  • OFFSET_STORAGE_TOPIC=connector-offsets — konektörün halihazırda bulunduğu konumların saklanması için bir konu;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status — bağlayıcının ve görevlerinin durumunu saklama konusu;
  • CONFIG_STORAGE_TOPIC=connector-config — bağlayıcı yapılandırma verilerinin ve görevlerinin saklanmasına ilişkin konu;
  • GROUP_ID=1 - bağlayıcı görevin yürütülebileceği çalışan grubunun tanımlayıcısı; dağıtılmış kullanırken gerekli (dağıtılmış) modu.

Konteyneri şu değişkenlerle başlatıyoruz:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Avro hakkında not

Debezium, varsayılan olarak verileri JSON formatında yazar; bu, sanal alanlar ve küçük miktardaki veriler için kabul edilebilir, ancak yüksek yüklü veritabanlarında sorun haline gelebilir. JSON dönüştürücüye bir alternatif, mesajları kullanarak serileştirmektir. Avro Apache Kafka'daki G/Ç alt sistemindeki yükü azaltan ikili biçime dönüştürülür.

Avro'yu kullanmak için ayrı bir dağıtım yapmanız gerekir şema kaydı (diyagramları saklamak için). Dönüştürücünün değişkenleri şöyle görünecektir:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Avro'yu kullanma ve bunun için kayıt defterini ayarlamayla ilgili ayrıntılar bu makalenin kapsamı dışındadır - daha sonra netlik sağlamak için JSON'u kullanacağız.

2. Bağlayıcının kendisini yapılandırma

Artık doğrudan kaynaktan veri okuyacak olan konektörün yapılandırmasına gidebilirsiniz.

İki DBMS için bağlayıcı örneğine bakalım: Deneyimim olduğu ve farklılıkların olduğu (küçük de olsa, ancak bazı durumlarda önemli!) PostgreSQL ve MongoDB.

Yapılandırma JSON gösteriminde açıklanır ve POST isteği kullanılarak Kafka Connect'e yüklenir.

2.1. Postgre SQL

PostgreSQL için örnek bağlayıcı yapılandırması:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Bu kurulumdan sonra konektörün çalışma prensibi oldukça basittir:

  • İlk çalıştırmada konfigürasyonda belirtilen veritabanına bağlanır ve modunda başlar. ilk anlık görüntü, koşullu kullanılarak elde edilen ilk veri kümesini Kafka'ya göndererek SELECT * FROM table_name.
  • Başlatma işlemi tamamlandıktan sonra bağlayıcı, PostgreSQL WAL dosyalarındaki değişiklikleri okuma moduna girer.

Kullanılan seçenekler hakkında:

  • name - aşağıda açıklanan konfigürasyonun kullanıldığı konektörün adı; gelecekte bu ad, Kafka Connect REST API aracılığıyla bağlayıcıyla çalışmak (yani durumu görüntülemek/yapılandırmayı yeniden başlatmak/güncellemek) için kullanılacak;
  • connector.class — Yapılandırılmış bağlayıcı tarafından kullanılacak DBMS bağlayıcı sınıfı;
  • plugin.name — WAL dosyalarından verilerin mantıksal kodunun çözülmesine yönelik eklentinin adı. Aralarından seçim yapılabilir wal2json, decoderbuffs и pgoutput. İlk ikisi DBMS'ye uygun uzantıların kurulmasını gerektirir ve pgoutput PostgreSQL sürüm 10 ve üzeri için ek manipülasyon gerektirmez;
  • database.* — veritabanına bağlanma seçenekleri, burada database.server.name — Kafka kümesindeki konu adını oluşturmak için kullanılan PostgreSQL örnek adı;
  • table.include.list — değişiklikleri izlemek istediğimiz tabloların listesi; formatta belirtilen schema.table_name; ile birlikte kullanılamaz table.exclude.list;
  • heartbeat.interval.ms — bağlayıcının özel bir konuya kalp atışı mesajları gönderdiği aralık (milisaniye cinsinden);
  • heartbeat.action.query — her kalp atışı mesajı gönderilirken yürütülecek bir istek (seçenek 1.1 sürümünde ortaya çıkmıştır);
  • slot.name — bağlayıcı tarafından kullanılacak çoğaltma yuvasının adı;
  • publication.name - İsim Yayın bağlayıcının kullandığı PostgreSQL'de. Eğer mevcut değilse Debezium onu ​​oluşturmaya çalışacaktır. Bağlantının yapıldığı kullanıcı bu işlem için yeterli haklara sahip değilse bağlayıcı bir hata vererek sonlandırılacaktır;
  • transforms hedef konunun adının tam olarak nasıl değiştirileceğini belirler:
    • transforms.AddPrefix.type normal ifadeler kullanacağımızı belirtir;
    • transforms.AddPrefix.regex — hedef konunun adını yeniden tanımlayan bir maske;
    • transforms.AddPrefix.replacement - doğrudan yeniden tanımladığımız şey.

Kalp atışı ve dönüşümler hakkında daha fazla bilgi

Varsayılan olarak bağlayıcı, taahhüt edilen her işlem için Kafka'ya veri gönderir ve bunun LSN'si (Günlük Sıra Numarası) hizmet konusuna kaydedilir. offset. Ancak bağlayıcı veritabanının tamamını değil de tablolarının yalnızca bir kısmını (veri güncellemelerinin sık gerçekleşmediği) okuyacak şekilde yapılandırılırsa ne olur?

  • Bağlayıcı WAL dosyalarını okuyacak ve izlediği tablolara yapılan herhangi bir işlem işlemini algılamayacaktır.
  • Bu nedenle konu veya çoğaltma yuvasındaki mevcut konumunu güncellemeyecektir.
  • Bu da WAL dosyalarının diskte tutulmasına ve muhtemelen disk alanının tükenmesine neden olacaktır.

Ve burada seçenekler kurtarmaya geliyor. heartbeat.interval.ms и heartbeat.action.query. Bu seçeneklerin çiftler halinde kullanılması, her kalp atışı mesajı gönderildiğinde ayrı bir tablodaki verileri değiştirme isteğinin gerçekleştirilmesini mümkün kılar. Böylece konektörün halihazırda bulunduğu LSN (çoğaltma yuvasında) sürekli olarak güncellenir. Bu, DBMS'nin artık ihtiyaç duyulmayan WAL dosyalarını kaldırmasına olanak tanır. Seçeneklerin nasıl çalıştığı hakkında daha fazla bilgi edinebilirsiniz. belgeleme.

Daha yakından ilgilenmeye değer başka bir seçenek de transforms. Her ne kadar daha çok rahatlık ve güzellikle ilgili olsa da...

Varsayılan olarak Debezium, aşağıdaki adlandırma politikasını kullanarak konuları oluşturur: serverName.schemaName.tableName. Bu her zaman uygun olmayabilir. Seçenekler transforms Belirli bir ada sahip bir konuya yönlendirilmesi gereken olayların, tabloların bir listesini tanımlamak için normal ifadeleri kullanabilirsiniz.

Yapılandırmamızda teşekkürler transforms şu gerçekleşir: izlenen veri tabanındaki tüm CDC olayları şu addaki bir konuya gider: data.cdc.dbname. Aksi takdirde (bu ayarlar olmadan), Debezium varsayılan olarak her tablo için aşağıdaki gibi bir konu oluşturur: pg-dev.public.<table_name>.

Bağlayıcı Sınırlamaları

PostgreSQL için bağlayıcı yapılandırmasının açıklamasının sonunda, çalışmasının aşağıdaki özelliklerinden / sınırlamalarından bahsetmeye değer:

  1. PostgreSQL için bağlayıcının işlevselliği mantıksal kod çözme kavramına dayanır. Bu nedenle o veritabanı yapısını değiştirme isteklerini izlemiyor (DDL) - buna göre bu veriler konularda olmayacak.
  2. Çoğaltma yuvaları kullanıldığından konektör bağlamak mümkündür sadece önde gelen DBMS örneğine.
  3. Bağlayıcının veritabanına bağlandığı kullanıcıya salt okuma hakları verilmişse, ilk başlatmadan önce manuel olarak bir çoğaltma yuvası oluşturmanız ve veritabanında yayınlamanız gerekecektir.

Yapılandırmanın uygulanması

Şimdi konfigürasyonumuzu bağlayıcıya yükleyelim:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

İndirmenin başarılı olduğunu ve bağlayıcının başlatıldığını kontrol ediyoruz:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Harika: Kuruldu ve kullanıma hazır. Şimdi tüketici gibi davranıp Kafka'ya bağlanalım, ardından tabloya bir girdi ekleyip değiştireceğiz:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Konumuz aşağıdaki gibi görüntülenecektir:

Değişikliklerimizle birlikte çok uzun JSON

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Her iki durumda da kayıtlar, değiştirilen kaydın anahtarından (PK) ve değişikliklerin özünden oluşur: kaydın öncesinde ne olduğu ve sonrasında ne olduğu.

  • Durumunda INSERT: önceki değer (before) eşittir nullve sonrasında - eklenen satır.
  • Durumunda UPDATE: içinde payload.before satırın önceki durumu görüntülenir ve payload.after - değişimin özüyle yeni.

2.2 MongoDB

Bu bağlayıcı, birincil DBMS düğümünün oplog'undan bilgileri okuyarak standart MongoDB çoğaltma mekanizmasını kullanır.

PgSQL için daha önce açıklanan bağlayıcıya benzer şekilde, burada da ilk başlangıçta birincil veri anlık görüntüsü alınır ve ardından bağlayıcı oplog okuma moduna geçer.

Yapılandırma örneği:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Gördüğünüz gibi, önceki örneğe kıyasla burada yeni bir seçenek yok, yalnızca veritabanına bağlanmaktan sorumlu seçeneklerin ve bunların öneklerinin sayısı azaltıldı.

Ayarlar transforms bu sefer şunu yapıyorlar: hedef konunun adını şemadan dönüştürüyorlar <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

hata toleransı

Günümüzde hata toleransı ve yüksek kullanılabilirlik sorunu her zamankinden daha ciddi bir hal alıyor; özellikle veri ve işlemlerden bahsederken ve veri değişikliklerini takip etmek bu konuda bir kenara bırakılmıyor. Prensipte neyin yanlış gidebileceğine ve her durumda Debezium'a ne olacağına bakalım.

Üç vazgeçme seçeneği vardır:

  1. Kafka Bağlantısı hatası. Connect, dağıtılmış modda çalışacak şekilde yapılandırıldıysa bu, birden fazla çalışanın aynı group.id'yi ayarlamasını gerektirir. Daha sonra bunlardan biri arızalanırsa bağlayıcı diğer çalışanda yeniden başlatılacak ve Kafka'da konuya ilişkin son kararlı konumdan okumaya devam edilecektir.
  2. Kafka kümesiyle bağlantı kaybı. Bağlayıcı, Kafka'ya gönderilemeyen konumda okumayı durduracak ve deneme başarılı olana kadar periyodik olarak yeniden göndermeyi deneyecektir.
  3. Veri kaynağının kullanılamaması. Bağlayıcı, yapılandırıldığı gibi kaynağa yeniden bağlanmayı deneyecektir. Varsayılan, 16 denemedir. üstel geri çekilme. 16. başarısız denemeden sonra görev şu şekilde işaretlenecektir: başarısız ve Kafka Connect REST arayüzü aracılığıyla manuel olarak yeniden başlatmanız gerekecektir.
    • Durumunda PostgreSQL veriler kaybolmayacaktır çünkü Çoğaltma yuvalarının kullanılması, bağlayıcı tarafından okunmayan WAL dosyalarını silmenizi önleyecektir. Bu durumda madalyonun bir dezavantajı da var: Konektör ile DBMS arasındaki ağ bağlantısı uzun süre kesilirse, disk alanının tükenmesi olasılığı vardır ve bu da bir arızaya yol açabilir. DBMS'nin tamamı.
    • Durumunda MySQL binlog dosyaları, bağlantı yeniden sağlanmadan önce DBMS'nin kendisi tarafından döndürülebilir. Bu, bağlayıcının arızalı duruma geçmesine ve normal çalışmaya geri dönmesine neden olacaktır; binlog'lardan okumaya devam etmek için ilk anlık görüntü modunda yeniden başlatmanız gerekecektir.
    • Hakkında MongoDB. Dokümantasyon şunu belirtir: Log/oplog dosyalarının silinmesi ve bağlayıcının kaldığı yerden okumaya devam edememesi durumunda bağlayıcının davranışı tüm DBMS'ler için aynıdır. Bu, bağlayıcının duruma geçeceği anlamına gelir başarısız ve modda yeniden başlatmayı gerektirecek ilk anlık görüntü.

      Ancak istisnalar da var. Konektörün bağlantısı uzun süre kesildiyse (veya MongoDB örneğine ulaşamadıysa) ve oplog bu süre zarfında dönüş yaptıysa, bağlantı yeniden kurulduğunda konektör sakin bir şekilde ilk kullanılabilir konumdan verileri okumaya devam edecektir. bu yüzden Kafka'daki bazı veriler hayır vuracak.

Sonuç

Debezium, CDC sistemleriyle ilgili ilk deneyimim ve genel olarak çok olumlu. Proje, büyük DBMS'lere verdiği destek, yapılandırma kolaylığı, kümeleme desteği ve aktif topluluğuyla beğeni topladı. Uygulamaya ilgi duyanların rehberlerini okumalarını tavsiye ederim. Kafka Bağlantısı и debezyum.

Kafka Connect için JDBC bağlayıcıyla karşılaştırıldığında Debezium'un temel avantajı, değişikliklerin DBMS günlüklerinden okunmasıdır, bu da verilerin minimum gecikmeyle alınmasına olanak tanır. JDBC Bağlayıcısı (Kafka Connect'ten) izlenen tabloyu sabit aralıklarla sorgular ve (aynı nedenden dolayı) veriler silindiğinde mesaj oluşturmaz (var olmayan verileri nasıl sorgulayabilirsiniz?).

Benzer sorunları çözmek için aşağıdaki çözümlere (Debezium'a ek olarak) dikkat edebilirsiniz:

PS

Blogumuzda da okuyun:

Kaynak: habr.com

Yorum ekle