அப்பாச்சி காஃப்காவிற்காக Debezium - CDC அறிமுகம்

அப்பாச்சி காஃப்காவிற்காக Debezium - CDC அறிமுகம்

எனது பணியில், நான் அடிக்கடி புதிய தொழில்நுட்ப தீர்வுகள்/மென்பொருள் தயாரிப்புகளை சந்திக்கிறேன், ரஷ்ய மொழி இணையத்தில் இது பற்றிய தகவல்கள் குறைவாகவே உள்ளன. இந்தக் கட்டுரையின் மூலம், எனது சமீபத்திய நடைமுறையில் இருந்து அத்தகைய இடைவெளியை நிரப்ப முயற்சிப்பேன், CDC நிகழ்வுகளை இரண்டு பிரபலமான DBMS களில் இருந்து (PostgreSQL மற்றும் MongoDB) Debezium ஐப் பயன்படுத்தி காஃப்கா கிளஸ்டருக்கு அனுப்புவதை உள்ளமைக்க வேண்டும். செய்த பணியின் பலனாக வெளிவரும் இந்த ஆய்வுக் கட்டுரை மற்றவர்களுக்குப் பயன்படும் என்று நம்புகிறேன்.

பொதுவாக Debezium மற்றும் CDC என்றால் என்ன?

டெபேசியம் - CDC மென்பொருள் வகையின் பிரதிநிதி (தரவு மாற்றத்தைப் படமெடுக்கவும்), அல்லது இன்னும் துல்லியமாக, இது Apache Kafka Connect கட்டமைப்புடன் இணக்கமான பல்வேறு DBMSகளுக்கான இணைப்பிகளின் தொகுப்பாகும்.

இந்த திறந்த மூல திட்டம், Apache உரிமம் v2.0 இன் கீழ் உரிமம் பெற்றது மற்றும் Red Hat ஆல் ஸ்பான்சர் செய்யப்பட்டது. மேம்பாடு 2016 முதல் நடந்து வருகிறது, தற்போது இது பின்வரும் DBMS களுக்கு அதிகாரப்பூர்வ ஆதரவை வழங்குகிறது: MySQL, PostgreSQL, MongoDB, SQL சர்வர். கசாண்ட்ரா மற்றும் ஆரக்கிளுக்கான இணைப்பிகள் உள்ளன, ஆனால் தற்போது அவை "ஆரம்ப அணுகல்" நிலையில் உள்ளன, மேலும் புதிய வெளியீடுகள் பின்தங்கிய இணக்கத்தன்மைக்கு உத்தரவாதம் அளிக்காது.

சிடிசியை பாரம்பரிய அணுகுமுறையுடன் ஒப்பிட்டுப் பார்த்தால் (பயன்பாடு டிபிஎம்எஸ்ஸிலிருந்து தரவை நேரடியாகப் படிக்கும் போது), அதன் முக்கிய நன்மைகள் குறைந்த தாமதம், அதிக நம்பகத்தன்மை மற்றும் கிடைக்கும் தன்மையுடன் வரிசை மட்டத்தில் தரவு மாற்ற ஸ்ட்ரீமிங்கை செயல்படுத்துவது அடங்கும். CDC நிகழ்வுகளுக்கான களஞ்சியமாக காஃப்கா கிளஸ்டரைப் பயன்படுத்துவதன் மூலம் கடைசி இரண்டு புள்ளிகள் அடையப்படுகின்றன.

மற்றொரு நன்மை என்னவென்றால், நிகழ்வுகளைச் சேமிக்க ஒரு மாதிரியானது பயன்படுத்தப்படுகிறது, எனவே இறுதிப் பயன்பாடு வெவ்வேறு DBMSகளை இயக்கும் நுணுக்கங்களைப் பற்றி கவலைப்பட வேண்டியதில்லை.

இறுதியாக, ஒரு செய்தி தரகரைப் பயன்படுத்துவது, தரவுகளில் ஏற்படும் மாற்றங்களைக் கண்காணிக்கும் பயன்பாடுகளை கிடைமட்டமாக அளவிட அனுமதிக்கிறது. அதே நேரத்தில், தரவு மூலத்தின் தாக்கம் குறைக்கப்படுகிறது, ஏனெனில் தரவு நேரடியாக DBMS இலிருந்து பெறப்படவில்லை, மாறாக காஃப்கா கிளஸ்டரிலிருந்து பெறப்படுகிறது.

Debezium கட்டிடக்கலை பற்றி

Debezium ஐப் பயன்படுத்துவது இந்த எளிய திட்டத்திற்கு வருகிறது:

DBMS (ஒரு தரவு ஆதாரமாக) → காஃப்கா இணைப்பில் உள்ள இணைப்பான் → அப்பாச்சி காஃப்கா → நுகர்வோர்

ஒரு விளக்கமாக, திட்ட வலைத்தளத்திலிருந்து ஒரு வரைபடம் இங்கே:

அப்பாச்சி காஃப்காவிற்காக Debezium - CDC அறிமுகம்

இருப்பினும், இந்த திட்டத்தை நான் உண்மையில் விரும்பவில்லை, ஏனென்றால் ஒரு மடு இணைப்பியின் பயன்பாடு மட்டுமே சாத்தியம் என்று தோன்றுகிறது.

உண்மையில், நிலைமை வேறுபட்டது: உங்கள் தரவு ஏரியை நிரப்புதல் (மேலே உள்ள வரைபடத்தில் கடைசி இணைப்பு) Debezium ஐப் பயன்படுத்துவதற்கான ஒரே வழி இதுவல்ல. அப்பாச்சி காஃப்காவிற்கு அனுப்பப்படும் நிகழ்வுகள் பல்வேறு சூழ்நிலைகளைக் கையாள உங்கள் பயன்பாடுகளால் பயன்படுத்தப்படலாம். உதாரணத்திற்கு:

  • தற்காலிக சேமிப்பிலிருந்து பொருத்தமற்ற தரவை நீக்குதல்;
  • அறிவிப்புகளை அனுப்புதல்;
  • தேடல் குறியீட்டு மேம்படுத்தல்கள்;
  • சில வகையான தணிக்கை பதிவுகள்;
  • ...

உங்களிடம் ஜாவா பயன்பாடு இருந்தால் மற்றும் காஃப்கா கிளஸ்டரைப் பயன்படுத்த வேண்டிய அவசியம்/சாத்தியம் இல்லை என்றால், வேலை செய்யும் வாய்ப்பும் உள்ளது. உட்பொதிக்கப்பட்ட-இணைப்பான். வெளிப்படையான நன்மை என்னவென்றால், இது கூடுதல் உள்கட்டமைப்பின் தேவையை நீக்குகிறது (இணைப்பான் மற்றும் காஃப்கா வடிவத்தில்). இருப்பினும், இந்த தீர்வு பதிப்பு 1.1 இலிருந்து நிராகரிக்கப்பட்டது மற்றும் இனி பயன்படுத்த பரிந்துரைக்கப்படவில்லை (எதிர்கால வெளியீடுகளில் இதற்கான ஆதரவு அகற்றப்படலாம்).

இந்த கட்டுரை டெவலப்பர்களால் பரிந்துரைக்கப்படும் கட்டிடக்கலை பற்றி விவாதிக்கும், இது தவறு சகிப்புத்தன்மை மற்றும் அளவிடுதல் ஆகியவற்றை வழங்குகிறது.

இணைப்பான் கட்டமைப்பு

மிக முக்கியமான மதிப்பு - தரவு - மாற்றங்களைக் கண்காணிக்கத் தொடங்க, நமக்குத் தேவை:

  1. தரவு மூலமானது, பதிப்பு 5.7, PostgreSQL 9.6+, MongoDB 3.2+ இலிருந்து MySQL ஆக இருக்கலாம் (முழுமையான பட்டியல்);
  2. அப்பாச்சி காஃப்கா கிளஸ்டர்;
  3. காஃப்கா இணைப்பு நிகழ்வு (பதிப்புகள் 1.x, 2.x);
  4. கட்டமைக்கப்பட்ட Debezium இணைப்பு.

முதல் இரண்டு புள்ளிகளில் வேலை செய்யுங்கள், அதாவது. DBMS மற்றும் Apache Kafka இன் நிறுவல் செயல்முறை கட்டுரையின் நோக்கத்திற்கு அப்பாற்பட்டது. இருப்பினும், சாண்ட்பாக்ஸில் அனைத்தையும் பயன்படுத்த விரும்புவோருக்கு, எடுத்துக்காட்டுகளுடன் கூடிய அதிகாரப்பூர்வ களஞ்சியத்தில் தயாராக உள்ளது docker-compose.yaml.

கடைசி இரண்டு புள்ளிகளில் நாம் இன்னும் விரிவாக வாழ்வோம்.

0. காஃப்கா இணைப்பு

இங்கே மேலும் கட்டுரையில், அனைத்து உள்ளமைவு எடுத்துக்காட்டுகளும் Debezium டெவலப்பர்களால் விநியோகிக்கப்படும் Docker படத்தின் பின்னணியில் விவாதிக்கப்படுகின்றன. இது தேவையான அனைத்து செருகுநிரல் கோப்புகளையும் (இணைப்பிகள்) கொண்டுள்ளது மற்றும் சூழல் மாறிகளைப் பயன்படுத்தி காஃப்கா இணைப்பின் உள்ளமைவை வழங்குகிறது.

நீங்கள் Confluent இலிருந்து Kafka Connect ஐப் பயன்படுத்த விரும்பினால், அதில் குறிப்பிடப்பட்டுள்ள கோப்பகத்தில் தேவையான இணைப்பிகளின் செருகுநிரல்களை நீங்கள் சுயாதீனமாகச் சேர்க்க வேண்டும். plugin.path அல்லது சூழல் மாறி மூலம் அமைக்கவும் CLASSPATH. காஃப்கா கனெக்ட் தொழிலாளி மற்றும் இணைப்பான்களுக்கான அமைப்புகள் உள்ளமைவு கோப்புகள் மூலம் தீர்மானிக்கப்படுகின்றன, அவை தொழிலாளர் வெளியீட்டு கட்டளைக்கு வாதங்களாக அனுப்பப்படுகின்றன. மேலும் விவரங்களுக்கு, பார்க்கவும் ஆவணங்கள்.

இணைப்பான் பதிப்பில் Debeizum அமைப்பதற்கான முழு செயல்முறையும் இரண்டு நிலைகளில் மேற்கொள்ளப்படுகிறது. அவை ஒவ்வொன்றையும் பார்ப்போம்:

1. காஃப்கா இணைப்பு கட்டமைப்பை அமைத்தல்

அப்பாச்சி காஃப்கா கிளஸ்டருக்கு தரவை ஸ்ட்ரீம் செய்ய, குறிப்பிட்ட அளவுருக்கள் காஃப்கா கனெக்ட் கட்டமைப்பில் அமைக்கப்பட்டுள்ளன, அவை:

  • கிளஸ்டருடன் இணைப்பதற்கான அளவுருக்கள்,
  • இணைப்பியின் உள்ளமைவு நேரடியாகச் சேமிக்கப்படும் தலைப்புகளின் பெயர்கள்,
  • இணைப்பான் இயங்கும் குழுவின் பெயர் (விநியோக முறை பயன்படுத்தப்பட்டால்).

திட்டத்தின் அதிகாரப்பூர்வ டோக்கர் படம் சூழல் மாறிகளைப் பயன்படுத்தி உள்ளமைவை ஆதரிக்கிறது - இதைத்தான் நாங்கள் பயன்படுத்துவோம். எனவே, படத்தைப் பதிவிறக்கவும்:

docker pull debezium/connect

இணைப்பியை இயக்குவதற்கு தேவையான குறைந்தபட்ச சூழல் மாறிகளின் தொகுப்பு பின்வருமாறு:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - கிளஸ்டர் உறுப்பினர்களின் முழுமையான பட்டியலைப் பெற காஃப்கா கிளஸ்டர் சேவையகங்களின் ஆரம்ப பட்டியல்;
  • OFFSET_STORAGE_TOPIC=connector-offsets - இணைப்பான் தற்போது அமைந்துள்ள நிலைகளை சேமிப்பதற்கான தலைப்பு;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - இணைப்பியின் நிலை மற்றும் அதன் பணிகளை சேமிப்பதற்கான தலைப்பு;
  • CONFIG_STORAGE_TOPIC=connector-config - இணைப்பான் உள்ளமைவு தரவு மற்றும் அதன் பணிகளை சேமிப்பதற்கான தலைப்பு;
  • GROUP_ID=1 - இணைப்பான் பணியை நிறைவேற்றக்கூடிய தொழிலாளர்களின் குழுவின் அடையாளங்காட்டி; விநியோகிக்கப்பட்ட பயன்படுத்தும் போது அவசியம் (விநியோகிக்கப்பட்டது) ஆட்சி.

இந்த மாறிகள் கொண்ட கொள்கலனை நாங்கள் தொடங்குகிறோம்:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

அவ்ரோ பற்றிய குறிப்பு

இயல்பாக, Debezium JSON வடிவத்தில் தரவை எழுதுகிறது, இது சாண்ட்பாக்ஸ்கள் மற்றும் சிறிய அளவிலான தரவுகளுக்கு ஏற்கத்தக்கது, ஆனால் அதிக ஏற்றப்பட்ட தரவுத்தளங்களில் சிக்கலாக இருக்கலாம். JSON மாற்றிக்கு மாற்றாக செய்திகளை வரிசைப்படுத்துவது அவ்ரோ பைனரி வடிவத்தில், இது அப்பாச்சி காஃப்காவில் உள்ள I/O துணை அமைப்பில் சுமையை குறைக்கிறது.

அவ்ரோவைப் பயன்படுத்த, நீங்கள் ஒரு தனித்தனியை வரிசைப்படுத்த வேண்டும் திட்டப் பதிவேடு (வரைபடங்களை சேமிப்பதற்காக). மாற்றிக்கான மாறிகள் இப்படி இருக்கும்:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

அவ்ரோவைப் பயன்படுத்துவது மற்றும் அதற்கான பதிவேட்டை அமைப்பது பற்றிய விவரங்கள் இந்தக் கட்டுரையின் எல்லைக்கு அப்பாற்பட்டவை - மேலும், தெளிவுக்காக, நாங்கள் JSON ஐப் பயன்படுத்துவோம்.

2. இணைப்பியையே கட்டமைத்தல்

இப்போது நீங்கள் நேரடியாக இணைப்பியின் உள்ளமைவுக்குச் செல்லலாம், இது மூலத்திலிருந்து தரவைப் படிக்கும்.

இரண்டு DBMSகளுக்கான இணைப்பிகளின் உதாரணத்தைப் பார்ப்போம்: PostgreSQL மற்றும் MongoDB, இதில் எனக்கு அனுபவம் உள்ளது மற்றும் அதில் வேறுபாடுகள் உள்ளன (சிறியதாக இருந்தாலும், சில சந்தர்ப்பங்களில் குறிப்பிடத்தக்கவை!).

உள்ளமைவு JSON குறியீட்டில் விவரிக்கப்பட்டுள்ளது மற்றும் POST கோரிக்கையைப் பயன்படுத்தி Kafka Connect இல் பதிவேற்றப்பட்டது.

2.1 PostgreSQL

PostgreSQL க்கான உதாரணம் இணைப்பான் கட்டமைப்பு:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

இந்த அமைப்பிற்குப் பிறகு இணைப்பியின் செயல்பாட்டின் கொள்கை மிகவும் எளிது:

  • முதல் முறையாக தொடங்கப்படும் போது, ​​அது உள்ளமைவில் குறிப்பிடப்பட்ட தரவுத்தளத்துடன் இணைக்கப்பட்டு பயன்முறையில் தொடங்குகிறது. ஆரம்ப ஸ்னாப்ஷாட், நிபந்தனையைப் பயன்படுத்தி பெறப்பட்ட தரவுகளின் ஆரம்ப தொகுப்பை காஃப்காவிற்கு அனுப்புகிறது SELECT * FROM table_name.
  • துவக்கம் முடிந்ததும், PostgreSQL WAL கோப்புகளிலிருந்து மாற்றங்களைப் படிக்க இணைப்பான் பயன்முறையில் செல்கிறது.

பயன்படுத்தப்படும் விருப்பங்களைப் பற்றி:

  • name - கீழே விவரிக்கப்பட்டுள்ள உள்ளமைவு பயன்படுத்தப்படும் இணைப்பியின் பெயர்; எதிர்காலத்தில், காஃப்கா கனெக்ட் REST API வழியாக இணைப்பாளருடன் (அதாவது, நிலையைப் பார்க்க/மறுதொடக்கம்/கட்டமைப்பைப் புதுப்பிக்கவும்) வேலை செய்ய இந்தப் பெயர் பயன்படுத்தப்படுகிறது;
  • connector.class - கட்டமைக்கப்பட்ட இணைப்பினால் பயன்படுத்தப்படும் DBMS இணைப்பு வகுப்பு;
  • plugin.name - WAL கோப்புகளிலிருந்து தரவின் தருக்க டிகோடிங்கிற்கான செருகுநிரலின் பெயர். தேர்வு செய்யக் கிடைக்கிறது wal2json, decoderbuffs и pgoutput. முதல் இரண்டு DBMS இல் பொருத்தமான நீட்டிப்புகளை நிறுவ வேண்டும், மற்றும் pgoutput PostgreSQL பதிப்பு 10 மற்றும் அதற்கு மேல் கூடுதல் கையாளுதல்கள் தேவையில்லை;
  • database.* — தரவுத்தளத்துடன் இணைப்பதற்கான விருப்பங்கள், எங்கே database.server.name - PostgreSQL நிகழ்வு பெயர் காஃப்கா கிளஸ்டரில் தலைப்புப் பெயரை உருவாக்கப் பயன்படுகிறது;
  • table.include.list - மாற்றங்களைக் கண்காணிக்க விரும்பும் அட்டவணைகளின் பட்டியல்; வடிவத்தில் குறிப்பிடப்பட்டுள்ளது schema.table_name; உடன் பயன்படுத்த முடியாது table.exclude.list;
  • heartbeat.interval.ms - இடைவெளி (மில்லி விநாடிகளில்) இணைப்பான் ஒரு சிறப்பு தலைப்புக்கு இதய துடிப்பு செய்திகளை அனுப்புகிறது;
  • heartbeat.action.query - ஒவ்வொரு இதயத்துடிப்பு செய்தியையும் அனுப்பும் போது செயல்படுத்தப்படும் கோரிக்கை (பதிப்பு 1.1 இல் தோன்றியது);
  • slot.name - இணைப்பாளரால் பயன்படுத்தப்படும் பிரதி ஸ்லாட்டின் பெயர்;
  • publication.name - பெயர் வெளியீடு இணைப்பான் பயன்படுத்தும் PostgreSQL இல். அது இல்லை என்றால், Debezium அதை உருவாக்க முயற்சிக்கும். இணைப்பு செய்யப்பட்ட பயனருக்கு இந்த செயலுக்கான போதுமான உரிமைகள் இல்லை என்றால், இணைப்பான் பிழையுடன் நிறுத்தப்படும்;
  • transforms இலக்கு தலைப்பின் பெயரை எவ்வாறு மாற்றுவது என்பதைத் தீர்மானிக்கிறது:
    • transforms.AddPrefix.type வழக்கமான வெளிப்பாடுகளைப் பயன்படுத்துவோம் என்பதைக் குறிக்கிறது;
    • transforms.AddPrefix.regex - இலக்கு தலைப்பின் பெயரை மறுவரையறை செய்யும் முகமூடி;
    • transforms.AddPrefix.replacement - நேரடியாக நாம் மறுவரையறை செய்கிறோம்.

இதயத் துடிப்பு மற்றும் உருமாற்றம் பற்றி மேலும்

முன்னிருப்பாக, ஒவ்வொரு உறுதியான பரிவர்த்தனைக்கும் இணைப்பான் காஃப்காவுக்குத் தரவை அனுப்புகிறது, மேலும் அதன் LSN (பதிவு வரிசை எண்) சேவைத் தலைப்பில் பதிவு செய்யப்படும். offset. முழு தரவுத்தளத்தையும் படிக்காமல், அதன் அட்டவணையின் ஒரு பகுதியை மட்டுமே (தரவு புதுப்பிப்புகள் அடிக்கடி நிகழாத) இணைப்பான் கட்டமைக்கப்பட்டால் என்ன நடக்கும்?

  • இணைப்பான் WAL கோப்புகளைப் படிக்கும் மற்றும் அது கண்காணிக்கும் அட்டவணையில் எந்தப் பரிவர்த்தனை செய்தாலும் அதைக் கண்டறியாது.
  • எனவே, இது தலைப்பில் அல்லது பிரதியிடல் ஸ்லாட்டில் அதன் தற்போதைய நிலையை புதுப்பிக்காது.
  • இதன் விளைவாக, WAL கோப்புகள் வட்டில் வைக்கப்படும் மற்றும் வட்டு இடம் இல்லாமல் போகும்.

இங்குதான் விருப்பங்கள் மீட்புக்கு வருகின்றன. heartbeat.interval.ms и heartbeat.action.query. இந்த விருப்பங்களை ஜோடிகளாகப் பயன்படுத்துவது ஒவ்வொரு முறை இதயத்துடிப்புச் செய்தி அனுப்பப்படும்போதும் ஒரு தனி அட்டவணையில் தரவை மாற்றுவதற்கான கோரிக்கையைச் செயல்படுத்துவதை சாத்தியமாக்குகிறது. எனவே, இணைப்பான் தற்போது அமைந்துள்ள LSN (பிரதி ஸ்லாட்டில்) தொடர்ந்து புதுப்பிக்கப்படுகிறது. இது DBMS ஐ இனி தேவைப்படாத WAL கோப்புகளை அகற்ற அனுமதிக்கிறது. விருப்பங்கள் எவ்வாறு செயல்படுகின்றன என்பது பற்றிய கூடுதல் தகவலுக்கு, பார்க்கவும் ஆவணங்கள்.

நெருக்கமான கவனத்திற்கு தகுதியான மற்றொரு விருப்பம் transforms. இது வசதி மற்றும் அழகு பற்றியது என்றாலும் ...

இயல்பாக, Debezium பின்வரும் பெயரிடும் கொள்கையைப் பயன்படுத்தி தலைப்புகளை உருவாக்குகிறது: serverName.schemaName.tableName. இது எப்போதும் வசதியாக இருக்காது. விருப்பங்கள் transforms அட்டவணைகளின் பட்டியலை வரையறுக்க வழக்கமான வெளிப்பாடுகளைப் பயன்படுத்தலாம், குறிப்பிட்ட பெயருடன் ஒரு தலைப்புக்கு அனுப்ப வேண்டிய நிகழ்வுகள்.

எங்கள் கட்டமைப்பில் நன்றி transforms பின்வருபவை நடக்கும்: கண்காணிக்கப்படும் தரவுத்தளத்திலிருந்து அனைத்து CDC நிகழ்வுகளும் பெயருடன் ஒரு தலைப்புக்கு செல்லும் data.cdc.dbname. இல்லையெனில் (இந்த அமைப்புகள் இல்லாமல்), Debezium முன்னிருப்பாக ஒவ்வொரு அட்டவணைக்கும் ஒரு தலைப்பை உருவாக்கும்: pg-dev.public.<table_name>.

இணைப்பான் வரம்புகள்

PostgreSQL க்கான இணைப்பான் உள்ளமைவின் விளக்கத்தை முடிக்க, அதன் செயல்பாட்டின் பின்வரும் அம்சங்கள்/வரம்புகளைப் பற்றி பேசுவது மதிப்பு:

  1. PostgreSQL க்கான இணைப்பியின் செயல்பாடு லாஜிக்கல் டிகோடிங் கருத்தைச் சார்ந்துள்ளது. எனவே அவர் தரவுத்தள கட்டமைப்பை மாற்றுவதற்கான கோரிக்கைகளை கண்காணிக்காது (DDL) - அதன்படி, இந்தத் தரவு தலைப்புகளில் இருக்காது.
  2. பிரதி இடங்கள் பயன்படுத்தப்படுவதால், இணைப்பியின் இணைப்பு சாத்தியமாகும் மட்டுமே முதன்மை DBMS உதாரணத்திற்கு.
  3. தரவுத்தளத்துடன் இணைப்பான் இணைக்கும் பயனருக்கு படிக்க-மட்டும் உரிமைகள் வழங்கப்பட்டால், முதல் வெளியீட்டிற்கு முன் நீங்கள் கைமுறையாக ஒரு பிரதி ஸ்லாட்டை உருவாக்கி தரவுத்தளத்தில் வெளியிட வேண்டும்.

உள்ளமைவைப் பயன்படுத்துதல்

எனவே, இணைப்பியில் எங்கள் உள்ளமைவை ஏற்றுவோம்:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

பதிவிறக்கம் வெற்றிகரமாக இருந்ததா என்பதை நாங்கள் சரிபார்க்கிறோம் மற்றும் இணைப்பு தொடங்கப்பட்டது:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

அருமை: இது அமைக்கப்பட்டு, செல்லத் தயாராக உள்ளது. இப்போது நாம் ஒரு நுகர்வோர் போல் நடித்து காஃப்காவுடன் இணைவோம், அதன் பிறகு அட்டவணையில் உள்ளீட்டைச் சேர்த்து மாற்றுவோம்:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

எங்கள் தலைப்பில் இது பின்வருமாறு காட்டப்படும்:

எங்கள் மாற்றங்களுடன் மிக நீண்ட JSON

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

இரண்டு சந்தர்ப்பங்களிலும், பதிவுகள் மாற்றப்பட்ட பதிவின் விசை (PK) மற்றும் மாற்றங்களின் சாராம்சம்: பதிவுக்கு முன் என்ன, பிறகு என்ன ஆனது.

  • வழக்கில் INSERT: முன் மதிப்பு (before) சமம் null, மற்றும் பின் - செருகப்பட்ட வரி.
  • வழக்கில் UPDATE: இல் payload.before வரியின் முந்தைய நிலை காட்டப்படும், மற்றும் payload.after - மாற்றங்களின் சாரத்துடன் புதியது.

2.2 மோங்கோடிபி

இந்த இணைப்பான் நிலையான மோங்கோடிபி ரெப்ளிகேஷன் பொறிமுறையைப் பயன்படுத்துகிறது, முதன்மை டிபிஎம்எஸ் நோட்டின் ஓப்லாக் மூலம் தகவல்களைப் படிக்கிறது.

PgSQL க்கான ஏற்கனவே விவரிக்கப்பட்ட இணைப்பியைப் போலவே, இங்கேயும், முதல் தொடக்கத்தில், முதன்மை தரவு ஸ்னாப்ஷாட் எடுக்கப்பட்டது, அதன் பிறகு இணைப்பான் ஓப்லாக் வாசிப்பு முறைக்கு மாறுகிறது.

கட்டமைப்பு உதாரணம்:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

நீங்கள் பார்க்க முடியும் என, முந்தைய உதாரணத்துடன் ஒப்பிடும்போது புதிய விருப்பங்கள் எதுவும் இல்லை, ஆனால் தரவுத்தளத்துடன் இணைப்பதற்கும் அவற்றின் முன்னொட்டுகளுக்கும் பொறுப்பான விருப்பங்களின் எண்ணிக்கை மட்டுமே குறைக்கப்பட்டுள்ளது.

அமைப்புகளை transforms இந்த நேரத்தில் அவர்கள் பின்வருவனவற்றைச் செய்கிறார்கள்: அவர்கள் இலக்கு தலைப்பின் பெயரை திட்டத்திலிருந்து மாற்றுகிறார்கள் <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

தவறு சகிப்புத்தன்மை

நம் காலத்தில் தவறு சகிப்புத்தன்மை மற்றும் அதிக கிடைக்கும் தன்மை ஆகியவை முன்னெப்போதையும் விட மிகவும் கடுமையானது - குறிப்பாக நாங்கள் தரவு மற்றும் பரிவர்த்தனைகளைப் பற்றி பேசும்போது, ​​​​தரவு மாற்றங்களைக் கண்காணிப்பது இந்த சிக்கலில் ஒதுங்கி நிற்காது. கொள்கையளவில் என்ன தவறு நடக்கலாம் மற்றும் ஒவ்வொரு விஷயத்திலும் Debezium க்கு என்ன நடக்கும் என்பதைப் பார்ப்போம்.

மூன்று விலகல் விருப்பங்கள் உள்ளன:

  1. காஃப்கா இணைப்பு தோல்வி. விநியோகிக்கப்பட்ட பயன்முறையில் வேலை செய்ய இணைப்பு உள்ளமைக்கப்பட்டிருந்தால், ஒரே group.id ஐ அமைக்க பல பணியாளர்கள் தேவை. பின்னர், அவற்றில் ஒன்று தோல்வியுற்றால், இணைப்பான் மற்றொரு பணியாளருக்கு மறுதொடக்கம் செய்யப்பட்டு, காஃப்காவில் உள்ள தலைப்பில் கடைசியாக உறுதிசெய்யப்பட்ட நிலையில் இருந்து தொடர்ந்து படிக்கும்.
  2. காஃப்கா கிளஸ்டருடன் இணைப்பு இழப்பு. இணைப்பான் காஃப்காவிற்கு அனுப்பத் தவறிய நிலையில் படிப்பதை நிறுத்திவிடும், மேலும் முயற்சி வெற்றிபெறும் வரை அவ்வப்போது அதை மீண்டும் அனுப்ப முயற்சிக்கும்.
  3. தரவு ஆதாரம் கிடைக்கவில்லை. இணைப்பான் கட்டமைக்கப்பட்ட மூலத்துடன் மீண்டும் இணைக்க முயற்சிக்கும். இயல்புநிலை 16 முயற்சிகளைப் பயன்படுத்துகிறது அதிவேக பின்னடைவு. 16 ஆம் தேதி தோல்வியுற்ற பிறகு, பணி குறிக்கப்படும் தோல்வி நீங்கள் அதை Kafka Connect REST இடைமுகம் வழியாக கைமுறையாக மறுதொடக்கம் செய்ய வேண்டும்.
    • வழக்கில் போஸ்ட்கெரே தரவு இழக்கப்படாது, ஏனெனில் பிரதியிடல் ஸ்லாட்டுகளைப் பயன்படுத்துவது இணைப்பாளரால் படிக்கப்படாத WAL கோப்புகளை நீக்குவதைத் தடுக்கும். இந்த வழக்கில், நாணயத்திற்கு ஒரு எதிர்மறையும் உள்ளது: இணைப்பான் மற்றும் DBMS க்கு இடையிலான பிணைய இணைப்பு நீண்ட காலத்திற்கு இடையூறு ஏற்பட்டால், வட்டு இடம் தீர்ந்துவிடும் வாய்ப்பு உள்ளது, மேலும் இது தோல்விக்கு வழிவகுக்கும். முழு DBMS.
    • வழக்கில் MySQL, binlog கோப்புகளை DBMS மூலம் சுழற்ற முடியும். இது இணைப்பான் தோல்வியடைந்த நிலைக்குச் செல்லும், மேலும் இயல்பான செயல்பாட்டை மீட்டெடுக்க, பின்லாக்களிலிருந்து தொடர்ந்து படிக்க ஆரம்ப ஸ்னாப்ஷாட் பயன்முறையில் நீங்கள் மறுதொடக்கம் செய்ய வேண்டும்.
    • பற்றி MongoDB. ஆவணம் கூறுகிறது: லாக்/ஒப்லாக் கோப்புகள் நீக்கப்பட்டால் இணைப்பாளரின் நடத்தை மற்றும் இணைப்பான் அதை விட்ட இடத்தில் இருந்து படிக்க முடியாது. இணைப்பான் மாநிலத்திற்குள் செல்லும் என்று அர்த்தம் தோல்வி மற்றும் பயன்முறையில் மறுதொடக்கம் தேவைப்படும் ஆரம்ப ஸ்னாப்ஷாட்.

      இருப்பினும், விதிவிலக்குகள் உள்ளன. இணைப்பான் நீண்ட நேரம் துண்டிக்கப்பட்டிருந்தால் (அல்லது மோங்கோடிபி நிகழ்வை அடைய முடியவில்லை), மற்றும் இந்த நேரத்தில் ஓப்லாக் சுழற்சியின் வழியாக சென்றால், இணைப்பு மீட்டமைக்கப்படும் போது, ​​இணைப்பான் அமைதியாக இருக்கும் முதல் நிலையிலிருந்து தரவைப் படிக்கும், அதனால்தான் காஃப்காவில் உள்ள சில தரவுகள் இல்லை அடிப்பார்கள்.

முடிவுக்கு

Debezium CDC அமைப்புகளுடன் எனது முதல் அனுபவம் மற்றும் ஒட்டுமொத்தமாக மிகவும் நேர்மறையானது. முக்கிய டிபிஎம்எஸ்கள், எளிதாக உள்ளமைவு, கிளஸ்டரிங் ஆதரவு மற்றும் செயலில் உள்ள சமூகம் ஆகியவற்றிற்கான ஆதரவுடன் இந்த திட்டம் வெற்றி பெற்றது. நடைமுறையில் ஆர்வமுள்ளவர்கள், அதற்கான வழிகாட்டிகளைப் படிக்குமாறு நான் பரிந்துரைக்கிறேன் காஃப்கா இணைப்பு и டெபேசியம்.

காஃப்கா கனெக்டிற்கான ஜேடிபிசி இணைப்பியுடன் ஒப்பிடும்போது, ​​டிபிஎம்எஸ் பதிவுகளிலிருந்து மாற்றங்கள் படிக்கப்படுவதே Debezium இன் முக்கிய நன்மையாகும், இது குறைந்தபட்ச தாமதத்துடன் தரவைப் பெற அனுமதிக்கிறது. JDBC Connector (Kafka Connect இலிருந்து) ஒரு நிலையான இடைவெளியில் கண்காணிக்கப்படும் அட்டவணையை வினவுகிறது மேலும் (அதே காரணத்திற்காக) தரவு நீக்கப்படும் போது செய்திகளை உருவாக்காது (இல்லாத தரவை நீங்கள் எப்படி வினவலாம்?).

இதே போன்ற சிக்கல்களைத் தீர்க்க, பின்வரும் தீர்வுகளுக்கு நீங்கள் கவனம் செலுத்தலாம் (Debezium கூடுதலாக):

சோசலிஸ்ட் கட்சி

எங்கள் வலைப்பதிவிலும் படிக்கவும்:

ஆதாரம்: www.habr.com

கருத்தைச் சேர்