Ներկայացնում ենք Debezium - CDC-ն Apache Kafka-ի համար

Ներկայացնում ենք Debezium - CDC-ն Apache Kafka-ի համար

Իմ աշխատանքում հաճախ եմ հանդիպում նոր տեխնիկական լուծումների/ծրագրային արտադրանքների, որոնց մասին տեղեկատվությունը բավականին սակավ է ռուսալեզու ինտերնետում։ Այս հոդվածով ես կփորձեմ լրացնել նման բացը իմ վերջին պրակտիկայի օրինակով, երբ ինձ անհրաժեշտ էր կարգավորել CDC իրադարձությունների ուղարկումը երկու հանրաճանաչ DBMS-ներից (PostgreSQL և MongoDB) դեպի Kafka կլաստեր՝ օգտագործելով Debezium: Հուսով եմ, որ այս վերանայման հոդվածը, որը հայտնվել է կատարված աշխատանքի արդյունքում, օգտակար կլինի մյուսների համար:

Ի՞նչ է Debezium-ը և CDC-ն ընդհանրապես:

Դեբեզիում - CDC ծրագրային ապահովման կատեգորիայի ներկայացուցիչ (Լուսանկարել տվյալների փոփոխությունը), կամ ավելի ճիշտ՝ դա տարբեր DBMS-ների համար միակցիչների մի շարք է, որոնք համատեղելի են Apache Kafka Connect շրջանակի հետ։

այս բաց կոդով նախագիծ, լիցենզավորված Apache License v2.0-ի համաձայն և հովանավորվում է Red Hat-ի կողմից: Մշակումն իրականացվում է 2016 թվականից և այս պահին այն պաշտոնական աջակցություն է տրամադրում հետևյալ DBMS-ներին՝ MySQL, PostgreSQL, MongoDB, SQL Server: Կան նաև Cassandra-ի և Oracle-ի միակցիչներ, բայց դրանք ներկայումս գտնվում են «վաղ մուտքի» կարգավիճակում, և նոր թողարկումները չեն երաշխավորում հետամնաց համատեղելիությունը:

Եթե ​​մենք համեմատում ենք CDC-ն ավանդական մոտեցման հետ (երբ հավելվածն ուղղակիորեն կարդում է տվյալները DBMS-ից), ապա դրա հիմնական առավելությունները ներառում են տվյալների փոփոխության հոսքի իրականացումը տողերի մակարդակում՝ ցածր ուշացումով, բարձր հուսալիությամբ և հասանելիությամբ: Վերջին երկու կետերը ձեռք են բերվում Կաֆկա կլաստերի օգտագործմամբ որպես CDC իրադարձությունների պահոց:

Նաև առավելությունները ներառում են այն փաստը, որ իրադարձությունները պահելու համար օգտագործվում է մեկ մոդել, ուստի վերջնական հավելվածը չպետք է անհանգստանա տարբեր DBMS-ների գործարկման նրբությունների մասին:

Վերջապես, հաղորդագրության բրոքերի օգտագործումը բացում է տվյալների փոփոխությանը հետևող հավելվածների հորիզոնական մասշտաբի շրջանակը: Միևնույն ժամանակ, տվյալների աղբյուրի վրա ազդեցությունը նվազագույնի է հասցվում, քանի որ տվյալները ստացվում են ոչ թե ուղղակիորեն DBMS-ից, այլ Կաֆկա կլաստերից:

Debezium ճարտարապետության մասին

Debezium-ի օգտագործումը հանգում է այս պարզ սխեմայի.

DBMS (որպես տվյալների աղբյուր) → միակցիչ Kafka Connect-ում → Apache Kafka → սպառող

Որպես օրինակ, ես կտամ դիագրամ նախագծի կայքից.

Ներկայացնում ենք Debezium - CDC-ն Apache Kafka-ի համար

Այնուամենայնիվ, ես իսկապես չեմ սիրում այս սխեման, քանի որ թվում է, որ հնարավոր է միայն լվացարանի միակցիչ:

Իրականում իրավիճակն այլ է՝ լցնել ձեր Տվյալների լիճը (վերջին հղումը վերևի գծապատկերում) Debezium-ի օգտագործման միակ միջոցը չէ: Apache Kafka-ին ուղարկված իրադարձությունները կարող են օգտագործվել ձեր հավելվածների կողմից՝ տարբեր իրավիճակներ լուծելու համար: Օրինակ:

  • քեշից անհամապատասխան տվյալների հեռացում;
  • ծանուցումներ ուղարկելը;
  • որոնման ինդեքսի թարմացումներ;
  • որոշակի տեսակի աուդիտի տեղեկամատյաններ;
  • ...

Այն դեպքում, երբ դուք ունեք Java հավելված, և չկան Կաֆկա կլաստեր օգտագործելու անհրաժեշտություն/հնարավորություն, կա նաև աշխատելու հնարավորություն. ներկառուցված միակցիչ. Ակնհայտ պլյուսն այն է, որ դրա հետ դուք կարող եք հրաժարվել լրացուցիչ ենթակառուցվածքից (միակցիչի և Կաֆկայի տեսքով): Այնուամենայնիվ, այս լուծումը հնացել է 1.1 տարբերակից և այլևս խորհուրդ չի տրվում օգտագործել (այն կարող է հեռացվել հետագա թողարկումներում):

Այս հոդվածը կքննարկի մշակողների կողմից առաջարկվող ճարտարապետությունը, որն ապահովում է սխալների հանդուրժողականություն և մասշտաբայնություն:

Միակցիչի կոնֆիգուրացիա

Որպեսզի սկսենք հետևել ամենակարևոր արժեքի՝ տվյալների փոփոխություններին, մեզ անհրաժեշտ է.

  1. տվյալների աղբյուր, որը կարող է լինել MySQL՝ սկսած 5.7 տարբերակից, PostgreSQL 9.6+, MongoDB 3.2+ (լրիվ ցանկը);
  2. Ապաչի Կաֆկա կլաստեր
  3. Kafka Connect օրինակ (տարբերակներ 1.x, 2.x);
  4. կազմաձևված Debezium միակցիչ:

Աշխատեք առաջին երկու կետերի վրա, այսինքն. DBMS-ի և Apache Kafka-ի տեղադրման գործընթացը դուրս է հոդվածի շրջանակներից: Այնուամենայնիվ, նրանց համար, ովքեր ցանկանում են ամեն ինչ տեղակայել ավազատուփում, պաշտոնական պահոցում կա պատրաստի օրինակներ. docker-compose.yaml.

Մենք ավելի մանրամասն կկենտրոնանանք վերջին երկու կետերի վրա:

0. Kafka Connect

Այստեղ և ավելի ուշ հոդվածում, բոլոր կազմաձևման օրինակները դիտարկվում են Debezium-ի մշակողների կողմից տարածված Docker պատկերի համատեքստում: Այն պարունակում է բոլոր անհրաժեշտ plugin ֆայլերը (միակցիչներ) և ապահովում է Kafka Connect-ի կոնֆիգուրացիան՝ օգտագործելով շրջակա միջավայրի փոփոխականները:

Եթե ​​դուք մտադիր եք օգտագործել Kafka Connect-ը Confluent-ից, ապա ձեզ անհրաժեշտ կլինի ինքներդ ավելացնել անհրաժեշտ միակցիչների հավելումները նշված գրացուցակում: plugin.path կամ սահմանել շրջակա միջավայրի փոփոխականի միջոցով CLASSPATH. Kafka Connect աշխատողի և միակցիչների կարգավորումները սահմանվում են կազմաձևման ֆայլերի միջոցով, որոնք փոխանցվում են որպես արգումենտ աշխատանքային մեկնարկի հրամանին: Մանրամասների համար տես փաստաթղթավորում.

Debeizum-ի միակցիչի տարբերակում տեղադրելու ողջ գործընթացը իրականացվում է երկու փուլով. Դիտարկենք դրանցից յուրաքանչյուրը.

1. Kafka Connect շրջանակի կարգավորում

Apache Kafka կլաստերին տվյալների հոսքի համար Kafka Connect շրջանակում սահմանվում են հատուկ պարամետրեր, ինչպիսիք են՝

  • կլաստերի միացման կարգավորումներ,
  • թեմաների անուններ, որոնցում կպահվի միակցիչի կոնֆիգուրացիան,
  • խմբի անվանումը, որում աշխատում է միակցիչը (բաշխված ռեժիմի օգտագործման դեպքում):

Ծրագրի պաշտոնական Docker պատկերն աջակցում է կոնֆիգուրացիան՝ օգտագործելով շրջակա միջավայրի փոփոխականները. սա այն է, ինչ մենք կօգտագործենք: Այսպիսով, եկեք ներբեռնենք պատկերը.

docker pull debezium/connect

Միակցիչի գործարկման համար անհրաժեշտ շրջակա միջավայրի փոփոխականների նվազագույն փաթեթը հետևյալն է.

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - Կաֆկա կլաստերի սերվերների սկզբնական ցուցակը՝ կլաստերի անդամների ամբողջական ցուցակը ստանալու համար.
  • OFFSET_STORAGE_TOPIC=connector-offsets — թեմա պահելու այն դիրքերը, որտեղ ներկայումս գտնվում է միակցիչը.
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - միակցիչի կարգավիճակը և դրա առաջադրանքները պահելու թեմա.
  • CONFIG_STORAGE_TOPIC=connector-config - միակցիչի կազմաձևման տվյալները և դրա առաջադրանքները պահելու թեմա.
  • GROUP_ID=1 - աշխատողների խմբի նույնացուցիչը, որի վրա կարող է կատարվել միակցիչի առաջադրանքը. պահանջվում է բաշխված օգտագործելիս (բաշխված) ռեժիմ.

Մենք սկսում ենք կոնտեյները այս փոփոխականներով.

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Նշում Ավրոյի մասին

Լռելյայնորեն, Debezium-ը գրում է տվյալները JSON ձևաչափով, ինչը ընդունելի է ավազարկղերի և փոքր քանակությամբ տվյալների համար, բայց կարող է խնդիր լինել խիստ բեռնված տվյալների բազաներում: JSON փոխարկիչի այլընտրանքը հաղորդագրությունների սերիականացումն է՝ օգտագործելով Avro երկուական ձևաչափով, որը նվազեցնում է I/O ենթահամակարգի բեռը Apache Kafka-ում:

Avro-ն օգտագործելու համար հարկավոր է առանձին տեղակայել սխեմա-գրանցամատյան (սխեմաների պահպանման համար): Փոխարկիչի փոփոխականները այսպիսի տեսք կունենան.

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Avro-ն օգտագործելու և դրա համար ռեեստրի ստեղծման վերաբերյալ մանրամասները դուրս են հոդվածի շրջանակներից. ավելին, պարզության համար մենք կօգտագործենք JSON:

2. Միակցիչի ինքնին կարգավորումը

Այժմ դուք կարող եք ուղղակիորեն գնալ հենց միակցիչի կազմաձևմանը, որը կկարդա տվյալները աղբյուրից:

Եկեք նայենք երկու DBMS-ի միակցիչների օրինակին՝ PostgreSQL և MongoDB, որոնց համար ես փորձ ունեմ և որոնց համար կան տարբերություններ (թեև փոքր, բայց որոշ դեպքերում նշանակալի):

Կազմաձևը նկարագրված է JSON նշումով և վերբեռնվում է Kafka Connect՝ օգտագործելով POST հարցումը:

2.1. PostgreSQL

Միակցիչի կազմաձևման օրինակ PostgreSQL-ի համար.

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Այս կոնֆիգուրացիայից հետո միակցիչի շահագործման սկզբունքը բավականին պարզ է.

  • Առաջին մեկնարկի ժամանակ այն միանում է կոնֆիգուրացիայի մեջ նշված տվյալների բազային և սկսում ռեժիմում նախնական նկարը, Կաֆկային ուղարկելով պայմանականով ստացված տվյալների նախնական փաթեթը SELECT * FROM table_name.
  • Նախաստորագրման ավարտից հետո միակցիչը մտնում է PostgreSQL WAL ֆայլերից փոփոխությունների ընթերցման ռեժիմ:

Օգտագործված տարբերակների մասին.

  • name — միակցիչի անվանումը, որի համար օգտագործվում է ստորև նկարագրված կոնֆիգուրացիան. Հետագայում այս անունը օգտագործվում է միակցիչի հետ աշխատելու համար (այսինքն՝ դիտել կարգավիճակը / վերագործարկել / թարմացնել կազմաձևումը) Kafka Connect REST API-ի միջոցով;
  • connector.class — DBMS միակցիչի դասը, որը կօգտագործվի կազմաձևված միակցիչի կողմից.
  • plugin.name WAL ֆայլերից տվյալների տրամաբանական վերծանման հավելյալի անվանումն է: Հասանելի է ընտրելու համար wal2json, decoderbuffs и pgoutput. Առաջին երկուսը պահանջում են DBMS-ում համապատասխան ընդլայնումների տեղադրում, և pgoutput PostgreSQL 10 և ավելի բարձր տարբերակի համար չի պահանջում լրացուցիչ մանիպուլյացիաներ.
  • database.* — տվյալների շտեմարանին միանալու տարբերակներ, որտեղ database.server.name - PostgreSQL օրինակի անվանումը, որն օգտագործվում է Կաֆկա կլաստերում թեմայի անվանումը ձևավորելու համար.
  • table.include.list - աղյուսակների ցանկ, որոնցում մենք ցանկանում ենք հետևել փոփոխություններին. տրված ձևաչափով schema.table_name; հետ չի կարող օգտագործվել table.exclude.list;
  • heartbeat.interval.ms — ընդմիջում (միլիվայրկյաններով), որով միակցիչը սրտի բաբախման հաղորդագրություններ է ուղարկում հատուկ թեմայի.
  • heartbeat.action.query - հարցում, որը կկատարվի սրտի զարկերի յուրաքանչյուր հաղորդագրություն ուղարկելիս (տարբերակը հայտնվել է 1.1 տարբերակից);
  • slot.name - կրկնօրինակման անցքի անվանումը, որը կօգտագործվի միակցիչի կողմից.
  • publication.name - Անուն Հրատարակություն PostgreSQL-ում, որն օգտագործում է միակցիչը: Այն չգոյության դեպքում Debezium-ը կփորձի ստեղծել այն։ Եթե ​​օգտատերը, որի հետ կապը հաստատված է, այս գործողության համար բավարար իրավունքներ չունի, միակցիչը սխալմամբ դուրս կգա.
  • transforms որոշում է, թե կոնկրետ ինչպես փոխել թիրախային թեմայի անունը.
    • transforms.AddPrefix.type ցույց է տալիս, որ մենք կօգտագործենք կանոնավոր արտահայտություններ.
    • transforms.AddPrefix.regex — դիմակ, որով վերասահմանվում է թիրախային թեմայի անվանումը.
    • transforms.AddPrefix.replacement - ուղղակիորեն այն, ինչ մենք վերասահմանում ենք:

Ավելին սրտի բաբախման և փոխակերպումների մասին

Լռելյայնորեն, միակցիչը տվյալներ է ուղարկում Kafka-ին յուրաքանչյուր կատարված գործարքի համար և գրում է իր LSN-ը (Log Sequence Number) ծառայության թեմայում: offset. Բայց ի՞նչ կլինի, եթե միակցիչը կարգավորվի այնպես, որ կարդա ոչ թե ամբողջ տվյալների բազան, այլ նրա աղյուսակների միայն մի մասը (որոնցում տվյալները հազվադեպ են թարմացվում):

  • Միակցիչը կկարդա WAL ֆայլերը և չի հայտնաբերի դրանցում կատարված գործարքների պարտավորությունները այն աղյուսակների նկատմամբ, որոնք վերահսկում է:
  • Հետևաբար, այն չի թարմացնի իր ներկայիս դիրքը ոչ թեմայում, ոչ էլ կրկնօրինակման բնիկում:
  • Սա, իր հերթին, կհանգեցնի նրան, որ WAL ֆայլերը «կպչեն» սկավառակի վրա և, հավանաբար, կսպառվեն սկավառակի տարածքը:

Եվ ահա տարբերակները օգնության են հասնում։ heartbeat.interval.ms и heartbeat.action.query. Այս ընտրանքների զույգերով օգտագործումը հնարավորություն է տալիս յուրաքանչյուր անգամ սրտի բաբախման հաղորդագրություն ուղարկելիս առանձին աղյուսակում տվյալները փոխելու հարցում կատարել: Այսպիսով, LSN-ը, որի վրա ներկայումս գտնվում է միակցիչը (կրկնօրինակման բնիկում) անընդհատ թարմացվում է: Սա թույլ է տալիս DBMS-ին հեռացնել WAL ֆայլերը, որոնք այլևս անհրաժեշտ չեն: Լրացուցիչ տեղեկությունների համար, թե ինչպես են աշխատում ընտրանքները, տե՛ս փաստաթղթավորում.

Մեկ այլ տարբերակ, որն ավելի մեծ ուշադրության է արժանի transforms. Չնայած դա ավելի շատ հարմարության և գեղեցկության մասին է...

Լռելյայնորեն, Debezium-ը թեմաներ է ստեղծում՝ օգտագործելով հետևյալ անվանման քաղաքականությունը. serverName.schemaName.tableName. Սա միշտ չէ, որ հարմար է: Ընտրանքներ transforms օգտագործելով կանոնավոր արտահայտություններ, դուք կարող եք սահմանել աղյուսակների ցանկ, որոնց իրադարձությունները պետք է ուղղորդվեն կոնկրետ անունով թեմա:

Մեր կազմաձևում շնորհիվ transforms Հետևյալ տվյալների բազայից CDC-ի բոլոր իրադարձությունները կանցնեն անվան հետ կապված թեմային data.cdc.dbname. Հակառակ դեպքում (առանց այս կարգավորումների), Debezium-ը լռելյայնորեն թեմա կստեղծեր ձևի յուրաքանչյուր աղյուսակի համար. pg-dev.public.<table_name>.

Միակցիչի սահմանափակումները

PostgreSQL-ի միակցիչի կազմաձևման նկարագրության վերջում արժե խոսել դրա աշխատանքի հետևյալ հատկանիշների / սահմանափակումների մասին.

  1. PostgreSQL-ի միակցիչի գործառույթը հիմնված է տրամաբանական վերծանման հայեցակարգի վրա: Ուստի նա չի հետևում տվյալների բազայի կառուցվածքը փոխելու հարցումներին (DDL) - համապատասխանաբար, այս տվյալները թեմաներում չեն լինի:
  2. Քանի որ օգտագործվում են կրկնօրինակման անցքեր, միակցիչի միացումը հնարավոր է միայն գլխավոր DBMS օրինակին:
  3. Եթե ​​օգտագործողը, որի միջոցով միակցիչը միանում է տվյալների շտեմարանին, ունի միայն կարդալու իրավունքներ, ապա նախքան առաջին գործարկումը, դուք պետք է ձեռքով ստեղծեք կրկնօրինակման բնիկ և հրապարակեք տվյալների բազայում:

Կիրառելով կոնֆիգուրացիա

Այսպիսով, եկեք բեռնենք մեր կոնֆիգուրացիան միակցիչի մեջ.

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Մենք ստուգում ենք, որ ներբեռնումը հաջող է, և միակցիչը սկսվել է.

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Հիանալի է. այն ստեղծվել է և պատրաստ է գնալու: Հիմա եկեք ձևացնենք սպառող և միանանք Կաֆկային, որից հետո աղյուսակում ավելացնենք և փոխենք գրառումը.

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Մեր թեմայում սա կցուցադրվի հետևյալ կերպ.

Շատ երկար JSON՝ մեր փոփոխություններով

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Երկու դեպքում էլ գրառումները բաղկացած են փոփոխված ձայնագրության բանալին (PK) և փոփոխությունների բուն էությունից.

  • Դեպքում INSERT: արժեքը նախկինում (before) հավասար է nullհաջորդում է լարը, որը տեղադրվել է:
  • Դեպքում UPDATE: ներ payload.before ցուցադրվում է տողի նախորդ վիճակը, և in payload.after - նոր՝ փոփոխության էությամբ:

2.2 MongoDB

Այս միակցիչը օգտագործում է ստանդարտ MongoDB վերարտադրման մեխանիզմը՝ կարդալով տեղեկատվություն DBMS-ի առաջնային հանգույցի oplog-ից:

PgSQL-ի համար արդեն նկարագրված միակցիչի նման, այստեղ նույնպես, առաջին մեկնարկի ժամանակ, վերցվում է տվյալների առաջնային նկարը, որից հետո միակցիչն անցնում է օպլոգի ընթերցման ռեժիմին:

Կազմաձևման օրինակ.

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Ինչպես տեսնում եք, նախորդ օրինակի համեմատ նոր տարբերակներ չկան, այլ միայն տվյալների բազային միանալու համար պատասխանատու ընտրանքների և դրանց նախածանցների թիվը կրճատվել է։

Կառավարում transforms այս անգամ նրանք անում են հետևյալը. սխեմայից անջատում են թիրախային թեմայի անվանումը <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

սխալների հանդուրժողականություն

Սխալների հանդուրժողականության և բարձր հասանելիության հարցը մեր ժամանակներում ավելի սուր է, քան երբևէ, հատկապես, երբ մենք խոսում ենք տվյալների և գործարքների մասին, և տվյալների փոփոխության հետագծումը այս հարցում կողքից չէ: Եկեք նայենք, թե ինչ կարող է սխալ լինել սկզբունքորեն, և ինչ կլինի Դեբեզիումի հետ յուրաքանչյուր դեպքում:

Չեղարկման երեք տարբերակ կա.

  1. Kafka Connect-ի ձախողում. Եթե ​​Connect-ը կազմաձևված է բաշխված ռեժիմով աշխատելու համար, ապա դա պահանջում է մի քանի աշխատողների՝ նույն group.id-ը սահմանելու համար: Այնուհետև, եթե դրանցից մեկը ձախողվի, միակցիչը կվերագործարկվի մյուս աշխատողի վրա և կշարունակի կարդալ Կաֆկայի թեմայի վերջին պարտավորված դիրքից:
  2. Կաֆկա կլաստերի հետ կապի կորուստ. Միակցիչը պարզապես կդադարի կարդալ այն դիրքում, որը չկարողացավ ուղարկել Կաֆկային և պարբերաբար կփորձի այն նորից ուղարկել, մինչև փորձը հաջողվի:
  3. Տվյալների աղբյուրն անհասանելի է. Միակցիչը կփորձի նորից միանալ աղբյուրին ըստ կազմաձևման: Լռելյայն օգտագործումը 16 փորձ է էքսպոնենցիալ հետընթաց. 16-րդ անհաջող փորձից հետո առաջադրանքը կնշվի որպես տապալվեց և այն պետք է ձեռքով վերագործարկվի Kafka Connect REST ինտերֆեյսի միջոցով:
    • Դեպքում PostgreSQL տվյալները չեն կորչի, քանի որ կրկնօրինակման սլոտների օգտագործումը կկանխի միակցիչի կողմից չընթերցված WAL ֆայլերի ջնջումը: Այս դեպքում կա մի բացասական կողմ. եթե միակցիչի և DBMS-ի միջև ցանցային կապը երկար ժամանակ խաթարվում է, կա հավանականություն, որ սկավառակի տարածքը կսպառվի, և դա կարող է հանգեցնել ամբողջ DBMS-ի ձախողման:
    • Դեպքում MySQL binlog ֆայլերը կարող են պտտվել հենց DBMS-ի կողմից, նախքան կապի վերականգնումը: Սա կհանգեցնի, որ միակցիչը կմտնի ձախողված վիճակ, և այն պետք է վերագործարկվի սկզբնական պատկերի ռեժիմում, որպեսզի շարունակի կարդալ բինլոգներից՝ նորմալ աշխատանքը վերականգնելու համար:
    • Մոտ MongoDB- ը. Փաստաթղթում ասվում է. միակցիչի վարքագիծը այն դեպքում, երբ log/oplog ֆայլերը ջնջվել են, և միակցիչը չի կարող շարունակել կարդալ այն դիրքից, որտեղ այն դադարեցվել է, նույնն է բոլոր DBMS-ների համար: Դա կայանում է նրանում, որ միակցիչը կմտնի վիճակ տապալվեց և կպահանջի ռեժիմում վերագործարկել նախնական նկարը.

      Այնուամենայնիվ, կան բացառություններ. Եթե ​​միակցիչը երկար ժամանակ եղել է անջատված վիճակում (կամ չի կարողացել հասնել MongoDB օրինակին), և oplog-ը պտտվել է այս ընթացքում, ապա երբ կապը վերականգնվի, միակցիչը հանգիստ կշարունակի կարդալ տվյալները առաջին հասանելի դիրքից։ , որի պատճառով որոշ տվյալներ Կաֆկայում ոչ կհարվածի.

Ամփոփում

Debezium-ն իմ առաջին փորձն է CDC համակարգերի հետ և ընդհանուր առմամբ շատ դրական է եղել: Ծրագիրը կաշառեց հիմնական DBMS-ի աջակցությունը, կոնֆիգուրացիայի հեշտությունը, կլաստերի աջակցումը և ակտիվ համայնքը: Նրանց համար, ովքեր հետաքրքրված են գործնականում, խորհուրդ եմ տալիս կարդալ ուղեցույցները Kafka Connect и Դեբեզիում.

Kafka Connect-ի JDBC միակցիչի համեմատ՝ Debezium-ի հիմնական առավելությունն այն է, որ փոփոխությունները կարդացվում են DBMS տեղեկամատյաններից, ինչը թույլ է տալիս ստանալ տվյալները նվազագույն ուշացումով: JDBC միակցիչը (տրամադրված է Kafka Connect-ի կողմից) ֆիքսված ընդմիջումով հարցումներ է կատարում հետագծվող աղյուսակին և (նույն պատճառով) հաղորդագրություններ չի առաջացնում, երբ տվյալները ջնջվում են (ինչպե՞ս կարող եք հարցումներ կատարել այնտեղ չկան տվյալների համար):

Նմանատիպ խնդիրներ լուծելու համար կարող եք ուշադրություն դարձնել հետևյալ լուծումներին (բացի Debezium-ից).

PS

Կարդացեք նաև մեր բլոգում.

Source: www.habr.com

Добавить комментарий