මගේ වැඩ වලදී, මට බොහෝ විට නව තාක්ෂණික විසඳුම් / මෘදුකාංග නිෂ්පාදන හමු වේ, රුසියානු භාෂාව කතා කරන අන්තර්ජාලයේ තරමක් දුර්ලභ තොරතුරු. මෙම ලිපිය සමඟින්, Debezium භාවිතා කරමින් ජනප්රිය DBMS දෙකකින් (PostgreSQL සහ MongoDB) CDC සිද්ධීන් Kafka පොකුරකට යැවීමට මට අවශ්ය වූ විට, මගේ මෑතකාලීන පුහුණුවෙන් එවැනි පරතරයක් පිරවීමට මම උත්සාහ කරමි. කරන ලද කාර්යයේ ප්රතිඵලයක් ලෙස පළ වූ මෙම සමාලෝචන ලිපිය අන් අයට ප්රයෝජනවත් වනු ඇතැයි මම බලාපොරොත්තු වෙමි.
සාමාන්යයෙන් Debezium සහ CDC යනු කුමක්ද?
මෙම
අපි CDC සාම්ප්රදායික ප්රවේශය සමඟ සංසන්දනය කරන්නේ නම් (යෙදුම DBMS වෙතින් දත්ත කෙලින්ම කියවන විට), එවිට එහි ප්රධාන වාසි අතර අඩු ප්රමාදය, ඉහළ විශ්වසනීයත්වය සහ ලබා ගැනීමේ හැකියාව සමඟ පේළි මට්ටමින් දත්ත වෙනස් කිරීමේ ප්රවාහය ක්රියාත්මක කිරීම ඇතුළත් වේ. CDC සිදුවීම් සඳහා ගබඩාවක් ලෙස Kafka පොකුරක් භාවිතා කිරීමෙන් අවසාන කරුණු දෙක සාක්ෂාත් කරගනු ලැබේ.
එසේම, වාසි අතර සිදුවීම් ගබඩා කිරීම සඳහා තනි ආකෘතියක් භාවිතා කරයි, එබැවින් අවසාන යෙදුම විවිධ DBMS ක්රියාත්මක කිරීමේ සූක්ෂ්මතාවයන් ගැන කරදර විය යුතු නැත.
අවසාන වශයෙන්, පණිවිඩ තැරැව්කරුවෙකු භාවිතා කිරීමෙන් දත්තවල වෙනස්කම් නිරීක්ෂණය කරන යෙදුම්වල තිරස් පරිමාණය සඳහා විෂය පථය විවෘත වේ. ඒ අතරම, දත්ත සෘජුවම DBMS වෙතින් නොව, Kafka පොකුරෙන් ලැබෙන බැවින්, දත්ත මූලාශ්රය මත ඇති බලපෑම අවම වේ.
Debezium ගෘහ නිර්මාණ ශිල්පය ගැන
Debezium භාවිතා කිරීම මෙම සරල යෝජනා ක්රමයට පැමිණේ:
DBMS (දත්ත මූලාශ්රය ලෙස) → Kafka Connect හි සම්බන්ධකය → Apache Kafka → පාරිභෝගික
නිදර්ශනයක් ලෙස, මම ව්යාපෘති වෙබ් අඩවියෙන් රූප සටහනක් දෙන්නෙමි:
කෙසේ වෙතත්, මම මෙම යෝජනා ක්රමයට සැබවින්ම කැමති නැත, මන්ද සින්ක් සම්බන්ධකයක් පමණක් කළ හැකි බව පෙනේ.
යථාර්ථයේ දී, තත්වය වෙනස් ය: ඔබේ දත්ත විල පිරවීම (ඉහත රූප සටහනේ අවසාන සබැඳිය) Debezium භාවිතා කිරීමට ඇති එකම මාර්ගය නොවේ. Apache Kafka වෙත යවන ලද සිදුවීම් විවිධ තත්වයන් සමඟ කටයුතු කිරීමට ඔබගේ යෙදුම් මගින් භාවිතා කළ හැක. උදාහරණ වශයෙන්:
- හැඹිලියෙන් අදාල නොවන දත්ත ඉවත් කිරීම;
- දැනුම්දීම් යැවීම;
- සෙවුම් දර්ශක යාවත්කාලීන කිරීම්;
- යම් ආකාරයක විගණන ලඝු-සටහන්;
- ...
ඔබට ජාවා යෙදුමක් තිබේ නම් සහ කෆ්කා පොකුරක් භාවිතා කිරීමට අවශ්යතාවයක් / හැකියාවක් නොමැති නම්, ඒ හරහා ක්රියා කිරීමේ හැකියාව ද ඇත.
මෙම ලිපියෙන් සංවර්ධකයින් විසින් නිර්දේශ කරනු ලබන ගෘහ නිර්මාණ ශිල්පය පිළිබඳව සාකච්ඡා කරනු ඇත, එය දෝෂ ඉවසීම සහ පරිමාණය සපයයි.
සම්බන්ධක වින්යාසය
වඩාත්ම වැදගත් අගය - දත්ත - වෙනස්කම් නිරීක්ෂණය කිරීම ආරම්භ කිරීම සඳහා අපට අවශ්ය වන්නේ:
- දත්ත මූලාශ්රය, එය MySQL විය හැකි අනුවාදය 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (
සම්පූර්ණ ලැයිස්තුව ); - Apache Kafka පොකුර
- Kafka Connect උදාහරණය (අනුවාද 1.x, 2.x);
- වින්යාසගත Debezium සම්බන්ධකය.
පළමු කරුණු දෙක මත වැඩ කරන්න, i.e. DBMS සහ Apache Kafka ස්ථාපනය කිරීමේ ක්රියාවලිය ලිපියේ විෂය පථයෙන් ඔබ්බට ය. කෙසේ වෙතත්, වැලිපිල්ලක සෑම දෙයක්ම යෙදවීමට කැමති අය සඳහා, උදාහරණ සමඟ නිල ගබඩාවේ සූදානම් කළ එකක් තිබේ.
අපි අවසාන කරුණු දෙක වඩාත් විස්තරාත්මකව අවධානය යොමු කරමු.
0. කෆ්කා කනෙක්ට්
මෙහි සහ පසුව ලිපියේ, Debezium සංවර්ධකයින් විසින් බෙදා හරින ලද Docker රූපයේ සන්දර්භය තුළ සියලුම වින්යාස උදාහරණ සලකා බලනු ලැබේ. එහි අවශ්ය සියලුම ප්ලගින ගොනු (සම්බන්ධක) අඩංගු වන අතර පරිසර විචල්යයන් භාවිතයෙන් Kafka Connect වින්යාසය සපයයි.
ඔබ Confluent වෙතින් Kafka Connect භාවිතා කිරීමට අදහස් කරන්නේ නම්, ඔබට අවශ්ය සම්බන්ධකවල ප්ලගීන ඔබ විසින්ම සඳහන් කර ඇති නාමාවලියට එක් කිරීමට අවශ්ය වනු ඇත. plugin.path
හෝ පරිසර විචල්යයක් හරහා සකසන්න CLASSPATH
. Kafka Connect සේවක සහ සම්බන්ධක සඳහා වන සැකසුම් කම්කරු ආරම්භක විධානයට තර්ක ලෙස යවන වින්යාස ගොනු හරහා අර්ථ දක්වා ඇත. විස්තර සඳහා බලන්න
සම්බන්ධක අනුවාදයේ Debeizum සැකසීමේ සම්පූර්ණ ක්රියාවලිය අදියර දෙකකින් සිදු කෙරේ. අපි ඒ එක් එක් සලකා බලමු:
1. Kafka Connect රාමුව සැකසීම
Apache Kafka පොකුරකට දත්ත ප්රවාහ කිරීමට, Kafka Connect රාමුව තුළ නිශ්චිත පරාමිති සකසා ඇත, එවැනි:
- පොකුරු සම්බන්ධතා සැකසුම්,
- සම්බන්ධකයේ වින්යාසය ගබඩා කරන මාතෘකා වල නම්,
- සම්බන්ධකය ක්රියාත්මක වන කණ්ඩායමේ නම (බෙදා හරින ලද මාදිලිය භාවිතා කරන විට).
ව්යාපෘතියේ නිල ඩොකර් රූපය පරිසර විචල්යයන් භාවිතයෙන් වින්යාස කිරීමට සහය දක්වයි - මෙය අප භාවිතා කරනු ඇත. ඉතින් අපි පින්තූරය බාගත කරමු:
docker pull debezium/connect
සම්බන්ධකය ක්රියාත්මක කිරීමට අවශ්ය අවම පරිසර විචල්ය කට්ටලය පහත පරිදි වේ:
-
BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092
- පොකුරු සාමාජිකයින්ගේ සම්පූර්ණ ලැයිස්තුවක් ලබා ගැනීම සඳහා Kafka පොකුරු සේවාදායකයන්ගේ මූලික ලැයිස්තුව; -
OFFSET_STORAGE_TOPIC=connector-offsets
- සම්බන්ධකය දැනට පිහිටා ඇති ස්ථාන ගබඩා කිරීම සඳහා මාතෘකාවක්; -
CONNECT_STATUS_STORAGE_TOPIC=connector-status
- සම්බන්ධකයේ තත්ත්වය සහ එහි කාර්යයන් ගබඩා කිරීම සඳහා මාතෘකාවක්; -
CONFIG_STORAGE_TOPIC=connector-config
- සම්බන්ධක වින්යාස දත්ත සහ එහි කාර්යයන් ගබඩා කිරීම සඳහා මාතෘකාවක්; -
GROUP_ID=1
- සම්බන්ධක කාර්යය ක්රියාත්මක කළ හැකි සේවකයින්ගේ කණ්ඩායමේ හඳුනාගැනීම; බෙදා හැරීම භාවිතා කරන විට අවශ්ය වේ (බෙදාහැර ඇත) තන්ත්රය.
අපි මෙම විචල්යයන් සමඟ කන්ටේනරය ආරම්භ කරමු:
docker run
-e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092'
-e GROUP_ID=1
-e CONFIG_STORAGE_TOPIC=my_connect_configs
-e OFFSET_STORAGE_TOPIC=my_connect_offsets
-e STATUS_STORAGE_TOPIC=my_connect_statuses debezium/connect:1.2
Avro ගැන සටහන
පෙරනිමියෙන්, Debezium JSON ආකෘතියෙන් දත්ත ලියයි, එය වැලි පෙට්ටි සහ කුඩා දත්ත සඳහා පිළිගත හැකි නමුත්, අධික ලෙස පටවන ලද දත්ත සමුදායන්හි ගැටලුවක් විය හැක. JSON පරිවර්තකය සඳහා විකල්පයක් වන්නේ භාවිතා කරමින් පණිවිඩ අනුක්රමික කිරීමයි
Avro භාවිතා කිරීමට, ඔබ වෙනම යෙදවිය යුතුය
name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER
value: io.confluent.connect.avro.AvroConverter
Avro භාවිතා කිරීම සහ ඒ සඳහා රෙජිස්ට්රියක් පිහිටුවීම පිළිබඳ විස්තර ලිපියේ විෂය පථයෙන් ඔබ්බට ය - තවදුරටත්, පැහැදිලිකම සඳහා, අපි JSON භාවිතා කරන්නෙමු.
2. සම්බන්ධකයම සැකසීම
දැන් ඔබට සම්බන්ධකයේ වින්යාසය වෙත කෙලින්ම යා හැකිය, එමඟින් මූලාශ්රයෙන් දත්ත කියවනු ඇත.
DBMS දෙකක් සඳහා සම්බන්ධක උදාහරණය දෙස බලමු: PostgreSQL සහ MongoDB, ඒ සඳහා මට අත්දැකීම් ඇති සහ වෙනස්කම් ඇති (කුඩා වුවද, නමුත් සමහර අවස්ථාවල වැදගත් වේ!).
වින්යාසය JSON අංකනයෙහි විස්තර කර ඇති අතර POST ඉල්ලීමක් භාවිතයෙන් Kafka Connect වෙත උඩුගත කර ඇත.
2.1 PostgreSQL
PostgreSQL සඳහා උදාහරණ සම්බන්ධක වින්යාසය:
{
"name": "pg-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"database.hostname": "127.0.0.1",
"database.port": "5432",
"database.user": "debezium",
"database.password": "definitelynotpassword",
"database.dbname" : "dbname",
"database.server.name": "pg-dev",
"table.include.list": "public.(.*)",
"heartbeat.interval.ms": "5000",
"slot.name": "dbname_debezium",
"publication.name": "dbname_publication",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "pg-dev.public.(.*)",
"transforms.AddPrefix.replacement": "data.cdc.dbname"
}
}
මෙම වින්යාසයෙන් පසු සම්බන්ධකයේ ක්රියාකාරිත්වයේ මූලධර්මය තරමක් සරල ය:
- පළමු ආරම්භයේදී, එය වින්යාසයේ දක්වා ඇති දත්ත සමුදායට සම්බන්ධ වන අතර ප්රකාරයේදී ආරම්භ වේ. ආරම්භක ඡායාරූපය, කොන්දේසි සහිතව ලැබුණු මූලික දත්ත කට්ටලය Kafka වෙත යැවීම
SELECT * FROM table_name
. - ආරම්භ කිරීම අවසන් වූ පසු, සම්බන්ධකය PostgreSQL WAL ගොනු වලින් කියවීමේ වෙනස්කම් වලට ඇතුල් වේ.
භාවිතා කරන විකල්ප ගැන:
-
name
- පහත විස්තර කර ඇති වින්යාසය භාවිතා කරන සම්බන්ධකයේ නම; අනාගතයේදී, මෙම නම Kafka Connect REST API හරහා සම්බන්ධකය සමඟ වැඩ කිරීමට (එනම් තත්වය බැලීම / නැවත ආරම්භ කිරීම / වින්යාසය යාවත්කාලීන කිරීම) භාවිතා කරයි; -
connector.class
- වින්යාසගත සම්බන්ධකය විසින් භාවිතා කරනු ලබන DBMS සම්බන්ධක පන්තිය; -
plugin.name
WAL ගොනු වලින් දත්ත තාර්කික විකේතනය කිරීම සඳහා වන ප්ලගිනයේ නම වේ. තෝරා ගැනීමට තිබේwal2json
,decoderbuffs
иpgoutput
. පළමු දෙක DBMS හි සුදුසු දිගු ස්ථාපනය කිරීම අවශ්ය වේ, සහpgoutput
PostgreSQL අනුවාදය 10 සහ ඉහළ සඳහා අමතර උපාමාරු අවශ්ය නොවේ; -
database.*
- දත්ත සමුදායට සම්බන්ධ වීමට විකල්ප, එහිදීdatabase.server.name
- Kafka පොකුරේ මාතෘකාවේ නම සැකසීමට භාවිතා කරන PostgreSQL උදාහරණයේ නම; -
table.include.list
- අපට වෙනස්කම් නිරීක්ෂණය කිරීමට අවශ්ය වගු ලැයිස්තුවක්; ආකෘතියෙන් ලබා දී ඇතschema.table_name
; සමඟ එකට භාවිතා කළ නොහැකtable.exclude.list
; -
heartbeat.interval.ms
- සම්බන්ධකය විශේෂ මාතෘකාවකට හෘද ස්පන්දන පණිවිඩ යවන පරතරය (මිලි තත්පර වලින්); -
heartbeat.action.query
- එක් එක් හෘද ස්පන්දන පණිවිඩ යැවීමේදී ක්රියාත්මක වන ඉල්ලීමක් (විකල්පය 1.1 අනුවාදයේ සිට දර්ශනය වී ඇත); -
slot.name
- සම්බන්ධකය විසින් භාවිතා කරනු ලබන අනුකරණ ස්ලට් එකේ නම; publication.name
- නමප්රකාශන සම්බන්ධකය භාවිතා කරන PostgreSQL හි. එය නොපවතියි නම්, Debezium එය නිර්මාණය කිරීමට උත්සාහ කරයි. සම්බන්ධතාවය ඇති පරිශීලකයාට මෙම ක්රියාව සඳහා ප්රමාණවත් හිමිකම් නොමැති නම්, සම්බන්ධකය දෝෂයක් සමඟ පිටව යනු ඇත;-
transforms
ඉලක්ක මාතෘකාවේ නම හරියටම වෙනස් කරන්නේ කෙසේද යන්න තීරණය කරයි:-
transforms.AddPrefix.type
අපි නිත්ය ප්රකාශන භාවිතා කරන බව අඟවයි; -
transforms.AddPrefix.regex
- ඉලක්ක මාතෘකාවේ නම නැවත අර්ථ දක්වා ඇති වෙස් මුහුණ; -
transforms.AddPrefix.replacement
- කෙලින්ම අපි නැවත අර්ථ දක්වන දේ.
-
හෘද ස්පන්දනය සහ පරිවර්තනයන් පිළිබඳ වැඩි විස්තර
පෙරනිමියෙන්, සම්බන්ධකය සෑම කැපවූ ගනුදෙනුවක් සඳහාම Kafka වෙත දත්ත යවන අතර, එහි LSN (ලොග් අනුක්රමික අංකය) සේවා මාතෘකාවට ලියයි. offset
. නමුත් සම්බන්ධකය සම්පූර්ණ දත්ත ගබඩාව නොව එහි වගු වලින් කොටසක් පමණක් (දත්ත නිතර යාවත්කාලීන වන) කියවීමට වින්යාස කර ඇත්නම් කුමක් සිදුවේද?
- සම්බන්ධකය විසින් WAL ගොනු කියවනු ඇති අතර එය නිරීක්ෂණය කරන වගු වෙත ගනුදෙනු සිදුකිරීම් අනාවරණය නොකරයි.
- එබැවින්, එය මාතෘකාව තුළ හෝ අනුවර්තනය කිරීමේ කොටසෙහි එහි වත්මන් තත්ත්වය යාවත්කාලීන නොකරනු ඇත.
- මෙය, අනෙක් අතට, WAL ගොනු තැටියේ "හිරවී" ඇති අතර තැටි ඉඩ හිඟ වීමට ඉඩ ඇත.
මෙන්න විකල්ප ගලවා ගැනීමට පැමිණේ. heartbeat.interval.ms
и heartbeat.action.query
. මෙම විකල්ප යුගල වශයෙන් භාවිතා කිරීමෙන් හෘද ස්පන්දන පණිවිඩයක් යවන සෑම අවස්ථාවකම වෙනම වගුවක දත්ත වෙනස් කිරීමට ඉල්ලීමක් ක්රියාත්මක කිරීමට හැකි වේ. මේ අනුව, සම්බන්ධකය දැනට පිහිටා ඇති LSN (අනුවර්තන ස්ලට් එකේ) නිරන්තරයෙන් යාවත්කාලීන වේ. මෙය DBMS හට තවදුරටත් අවශ්ය නොවන WAL ගොනු ඉවත් කිරීමට ඉඩ සලසයි. විකල්ප ක්රියා කරන ආකාරය පිළිබඳ වැඩි විස්තර සඳහා, බලන්න
සමීප අවධානයක් ලැබිය යුතු තවත් විකල්පයකි transforms
. එය පහසුව සහ අලංකාරය ගැන වැඩි යමක් වුවද ...
පෙරනිමියෙන්, Debezium පහත නම් කිරීමේ ප්රතිපත්තිය භාවිතා කරමින් මාතෘකා නිර්මාණය කරයි: serverName.schemaName.tableName
. මෙය සැමවිටම පහසු නොවිය හැකිය. විකල්ප transforms
සාමාන්ය ප්රකාශන භාවිතයෙන්, ඔබට නිශ්චිත නමක් සහිත මාතෘකාවක් වෙත සිදුවීම් යොමු කළ යුතු වගු ලැයිස්තුවක් අර්ථ දැක්විය හැක.
අපගේ වින්යාසය තුළ ස්තූතියි transforms
පහත දේ සිදුවේ: නිරීක්ෂණය කරන ලද දත්ත සමුදායෙන් සියලුම CDC සිදුවීම් නම සහිත මාතෘකාවට යයි data.cdc.dbname
. එසේ නොමැතිනම් (මෙම සිටුවම් නොමැතිව), Debezium පෙරනිමියෙන් පෝරමයේ එක් එක් වගුව සඳහා මාතෘකාවක් සාදනු ඇත: pg-dev.public.<table_name>
.
සම්බන්ධක සීමාවන්
PostgreSQL සඳහා සම්බන්ධක වින්යාසය පිළිබඳ විස්තරය අවසානයේ, එහි කාර්යයේ පහත විශේෂාංග / සීමාවන් ගැන කතා කිරීම වටී:
- PostgreSQL සඳහා සම්බන්ධක ක්රියාකාරීත්වය තාර්කික විකේතනය පිළිබඳ සංකල්පය මත රඳා පවතී. එබැවින් ඔහු දත්ත සමුදායේ ව්යුහය වෙනස් කිරීම සඳහා ඉල්ලීම් නිරීක්ෂණය නොකරයි (DDL) - ඒ අනුව, මෙම දත්ත මාතෘකා වල නොතිබෙනු ඇත.
- අනුකරණ ස්ලට් භාවිතා කරන බැවින්, සම්බන්ධකයේ සම්බන්ධතාවය හැකි ය පමණි ප්රධාන DBMS අවස්ථාවට.
- සම්බන්ධකය දත්ත සමුදායට සම්බන්ධ වන පරිශීලකයාට කියවීමට පමණක් හිමිකම් තිබේ නම්, පළමු දියත් කිරීමට පෙර, ඔබ අතින් අනුරූ ස්ලට් එකක් සාදා දත්ත ගබඩාවට ප්රකාශ කිරීමට අවශ්ය වනු ඇත.
වින්යාසයක් යෙදීම
එබැවින් අපි අපගේ වින්යාසය සම්බන්ධකයට පූරණය කරමු:
curl -i -X POST -H "Accept:application/json"
-H "Content-Type:application/json" http://localhost:8083/connectors/
-d @pg-con.json
බාගත කිරීම සාර්ථක වූ අතර සම්බන්ධකය ආරම්භ වූ බව අපි පරීක්ෂා කරමු:
$ curl -i http://localhost:8083/connectors/pg-connector/status
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)
{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}
නියමයි: එය සකසා ඇති අතර යාමට සූදානම්ය. දැන් අපි පාරිභෝගිකයෙකු ලෙස පෙනී සිටිමින් කෆ්කා වෙත සම්බන්ධ වෙමු, ඉන්පසු අපි වගුවේ ඇතුළත් කිරීමක් එකතු කර වෙනස් කරමු:
$ kafka/bin/kafka-console-consumer.sh
--bootstrap-server kafka:9092
--from-beginning
--property print.key=true
--topic data.cdc.dbname
postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1
අපගේ මාතෘකාව තුළ, මෙය පහත පරිදි පෙන්වනු ඇත:
අපගේ වෙනස්කම් සමඟ ඉතා දිගු JSON
{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}
අවස්ථා දෙකේදීම, වාර්තා වෙනස් කරන ලද වාර්තාවේ යතුර (PK) සහ වෙනස්කම්වල සාරය සමන්විත වේ: වාර්තාව පෙර කුමක්ද සහ එය පසුව වූයේ කුමක්ද.
- සම්බන්ධයෙන්
INSERT
: පෙර අගය (before
) සමානnull
ඇතුල් කරන ලද තන්තුව අනුගමනය කරයි. - සම්බන්ධයෙන්
UPDATE
: හිදීpayload.before
පේළියේ පෙර තත්වය පෙන්වනු ලැබේ, සහ inpayload.after
- වෙනස් වීමේ සාරය සමඟ අලුත්.
2.2 MongoDB
මෙම සම්බන්ධකය DBMS ප්රාථමික නෝඩයේ ඔප්ලොග් වෙතින් තොරතුරු කියවීම, සම්මත MongoDB අනුකරණ යාන්ත්රණය භාවිතා කරයි.
PgSQL සඳහා දැනටමත් විස්තර කර ඇති සම්බන්ධකය හා සමානව, මෙහිද, පළමු ආරම්භයේදීම, ප්රාථමික දත්ත ස්නැප්ෂොට් ගනු ලැබේ, ඉන්පසු සම්බන්ධකය ඔප්ලොග් කියවීමේ මාදිලියට මාරු වේ.
මානකරන උදාහරණය:
{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}
ඔබට පෙනෙන පරිදි, පෙර උදාහරණයට සාපේක්ෂව නව විකල්ප නොමැත, නමුත් දත්ත සමුදායට සම්බන්ධ කිරීම සඳහා වගකිව යුතු විකල්ප ගණන සහ ඒවායේ උපසර්ගයන් පමණක් අඩු කර ඇත.
සැකසුම් transforms
මෙවර ඔවුන් පහත දේ කරයි: යෝජනා ක්රමයෙන් ඉලක්ක මාතෘකාවේ නම හරවන්න <server_name>.<db_name>.<collection_name>
в data.cdc.mongo_<db_name>
.
වැරදි ඉවසීම
අපගේ කාලය තුළ වැරදි ඉවසීම සහ ඉහළ ලබා ගැනීමේ ගැටලුව වෙන කවරදාටත් වඩා උග්ර වේ - විශේෂයෙන් අපි දත්ත සහ ගනුදෙනු ගැන කතා කරන විට සහ දත්ත වෙනස් කිරීමේ ලුහුබැඳීම මේ කාරණයේදී පැත්තක නැත. ප්රතිපත්තිමය වශයෙන් වැරදි විය හැකි දේ සහ එක් එක් අවස්ථා වලදී Debezium හට කුමක් සිදුවේද යන්න අපි බලමු.
ඉවත් වීමේ විකල්ප තුනක් ඇත:
- Kafka Connect අසාර්ථකයි. Connect බෙදා හරින ලද මාදිලියේ වැඩ කිරීමට වින්යාස කර තිබේ නම්, මේ සඳහා එකම group.id සැකසීමට බහු කම්කරුවන් අවශ්ය වේ. ඉන්පසුව, ඔවුන්ගෙන් එක් අයෙකු අසමත් වුවහොත්, සම්බන්ධකය අනෙක් සේවකයා මත නැවත ආරම්භ කර කෆ්කා හි මාතෘකාවේ අවසාන කැපවූ ස්ථානයේ සිට කියවීම දිගටම කරගෙන යනු ඇත.
- කෆ්කා පොකුර සමඟ සම්බන්ධතාව නැතිවීම. සම්බන්ධකය Kafka වෙත යැවීමට අසමත් වූ ස්ථානයේ කියවීම නවත්වන අතර උත්සාහය සාර්ථක වන තෙක් වරින් වර එය නැවත යැවීමට උත්සාහ කරයි.
- දත්ත මූලාශ්රය නොමැත. සම්බන්ධකය වින්යාසය අනුව ප්රභවයට නැවත සම්බන්ධ වීමට උත්සාහ කරයි. පෙරනිමිය උත්සාහයන් 16ක් භාවිතා කරයි
ඝාතීය පසුබෑම . 16 වැනි අසාර්ථක උත්සාහයෙන් පසුව, කාර්යය ලෙස සලකුණු කරනු ලැබේ අසාර්ථක විය එය Kafka Connect REST අතුරුමුහුණත හරහා අතින් නැවත ආරම්භ කිරීමට අවශ්ය වනු ඇත.- සම්බන්ධයෙන් PostgreSQL දත්ත නැති නොවනු ඇත, මන්ද අනුවර්තන තව් භාවිතා කිරීම සම්බන්ධකය මගින් කියවා නැති WAL ගොනු මකා දැමීම වලක්වනු ඇත. මෙම අවස්ථාවෙහිදී, අවාසියක් ඇත: සම්බන්ධකය සහ DBMS අතර ජාල සම්බන්ධතාවය දිගු කලක් කඩාකප්පල් වී ඇත්නම්, තැටි අවකාශය අවසන් වීමට ඉඩ ඇති අතර, මෙය සමස්ත DBMS හි අසාර්ථකත්වයට හේතු විය හැක.
- සම්බන්ධයෙන් MySQL සම්බන්ධතාවය ප්රතිස්ථාපනය කිරීමට පෙර binlog ගොනු DBMS විසින්ම කරකැවිය හැක. මෙය සම්බන්ධකය අසාර්ථක තත්ත්වයට යාමට හේතු වනු ඇති අතර, සාමාන්ය ක්රියාකාරිත්වය ප්රතිසාධනය කිරීම සඳහා බින්ලොග් වලින් කියවීම දිගටම කරගෙන යාමට එය ආරම්භක ස්නැප්ෂොට් ප්රකාරයේදී නැවත ආරම්භ කිරීමට අවශ්ය වනු ඇත.
- මත MongoDB. ප්රලේඛනය පවසන්නේ: ලොග්/ඔප්ලොග් ගොනු මකා ඇති අවස්ථාවක සම්බන්ධකයේ හැසිරීම සහ සම්බන්ධකය නතර කළ ස්ථානයේ සිට කියවීම දිගටම කරගෙන යාමට නොහැකි නම්, සියලුම DBMS සඳහා සමාන වේ. එය සම්බන්ධකය රාජ්යයට යන කාරනය තුල පවතී අසාර්ථක විය සහ මාදිලියේ නැවත ආරම්භ කිරීම අවශ්ය වනු ඇත ආරම්භක ඡායාරූපය.
කෙසේ වෙතත්, ව්යතිරේක පවතී. සම්බන්ධකය දිගු වේලාවක් විසන්ධි වූ තත්වයක පැවතියේ නම් (හෝ MongoDB අවස්ථාවට ළඟා වීමට නොහැකි විය), සහ මෙම කාලය තුළ ඔප්ලොග් කරකවනු ලැබුවේ නම්, සම්බන්ධතාවය ප්රතිසාධනය කළ විට, සම්බන්ධකය සන්සුන්ව පවතින පළමු ස්ථානයේ සිට දත්ත කියවීම දිගටම කරගෙන යනු ඇත. , කෆ්කා හි සමහර දත්ත ඒ නිසයි නෑ පහර දෙනු ඇත.
නිගමනය
Debezium CDC පද්ධති සමඟ මගේ පළමු අත්දැකීම වන අතර එය සමස්තයක් ලෙස ඉතා ධනාත්මකයි. ව්යාපෘතිය ප්රධාන DBMS හි සහය, වින්යාස කිරීමේ පහසුව, පොකුරු සහාය සහ ක්රියාකාරී ප්රජාවක් අල්ලස් ලබා දුන්නේය. ප්රායෝගිකව උනන්දුවක් දක්වන අය සඳහා, ඔබ සඳහා මාර්ගෝපදේශ කියවීමට මම නිර්දේශ කරමි
Kafka Connect සඳහා වන JDBC සම්බන්ධකය හා සසඳන විට, Debezium හි ප්රධාන වාසිය නම්, DBMS ලොග් වලින් වෙනස්කම් කියවීමයි, එමඟින් දත්ත අවම ප්රමාදයකින් ලබා ගැනීමට ඉඩ සලසයි. JDBC සම්බන්ධකය (Kafka Connect විසින් සපයනු ලැබේ) ලුහුබැඳ ගිය වගුව ස්ථාවර කාල පරතරයකින් විමසන අතර (එම හේතුව නිසා) දත්ත මකා දැමූ විට පණිවිඩ ජනනය නොකරයි (නොපවතින දත්ත සඳහා ඔබට විමසිය හැක්කේ කෙසේද?).
සමාන ගැටළු විසඳීම සඳහා, ඔබට පහත විසඳුම් කෙරෙහි අවධානය යොමු කළ හැකිය (Debezium වලට අමතරව):
-
JDBC සම්බන්ධකය Kafka Connect - MySQL-පමණි විසඳුම් කිහිපයක්:
-
ඔරකල් ගෝල්ඩන් ගේට් , නමුත් මෙය සම්පූර්ණයෙන්ම වෙනස් "බර කාණ්ඩයක්" වේ.
ප්රාදේශීය සභා
අපගේ බ්ලොග් අඩවියේ ද කියවන්න:
- «
Kubernetes හි Kafka පොකුරක් සඳහා සුදුසු ප්රමාණය තීරණය කරන්න »; - «
අපගේ SRE එදිනෙදා ජීවිතයෙන් ප්රායෝගික කථා. 2 කොටස »; - «
Kubernetes සඳහා PostgreSQL ප්රකාශයන් පිළිබඳ කෙටි දළ විශ්ලේෂණයක්, අපගේ තේරීම් සහ අත්දැකීම් ".
මූලාශ්රය: www.habr.com