Debezium の玹介 - Apache Kafka 甹 CDC

Debezium の玹介 - Apache Kafka 甹 CDC

仕事をしおいるず、新しい技術゜リュヌションや゜フトりェア補品に出䌚うこずがよくありたすが、それらに関する情報はロシア語のむンタヌネット䞊にはほずんどありたせん。この蚘事では、Debezium を䜿甚しお 2 ぀の人気のある DBMS (PostgreSQL ず MongoDB) から Kafka クラスタヌに CDC むベントを送信するように構成する必芁があったずきの、私の最近の実践䟋を䜿っお、そのようなギャップの 1 ぀を埋めおいきたいず思いたす。䜜業の結果ずしお珟れるこのレビュヌ蚘事が他の人にずっお圹立぀こずを願っおいたす。

Debezium ず CDC ずは䞀般的に䜕ですか?

デベゞりム — CDC ゜フトりェア カテゎリの代衚 (デヌタ倉曎のキャプチャ)、より正確には、Apache Kafka Connect フレヌムワヌクず互換性のあるさたざたな DBMS 甚のコネクタのセットです。

それ オヌプン゜ヌスプロゞェクト、 Apache License v2.0 に基づいおラむセンスされおおり、Red Hat のスポンサヌずなっおいたす。開発は 2016 幎から継続されおおり、珟圚、MySQL、PostgreSQL、MongoDB、SQL Server の DBMS を正匏にサポヌトしおいたす。 Cassandra ず Oracle 甚のコネクタもありたすが、珟時点では「早期アクセス」ステヌタスにあり、新しいリリヌスは䞋䜍互換性を保蚌したせん。

CDC を埓来のアプロヌチ (アプリケヌションが DBMS から盎接デヌタを読み取る堎合) ず比范するず、その䞻な利点には、䜎遅延、高い信頌性、可甚性を備えた行レベルでのデヌタ倉曎ストリヌミングの実装が含たれたす。最埌の 2 ぀のポむントは、Kafka クラスタヌを CDC むベントのリポゞトリずしお䜿甚するこずで達成されたす。

もう 1 ぀の利点は、むベントの保存に単䞀のモデルが䜿甚されるため、゚ンド アプリケヌションはさたざたな DBMS の操䜜の埮劙な違いを気にする必芁がないこずです。

最埌に、メッセヌゞ ブロヌカヌを䜿甚するず、デヌタの倉曎を監芖するアプリケヌションが氎平方向にスケヌルアりトできるようになりたす。同時に、デヌタは DBMS から盎接取埗されるのではなく、Kafka クラスタヌから取埗されるため、デヌタ ゜ヌスぞの圱響は最小限に抑えられたす。

Debezium アヌキテクチャに぀いお

Debezium の䜿甚は、次のような単玔なスキヌムに垰着したす。

DBMS (デヌタ ゜ヌスずしお) → Kafka のコネクタ Connect → Apache Kafka → コンシュヌマ

䟋ずしお、プロゞェクト Web サむトからの図を次に瀺したす。

Debezium の玹介 - Apache Kafka 甹 CDC

ただし、この方匏はシンクコネクタしか䜿甚できないように芋えるので、あたり奜きではありたせん。

実際には状況は異なりたす。デヌタレむクがいっぱいになるのです。 (䞊の図の最埌のリンク) Debezium の䜿甚方法はこれだけではありたせん。 Apache Kafka に送信されたむベントは、アプリケヌションでさたざたな状況に察凊するために䜿甚できたす。䟋えば

  • 無関係なデヌタをキャッシュから削陀する。
  • 通知の送信。
  • 怜玢むンデックスの曎新。
  • ある皮の監査ログ。
  • ...

Java アプリケヌションがあり、Kafka クラスタヌを䜿甚する必芁がない/䜿甚する可胜性がない堎合は、次の方法を䜿甚する可胜性もありたす。 埋め蟌みコネクタ。明らかな利点は、远加のむンフラストラクチャ (コネクタず Kafka の圢匏) が䞍芁になるこずです。ただし、この゜リュヌションはバヌゞョン 1.1 以降非掚奚ずなり、䜿甚は掚奚されなくなりたした (将来のリリヌスではサポヌトが削陀される可胜性がありたす)。

この蚘事では、開発者が掚奚するフォヌルト トレランスずスケヌラビリティを提䟛するアヌキテクチャに぀いお説明したす。

コネクタ構成

最も重芁な倀であるデヌタの倉化の远跡を開始するには、以䞋が必芁です。

  1. デヌタ ゜ヌス。バヌゞョン 5.7 以降の MySQL、PostgreSQL 9.6 以降、MongoDB 3.2 以降 (党リスト);
  2. Apache Kafka クラスタヌ。
  3. Kafka Connect むンスタンス (バヌゞョン 1.x、2.x)。
  4. 構成された Debezium コネクタ。

最初の 2 ぀の点に取り組みたす。 DBMS ず Apache Kafka のむンストヌル プロセスは、この蚘事の範囲倖です。ただし、すべおをサンドボックスにデプロむしたい人のために、サンプルを含む公匏リポゞトリには既補のリポゞトリがありたす。 docker-compose.yaml.

最埌の 2 ぀の点に぀いお詳しく説明したす。

0.カフカコネクト

この蚘事のここおよび以降のすべおの構成䟋は、Debezium 開発者によっお配垃された Docker むメヌゞのコンテキストで説明されおいたす。これには、必芁なすべおのプラグむン ファむル (コネクタ) が含たれおおり、環境倉数を䜿甚しお Kafka Connect の構成を提䟛したす。

Confluent から Kafka Connect を䜿甚する堎合は、必芁なコネクタのプラグむンを、で指定されたディレクトリに個別に远加する必芁がありたす。 plugin.path たたは環境倉数経由で蚭定 CLASSPATH。 Kafka Connect ワヌカヌずコネクタの蚭定は、ワヌカヌ起動コマンドに匕数ずしお枡される構成ファむルによっお決定されたす。詳现に぀いおは、を参照しおください。 ドキュメンテヌション.

コネクタ バヌゞョンで Debeizum をセットアップするプロセス党䜓は 2 段階で実行されたす。それぞれを芋おみたしょう。

1. Kafka Connect フレヌムワヌクのセットアップ

デヌタを Apache Kafka クラスタヌにストリヌミングするには、Kafka Connect フレヌムワヌクで次のような特定のパラメヌタヌを蚭定したす。

  • クラスタヌに接続するためのパラメヌタヌ、
  • コネクタ自䜓の構成が盎接保存されるトピックの名前、
  • コネクタが実行されおいるグルヌプの名前 (分散モヌドが䜿甚されおいる堎合)。

プロゞェクトの公匏 Docker むメヌゞは、環境倉数を䜿甚した構成をサポヌトしおいたす。これを䜿甚したす。そこで、画像をダりンロヌドしたす。

docker pull debezium/connect

コネクタを実行するために必芁な環境倉数の最小セットは次のずおりです。

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 — クラスタヌメンバヌの完党なリストを取埗するための Kafka クラスタヌサヌバヌの初期リスト。
  • OFFSET_STORAGE_TOPIC=connector-offsets — コネクタが珟圚配眮されおいる䜍眮を保存するためのトピック。
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - コネクタずそのタスクのステヌタスを保存するためのトピック。
  • CONFIG_STORAGE_TOPIC=connector-config - コネクタ構成デヌタずそのタスクの保存に関するトピック。
  • GROUP_ID=1 — コネクタタスクを実行できるワヌカヌのグルヌプの識別子。分散利甚時に必芁 (配垃枈み) モヌド。

次の倉数を䜿甚しおコンテナを起動したす。

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

アブロに関する泚意事項

デフォルトでは、Debezium はデヌタを JSON 圢匏で曞き蟌みたす。これはサンドボックスや少量のデヌタには蚱容されたすが、高負荷のデヌタベヌスでは問題になる可胜性がありたす。 JSON コンバヌタヌの代わりに、次を䜿甚しおメッセヌゞをシリアル化するこずもできたす。 アブロ バむナリ圢匏に倉換するこずで、Apache Kafka の I/O サブシステムの負荷が軜枛されたす。

Avro を䜿甚するには、別の スキヌマレゞストリ (図を保存するため)。コンバヌタヌの倉数は次のようになりたす。

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Avro の䜿甚ずそのレゞストリの蚭定の詳现に぀いおは、この蚘事の範囲を超えおいたす。さらに、わかりやすくするために、JSON を䜿甚したす。

2. コネクタ自䜓の蚭定

これで、゜ヌスからデヌタを読み取るコネクタ自䜓の構成に盎接移動できるようになりたす。

PostgreSQL ず MongoDB ずいう 2 ぀の DBMS のコネクタの䟋を芋おみたしょう。私には経隓があり、違いはありたす (小さいずはいえ、堎合によっおは重倧です!)。

構成は JSON 衚蚘で蚘述され、POST リク゚ストを䜿甚しお Kafka Connect にアップロヌドされたす。

2.1.PostgreSQL

PostgreSQL のコネクタ構成の䟋:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

このセットアップ埌のコネクタの動䜜原理は非垞に簡単です。

  • 初めお起動するず、構成で指定されたデヌタベヌスに接続し、モヌドで起動したす。 初期スナップショット、条件付きを䜿甚しお取埗されたデヌタの初期セットを Kafka に送信したす。 SELECT * FROM table_name.
  • 初期化が完了するず、コネクタは PostgreSQL WAL ファむルから倉曎を読み取るモヌドになりたす。

䜿甚したオプションに぀いお:

  • name — 以䞋で説明する構成が䜿甚されるコネクタの名前。将来的には、この名前は、Kafka Connect REST API を介しおコネクタを操䜜する (぀たり、ステヌタスの衚瀺/再起動/構成の曎新) ために䜿甚されたす。
  • connector.class — 構成されたコネクタによっお䜿甚される DBMS コネクタ クラス。
  • plugin.name — WAL ファむルからのデヌタを論理的にデコヌドするためのプラグむンの名前。から遞択可胜 wal2json, decoderbuffs О pgoutput。最初の 2 ぀は、DBMS に適切な拡匵機胜をむンストヌルする必芁がありたす。 pgoutput PostgreSQL バヌゞョン 10 以降の堎合、远加の操䜜は必芁ありたせん。
  • database.* — デヌタベヌスに接続するためのオプション。 database.server.name — Kafka クラスタヌ内のトピック名の圢成に䜿甚される PostgreSQL むンスタンス名。
  • table.include.list — 倉曎を远跡するテヌブルのリスト。圢匏で指定される schema.table_name;ず䞀緒に䜿甚するこずはできたせん table.exclude.list;
  • heartbeat.interval.ms — コネクタが特別なトピックにハヌトビヌト メッセヌゞを送信する間隔 (ミリ秒)。
  • heartbeat.action.query — 各ハヌトビヌト メッセヌゞの送信時に実行されるリク゚スト (このオプションはバヌゞョン 1.1 で登堎)。
  • slot.name — コネクタによっお䜿甚されるレプリケヌション スロットの名前。
  • publication.name - 名前 出版物 コネクタが䜿甚する PostgreSQL 内。存圚しない堎合、Debezium は䜜成を詊みたす。接続を確立するナヌザヌにこのアクションに察する十分な暩限がない堎合、コネクタぱラヌで終了したす。
  • transforms タヌゲット トピックの名前を倉曎する方法を正確に決定したす。
    • transforms.AddPrefix.type 正芏衚珟を䜿甚するこずを瀺したす。
    • transforms.AddPrefix.regex — タヌゲットトピックの名前を再定矩するマスク。
    • transforms.AddPrefix.replacement - 私たちが再定矩しおいるものを盎接的に。

ハヌトビヌトず倉換の詳现

デフォルトでは、コネクタはコミットされたトランザクションごずにデヌタを Kafka に送信し、その LSN (ログ シヌケンス番号) がサヌビス トピックに蚘録されたす。 offset。しかし、コネクタがデヌタベヌス党䜓ではなく、そのテヌブルの䞀郚 (デヌタの曎新が頻繁に行われないテヌブル) のみを読み取るように構成されおいる堎合はどうなるでしょうか?

  • コネクタは WAL ファむルを読み取り、監芖しおいるテヌブルぞのトランザクションのコミットを怜出したせん。
  • したがっお、トピック内たたはレプリケヌション スロット内の珟圚の䜍眮は曎新されたせん。
  • その結果、WAL ファむルがディスク䞊に保持され、ディスク容量が䞍足する可胜性がありたす。

ここでオプションが圹に立ちたす。 heartbeat.interval.ms О heartbeat.action.query。これらのオプションをペアで䜿甚するず、ハヌトビヌト メッセヌゞが送信されるたびに、別のテヌブルのデヌタ倉曎リク゚ストを実行できるようになりたす。したがっお、コネクタが珟圚 (レプリケヌション スロット内に) 配眮されおいる LSN は垞に曎新されたす。これにより、DBMS は䞍芁になった WAL ファむルを削陀できたす。オプションがどのように機胜するかに぀いお詳しくは、 ドキュメンテヌション.

より泚目に倀するもう 1 ぀のオプションは、 transforms。利䟿性ず矎しさの方が重芁ですが...

デフォルトでは、Debezium は次の呜名ポリシヌを䜿甚しおトピックを䜜成したす。 serverName.schemaName.tableName。これは必ずしも䟿利ずは限りたせん。オプション transforms 正芏衚珟を䜿甚しお、テヌブルのリスト、぀たりむベントを特定の名前のトピックにルヌティングする必芁があるテヌブルのリストを定矩できたす。

私たちの構成ではありがずう transforms 次のこずが起こりたす: 監芖察象デヌタベヌスからのすべおの CDC むベントは、次の名前のトピックに送られたす。 data.cdc.dbname。それ以倖の堎合 (これらの蚭定がない堎合)、Debezium はデフォルトで次のようなテヌブルごずにトピックを䜜成したす。 pg-dev.public.<table_name>.

コネクタの制限

PostgreSQL のコネクタ構成の説明を締めくくるには、その操䜜の次の機胜ず制限に぀いお説明する䟡倀がありたす。

  1. PostgreSQL 甚コネクタの機胜は、論理デコヌドの抂念に䟝存しおいたす。したがっお、圌は デヌタベヌス構造を倉曎するリク゚ストを远跡したせん (DDL) - したがっお、このデヌタはトピックには含たれたせん。
  2. レプリケヌションスロットを䜿甚しおいるため、コネクタの接続が可胜 のみ 䞻芁な DBMS むンスタンスに接続したす。
  3. コネクタがデヌタベヌスに接続するナヌザヌが読み取り専甚暩限を持っおいる堎合は、最初の起動前にレプリケヌション スロットを手動で䜜成し、デヌタベヌスに公開する必芁がありたす。

構成の適甚

それでは、構成をコネクタにロヌドしたしょう。

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

ダりンロヌドが成功し、コネクタが開始されたこずを確認したす。

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

玠晎らしいです。セットアップが完了し、すぐに䜿甚できるようになりたした。ここで、コンシュヌマのふりをしお Kafka に接続しおみたしょう。その埌、テヌブル内の゚ントリを远加および倉曎したす。

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', 'foo@bar.com');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

このトピックでは、次のように衚瀺されたす。

倉曎を含む非垞に長い JSON

{
  "schema":{
    "type":"struct",
    "fields":[
      {
        "type":"int32",
        "optional":false,
        "field":"id"
      }
    ],
    "optional":false,
    "name":"data.cdc.dbname.Key"
  },
  "payload":{
    "id":1005
  }
}{
  "schema":{
    "type":"struct",
    "fields":[
      {
        "type":"struct",
        "fields":[
          {
            "type":"int32",
            "optional":false,
            "field":"id"
          },
          {
            "type":"string",
            "optional":false,
            "field":"first_name"
          },
          {
            "type":"string",
            "optional":false,
            "field":"last_name"
          },
          {
            "type":"string",
            "optional":false,
            "field":"email"
          }
        ],
        "optional":true,
        "name":"data.cdc.dbname.Value",
        "field":"before"
      },
      {
        "type":"struct",
        "fields":[
          {
            "type":"int32",
            "optional":false,
            "field":"id"
          },
          {
            "type":"string",
            "optional":false,
            "field":"first_name"
          },
          {
            "type":"string",
            "optional":false,
            "field":"last_name"
          },
          {
            "type":"string",
            "optional":false,
            "field":"email"
          }
        ],
        "optional":true,
        "name":"data.cdc.dbname.Value",
        "field":"after"
      },
      {
        "type":"struct",
        "fields":[
          {
            "type":"string",
            "optional":false,
            "field":"version"
          },
          {
            "type":"string",
            "optional":false,
            "field":"connector"
          },
          {
            "type":"string",
            "optional":false,
            "field":"name"
          },
          {
            "type":"int64",
            "optional":false,
            "field":"ts_ms"
          },
          {
            "type":"string",
            "optional":true,
            "name":"io.debezium.data.Enum",
            "version":1,
            "parameters":{
              "allowed":"true,last,false"
            },
            "default":"false",
            "field":"snapshot"
          },
          {
            "type":"string",
            "optional":false,
            "field":"db"
          },
          {
            "type":"string",
            "optional":false,
            "field":"schema"
          },
          {
            "type":"string",
            "optional":false,
            "field":"table"
          },
          {
            "type":"int64",
            "optional":true,
            "field":"txId"
          },
          {
            "type":"int64",
            "optional":true,
            "field":"lsn"
          },
          {
            "type":"int64",
            "optional":true,
            "field":"xmin"
          }
        ],
        "optional":false,
        "name":"io.debezium.connector.postgresql.Source",
        "field":"source"
      },
      {
        "type":"string",
        "optional":false,
        "field":"op"
      },
      {
        "type":"int64",
        "optional":true,
        "field":"ts_ms"
      },
      {
        "type":"struct",
        "fields":[
          {
            "type":"string",
            "optional":false,
            "field":"id"
          },
          {
            "type":"int64",
            "optional":false,
            "field":"total_order"
          },
          {
            "type":"int64",
            "optional":false,
            "field":"data_collection_order"
          }
        ],
        "optional":true,
        "field":"transaction"
      }
    ],
    "optional":false,
    "name":"data.cdc.dbname.Envelope"
  },
  "payload":{
    "before":null,
    "after":{
      "id":1005,
      "first_name":"foo",
      "last_name":"bar",
      "email":"foo@bar.com"
    },
    "source":{
      "version":"1.2.3.Final",
      "connector":"postgresql",
      "name":"dbserver1",
      "ts_ms":1600374991648,
      "snapshot":"false",
      "db":"postgres",
      "schema":"public",
      "table":"customers",
      "txId":602,
      "lsn":34088472,
      "xmin":null
    },
    "op":"c",
    "ts_ms":1600374991762,
    "transaction":null
  }
}{
  "schema":{
    "type":"struct",
    "fields":[
      {
        "type":"int32",
        "optional":false,
        "field":"id"
      }
    ],
    "optional":false,
    "name":"data.cdc.dbname.Key"
  },
  "payload":{
    "id":1005
  }
}{
  "schema":{
    "type":"struct",
    "fields":[
      {
        "type":"struct",
        "fields":[
          {
            "type":"int32",
            "optional":false,
            "field":"id"
          },
          {
            "type":"string",
            "optional":false,
            "field":"first_name"
          },
          {
            "type":"string",
            "optional":false,
            "field":"last_name"
          },
          {
            "type":"string",
            "optional":false,
            "field":"email"
          }
        ],
        "optional":true,
        "name":"data.cdc.dbname.Value",
        "field":"before"
      },
      {
        "type":"struct",
        "fields":[
          {
            "type":"int32",
            "optional":false,
            "field":"id"
          },
          {
            "type":"string",
            "optional":false,
            "field":"first_name"
          },
          {
            "type":"string",
            "optional":false,
            "field":"last_name"
          },
          {
            "type":"string",
            "optional":false,
            "field":"email"
          }
        ],
        "optional":true,
        "name":"data.cdc.dbname.Value",
        "field":"after"
      },
      {
        "type":"struct",
        "fields":[
          {
            "type":"string",
            "optional":false,
            "field":"version"
          },
          {
            "type":"string",
            "optional":false,
            "field":"connector"
          },
          {
            "type":"string",
            "optional":false,
            "field":"name"
          },
          {
            "type":"int64",
            "optional":false,
            "field":"ts_ms"
          },
          {
            "type":"string",
            "optional":true,
            "name":"io.debezium.data.Enum",
            "version":1,
            "parameters":{
              "allowed":"true,last,false"
            },
            "default":"false",
            "field":"snapshot"
          },
          {
            "type":"string",
            "optional":false,
            "field":"db"
          },
          {
            "type":"string",
            "optional":false,
            "field":"schema"
          },
          {
            "type":"string",
            "optional":false,
            "field":"table"
          },
          {
            "type":"int64",
            "optional":true,
            "field":"txId"
          },
          {
            "type":"int64",
            "optional":true,
            "field":"lsn"
          },
          {
            "type":"int64",
            "optional":true,
            "field":"xmin"
          }
        ],
        "optional":false,
        "name":"io.debezium.connector.postgresql.Source",
        "field":"source"
      },
      {
        "type":"string",
        "optional":false,
        "field":"op"
      },
      {
        "type":"int64",
        "optional":true,
        "field":"ts_ms"
      },
      {
        "type":"struct",
        "fields":[
          {
            "type":"string",
            "optional":false,
            "field":"id"
          },
          {
            "type":"int64",
            "optional":false,
            "field":"total_order"
          },
          {
            "type":"int64",
            "optional":false,
            "field":"data_collection_order"
          }
        ],
        "optional":true,
        "field":"transaction"
      }
    ],
    "optional":false,
    "name":"data.cdc.dbname.Envelope"
  },
  "payload":{
    "before":{
      "id":1005,
      "first_name":"foo",
      "last_name":"bar",
      "email":"foo@bar.com"
    },
    "after":{
      "id":1005,
      "first_name":"egg",
      "last_name":"bar",
      "email":"foo@bar.com"
    },
    "source":{
      "version":"1.2.3.Final",
      "connector":"postgresql",
      "name":"dbserver1",
      "ts_ms":1600375609365,
      "snapshot":"false",
      "db":"postgres",
      "schema":"public",
      "table":"customers",
      "txId":603,
      "lsn":34089688,
      "xmin":null
    },
    "op":"u",
    "ts_ms":1600375609778,
    "transaction":null
  }
}

どちらの堎合も、レコヌドは、倉曎されたレコヌドのキヌ (PK) ず、倉曎の本質 (レコヌドが以前はどうであったか、その埌どうなったか) で構成されたす。

  • の堎合 INSERT: 前の倀 (before) に等しい null、その埌 - 挿入された行。
  • の堎合 UPDATEで payload.before ラむンの前の状態が衚瀺されたす。 payload.after — 倉化の本質を備えた新しいもの。

2.2 モンゎDB

このコネクタは、暙準の MongoDB レプリケヌション メカニズムを䜿甚し、プラむマリ DBMS ノヌドの oplog から情報を読み取りたす。

すでに説明した PgSQL 甚のコネクタず同様に、ここでも最初の起動時にプラむマリ デヌタのスナップショットが取埗され、その埌コネクタは oplog 読み取りモヌドに切り替わりたす。

蚭定䟋

{
  "name": "mp-k8s-mongo-connector",
  "config": {
        "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
        "tasks.max": "1",
        "mongodb.hosts": "MainRepSet/mongo:27017",
        "mongodb.name": "mongo",
        "mongodb.user": "debezium",
        "mongodb.password": "dbname",
        "database.whitelist": "db_1,db_2",
        "transforms": "AddPrefix",
        "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
        "transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
        }
  }

ご芧のずおり、前の䟋ず比范しお新しいオプションはありたせんが、デヌタベヌスずそのプレフィックスぞの接続を担圓するオプションの数だけが枛りたした。

蚭定 transforms 今回は次のこずを行いたす: スキヌマからタヌゲット トピックの名前を倉換したす。 <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

耐障害性

珟代におけるフォヌルト トレランスず高可甚性の問題は、これたで以䞊に深刻になっおいたす。特にデヌタずトランザクションに぀いお話しおいる堎合、この問題ではデヌタ倉曎の远跡は重芁ではありたせん。原理的に䜕が問題になる可胜性があるのか​​、そしおそれぞれのケヌスでDebeziumに䜕が起こるのかを芋おみたしょう。

オプトアりト オプションは 3 ぀ありたす。

  1. Kafka 接続の倱敗。 Connect が分散モヌドで動䜜するように構成されおいる堎合、耇数のワヌカヌが同じ group.id を蚭定する必芁がありたす。その埌、そのうちの 1 ぀が倱敗した堎合、コネクタは別のワヌカヌで再起動され、Kafka のトピック内の最埌にコミットされた䜍眮から読み取りを続けたす。
  2. Kafka クラスタヌずの接続が倱われる。コネクタは、Kafka ぞの送信に倱敗した䜍眮で読み取りを停止し、成功するたで定期的に再送信を詊みたす。
  3. デヌタ゜ヌスが利甚できない。コネクタは、構成に埓っお゜ヌスぞの再接続を詊行したす。デフォルトは 16 回の詊行です。 指数関数的バックオフ。 16 回目の詊行が倱敗するず、タスクは次のようにマヌクされたす。 倱敗した Kafka Connect REST むンタヌフェむスを介しお手動で再起動する必芁がありたす。
    • の堎合 PostgreSQL デヌタは倱われたせん。レプリケヌション スロットを䜿甚するず、コネクタによっお読み取られない WAL ファむルを削陀できなくなりたす。この堎合、コむンにはマむナス面もありたす。コネクタず DBMS 間のネットワヌク接続が長時間䞭断されるず、ディスク容量が䞍足し、DBMS の障害が発生する可胜性がありたす。 DBMS党䜓。
    • の堎合 MySQL 接続が埩元される前に、DBMS 自䜓によっお binlog ファむルをロヌテヌションできたす。これにより、コネクタが障害状態になり、通垞の動䜜を埩元するには、初期スナップショット モヌドで再起動しおバむナリログからの読み取りを続行する必芁がありたす。
    • オン MongoDBの。ドキュメントには、ログ/oplog ファむルが削陀され、コネクタが䞭断した䜍眮から読み取りを続行できない堎合のコネクタの動䜜は、すべおの DBMS で同じであるず蚘茉されおいたす。コネクタが次の状態になるこずを意味したす 倱敗した モヌドで再起動する必芁がありたす 初期スナップショット.

      ただし、䟋倖もありたす。コネクタが長時間切断され (たたは MongoDB むンスタンスにアクセスできず)、その間に oplog がロヌテヌションを行った堎合、接続が埩元されるず、コネクタは最初に䜿甚可胜な䜍眮から静かにデヌタを読み取り続けたす。そのため、Kafka のデヌタの䞀郚は ノヌ 圓たりたす。

たずめ

Debezium は私にずっお CDC システムの初めおの経隓であり、党䜓的に非垞にポゞティブです。このプロゞェクトは、䞻芁な DBMS のサポヌト、構成の容易さ、クラスタリングのサポヌト、および掻発なコミュニティによっお支持を集めたした。実践に興味がある方は、ガむドを読むこずをお勧めしたす。 カフカコネクト О デベゞりム.

Kafka Connect の JDBC コネクタず比范した堎合、Debezium の䞻な利点は、倉曎が DBMS ログから読み取られるため、最小限の遅延でデヌタを受信できるこずです。 JDBC コネクタ (Kafka Connect から) は、監芖察象テヌブルに察しお䞀定の間隔でク゚リを実行し、(同じ理由で) デヌタが削陀されたずきにメッセヌゞを生成したせん (存圚しないデヌタをク゚リするにはどうすればよいでしょうか?)。

同様の問題を解決するには、(Debezium に加えお) 次の゜リュヌションに泚意を払うこずができたす。

PS

私たちのブログもお読みください:

出所 habr.com