Przedstawiamy Debezium - CDC dla Apache Kafka

Przedstawiamy Debezium - CDC dla Apache Kafka

W swojej pracy często spotykam się z nowymi rozwiązaniami technicznymi/programami, o których w rosyjskojęzycznym Internecie jest raczej mało informacji. W tym artykule postaram się wypełnić jedną z takich luk przykładem z mojej niedawnej praktyki, kiedy potrzebowałem skonfigurować wysyłanie zdarzeń CDC z dwóch popularnych baz danych DBMS (PostgreSQL i MongoDB) do klastra Kafki przy użyciu Debezium. Mam nadzieję, że ten artykuł poglądowy, który pojawi się w wyniku wykonanej pracy, będzie przydatny dla innych.

Co to jest Debezium i CDC w ogóle?

debezium — przedstawiciel kategorii oprogramowania CDC (Przechwytuj zmianę danych), a dokładniej jest to zestaw konektorów dla różnych systemów DBMS kompatybilnych z frameworkiem Apache Kafka Connect.

To Projekt Open Source, licencjonowany na podstawie licencji Apache v2.0 i sponsorowany przez firmę Red Hat. Rozwój trwa od 2016 roku i obecnie zapewnia oficjalne wsparcie dla następujących systemów DBMS: MySQL, PostgreSQL, MongoDB, SQL Server. Istnieją również konektory dla Cassandry i Oracle, ale na razie są one w fazie „wczesnego dostępu”, a nowe wydania nie gwarantują kompatybilności wstecznej.

Jeśli porównamy CDC z podejściem tradycyjnym (kiedy aplikacja bezpośrednio odczytuje dane z SZBD), to do jego głównych zalet można zaliczyć implementację strumieniowania zmian danych na poziomie wiersza z niskimi opóźnieniami, wysoką niezawodnością i dostępnością. Dwa ostatnie punkty osiąga się poprzez wykorzystanie klastra Kafki jako repozytorium zdarzeń CDC.

Kolejną zaletą jest fakt, że do przechowywania zdarzeń wykorzystywany jest jeden model, dzięki czemu aplikacja końcowa nie musi martwić się niuansami obsługi różnych systemów DBMS.

Wreszcie użycie brokera komunikatów umożliwia aplikacjom monitorującym zmiany danych skalowanie w poziomie. Jednocześnie minimalizowany jest wpływ na źródło danych, gdyż dane pozyskiwane są nie bezpośrednio z DBMS, a z klastra Kafki.

O architekturze Debezium

Korzystanie z Debezium sprowadza się do tego prostego schematu:

DBMS (jako źródło danych) → konektor w Kafka Connect → Apache Kafka → konsument

Dla ilustracji podaję diagram ze strony internetowej projektu:

Przedstawiamy Debezium - CDC dla Apache Kafka

Jednak ten schemat nie do końca mi się podoba, bo wydaje mi się, że możliwe jest jedynie zastosowanie łącznika do zlewu.

W rzeczywistości sytuacja jest inna: zapełnianie jeziora danych (ostatni link na powyższym schemacie) To nie jedyny sposób wykorzystania Debezium. Zdarzenia wysyłane do Apache Kafka mogą być wykorzystywane przez aplikacje do obsługi różnych sytuacji. Na przykład:

  • usuwanie nieistotnych danych z pamięci podręcznej;
  • wysyłanie powiadomień;
  • wyszukiwanie aktualizacji indeksu;
  • pewnego rodzaju dzienniki audytu;
  • ...

W przypadku, gdy posiadasz aplikację Java i nie ma potrzeby/możliwości korzystania z klastra Kafki, istnieje również możliwość pracy przez złącze wbudowane. Oczywistą zaletą jest to, że eliminuje potrzebę dodatkowej infrastruktury (w postaci konektora i Kafki). Jednak to rozwiązanie stało się przestarzałe od wersji 1.1 i nie jest już zalecane do użytku (obsługa tego rozwiązania może zostać usunięta w przyszłych wersjach).

W tym artykule omówiona zostanie architektura zalecana przez programistów, która zapewnia odporność na awarie i skalowalność.

Konfiguracja złącza

Aby rozpocząć śledzenie zmian najważniejszej wartości – danych – potrzebujemy:

  1. źródłem danych, którym może być MySQL począwszy od wersji 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (pełna lista);
  2. Klaster Apache Kafka;
  3. Instancja Kafka Connect (wersje 1.x, 2.x);
  4. skonfigurowane złącze Debezium.

Pracuj nad pierwszymi dwoma punktami, tj. Proces instalacji DBMS i Apache Kafka wykracza poza zakres artykułu. Jednak dla tych, którzy chcą wdrożyć wszystko w piaskownicy, oficjalne repozytorium z przykładami ma gotowe rozwiązanie docker-compose.yaml.

Zastanowimy się bardziej szczegółowo nad dwoma ostatnimi punktami.

0. Połączenie Kafki

Tutaj i w dalszej części artykułu wszystkie przykłady konfiguracji zostały omówione w kontekście obrazu Dockera dystrybuowanego przez programistów Debezium. Zawiera wszystkie niezbędne pliki wtyczek (łączników) oraz umożliwia konfigurację Kafka Connect przy użyciu zmiennych środowiskowych.

Jeśli zamierzasz korzystać z Kafka Connect z Confluent, będziesz musiał samodzielnie dodać wtyczki niezbędnych konektorów do katalogu określonego w plugin.path lub ustawić za pomocą zmiennej środowiskowej CLASSPATH. Ustawienia procesu roboczego i łączników Kafka Connect są określane za pomocą plików konfiguracyjnych przekazywanych jako argumenty do polecenia uruchomienia procesu roboczego. Więcej szczegółów znajdziesz w dokumentacja.

Cały proces konfiguracji Debeizum w wersji konektorowej odbywa się dwuetapowo. Przyjrzyjmy się każdemu z nich:

1. Konfiguracja frameworku Kafka Connect

Aby przesyłać strumieniowo dane do klastra Apache Kafka, w środowisku Kafka Connect ustawiane są określone parametry, takie jak:

  • parametry podłączenia do klastra,
  • nazwy tematów, w których bezpośrednio będzie zapisana konfiguracja samego konektora,
  • nazwa grupy, w której działa konektor (jeśli używany jest tryb rozproszony).

Oficjalny obraz Dockera projektu obsługuje konfigurację przy użyciu zmiennych środowiskowych - to właśnie z nich skorzystamy. Zatem pobierz obraz:

docker pull debezium/connect

Minimalny zestaw zmiennych środowiskowych wymaganych do uruchomienia konektora jest następujący:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 — wstępna lista serwerów klastra Kafka w celu uzyskania pełnej listy członków klastra;
  • OFFSET_STORAGE_TOPIC=connector-offsets — temat przechowywania pozycji, w których aktualnie znajduje się złącze;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status — temat przechowywania stanu konektora i jego zadań;
  • CONFIG_STORAGE_TOPIC=connector-config — temat przechowywania danych konfiguracyjnych złącza i jego zadania;
  • GROUP_ID=1 — identyfikator grupy pracowników, na której można wykonać zadanie łącznika; konieczne w przypadku korzystania z rozproszonego (Rozpowszechniane) reżim.

Uruchamiamy kontener z tymi zmiennymi:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Uwaga na temat Avro

Domyślnie Debezium zapisuje dane w formacie JSON, który jest akceptowalny w przypadku piaskownic i małych ilości danych, ale może stać się problemem w mocno obciążonych bazach danych. Alternatywą dla konwertera JSON jest serializacja wiadomości przy użyciu euro do formatu binarnego, co zmniejsza obciążenie podsystemu we/wy w Apache Kafka.

Aby korzystać z Avro, musisz wdrożyć oddzielny plik rejestr-schemat (do przechowywania diagramów). Zmienne konwertera będą wyglądać następująco:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Szczegóły dotyczące korzystania z Avro i konfigurowania dla niego rejestru wykraczają poza zakres tego artykułu - w dalszej części, dla przejrzystości, użyjemy JSON.

2. Konfiguracja samego konektora

Teraz możesz przejść bezpośrednio do konfiguracji samego konektora, który wczyta dane ze źródła.

Spójrzmy na przykład konektorów dla dwóch baz danych DBMS: PostgreSQL i MongoDB, w których mam doświadczenie i w których występują różnice (choć niewielkie, ale w niektórych przypadkach znaczące!).

Konfiguracja jest opisana w notacji JSON i przesłana do Kafka Connect za pomocą żądania POST.

2.1. PostgreSQL

Przykładowa konfiguracja konektora dla PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Zasada działania złącza po takiej konfiguracji jest dość prosta:

  • Przy pierwszym uruchomieniu łączy się z bazą danych określoną w konfiguracji i uruchamia się w trybie wstępna migawka, wysyłając do Kafki początkowy zestaw danych uzyskanych za pomocą warunku SELECT * FROM table_name.
  • Po zakończeniu inicjalizacji konektor przechodzi w tryb odczytywania zmian z plików PostgreSQL WAL.

O zastosowanych opcjach:

  • name — nazwę złącza, dla którego stosowana jest opisana poniżej konfiguracja; w przyszłości nazwa ta będzie wykorzystywana do pracy z konektorem (czyli podglądu stanu/restartu/aktualizacji konfiguracji) poprzez API REST Kafka Connect;
  • connector.class — klasa konektora DBMS, która będzie używana przez konfigurowany konektor;
  • plugin.name — nazwa wtyczki do logicznego dekodowania danych z plików WAL. Dostępne do wyboru wal2json, decoderbuffs и pgoutput. Pierwsze dwa wymagają instalacji odpowiednich rozszerzeń w systemie DBMS, oraz pgoutput dla PostgreSQL w wersji 10 i wyższej nie wymaga dodatkowych manipulacji;
  • database.* — opcje połączenia z bazą danych, gdzie database.server.name — nazwa instancji PostgreSQL używana do tworzenia nazwy tematu w klastrze Kafka;
  • table.include.list — lista tabel, w których chcemy śledzić zmiany; określone w formacie schema.table_name; nie można używać razem z table.exclude.list;
  • heartbeat.interval.ms — interwał (w milisekundach), z jakim konektor wysyła komunikaty pulsu do specjalnego tematu;
  • heartbeat.action.query — żądanie, które będzie realizowane przy wysyłaniu każdej wiadomości pulsu (opcja pojawiła się w wersji 1.1);
  • slot.name — nazwa slotu replikacyjnego, który będzie używany przez konektor;
  • publication.name - Nazwa Publikacja w PostgreSQL, którego używa konektor. Jeśli nie istnieje, Debezium spróbuje go stworzyć. Jeżeli użytkownik, pod którym nawiązywane jest połączenie, nie ma wystarczających uprawnień do tej akcji, konektor zakończy się błędem;
  • transforms określa dokładnie, jak zmienić nazwę tematu docelowego:
    • transforms.AddPrefix.type wskazuje, że będziemy używać wyrażeń regularnych;
    • transforms.AddPrefix.regex — maska, która na nowo definiuje nazwę tematu docelowego;
    • transforms.AddPrefix.replacement - bezpośrednio to, co redefiniujemy.

Więcej o biciu serca i przemianach

Domyślnie łącznik wysyła dane do Kafki dla każdej zatwierdzonej transakcji, a jego numer LSN (log sekwencyjny) jest rejestrowany w temacie usługi offset. Co się jednak stanie, jeśli konektor zostanie skonfigurowany tak, aby odczytywał nie całą bazę danych, a jedynie część jej tabel (w których aktualizacja danych nie następuje często)?

  • Łącznik odczyta pliki WAL i nie wykryje żadnych zatwierdzeń transakcji w monitorowanych tabelach.
  • Dlatego nie zaktualizuje swojej bieżącej pozycji ani w temacie, ani w gnieździe replikacji.
  • To z kolei spowoduje, że pliki WAL będą przechowywane na dysku i prawdopodobnie zabraknie miejsca na dysku.

I tu na ratunek przychodzą opcje. heartbeat.interval.ms и heartbeat.action.query. Użycie tych opcji w parach umożliwia wykonanie żądania zmiany danych w osobnej tabeli za każdym razem, gdy wysyłany jest komunikat pulsu. Dzięki temu numer LSN, na którym aktualnie znajduje się złącze (w gnieździe replikacyjnym), jest stale aktualizowany. Dzięki temu system DBMS może usunąć pliki WAL, które nie są już potrzebne. Możesz dowiedzieć się więcej o tym, jak działają te opcje dokumentacja.

Inną opcją godną bliższej uwagi jest transforms. Choć tu bardziej chodzi o wygodę i piękno...

Domyślnie Debezium tworzy tematy stosując następującą politykę nazewnictwa: serverName.schemaName.tableName. Nie zawsze może to być wygodne. Opcje transforms Za pomocą wyrażeń regularnych możesz zdefiniować listę tabel, zdarzeń, z których należy kierować do tematu o określonej nazwie.

W naszej konfiguracji dzięki transforms dzieje się tak: wszystkie zdarzenia CDC z monitorowanej bazy danych trafią do tematu o nazwie data.cdc.dbname. W przeciwnym razie (bez tych ustawień) Debezium domyślnie utworzy temat dla każdej tabeli, taki jak: pg-dev.public.<table_name>.

Ograniczenia złącza

Na zakończenie opisu konfiguracji konektora dla PostgreSQL warto wspomnieć o następujących cechach/ograniczeniach jego działania:

  1. Funkcjonalność konektora dla PostgreSQL opiera się na koncepcji dekodowania logicznego. Dlatego on nie śledzi żądań zmiany struktury bazy danych (DDL) - w związku z tym dane te nie znajdą się w tematach.
  2. Ponieważ wykorzystywane są gniazda replikacyjne, możliwe jest podłączenie łącznika tylko do wiodącej instancji DBMS.
  3. Jeśli użytkownik, pod którym konektor łączy się z bazą danych, ma uprawnienia tylko do odczytu, to przed pierwszym uruchomieniem trzeba będzie ręcznie utworzyć slot replikacji i opublikować go w bazie danych.

Zastosowanie konfiguracji

Załadujmy więc naszą konfigurację do konektora:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Sprawdzamy, czy pobieranie przebiegło pomyślnie i konektor został uruchomiony:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Świetnie: jest skonfigurowany i gotowy do pracy. Teraz wcielmy się w konsumenta i połączmy się z Kafką, po czym dodamy i zmienimy wpis w tabeli:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

W naszym temacie będzie to wyświetlane w następujący sposób:

Bardzo długi JSON z naszymi zmianami

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

W obu przypadkach akta składają się z klucza (PK) zmienionego zapisu oraz samej istoty zmian: czym był zapis wcześniej i czym stał się później.

  • W przypadku INSERT: wartość przed (before) równa się null, a po - wstawiony wiersz.
  • W przypadku UPDATE: w payload.before wyświetlany jest poprzedni stan linii i in payload.after — nowość z istotą zmian.

2.2 MongoDB

Złącze to wykorzystuje standardowy mechanizm replikacji MongoDB, odczytując informacje z oplogu głównego węzła DBMS.

Podobnie jak w już opisanym konektorze dla PgSQL, również tutaj przy pierwszym uruchomieniu wykonywany jest snapshot danych pierwotnych, po czym konektor przechodzi w tryb odczytu oplogu.

Przykład konfiguracji:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Jak widać nie ma tu żadnych nowych opcji w porównaniu do poprzedniego przykładu, a jedynie zmniejszono ilość opcji odpowiedzialnych za łączenie się z bazą danych oraz ich prefiksy.

Ustawienia transforms tym razem wykonują następującą czynność: przekształcają nazwę tematu docelowego ze schematu <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

tolerancja błędów

Kwestia odporności na awarie i wysokiej dostępności jest w naszych czasach bardziej dotkliwa niż kiedykolwiek - szczególnie gdy mówimy o danych i transakcjach, a śledzenie zmian danych nie pozostaje w tej kwestii na uboczu. Przyjrzyjmy się, co w zasadzie może pójść nie tak i co stanie się z Debezium w każdym przypadku.

Istnieją trzy możliwości rezygnacji:

  1. Awaria Kafki Connect. Jeśli Connect jest skonfigurowany do pracy w trybie rozproszonym, wymaga to ustawienia tego samego identyfikatora grupy przez wielu pracowników. Następnie, jeśli któryś z nich zawiedzie, konektor zostanie zrestartowany na innym workeru i będzie kontynuował czytanie od ostatniej zatwierdzonej pozycji w temacie w Kafce.
  2. Utrata łączności z klastrem Kafka. Konektor po prostu przestanie czytać w miejscu, którego nie udało się wysłać do Kafki, i okresowo będzie próbował wysłać go ponownie, aż próba się powiedzie.
  3. Niedostępność źródła danych. Złącze podejmie próbę ponownego połączenia ze źródłem zgodnie z konfiguracją. Wartość domyślna to 16 prób użycia wycofywanie wykładnicze. Po 16 nieudanej próbie zadanie zostanie oznaczone jako powiodło i będziesz musiał ręcznie uruchomić go ponownie za pomocą interfejsu REST Kafka Connect.
    • W przypadku PostgreSQL dane nie zostaną utracone, ponieważ Korzystanie ze szczelin replikacyjnych zapobiegnie usunięciu plików WAL, które nie są odczytywane przez łącznik. W tym przypadku moneta ma również wadę: jeśli łączność sieciowa pomiędzy złączem a systemem DBMS zostanie zakłócona na dłuższy czas, istnieje możliwość, że zabraknie miejsca na dysku, co może prowadzić do awarii cały DBMS.
    • W przypadku MySQL pliki binlog mogą być obracane przez sam system DBMS przed przywróceniem łączności. Spowoduje to, że łącznik przejdzie w stan awarii i aby przywrócić normalne działanie, konieczne będzie ponowne uruchomienie w trybie początkowej migawki, aby kontynuować czytanie dzienników binarnych.
    • Про MongoDB. Dokumentacja stwierdza: zachowanie konektora w przypadku, gdy pliki log/oplog zostały usunięte i konektor nie może kontynuować odczytu od miejsca, w którym został przerwany, jest takie samo dla wszystkich systemów DBMS. Oznacza to, że złącze przejdzie w stan powiodło i będzie wymagać ponownego uruchomienia w trybie wstępna migawka.

      Są jednak wyjątki. Jeżeli konektor był przez dłuższy czas odłączony (lub nie mógł dotrzeć do instancji MongoDB), a oplog w tym czasie przeszedł rotację, to po przywróceniu połączenia konektor będzie spokojnie kontynuował odczyt danych z pierwszej dostępnej pozycji, dlatego część danych w Kafce nie uderzy.

wniosek

Debezium to moje pierwsze doświadczenie z systemami CDC i ogólnie bardzo pozytywne. Projekt przekonał się wsparciem dla głównych systemów DBMS, łatwością konfiguracji, obsługą klastrów i aktywną społecznością. Osobom zainteresowanym praktyką polecam zapoznanie się z poradnikami dot Kafka Połącz и debezium.

W porównaniu do złącza JDBC dla Kafka Connect, główną zaletą Debezium jest to, że zmiany są odczytywane z logów DBMS, co pozwala na odbiór danych z minimalnym opóźnieniem. Złącze JDBC (z Kafka Connect) wysyła zapytania do monitorowanej tabeli w ustalonych odstępach czasu i (z tego samego powodu) nie generuje komunikatów w przypadku usunięcia danych (jak można wysyłać zapytania do danych, które nie istnieją?).

Aby rozwiązać podobne problemy, możesz zwrócić uwagę na następujące rozwiązania (oprócz Debezium):

PS

Przeczytaj także na naszym blogu:

Źródło: www.habr.com

Dodaj komentarz