
Իմ աշխատանքում հաճախ եմ հանդիպում նոր տեխնիկական լուծումների/ծրագրային արտադրանքների, որոնց մասին տեղեկատվությունը բավականին սակավ է ռուսալեզու ինտերնետում։ Այս հոդվածով ես կփորձեմ լրացնել նման բացը իմ վերջին պրակտիկայի օրինակով, երբ ինձ անհրաժեշտ էր կարգավորել CDC իրադարձությունների ուղարկումը երկու հանրաճանաչ DBMS-ներից (PostgreSQL և MongoDB) դեպի Kafka կլաստեր՝ օգտագործելով Debezium: Հուսով եմ, որ այս վերանայման հոդվածը, որը հայտնվել է կատարված աշխատանքի արդյունքում, օգտակար կլինի մյուսների համար:
Ի՞նչ է Debezium-ը և CDC-ն ընդհանրապես:
- CDC ծրագրային ապահովման կատեգորիայի ներկայացուցիչ (), կամ ավելի ճիշտ՝ դա տարբեր DBMS-ների համար միակցիչների մի շարք է, որոնք համատեղելի են Apache Kafka Connect շրջանակի հետ։
այս լիցենզավորված Apache License v2.0-ի համաձայն և հովանավորվում է Red Hat-ի կողմից: Մշակումն իրականացվում է 2016 թվականից և այս պահին այն պաշտոնական աջակցություն է տրամադրում հետևյալ DBMS-ներին՝ MySQL, PostgreSQL, MongoDB, SQL Server: Կան նաև Cassandra-ի և Oracle-ի միակցիչներ, բայց դրանք ներկայումս գտնվում են «վաղ մուտքի» կարգավիճակում, և նոր թողարկումները չեն երաշխավորում հետամնաց համատեղելիությունը:
Եթե մենք համեմատում ենք CDC-ն ավանդական մոտեցման հետ (երբ հավելվածն ուղղակիորեն կարդում է տվյալները DBMS-ից), ապա դրա հիմնական առավելությունները ներառում են տվյալների փոփոխության հոսքի իրականացումը տողերի մակարդակում՝ ցածր ուշացումով, բարձր հուսալիությամբ և հասանելիությամբ: Վերջին երկու կետերը ձեռք են բերվում Կաֆկա կլաստերի օգտագործմամբ որպես CDC իրադարձությունների պահոց:
Նաև առավելությունները ներառում են այն փաստը, որ իրադարձությունները պահելու համար օգտագործվում է մեկ մոդել, ուստի վերջնական հավելվածը չպետք է անհանգստանա տարբեր DBMS-ների գործարկման նրբությունների մասին:
Վերջապես, հաղորդագրության բրոքերի օգտագործումը բացում է տվյալների փոփոխությանը հետևող հավելվածների հորիզոնական մասշտաբի շրջանակը: Միևնույն ժամանակ, տվյալների աղբյուրի վրա ազդեցությունը նվազագույնի է հասցվում, քանի որ տվյալները ստացվում են ոչ թե ուղղակիորեն DBMS-ից, այլ Կաֆկա կլաստերից:
Debezium ճարտարապետության մասին
Debezium-ի օգտագործումը հանգում է այս պարզ սխեմայի.
DBMS (որպես տվյալների աղբյուր) → միակցիչ Kafka Connect-ում → Apache Kafka → սպառող
Որպես օրինակ, ես կտամ դիագրամ նախագծի կայքից.

Այնուամենայնիվ, ես իսկապես չեմ սիրում այս սխեման, քանի որ թվում է, որ հնարավոր է միայն լվացարանի միակցիչ:
Իրականում իրավիճակն այլ է՝ լցնել ձեր Տվյալների լիճը (վերջին հղումը վերևի գծապատկերում) Debezium-ի օգտագործման միակ միջոցը չէ: Apache Kafka-ին ուղարկված իրադարձությունները կարող են օգտագործվել ձեր հավելվածների կողմից՝ տարբեր իրավիճակներ լուծելու համար: Օրինակ:
- քեշից անհամապատասխան տվյալների հեռացում;
- ծանուցումներ ուղարկելը;
- որոնման ինդեքսի թարմացումներ;
- որոշակի տեսակի աուդիտի տեղեկամատյաններ;
- ...
Այն դեպքում, երբ դուք ունեք Java հավելված, և չկան Կաֆկա կլաստեր օգտագործելու անհրաժեշտություն/հնարավորություն, կա նաև աշխատելու հնարավորություն. . Ակնհայտ պլյուսն այն է, որ դրա հետ դուք կարող եք հրաժարվել լրացուցիչ ենթակառուցվածքից (միակցիչի և Կաֆկայի տեսքով): Այնուամենայնիվ, այս լուծումը հնացել է 1.1 տարբերակից և այլևս խորհուրդ չի տրվում օգտագործել (այն կարող է հեռացվել հետագա թողարկումներում):
Այս հոդվածը կքննարկի մշակողների կողմից առաջարկվող ճարտարապետությունը, որն ապահովում է սխալների հանդուրժողականություն և մասշտաբայնություն:
Միակցիչի կոնֆիգուրացիա
Որպեսզի սկսենք հետևել ամենակարևոր արժեքի՝ տվյալների փոփոխություններին, մեզ անհրաժեշտ է.
- տվյալների աղբյուր, որը կարող է լինել MySQL՝ սկսած 5.7 տարբերակից, PostgreSQL 9.6+, MongoDB 3.2+ ();
- Ապաչի Կաֆկա կլաստեր
- Kafka Connect օրինակ (տարբերակներ 1.x, 2.x);
- կազմաձևված Debezium միակցիչ:
Աշխատեք առաջին երկու կետերի վրա, այսինքն. DBMS-ի և Apache Kafka-ի տեղադրման գործընթացը դուրս է հոդվածի շրջանակներից: Այնուամենայնիվ, նրանց համար, ովքեր ցանկանում են ամեն ինչ տեղակայել ավազատուփում, պաշտոնական պահոցում կա պատրաստի օրինակներ. .
Մենք ավելի մանրամասն կկենտրոնանանք վերջին երկու կետերի վրա:
0. Kafka Connect
Այստեղ և ավելի ուշ հոդվածում, բոլոր կազմաձևման օրինակները դիտարկվում են Debezium-ի մշակողների կողմից տարածված Docker պատկերի համատեքստում: Այն պարունակում է բոլոր անհրաժեշտ plugin ֆայլերը (միակցիչներ) և ապահովում է Kafka Connect-ի կոնֆիգուրացիան՝ օգտագործելով շրջակա միջավայրի փոփոխականները:
Եթե դուք մտադիր եք օգտագործել Kafka Connect-ը Confluent-ից, ապա ձեզ անհրաժեշտ կլինի ինքներդ ավելացնել անհրաժեշտ միակցիչների հավելումները նշված գրացուցակում: plugin.path կամ սահմանել շրջակա միջավայրի փոփոխականի միջոցով CLASSPATH. Kafka Connect աշխատողի և միակցիչների կարգավորումները սահմանվում են կազմաձևման ֆայլերի միջոցով, որոնք փոխանցվում են որպես արգումենտ աշխատանքային մեկնարկի հրամանին: Մանրամասների համար տես .
Debeizum-ի միակցիչի տարբերակում տեղադրելու ողջ գործընթացը իրականացվում է երկու փուլով. Դիտարկենք դրանցից յուրաքանչյուրը.
1. Kafka Connect շրջանակի կարգավորում
Apache Kafka կլաստերին տվյալների հոսքի համար Kafka Connect շրջանակում սահմանվում են հատուկ պարամետրեր, ինչպիսիք են՝
- կլաստերի միացման կարգավորումներ,
- թեմաների անուններ, որոնցում կպահվի միակցիչի կոնֆիգուրացիան,
- խմբի անվանումը, որում աշխատում է միակցիչը (բաշխված ռեժիմի օգտագործման դեպքում):
Ծրագրի պաշտոնական Docker պատկերն աջակցում է կոնֆիգուրացիան՝ օգտագործելով շրջակա միջավայրի փոփոխականները. սա այն է, ինչ մենք կօգտագործենք: Այսպիսով, եկեք ներբեռնենք պատկերը.
docker pull debezium/connectՄիակցիչի գործարկման համար անհրաժեշտ շրջակա միջավայրի փոփոխականների նվազագույն փաթեթը հետևյալն է.
-
BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092- Կաֆկա կլաստերի սերվերների սկզբնական ցուցակը՝ կլաստերի անդամների ամբողջական ցուցակը ստանալու համար. -
OFFSET_STORAGE_TOPIC=connector-offsets— թեմա պահելու այն դիրքերը, որտեղ ներկայումս գտնվում է միակցիչը. -
CONNECT_STATUS_STORAGE_TOPIC=connector-status- միակցիչի կարգավիճակը և դրա առաջադրանքները պահելու թեմա. -
CONFIG_STORAGE_TOPIC=connector-config- միակցիչի կազմաձևման տվյալները և դրա առաջադրանքները պահելու թեմա. -
GROUP_ID=1- աշխատողների խմբի նույնացուցիչը, որի վրա կարող է կատարվել միակցիչի առաջադրանքը. պահանջվում է բաշխված օգտագործելիս (բաշխված) ռեժիմ.
Մենք սկսում ենք կոնտեյները այս փոփոխականներով.
docker run
-e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092'
-e GROUP_ID=1
-e CONFIG_STORAGE_TOPIC=my_connect_configs
-e OFFSET_STORAGE_TOPIC=my_connect_offsets
-e STATUS_STORAGE_TOPIC=my_connect_statuses debezium/connect:1.2Նշում Ավրոյի մասին
Լռելյայնորեն, Debezium-ը գրում է տվյալները JSON ձևաչափով, ինչը ընդունելի է ավազարկղերի և փոքր քանակությամբ տվյալների համար, բայց կարող է խնդիր լինել խիստ բեռնված տվյալների բազաներում: JSON փոխարկիչի այլընտրանքը հաղորդագրությունների սերիականացումն է՝ օգտագործելով երկուական ձևաչափով, որը նվազեցնում է I/O ենթահամակարգի բեռը Apache Kafka-ում:
Avro-ն օգտագործելու համար հարկավոր է առանձին տեղակայել (սխեմաների պահպանման համար): Փոխարկիչի փոփոխականները այսպիսի տեսք կունենան.
name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER
value: io.confluent.connect.avro.AvroConverterAvro-ն օգտագործելու և դրա համար ռեեստրի ստեղծման վերաբերյալ մանրամասները դուրս են հոդվածի շրջանակներից. ավելին, պարզության համար մենք կօգտագործենք JSON:
2. Միակցիչի ինքնին կարգավորումը
Այժմ դուք կարող եք ուղղակիորեն գնալ հենց միակցիչի կազմաձևմանը, որը կկարդա տվյալները աղբյուրից:
Եկեք նայենք երկու DBMS-ի միակցիչների օրինակին՝ PostgreSQL և MongoDB, որոնց համար ես փորձ ունեմ և որոնց համար կան տարբերություններ (թեև փոքր, բայց որոշ դեպքերում նշանակալի):
Կազմաձևը նկարագրված է JSON նշումով և վերբեռնվում է Kafka Connect՝ օգտագործելով POST հարցումը:
2.1. PostgreSQL
Միակցիչի կազմաձևման օրինակ PostgreSQL-ի համար.
{
"name": "pg-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"database.hostname": "127.0.0.1",
"database.port": "5432",
"database.user": "debezium",
"database.password": "definitelynotpassword",
"database.dbname" : "dbname",
"database.server.name": "pg-dev",
"table.include.list": "public.(.*)",
"heartbeat.interval.ms": "5000",
"slot.name": "dbname_debezium",
"publication.name": "dbname_publication",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "pg-dev.public.(.*)",
"transforms.AddPrefix.replacement": "data.cdc.dbname"
}
}Այս կոնֆիգուրացիայից հետո միակցիչի շահագործման սկզբունքը բավականին պարզ է.
- Առաջին մեկնարկի ժամանակ այն միանում է կոնֆիգուրացիայի մեջ նշված տվյալների բազային և սկսում ռեժիմում նախնական նկարը, Կաֆկային ուղարկելով պայմանականով ստացված տվյալների նախնական փաթեթը
SELECT * FROM table_name. - Նախաստորագրման ավարտից հետո միակցիչը մտնում է PostgreSQL WAL ֆայլերից փոփոխությունների ընթերցման ռեժիմ:
Օգտագործված տարբերակների մասին.
-
name— միակցիչի անվանումը, որի համար օգտագործվում է ստորև նկարագրված կոնֆիգուրացիան. Հետագայում այս անունը օգտագործվում է միակցիչի հետ աշխատելու համար (այսինքն՝ դիտել կարգավիճակը / վերագործարկել / թարմացնել կազմաձևումը) Kafka Connect REST API-ի միջոցով; -
connector.class— DBMS միակցիչի դասը, որը կօգտագործվի կազմաձևված միակցիչի կողմից. -
plugin.nameWAL ֆայլերից տվյալների տրամաբանական վերծանման հավելյալի անվանումն է: Հասանելի է ընտրելու համարwal2json,decoderbuffsиpgoutput. Առաջին երկուսը պահանջում են DBMS-ում համապատասխան ընդլայնումների տեղադրում, ևpgoutputPostgreSQL 10 և ավելի բարձր տարբերակի համար չի պահանջում լրացուցիչ մանիպուլյացիաներ. -
database.*— տվյալների շտեմարանին միանալու տարբերակներ, որտեղdatabase.server.name- PostgreSQL օրինակի անվանումը, որն օգտագործվում է Կաֆկա կլաստերում թեմայի անվանումը ձևավորելու համար. -
table.include.list- աղյուսակների ցանկ, որոնցում մենք ցանկանում ենք հետևել փոփոխություններին. տրված ձևաչափովschema.table_name; հետ չի կարող օգտագործվելtable.exclude.list; -
heartbeat.interval.ms— ընդմիջում (միլիվայրկյաններով), որով միակցիչը սրտի բաբախման հաղորդագրություններ է ուղարկում հատուկ թեմայի. -
heartbeat.action.query- հարցում, որը կկատարվի սրտի զարկերի յուրաքանչյուր հաղորդագրություն ուղարկելիս (տարբերակը հայտնվել է 1.1 տարբերակից); -
slot.name- կրկնօրինակման անցքի անվանումը, որը կօգտագործվի միակցիչի կողմից. publication.name- Անուն PostgreSQL-ում, որն օգտագործում է միակցիչը: Այն չգոյության դեպքում Debezium-ը կփորձի ստեղծել այն։ Եթե օգտատերը, որի հետ կապը հաստատված է, այս գործողության համար բավարար իրավունքներ չունի, միակցիչը սխալմամբ դուրս կգա.-
transformsորոշում է, թե կոնկրետ ինչպես փոխել թիրախային թեմայի անունը.-
transforms.AddPrefix.typeցույց է տալիս, որ մենք կօգտագործենք կանոնավոր արտահայտություններ. -
transforms.AddPrefix.regex— դիմակ, որով վերասահմանվում է թիրախային թեմայի անվանումը. -
transforms.AddPrefix.replacement- ուղղակիորեն այն, ինչ մենք վերասահմանում ենք:
-
Ավելին սրտի բաբախման և փոխակերպումների մասին
Լռելյայնորեն, միակցիչը տվյալներ է ուղարկում Kafka-ին յուրաքանչյուր կատարված գործարքի համար և գրում է իր LSN-ը (Log Sequence Number) ծառայության թեմայում: offset. Բայց ի՞նչ կլինի, եթե միակցիչը կարգավորվի այնպես, որ կարդա ոչ թե ամբողջ տվյալների բազան, այլ նրա աղյուսակների միայն մի մասը (որոնցում տվյալները հազվադեպ են թարմացվում):
- Միակցիչը կկարդա WAL ֆայլերը և չի հայտնաբերի դրանցում կատարված գործարքների պարտավորությունները այն աղյուսակների նկատմամբ, որոնք վերահսկում է:
- Հետևաբար, այն չի թարմացնի իր ներկայիս դիրքը ոչ թեմայում, ոչ էլ կրկնօրինակման բնիկում:
- Սա, իր հերթին, կհանգեցնի նրան, որ WAL ֆայլերը «կպչեն» սկավառակի վրա և, հավանաբար, կսպառվեն սկավառակի տարածքը:
Եվ ահա տարբերակները օգնության են հասնում։ heartbeat.interval.ms и heartbeat.action.query. Այս ընտրանքների զույգերով օգտագործումը հնարավորություն է տալիս յուրաքանչյուր անգամ սրտի բաբախման հաղորդագրություն ուղարկելիս առանձին աղյուսակում տվյալները փոխելու հարցում կատարել: Այսպիսով, LSN-ը, որի վրա ներկայումս գտնվում է միակցիչը (կրկնօրինակման բնիկում) անընդհատ թարմացվում է: Սա թույլ է տալիս DBMS-ին հեռացնել WAL ֆայլերը, որոնք այլևս անհրաժեշտ չեն: Լրացուցիչ տեղեկությունների համար, թե ինչպես են աշխատում ընտրանքները, տե՛ս .
Մեկ այլ տարբերակ, որն ավելի մեծ ուշադրության է արժանի transforms. Չնայած դա ավելի շատ հարմարության և գեղեցկության մասին է...
Լռելյայնորեն, Debezium-ը թեմաներ է ստեղծում՝ օգտագործելով հետևյալ անվանման քաղաքականությունը. serverName.schemaName.tableName. Սա միշտ չէ, որ հարմար է: Ընտրանքներ transforms օգտագործելով կանոնավոր արտահայտություններ, դուք կարող եք սահմանել աղյուսակների ցանկ, որոնց իրադարձությունները պետք է ուղղորդվեն կոնկրետ անունով թեմա:
Մեր կազմաձևում շնորհիվ transforms Հետևյալ տվյալների բազայից CDC-ի բոլոր իրադարձությունները կանցնեն անվան հետ կապված թեմային data.cdc.dbname. Հակառակ դեպքում (առանց այս կարգավորումների), Debezium-ը լռելյայնորեն թեմա կստեղծեր ձևի յուրաքանչյուր աղյուսակի համար. pg-dev.public.<table_name>.
Միակցիչի սահմանափակումները
PostgreSQL-ի միակցիչի կազմաձևման նկարագրության վերջում արժե խոսել դրա աշխատանքի հետևյալ հատկանիշների / սահմանափակումների մասին.
- PostgreSQL-ի միակցիչի գործառույթը հիմնված է տրամաբանական վերծանման հայեցակարգի վրա: Ուստի նա չի հետևում տվյալների բազայի կառուցվածքը փոխելու հարցումներին (DDL) - համապատասխանաբար, այս տվյալները թեմաներում չեն լինի:
- Քանի որ օգտագործվում են կրկնօրինակման անցքեր, միակցիչի միացումը հնարավոր է միայն գլխավոր DBMS օրինակին:
- Եթե օգտագործողը, որի միջոցով միակցիչը միանում է տվյալների շտեմարանին, ունի միայն կարդալու իրավունքներ, ապա նախքան առաջին գործարկումը, դուք պետք է ձեռքով ստեղծեք կրկնօրինակման բնիկ և հրապարակեք տվյալների բազայում:
Կիրառելով կոնֆիգուրացիա
Այսպիսով, եկեք բեռնենք մեր կոնֆիգուրացիան միակցիչի մեջ.
curl -i -X POST -H "Accept:application/json"
-H "Content-Type:application/json" http://localhost:8083/connectors/
-d @pg-con.jsonՄենք ստուգում ենք, որ ներբեռնումը հաջող է, և միակցիչը սկսվել է.
$ curl -i http://localhost:8083/connectors/pg-connector/status
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)
{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}Հիանալի է. այն ստեղծվել է և պատրաստ է գնալու: Հիմա եկեք ձևացնենք սպառող և միանանք Կաֆկային, որից հետո աղյուսակում ավելացնենք և փոխենք գրառումը.
$ kafka/bin/kafka-console-consumer.sh
--bootstrap-server kafka:9092
--from-beginning
--property print.key=true
--topic data.cdc.dbname
postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', 'foo@bar.com');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1Մեր թեմայում սա կցուցադրվի հետևյալ կերպ.
Շատ երկար JSON՝ մեր փոփոխություններով
{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"foo@bar.com"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"foo@bar.com"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"foo@bar.com"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}Երկու դեպքում էլ գրառումները բաղկացած են փոփոխված ձայնագրության բանալին (PK) և փոփոխությունների բուն էությունից.
- Դեպքում
INSERT: արժեքը նախկինում (before) հավասար էnullհաջորդում է լարը, որը տեղադրվել է: - Դեպքում
UPDATE: ներpayload.beforeցուցադրվում է տողի նախորդ վիճակը, և inpayload.after- նոր՝ փոփոխության էությամբ:
2.2 MongoDB
Այս միակցիչը օգտագործում է ստանդարտ MongoDB վերարտադրման մեխանիզմը՝ կարդալով տեղեկատվություն DBMS-ի առաջնային հանգույցի oplog-ից:
PgSQL-ի համար արդեն նկարագրված միակցիչի նման, այստեղ նույնպես, առաջին մեկնարկի ժամանակ, վերցվում է տվյալների առաջնային նկարը, որից հետո միակցիչն անցնում է օպլոգի ընթերցման ռեժիմին:
Կազմաձևման օրինակ.
{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}Ինչպես տեսնում եք, նախորդ օրինակի համեմատ նոր տարբերակներ չկան, այլ միայն տվյալների բազային միանալու համար պատասխանատու ընտրանքների և դրանց նախածանցների թիվը կրճատվել է։
Կառավարում transforms այս անգամ նրանք անում են հետևյալը. սխեմայից անջատում են թիրախային թեմայի անվանումը <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.
սխալների հանդուրժողականություն
Սխալների հանդուրժողականության և բարձր հասանելիության հարցը մեր ժամանակներում ավելի սուր է, քան երբևէ, հատկապես, երբ մենք խոսում ենք տվյալների և գործարքների մասին, և տվյալների փոփոխության հետագծումը այս հարցում կողքից չէ: Եկեք նայենք, թե ինչ կարող է սխալ լինել սկզբունքորեն, և ինչ կլինի Դեբեզիումի հետ յուրաքանչյուր դեպքում:
Չեղարկման երեք տարբերակ կա.
- Kafka Connect-ի ձախողում. Եթե Connect-ը կազմաձևված է բաշխված ռեժիմով աշխատելու համար, ապա դա պահանջում է մի քանի աշխատողների՝ նույն group.id-ը սահմանելու համար: Այնուհետև, եթե դրանցից մեկը ձախողվի, միակցիչը կվերագործարկվի մյուս աշխատողի վրա և կշարունակի կարդալ Կաֆկայի թեմայի վերջին պարտավորված դիրքից:
- Կաֆկա կլաստերի հետ կապի կորուստ. Միակցիչը պարզապես կդադարի կարդալ այն դիրքում, որը չկարողացավ ուղարկել Կաֆկային և պարբերաբար կփորձի այն նորից ուղարկել, մինչև փորձը հաջողվի:
- Տվյալների աղբյուրն անհասանելի է. Միակցիչը կփորձի նորից միանալ աղբյուրին ըստ կազմաձևման: Լռելյայն օգտագործումը 16 փորձ է . 16-րդ անհաջող փորձից հետո առաջադրանքը կնշվի որպես տապալվեց և այն պետք է ձեռքով վերագործարկվի Kafka Connect REST ինտերֆեյսի միջոցով:
- Դեպքում PostgreSQL տվյալները չեն կորչի, քանի որ կրկնօրինակման սլոտների օգտագործումը կկանխի միակցիչի կողմից չընթերցված WAL ֆայլերի ջնջումը: Այս դեպքում կա մի բացասական կողմ. եթե միակցիչի և DBMS-ի միջև ցանցային կապը երկար ժամանակ խաթարվում է, կա հավանականություն, որ սկավառակի տարածքը կսպառվի, և դա կարող է հանգեցնել ամբողջ DBMS-ի ձախողման:
- Դեպքում MySQL binlog ֆայլերը կարող են պտտվել հենց DBMS-ի կողմից, նախքան կապի վերականգնումը: Սա կհանգեցնի, որ միակցիչը կմտնի ձախողված վիճակ, և այն պետք է վերագործարկվի սկզբնական պատկերի ռեժիմում, որպեսզի շարունակի կարդալ բինլոգներից՝ նորմալ աշխատանքը վերականգնելու համար:
- Մոտ MongoDB- ը. Փաստաթղթում ասվում է. միակցիչի վարքագիծը այն դեպքում, երբ log/oplog ֆայլերը ջնջվել են, և միակցիչը չի կարող շարունակել կարդալ այն դիրքից, որտեղ այն դադարեցվել է, նույնն է բոլոր DBMS-ների համար: Դա կայանում է նրանում, որ միակցիչը կմտնի վիճակ տապալվեց և կպահանջի ռեժիմում վերագործարկել նախնական նկարը.
Այնուամենայնիվ, կան բացառություններ. Եթե միակցիչը երկար ժամանակ եղել է անջատված վիճակում (կամ չի կարողացել հասնել MongoDB օրինակին), և oplog-ը պտտվել է այս ընթացքում, ապա երբ կապը վերականգնվի, միակցիչը հանգիստ կշարունակի կարդալ տվյալները առաջին հասանելի դիրքից։ , որի պատճառով որոշ տվյալներ Կաֆկայում ոչ կհարվածի.
Ամփոփում
Debezium-ն իմ առաջին փորձն է CDC համակարգերի հետ և ընդհանուր առմամբ շատ դրական է եղել: Ծրագիրը կաշառեց հիմնական DBMS-ի աջակցությունը, կոնֆիգուրացիայի հեշտությունը, կլաստերի աջակցումը և ակտիվ համայնքը: Նրանց համար, ովքեր հետաքրքրված են գործնականում, խորհուրդ եմ տալիս կարդալ ուղեցույցները и .
Kafka Connect-ի JDBC միակցիչի համեմատ՝ Debezium-ի հիմնական առավելությունն այն է, որ փոփոխությունները կարդացվում են DBMS տեղեկամատյաններից, ինչը թույլ է տալիս ստանալ տվյալները նվազագույն ուշացումով: JDBC միակցիչը (տրամադրված է Kafka Connect-ի կողմից) ֆիքսված ընդմիջումով հարցումներ է կատարում հետագծվող աղյուսակին և (նույն պատճառով) հաղորդագրություններ չի առաջացնում, երբ տվյալները ջնջվում են (ինչպե՞ս կարող եք հարցումներ կատարել այնտեղ չկան տվյալների համար):
Նմանատիպ խնդիրներ լուծելու համար կարող եք ուշադրություն դարձնել հետևյալ լուծումներին (բացի Debezium-ից).
- Մի քանի MySQL լուծումներ.
- , բայց սա բոլորովին այլ «քաշային կատեգորիա» է։
PS
Կարդացեք նաև մեր բլոգում.
- «";
- «";
- «.
Source: www.habr.com
