Apache Kafka සඳහා Debezium - CDC හඳුන්වා දීම

Apache Kafka සඳහා Debezium - CDC හඳුන්වා දීම

මගේ වැඩ වලදී, මට බොහෝ විට නව තාක්ෂණික විසඳුම් / මෘදුකාංග නිෂ්පාදන හමු වේ, රුසියානු භාෂාව කතා කරන අන්තර්ජාලයේ තරමක් දුර්ලභ තොරතුරු. මෙම ලිපිය සමඟින්, Debezium භාවිතා කරමින් ජනප්‍රිය DBMS දෙකකින් (PostgreSQL සහ MongoDB) CDC සිද්ධීන් Kafka පොකුරකට යැවීමට මට අවශ්‍ය වූ විට, මගේ මෑතකාලීන පුහුණුවෙන් එවැනි පරතරයක් පිරවීමට මම උත්සාහ කරමි. කරන ලද කාර්යයේ ප්‍රතිඵලයක් ලෙස පළ වූ මෙම සමාලෝචන ලිපිය අන් අයට ප්‍රයෝජනවත් වනු ඇතැයි මම බලාපොරොත්තු වෙමි.

සාමාන්යයෙන් Debezium සහ CDC යනු කුමක්ද?

ඩෙබෙසියම් - CDC මෘදුකාංග කාණ්ඩයේ නියෝජිතයා (දත්ත වෙනස් කිරීම ග්‍රහණය කරන්න), හෝ වඩාත් නිවැරදිව, එය Apache Kafka Connect රාමුව සමඟ අනුකූල වන විවිධ DBMS සඳහා සම්බන්ධක කට්ටලයකි.

මෙම විවෘත මූලාශ්‍ර ව්‍යාපෘතිය, Apache බලපත්‍රය v2.0 යටතේ බලපත්‍ර ලබා ඇති අතර Red Hat විසින් අනුග්‍රහය දක්වා ඇත. 2016 සිට සංවර්ධනය සිදු වෙමින් පවතින අතර මේ මොහොතේ එය පහත DBMS සඳහා නිල සහාය සපයයි: MySQL, PostgreSQL, MongoDB, SQL Server. කැසැන්ඩ්‍රා සහ ඔරකල් සඳහා සම්බන්ධක ද ඇත, නමුත් ඒවා දැනට "මුල් ප්‍රවේශ" තත්ත්වයේ පවතින අතර නව නිකුතු පසුගාමී අනුකූලතාව සහතික නොකරයි.

අපි CDC සාම්ප්‍රදායික ප්‍රවේශය සමඟ සංසන්දනය කරන්නේ නම් (යෙදුම DBMS වෙතින් දත්ත කෙලින්ම කියවන විට), එවිට එහි ප්‍රධාන වාසි අතර අඩු ප්‍රමාදය, ඉහළ විශ්වසනීයත්වය සහ ලබා ගැනීමේ හැකියාව සමඟ පේළි මට්ටමින් දත්ත වෙනස් කිරීමේ ප්‍රවාහය ක්‍රියාත්මක කිරීම ඇතුළත් වේ. CDC සිදුවීම් සඳහා ගබඩාවක් ලෙස Kafka පොකුරක් භාවිතා කිරීමෙන් අවසාන කරුණු දෙක සාක්ෂාත් කරගනු ලැබේ.

එසේම, වාසි අතර සිදුවීම් ගබඩා කිරීම සඳහා තනි ආකෘතියක් භාවිතා කරයි, එබැවින් අවසාන යෙදුම විවිධ DBMS ක්‍රියාත්මක කිරීමේ සූක්ෂ්මතාවයන් ගැන කරදර විය යුතු නැත.

අවසාන වශයෙන්, පණිවිඩ තැරැව්කරුවෙකු භාවිතා කිරීමෙන් දත්තවල වෙනස්කම් නිරීක්ෂණය කරන යෙදුම්වල තිරස් පරිමාණය සඳහා විෂය පථය විවෘත වේ. ඒ අතරම, දත්ත සෘජුවම DBMS වෙතින් නොව, Kafka පොකුරෙන් ලැබෙන බැවින්, දත්ත මූලාශ්‍රය මත ඇති බලපෑම අවම වේ.

Debezium ගෘහ නිර්මාණ ශිල්පය ගැන

Debezium භාවිතා කිරීම මෙම සරල යෝජනා ක්රමයට පැමිණේ:

DBMS (දත්ත මූලාශ්‍රය ලෙස) → Kafka Connect හි සම්බන්ධකය → Apache Kafka → පාරිභෝගික

නිදර්ශනයක් ලෙස, මම ව්‍යාපෘති වෙබ් අඩවියෙන් රූප සටහනක් දෙන්නෙමි:

Apache Kafka සඳහා Debezium - CDC හඳුන්වා දීම

කෙසේ වෙතත්, මම මෙම යෝජනා ක්‍රමයට සැබවින්ම කැමති නැත, මන්ද සින්ක් සම්බන්ධකයක් පමණක් කළ හැකි බව පෙනේ.

යථාර්ථයේ දී, තත්වය වෙනස් ය: ඔබේ දත්ත විල පිරවීම (ඉහත රූප සටහනේ අවසාන සබැඳිය) Debezium භාවිතා කිරීමට ඇති එකම මාර්ගය නොවේ. Apache Kafka වෙත යවන ලද සිදුවීම් විවිධ තත්වයන් සමඟ කටයුතු කිරීමට ඔබගේ යෙදුම් මගින් භාවිතා කළ හැක. උදාහරණ වශයෙන්:

  • හැඹිලියෙන් අදාල නොවන දත්ත ඉවත් කිරීම;
  • දැනුම්දීම් යැවීම;
  • සෙවුම් දර්ශක යාවත්කාලීන කිරීම්;
  • යම් ආකාරයක විගණන ලඝු-සටහන්;
  • ...

ඔබට ජාවා යෙදුමක් තිබේ නම් සහ කෆ්කා පොකුරක් භාවිතා කිරීමට අවශ්‍යතාවයක් / හැකියාවක් නොමැති නම්, ඒ හරහා ක්‍රියා කිරීමේ හැකියාව ද ඇත. කාවැද්දූ සම්බන්ධකය. පැහැදිලි ප්ලස් එය සමඟ ඔබට අතිරේක යටිතල පහසුකම් (සම්බන්ධකයක් සහ කෆ්කා ආකාරයෙන්) ප්රතික්ෂේප කළ හැකිය. කෙසේ වෙතත්, මෙම විසඳුම 1.1 අනුවාදයේ සිට අවලංගු කර ඇති අතර එය තවදුරටත් භාවිතය සඳහා නිර්දේශ කර නොමැත (එය අනාගත නිකුතු වලදී ඉවත් කළ හැක).

මෙම ලිපියෙන් සංවර්ධකයින් විසින් නිර්දේශ කරනු ලබන ගෘහ නිර්මාණ ශිල්පය පිළිබඳව සාකච්ඡා කරනු ඇත, එය දෝෂ ඉවසීම සහ පරිමාණය සපයයි.

සම්බන්ධක වින්යාසය

වඩාත්ම වැදගත් අගය - දත්ත - වෙනස්කම් නිරීක්ෂණය කිරීම ආරම්භ කිරීම සඳහා අපට අවශ්‍ය වන්නේ:

  1. දත්ත මූලාශ්‍රය, එය MySQL විය හැකි අනුවාදය 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (සම්පූර්ණ ලැයිස්තුව);
  2. Apache Kafka පොකුර
  3. Kafka Connect උදාහරණය (අනුවාද 1.x, 2.x);
  4. වින්යාසගත Debezium සම්බන්ධකය.

පළමු කරුණු දෙක මත වැඩ කරන්න, i.e. DBMS සහ Apache Kafka ස්ථාපනය කිරීමේ ක්‍රියාවලිය ලිපියේ විෂය පථයෙන් ඔබ්බට ය. කෙසේ වෙතත්, වැලිපිල්ලක සෑම දෙයක්ම යෙදවීමට කැමති අය සඳහා, උදාහරණ සමඟ නිල ගබඩාවේ සූදානම් කළ එකක් තිබේ. docker-compose.yaml.

අපි අවසාන කරුණු දෙක වඩාත් විස්තරාත්මකව අවධානය යොමු කරමු.

0. කෆ්කා කනෙක්ට්

මෙහි සහ පසුව ලිපියේ, Debezium සංවර්ධකයින් විසින් බෙදා හරින ලද Docker රූපයේ සන්දර්භය තුළ සියලුම වින්‍යාස උදාහරණ සලකා බලනු ලැබේ. එහි අවශ්‍ය සියලුම ප්ලගින ගොනු (සම්බන්ධක) අඩංගු වන අතර පරිසර විචල්‍යයන් භාවිතයෙන් Kafka Connect වින්‍යාසය සපයයි.

ඔබ Confluent වෙතින් Kafka Connect භාවිතා කිරීමට අදහස් කරන්නේ නම්, ඔබට අවශ්‍ය සම්බන්ධකවල ප්ලගීන ඔබ විසින්ම සඳහන් කර ඇති නාමාවලියට එක් කිරීමට අවශ්‍ය වනු ඇත. plugin.path හෝ පරිසර විචල්‍යයක් හරහා සකසන්න CLASSPATH. Kafka Connect සේවක සහ සම්බන්ධක සඳහා වන සැකසුම් කම්කරු ආරම්භක විධානයට තර්ක ලෙස යවන වින්‍යාස ගොනු හරහා අර්ථ දක්වා ඇත. විස්තර සඳහා බලන්න ලියකියවිලි.

සම්බන්ධක අනුවාදයේ Debeizum සැකසීමේ සම්පූර්ණ ක්රියාවලිය අදියර දෙකකින් සිදු කෙරේ. අපි ඒ එක් එක් සලකා බලමු:

1. Kafka Connect රාමුව සැකසීම

Apache Kafka පොකුරකට දත්ත ප්‍රවාහ කිරීමට, Kafka Connect රාමුව තුළ නිශ්චිත පරාමිති සකසා ඇත, එවැනි:

  • පොකුරු සම්බන්ධතා සැකසුම්,
  • සම්බන්ධකයේ වින්‍යාසය ගබඩා කරන මාතෘකා වල නම්,
  • සම්බන්ධකය ක්රියාත්මක වන කණ්ඩායමේ නම (බෙදා හරින ලද මාදිලිය භාවිතා කරන විට).

ව්‍යාපෘතියේ නිල ඩොකර් රූපය පරිසර විචල්‍යයන් භාවිතයෙන් වින්‍යාස කිරීමට සහය දක්වයි - මෙය අප භාවිතා කරනු ඇත. ඉතින් අපි පින්තූරය බාගත කරමු:

docker pull debezium/connect

සම්බන්ධකය ක්‍රියාත්මක කිරීමට අවශ්‍ය අවම පරිසර විචල්‍ය කට්ටලය පහත පරිදි වේ:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - පොකුරු සාමාජිකයින්ගේ සම්පූර්ණ ලැයිස්තුවක් ලබා ගැනීම සඳහා Kafka පොකුරු සේවාදායකයන්ගේ මූලික ලැයිස්තුව;
  • OFFSET_STORAGE_TOPIC=connector-offsets - සම්බන්ධකය දැනට පිහිටා ඇති ස්ථාන ගබඩා කිරීම සඳහා මාතෘකාවක්;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - සම්බන්ධකයේ තත්ත්වය සහ එහි කාර්යයන් ගබඩා කිරීම සඳහා මාතෘකාවක්;
  • CONFIG_STORAGE_TOPIC=connector-config - සම්බන්ධක වින්යාස දත්ත සහ එහි කාර්යයන් ගබඩා කිරීම සඳහා මාතෘකාවක්;
  • GROUP_ID=1 - සම්බන්ධක කාර්යය ක්රියාත්මක කළ හැකි සේවකයින්ගේ කණ්ඩායමේ හඳුනාගැනීම; බෙදා හැරීම භාවිතා කරන විට අවශ්ය වේ (බෙදාහැර ඇත) තන්ත්රය.

අපි මෙම විචල්‍යයන් සමඟ කන්ටේනරය ආරම්භ කරමු:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Avro ගැන සටහන

පෙරනිමියෙන්, Debezium JSON ආකෘතියෙන් දත්ත ලියයි, එය වැලි පෙට්ටි සහ කුඩා දත්ත සඳහා පිළිගත හැකි නමුත්, අධික ලෙස පටවන ලද දත්ත සමුදායන්හි ගැටලුවක් විය හැක. JSON පරිවර්තකය සඳහා විකල්පයක් වන්නේ භාවිතා කරමින් පණිවිඩ අනුක්‍රමික කිරීමයි අබ්රෝ ද්විමය ආකෘතියකට, එය Apache Kafka හි I / O උප පද්ධතිය මත පැටවීම අඩු කරයි.

Avro භාවිතා කිරීමට, ඔබ වෙනම යෙදවිය යුතුය schema-registry (රූප සටහන් ගබඩා කිරීම සඳහා). පරිවර්තකය සඳහා විචල්යයන් මේ ආකාරයෙන් පෙනෙනු ඇත:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Avro භාවිතා කිරීම සහ ඒ සඳහා රෙජිස්ට්‍රියක් පිහිටුවීම පිළිබඳ විස්තර ලිපියේ විෂය පථයෙන් ඔබ්බට ය - තවදුරටත්, පැහැදිලිකම සඳහා, අපි JSON භාවිතා කරන්නෙමු.

2. සම්බන්ධකයම සැකසීම

දැන් ඔබට සම්බන්ධකයේ වින්‍යාසය වෙත කෙලින්ම යා හැකිය, එමඟින් මූලාශ්‍රයෙන් දත්ත කියවනු ඇත.

DBMS දෙකක් සඳහා සම්බන්ධක උදාහරණය දෙස බලමු: PostgreSQL සහ MongoDB, ඒ සඳහා මට අත්දැකීම් ඇති සහ වෙනස්කම් ඇති (කුඩා වුවද, නමුත් සමහර අවස්ථාවල වැදගත් වේ!).

වින්‍යාසය JSON අංකනයෙහි විස්තර කර ඇති අතර POST ඉල්ලීමක් භාවිතයෙන් Kafka Connect වෙත උඩුගත කර ඇත.

2.1 PostgreSQL

PostgreSQL සඳහා උදාහරණ සම්බන්ධක වින්‍යාසය:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

මෙම වින්‍යාසයෙන් පසු සම්බන්ධකයේ ක්‍රියාකාරිත්වයේ මූලධර්මය තරමක් සරල ය:

  • පළමු ආරම්භයේදී, එය වින්‍යාසයේ දක්වා ඇති දත්ත සමුදායට සම්බන්ධ වන අතර ප්‍රකාරයේදී ආරම්භ වේ. ආරම්භක ඡායාරූපය, කොන්දේසි සහිතව ලැබුණු මූලික දත්ත කට්ටලය Kafka වෙත යැවීම SELECT * FROM table_name.
  • ආරම්භ කිරීම අවසන් වූ පසු, සම්බන්ධකය PostgreSQL WAL ගොනු වලින් කියවීමේ වෙනස්කම් වලට ඇතුල් වේ.

භාවිතා කරන විකල්ප ගැන:

  • name - පහත විස්තර කර ඇති වින්‍යාසය භාවිතා කරන සම්බන්ධකයේ නම; අනාගතයේදී, මෙම නම Kafka Connect REST API හරහා සම්බන්ධකය සමඟ වැඩ කිරීමට (එනම් තත්වය බැලීම / නැවත ආරම්භ කිරීම / වින්‍යාසය යාවත්කාලීන කිරීම) භාවිතා කරයි;
  • connector.class - වින්‍යාසගත සම්බන්ධකය විසින් භාවිතා කරනු ලබන DBMS සම්බන්ධක පන්තිය;
  • plugin.name WAL ගොනු වලින් දත්ත තාර්කික විකේතනය කිරීම සඳහා වන ප්ලගිනයේ නම වේ. තෝරා ගැනීමට තිබේ wal2json, decoderbuffs и pgoutput. පළමු දෙක DBMS හි සුදුසු දිගු ස්ථාපනය කිරීම අවශ්ය වේ, සහ pgoutput PostgreSQL අනුවාදය 10 සහ ඉහළ සඳහා අමතර උපාමාරු අවශ්‍ය නොවේ;
  • database.* - දත්ත සමුදායට සම්බන්ධ වීමට විකල්ප, එහිදී database.server.name - Kafka පොකුරේ මාතෘකාවේ නම සැකසීමට භාවිතා කරන PostgreSQL උදාහරණයේ නම;
  • table.include.list - අපට වෙනස්කම් නිරීක්ෂණය කිරීමට අවශ්‍ය වගු ලැයිස්තුවක්; ආකෘතියෙන් ලබා දී ඇත schema.table_name; සමඟ එකට භාවිතා කළ නොහැක table.exclude.list;
  • heartbeat.interval.ms - සම්බන්ධකය විශේෂ මාතෘකාවකට හෘද ස්පන්දන පණිවිඩ යවන පරතරය (මිලි තත්පර වලින්);
  • heartbeat.action.query - එක් එක් හෘද ස්පන්දන පණිවිඩ යැවීමේදී ක්‍රියාත්මක වන ඉල්ලීමක් (විකල්පය 1.1 අනුවාදයේ සිට දර්ශනය වී ඇත);
  • slot.name - සම්බන්ධකය විසින් භාවිතා කරනු ලබන අනුකරණ ස්ලට් එකේ නම;
  • publication.name - නම ප්රකාශන සම්බන්ධකය භාවිතා කරන PostgreSQL හි. එය නොපවතියි නම්, Debezium එය නිර්මාණය කිරීමට උත්සාහ කරයි. සම්බන්ධතාවය ඇති පරිශීලකයාට මෙම ක්‍රියාව සඳහා ප්‍රමාණවත් හිමිකම් නොමැති නම්, සම්බන්ධකය දෝෂයක් සමඟ පිටව යනු ඇත;
  • transforms ඉලක්ක මාතෘකාවේ නම හරියටම වෙනස් කරන්නේ කෙසේද යන්න තීරණය කරයි:
    • transforms.AddPrefix.type අපි නිත්‍ය ප්‍රකාශන භාවිතා කරන බව අඟවයි;
    • transforms.AddPrefix.regex - ඉලක්ක මාතෘකාවේ නම නැවත අර්ථ දක්වා ඇති වෙස් මුහුණ;
    • transforms.AddPrefix.replacement - කෙලින්ම අපි නැවත අර්ථ දක්වන දේ.

හෘද ස්පන්දනය සහ පරිවර්තනයන් පිළිබඳ වැඩි විස්තර

පෙරනිමියෙන්, සම්බන්ධකය සෑම කැපවූ ගනුදෙනුවක් සඳහාම Kafka වෙත දත්ත යවන අතර, එහි LSN (ලොග් අනුක්‍රමික අංකය) සේවා මාතෘකාවට ලියයි. offset. නමුත් සම්බන්ධකය සම්පූර්ණ දත්ත ගබඩාව නොව එහි වගු වලින් කොටසක් පමණක් (දත්ත නිතර යාවත්කාලීන වන) කියවීමට වින්‍යාස කර ඇත්නම් කුමක් සිදුවේද?

  • සම්බන්ධකය විසින් WAL ගොනු කියවනු ඇති අතර එය නිරීක්ෂණය කරන වගු වෙත ගනුදෙනු සිදුකිරීම් අනාවරණය නොකරයි.
  • එබැවින්, එය මාතෘකාව තුළ හෝ අනුවර්තනය කිරීමේ කොටසෙහි එහි වත්මන් තත්ත්වය යාවත්කාලීන නොකරනු ඇත.
  • මෙය, අනෙක් අතට, WAL ගොනු තැටියේ "හිරවී" ඇති අතර තැටි ඉඩ හිඟ වීමට ඉඩ ඇත.

මෙන්න විකල්ප ගලවා ගැනීමට පැමිණේ. heartbeat.interval.ms и heartbeat.action.query. මෙම විකල්ප යුගල වශයෙන් භාවිතා කිරීමෙන් හෘද ස්පන්දන පණිවිඩයක් යවන සෑම අවස්ථාවකම වෙනම වගුවක දත්ත වෙනස් කිරීමට ඉල්ලීමක් ක්‍රියාත්මක කිරීමට හැකි වේ. මේ අනුව, සම්බන්ධකය දැනට පිහිටා ඇති LSN (අනුවර්තන ස්ලට් එකේ) නිරන්තරයෙන් යාවත්කාලීන වේ. මෙය DBMS හට තවදුරටත් අවශ්‍ය නොවන WAL ගොනු ඉවත් කිරීමට ඉඩ සලසයි. විකල්ප ක්‍රියා කරන ආකාරය පිළිබඳ වැඩි විස්තර සඳහා, බලන්න ලියකියවිලි.

සමීප අවධානයක් ලැබිය යුතු තවත් විකල්පයකි transforms. එය පහසුව සහ අලංකාරය ගැන වැඩි යමක් වුවද ...

පෙරනිමියෙන්, Debezium පහත නම් කිරීමේ ප්‍රතිපත්තිය භාවිතා කරමින් මාතෘකා නිර්මාණය කරයි: serverName.schemaName.tableName. මෙය සැමවිටම පහසු නොවිය හැකිය. විකල්ප transforms සාමාන්‍ය ප්‍රකාශන භාවිතයෙන්, ඔබට නිශ්චිත නමක් සහිත මාතෘකාවක් වෙත සිදුවීම් යොමු කළ යුතු වගු ලැයිස්තුවක් අර්ථ දැක්විය හැක.

අපගේ වින්‍යාසය තුළ ස්තූතියි transforms පහත දේ සිදුවේ: නිරීක්ෂණය කරන ලද දත්ත සමුදායෙන් සියලුම CDC සිදුවීම් නම සහිත මාතෘකාවට යයි data.cdc.dbname. එසේ නොමැතිනම් (මෙම සිටුවම් නොමැතිව), Debezium පෙරනිමියෙන් පෝරමයේ එක් එක් වගුව සඳහා මාතෘකාවක් සාදනු ඇත: pg-dev.public.<table_name>.

සම්බන්ධක සීමාවන්

PostgreSQL සඳහා සම්බන්ධක වින්‍යාසය පිළිබඳ විස්තරය අවසානයේ, එහි කාර්යයේ පහත විශේෂාංග / සීමාවන් ගැන කතා කිරීම වටී:

  1. PostgreSQL සඳහා සම්බන්ධක ක්‍රියාකාරීත්වය තාර්කික විකේතනය පිළිබඳ සංකල්පය මත රඳා පවතී. එබැවින් ඔහු දත්ත සමුදායේ ව්යුහය වෙනස් කිරීම සඳහා ඉල්ලීම් නිරීක්ෂණය නොකරයි (DDL) - ඒ අනුව, මෙම දත්ත මාතෘකා වල නොතිබෙනු ඇත.
  2. අනුකරණ ස්ලට් භාවිතා කරන බැවින්, සම්බන්ධකයේ සම්බන්ධතාවය හැකි ය පමණි ප්‍රධාන DBMS අවස්ථාවට.
  3. සම්බන්ධකය දත්ත සමුදායට සම්බන්ධ වන පරිශීලකයාට කියවීමට පමණක් හිමිකම් තිබේ නම්, පළමු දියත් කිරීමට පෙර, ඔබ අතින් අනුරූ ස්ලට් එකක් සාදා දත්ත ගබඩාවට ප්‍රකාශ කිරීමට අවශ්‍ය වනු ඇත.

වින්‍යාසයක් යෙදීම

එබැවින් අපි අපගේ වින්‍යාසය සම්බන්ධකයට පූරණය කරමු:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

බාගත කිරීම සාර්ථක වූ අතර සම්බන්ධකය ආරම්භ වූ බව අපි පරීක්ෂා කරමු:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

නියමයි: එය සකසා ඇති අතර යාමට සූදානම්ය. දැන් අපි පාරිභෝගිකයෙකු ලෙස පෙනී සිටිමින් කෆ්කා වෙත සම්බන්ධ වෙමු, ඉන්පසු අපි වගුවේ ඇතුළත් කිරීමක් එකතු කර වෙනස් කරමු:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

අපගේ මාතෘකාව තුළ, මෙය පහත පරිදි පෙන්වනු ඇත:

අපගේ වෙනස්කම් සමඟ ඉතා දිගු JSON

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

අවස්ථා දෙකේදීම, වාර්තා වෙනස් කරන ලද වාර්තාවේ යතුර (PK) සහ වෙනස්කම්වල සාරය සමන්විත වේ: වාර්තාව පෙර කුමක්ද සහ එය පසුව වූයේ කුමක්ද.

  • සම්බන්ධයෙන් INSERT: පෙර අගය (before) සමාන nullඇතුල් කරන ලද තන්තුව අනුගමනය කරයි.
  • සම්බන්ධයෙන් UPDATE: හිදී payload.before පේළියේ පෙර තත්වය පෙන්වනු ලැබේ, සහ in payload.after - වෙනස් වීමේ සාරය සමඟ අලුත්.

2.2 MongoDB

මෙම සම්බන්ධකය DBMS ප්‍රාථමික නෝඩයේ ඔප්ලොග් වෙතින් තොරතුරු කියවීම, සම්මත MongoDB අනුකරණ යාන්ත්‍රණය භාවිතා කරයි.

PgSQL සඳහා දැනටමත් විස්තර කර ඇති සම්බන්ධකය හා සමානව, මෙහිද, පළමු ආරම්භයේදීම, ප්‍රාථමික දත්ත ස්නැප්ෂොට් ගනු ලැබේ, ඉන්පසු සම්බන්ධකය ඔප්ලොග් කියවීමේ මාදිලියට මාරු වේ.

මානකරන උදාහරණය:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

ඔබට පෙනෙන පරිදි, පෙර උදාහරණයට සාපේක්ෂව නව විකල්ප නොමැත, නමුත් දත්ත සමුදායට සම්බන්ධ කිරීම සඳහා වගකිව යුතු විකල්ප ගණන සහ ඒවායේ උපසර්ගයන් පමණක් අඩු කර ඇත.

සැකසුම් transforms මෙවර ඔවුන් පහත දේ කරයි: යෝජනා ක්‍රමයෙන් ඉලක්ක මාතෘකාවේ නම හරවන්න <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

වැරදි ඉවසීම

අපගේ කාලය තුළ වැරදි ඉවසීම සහ ඉහළ ලබා ගැනීමේ ගැටලුව වෙන කවරදාටත් වඩා උග්‍ර වේ - විශේෂයෙන් අපි දත්ත සහ ගනුදෙනු ගැන කතා කරන විට සහ දත්ත වෙනස් කිරීමේ ලුහුබැඳීම මේ කාරණයේදී පැත්තක නැත. ප්‍රතිපත්තිමය වශයෙන් වැරදි විය හැකි දේ සහ එක් එක් අවස්ථා වලදී Debezium හට කුමක් සිදුවේද යන්න අපි බලමු.

ඉවත් වීමේ විකල්ප තුනක් ඇත:

  1. Kafka Connect අසාර්ථකයි. Connect බෙදා හරින ලද මාදිලියේ වැඩ කිරීමට වින්‍යාස කර තිබේ නම්, මේ සඳහා එකම group.id සැකසීමට බහු කම්කරුවන් අවශ්‍ය වේ. ඉන්පසුව, ඔවුන්ගෙන් එක් අයෙකු අසමත් වුවහොත්, සම්බන්ධකය අනෙක් සේවකයා මත නැවත ආරම්භ කර කෆ්කා හි මාතෘකාවේ අවසාන කැපවූ ස්ථානයේ සිට කියවීම දිගටම කරගෙන යනු ඇත.
  2. කෆ්කා පොකුර සමඟ සම්බන්ධතාව නැතිවීම. සම්බන්ධකය Kafka වෙත යැවීමට අසමත් වූ ස්ථානයේ කියවීම නවත්වන අතර උත්සාහය සාර්ථක වන තෙක් වරින් වර එය නැවත යැවීමට උත්සාහ කරයි.
  3. දත්ත මූලාශ්‍රය නොමැත. සම්බන්ධකය වින්‍යාසය අනුව ප්‍රභවයට නැවත සම්බන්ධ වීමට උත්සාහ කරයි. පෙරනිමිය උත්සාහයන් 16ක් භාවිතා කරයි ඝාතීය පසුබෑම. 16 වැනි අසාර්ථක උත්සාහයෙන් පසුව, කාර්යය ලෙස සලකුණු කරනු ලැබේ අසාර්ථක විය එය Kafka Connect REST අතුරුමුහුණත හරහා අතින් නැවත ආරම්භ කිරීමට අවශ්‍ය වනු ඇත.
    • සම්බන්ධයෙන් PostgreSQL දත්ත නැති නොවනු ඇත, මන්ද අනුවර්තන තව් භාවිතා කිරීම සම්බන්ධකය මගින් කියවා නැති WAL ගොනු මකා දැමීම වලක්වනු ඇත. මෙම අවස්ථාවෙහිදී, අවාසියක් ඇත: සම්බන්ධකය සහ DBMS අතර ජාල සම්බන්ධතාවය දිගු කලක් කඩාකප්පල් වී ඇත්නම්, තැටි අවකාශය අවසන් වීමට ඉඩ ඇති අතර, මෙය සමස්ත DBMS හි අසාර්ථකත්වයට හේතු විය හැක.
    • සම්බන්ධයෙන් MySQL සම්බන්ධතාවය ප්‍රතිස්ථාපනය කිරීමට පෙර binlog ගොනු DBMS විසින්ම කරකැවිය හැක. මෙය සම්බන්ධකය අසාර්ථක තත්ත්වයට යාමට හේතු වනු ඇති අතර, සාමාන්‍ය ක්‍රියාකාරිත්වය ප්‍රතිසාධනය කිරීම සඳහා බින්ලොග් වලින් කියවීම දිගටම කරගෙන යාමට එය ආරම්භක ස්නැප්ෂොට් ප්‍රකාරයේදී නැවත ආරම්භ කිරීමට අවශ්‍ය වනු ඇත.
    • මත MongoDB. ප්‍රලේඛනය පවසන්නේ: ලොග්/ඔප්ලොග් ගොනු මකා ඇති අවස්ථාවක සම්බන්ධකයේ හැසිරීම සහ සම්බන්ධකය නතර කළ ස්ථානයේ සිට කියවීම දිගටම කරගෙන යාමට නොහැකි නම්, සියලුම DBMS සඳහා සමාන වේ. එය සම්බන්ධකය රාජ්යයට යන කාරනය තුල පවතී අසාර්ථක විය සහ මාදිලියේ නැවත ආරම්භ කිරීම අවශ්ය වනු ඇත ආරම්භක ඡායාරූපය.

      කෙසේ වෙතත්, ව්යතිරේක පවතී. සම්බන්ධකය දිගු වේලාවක් විසන්ධි වූ තත්වයක පැවතියේ නම් (හෝ MongoDB අවස්ථාවට ළඟා වීමට නොහැකි විය), සහ මෙම කාලය තුළ ඔප්ලොග් කරකවනු ලැබුවේ නම්, සම්බන්ධතාවය ප්‍රතිසාධනය කළ විට, සම්බන්ධකය සන්සුන්ව පවතින පළමු ස්ථානයේ සිට දත්ත කියවීම දිගටම කරගෙන යනු ඇත. , කෆ්කා හි සමහර දත්ත ඒ නිසයි නෑ පහර දෙනු ඇත.

නිගමනය

Debezium CDC පද්ධති සමඟ මගේ පළමු අත්දැකීම වන අතර එය සමස්තයක් ලෙස ඉතා ධනාත්මකයි. ව්‍යාපෘතිය ප්‍රධාන DBMS හි සහය, වින්‍යාස කිරීමේ පහසුව, පොකුරු සහාය සහ ක්‍රියාකාරී ප්‍රජාවක් අල්ලස් ලබා දුන්නේය. ප්රායෝගිකව උනන්දුවක් දක්වන අය සඳහා, ඔබ සඳහා මාර්ගෝපදේශ කියවීමට මම නිර්දේශ කරමි කෆ්කා කනෙක්ට් и ඩෙබෙසියම්.

Kafka Connect සඳහා වන JDBC සම්බන්ධකය හා සසඳන විට, Debezium හි ප්‍රධාන වාසිය නම්, DBMS ලොග් වලින් වෙනස්කම් කියවීමයි, එමඟින් දත්ත අවම ප්‍රමාදයකින් ලබා ගැනීමට ඉඩ සලසයි. JDBC සම්බන්ධකය (Kafka Connect විසින් සපයනු ලැබේ) ලුහුබැඳ ගිය වගුව ස්ථාවර කාල පරතරයකින් විමසන අතර (එම හේතුව නිසා) දත්ත මකා දැමූ විට පණිවිඩ ජනනය නොකරයි (නොපවතින දත්ත සඳහා ඔබට විමසිය හැක්කේ කෙසේද?).

සමාන ගැටළු විසඳීම සඳහා, ඔබට පහත විසඳුම් කෙරෙහි අවධානය යොමු කළ හැකිය (Debezium වලට අමතරව):

ප්රාදේශීය සභා

අපගේ බ්ලොග් අඩවියේ ද කියවන්න:

මූලාශ්රය: www.habr.com

අදහස් එක් කරන්න