Debezium በማስተዋወቅ ላይ - CDC ለ Apache Kafka

Debezium በማስተዋወቅ ላይ - CDC ለ Apache Kafka

በስራዬ ውስጥ ፣ ብዙ ጊዜ አዳዲስ ቴክኒካዊ መፍትሄዎች / የሶፍትዌር ምርቶች ያጋጥሙኛል ፣ ስለ ሩሲያኛ ተናጋሪው በይነመረብ በጣም አናሳ የሆነ መረጃ። በዚህ መጣጥፍ፣ ደብዚየምን በመጠቀም የሲዲሲ ዝግጅቶችን ከሁለት ታዋቂ ዲቢኤምኤስ (PostgreSQL እና MongoDB) ወደ ካፍ ክላስተር መላክ ሲያስፈልገኝ ከቅርብ ጊዜ ልምዴ ምሳሌ ጋር አንድ ክፍተት ለመሙላት እሞክራለሁ። በተሠራው ሥራ ምክንያት የወጣው ይህ የግምገማ ጽሑፍ ለሌሎች ጠቃሚ እንደሚሆን ተስፋ አደርጋለሁ።

Debezium እና CDC በአጠቃላይ ምንድነው?

ደበዚየም - የሲዲሲ ሶፍትዌር ምድብ ተወካይ (የውሂብ ለውጥ ያንሱ), ወይም የበለጠ በትክክል, ከ Apache Kafka Connect ማእቀፍ ጋር የሚጣጣሙ ለተለያዩ ዲቢኤምኤስዎች ማገናኛዎች ስብስብ ነው.

ይህ ክፍት ምንጭ ፕሮጀክት ፣ በApache License v2.0 ፈቃድ ያለው እና በ Red Hat ስፖንሰር የተደረገ። ልማት ከ 2016 ጀምሮ እየተካሄደ ነው እና በአሁኑ ጊዜ ለሚከተሉት ዲቢኤምኤስ ይፋዊ ድጋፍ ይሰጣል፡ MySQL፣ PostgreSQL፣ MongoDB፣ SQL Server። እንዲሁም ለካሳንድራ እና ኦራክል ማገናኛዎች አሉ ነገርግን በአሁኑ ጊዜ በ"ቅድመ መዳረሻ" ሁኔታ ላይ ናቸው እና አዲስ የተለቀቁት የኋላ ተኳሃኝነት ዋስትና አይሰጡም።

ሲዲሲን ከተለምዷዊ አቀራረብ ጋር ካነጻጸርን (መተግበሪያው ከዲቢኤምኤስ በቀጥታ መረጃ ሲያነብ) ዋና ጥቅሞቹ ዝቅተኛ መዘግየት፣ ከፍተኛ አስተማማኝነት እና ተገኝነት በረድፍ ደረጃ የውሂብ ለውጥ ዥረት መተግበርን ያጠቃልላል። የመጨረሻዎቹ ሁለት ነጥቦች የካፍካ ክላስተር እንደ ሲዲሲ ክስተቶች ማከማቻነት በመጠቀም የተገኙ ናቸው።

እንዲሁም ፣ ጥቅሞቹ አንድ ነጠላ ሞዴል ክስተቶችን ለማከማቸት ጥቅም ላይ የሚውሉ መሆናቸው ያካትታል ፣ ስለሆነም የመጨረሻው መተግበሪያ የተለያዩ ዲቢኤምኤስን ስለመሥራት ልዩነቶች መጨነቅ የለበትም።

በመጨረሻም የመልእክት ደላላን መጠቀም በውሂብ ላይ የሚደረጉ ለውጦችን የሚከታተሉ አፕሊኬሽኖችን በአግድም ለመለካት ወሰን ይከፍታል። በተመሳሳይ ጊዜ መረጃ ከዲቢኤምኤስ በቀጥታ ሳይሆን ከካፍካ ክላስተር ስለሚደርሰው በመረጃው ምንጭ ላይ ያለው ተጽእኖ ይቀንሳል።

ስለ ደበዚየም አርክቴክቸር

Debezium ን መጠቀም ወደዚህ ቀላል እቅድ ይመጣል፡-

DBMS (እንደ የውሂብ ምንጭ) → ማገናኛ በካፍካ አገናኝ → Apache Kafka → ሸማች

እንደ ምሳሌ፣ ከፕሮጀክቱ ድህረ ገጽ ላይ ሥዕላዊ መግለጫ እሰጣለሁ፡-

Debezium በማስተዋወቅ ላይ - CDC ለ Apache Kafka

ሆኖም ፣ ይህንን እቅድ አልወደውም ፣ ምክንያቱም የውሃ ማጠቢያ ማገናኛ ብቻ የሚቻል ይመስላል።

እንደ እውነቱ ከሆነ, ሁኔታው ​​​​የተለየ ነው: የእርስዎን የውሂብ ሐይቅ መሙላት (ከላይ ባለው ሥዕል ላይ የመጨረሻው አገናኝ) Debezium ለመጠቀም ብቸኛው መንገድ አይደለም. ወደ Apache Kafka የተላኩ ክስተቶች የተለያዩ ሁኔታዎችን ለመፍታት በመተግበሪያዎችዎ መጠቀም ይችላሉ። ለምሳሌ:

  • አግባብነት የሌለውን መረጃ ከመሸጎጫው ውስጥ ማስወገድ;
  • ማሳወቂያዎችን መላክ;
  • የፍለጋ ኢንዴክስ ዝመናዎች;
  • አንዳንድ ዓይነት የኦዲት መዝገቦች;
  • ...

የጃቫ አፕሊኬሽን ካለህ እና የካፍካ ክላስተር ለመጠቀም ካላስፈለገህ የመሥራት እድልም አለ የተከተተ ማገናኛ. ግልጽ የሆነ ተጨማሪው በእሱ አማካኝነት ተጨማሪ መሠረተ ልማት (በግንኙነት እና በካፍካ መልክ) እምቢ ማለት ነው. ነገር ግን፣ ይህ መፍትሔ ከስሪት 1.1 ጀምሮ ተቋርጧል እና ከአሁን በኋላ ጥቅም ላይ እንዲውል አይመከርም (ወደፊት በሚለቀቁት ሊወገድ ይችላል)።

ይህ መጣጥፍ በገንቢዎች የተመከረውን የስነ-ህንፃ ንድፍ ያብራራል።

የማገናኛ ውቅር

በጣም አስፈላጊ በሆነው እሴት ውስጥ ለውጦችን መከታተል ለመጀመር - ውሂብ - እኛ እንፈልጋለን

  1. የውሂብ ምንጭ፣ ከስሪት 5.7፣ PostgreSQL 9.6+፣ MongoDB 3.2+ የሚጀምር MySQL ሊሆን ይችላል (የተሟላ ዝርዝር።);
  2. Apache Kafka ዘለላ
  3. የካፍካ አገናኝ ምሳሌ (ስሪቶች 1.x, 2.x);
  4. የተዋቀረ Debezium አያያዥ.

በመጀመሪያዎቹ ሁለት ነጥቦች ላይ ይስሩ, ማለትም. DBMS እና Apache Kafka የመጫን ሂደት ከጽሁፉ ወሰን በላይ ናቸው። ሆኖም ግን, ሁሉንም ነገር በአሸዋ ሳጥን ውስጥ ለማሰማራት ለሚፈልጉ, በኦፊሴላዊው ማከማቻ ውስጥ ምሳሌዎችን የያዘ ዝግጁ የሆነ አለ. ዶከር-አቀናብር.yaml.

በመጨረሻዎቹ ሁለት ነጥቦች ላይ በዝርዝር እናተኩራለን።

0. የካፍካ ግንኙነት

እዚህ እና በኋላ በአንቀጹ ውስጥ ሁሉም የማዋቀሪያ ምሳሌዎች በዲቤዚየም ገንቢዎች በተሰራጨው የዶከር ምስል አውድ ውስጥ ተወስደዋል። ሁሉንም አስፈላጊ ተሰኪ ፋይሎች (ማገናኛዎች) ይይዛል እና የአካባቢ ተለዋዋጮችን በመጠቀም የካፍካ አገናኝ ውቅር ያቀርባል።

የ Kafka Connect from Confluent ለመጠቀም ካሰቡ አስፈላጊ የሆኑትን ማገናኛዎች ፕለጊን እራስዎ በተገለጸው ማውጫ ላይ ማከል ያስፈልግዎታል plugin.path ወይም በአካባቢ ተለዋዋጭ በኩል ተዘጋጅቷል CLASSPATH. የካፍካ አገናኝ ሰራተኛ እና ማገናኛዎች ቅንጅቶች የሚገለጹት ለሠራተኛው ጅምር ትእዛዝ እንደ ክርክር በሚተላለፉ የውቅረት ፋይሎች ነው። ለዝርዝሩ ይመልከቱ ሰነድ.

በአገናኝ ሥሪት ውስጥ Debeizum ን የማዘጋጀት አጠቃላይ ሂደት በሁለት ደረጃዎች ይከናወናል። እያንዳንዳቸውን እንመልከታቸው፡-

1. የካፍካ ማገናኛ ማእቀፍ ማዘጋጀት

ውሂብን ወደ Apache Kafka ክላስተር ለማሰራጨት የተወሰኑ መለኪያዎች በካፍካ ማገናኛ ማዕቀፍ ውስጥ ተቀምጠዋል፣ ለምሳሌ፡-

  • የክላስተር ግንኙነት ቅንብሮች ፣
  • የማገናኛው ውቅር ልሹ የሚከማችባቸው የርዕሶች ስሞች ፣
  • ማገናኛው የሚሰራበት የቡድኑ ስም (የተከፋፈለ ሁነታን በሚጠቀሙበት ጊዜ).

የፕሮጀክቱ ኦፊሴላዊ Docker ምስል የአካባቢ ተለዋዋጮችን በመጠቀም ውቅረትን ይደግፋል - እኛ የምንጠቀመው ይህንን ነው። ስለዚህ ምስሉን እናውርድ፡-

docker pull debezium/connect

ማገናኛውን ለማሄድ የሚያስፈልገው ዝቅተኛው የአካባቢ ተለዋዋጮች ስብስብ እንደሚከተለው ነው።

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - የተሟላ የክላስተር አባላትን ዝርዝር ለማግኘት የካፍካ ክላስተር አገልጋዮች የመጀመሪያ ዝርዝር;
  • OFFSET_STORAGE_TOPIC=connector-offsets - ማገናኛው በአሁኑ ጊዜ የሚገኝበትን ቦታዎችን ለማከማቸት ርዕስ;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - የግንኙነት ሁኔታን እና ተግባሮቹን ለማከማቸት ርዕስ;
  • CONFIG_STORAGE_TOPIC=connector-config - የአገናኝ ውቅር ውሂብን እና ተግባሮቹን ለማከማቸት ርዕስ;
  • GROUP_ID=1 - የማገናኛው ተግባር የሚከናወንበት የሰራተኞች ቡድን መለያ; ሲሰራጭ ያስፈልጋል (የተከፋፈለ) አገዛዝ።

መያዣውን በሚከተሉት ተለዋዋጮች እንጀምራለን-

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

ስለ አቭሮ ማስታወሻ

በነባሪ፣ Debezium ውሂብን በJSON ቅርጸት ይጽፋል፣ ይህም ለማጠሪያ ሳጥኖች እና አነስተኛ መጠን ያለው ውሂብ ተቀባይነት ያለው፣ ነገር ግን በጣም በተጫኑ የውሂብ ጎታዎች ውስጥ ችግር ሊሆን ይችላል። ከJSON መቀየሪያ ሌላ አማራጭ መልእክቶችን በመጠቀም ተከታታይ ማድረግ ነው። Avro በአፓቼ ካፍካ ውስጥ በ I / O ንዑስ ስርዓት ላይ ያለውን ጭነት የሚቀንስ ወደ ሁለትዮሽ ቅርጸት።

Avroን ለመጠቀም የተለየ ማሰማራት ያስፈልግዎታል schema-መዝገብ ቤት (መርሃግብሮችን ለማከማቸት). የመቀየሪያው ተለዋዋጮች ይህን ይመስላል።

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Avroን ስለመጠቀም እና ለእሱ መዝገብ ስለማዘጋጀት ዝርዝሮች ከጽሁፉ ወሰን በላይ ናቸው - የበለጠ ግልፅ ለማድረግ JSON ን እንጠቀማለን።

2. ማገናኛውን በራሱ ማዘጋጀት

አሁን በቀጥታ ወደ ማገናኛው ራሱ ውቅር መሄድ ይችላሉ, ይህም ከምንጩ ላይ መረጃን ያነባል.

ለሁለት ዲቢኤምኤስ የማገናኛዎች ምሳሌ እንመልከት፡ PostgreSQL እና MongoDB፣ ለዚህም ልምድ አለኝ እና ለነሱም ልዩነቶች አሉ (ምንም እንኳን ትንሽ ቢሆንም በአንዳንድ ሁኔታዎች ግን አስፈላጊ ነው!)።

ውቅሩ በJSON ማስታወሻ ውስጥ ተገልጿል እና የPOST ጥያቄን በመጠቀም ወደ Kafka Connect ተሰቅሏል።

2.1. PostgreSQL

የPostgreSQL ምሳሌ አያያዥ ውቅር፡

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

ከዚህ ውቅር በኋላ የግንኙነት መርህ በጣም ቀላል ነው-

  • በመጀመሪያው ጅምር ላይ, በማዋቀሪያው ውስጥ ከተጠቀሰው የውሂብ ጎታ ጋር ይገናኛል እና በ ሁነታ ይጀምራል የመጀመሪያ ቅጽበታዊ እይታከቅድመ ሁኔታው ​​ጋር የተቀበለውን የመጀመሪያ የውሂብ ስብስብ ለካፍካ በመላክ ላይ SELECT * FROM table_name.
  • ጅምር ከተጠናቀቀ በኋላ ማገናኛ ከ PostgreSQL WAL ፋይሎች ለውጦችን የማንበብ ዘዴን ያስገባል።

ስለተጠቀሙባቸው አማራጮች፡-

  • name - ከዚህ በታች የተገለጸው ውቅር ጥቅም ላይ የሚውልበት የማገናኛ ስም; ለወደፊቱ ይህ ስም ከማገናኛ ጋር አብሮ ለመስራት ጥቅም ላይ ይውላል (ማለትም ሁኔታውን ይመልከቱ / እንደገና ያስጀምሩ / አወቃቀሩን ያዘምኑ) በ Kafka Connect REST API;
  • connector.class - በተዋቀረው ማገናኛ ጥቅም ላይ የሚውለው የ DBMS ማገናኛ ክፍል;
  • plugin.name ከWAL ፋይሎች መረጃን ሎጂካዊ ዲኮዲንግ ለማድረግ የተሰኪው ስም ነው። ለመምረጥ ይገኛል። wal2json, decoderbuffs и pgoutput. የመጀመሪያዎቹ ሁለቱ በዲቢኤምኤስ ውስጥ ተገቢውን ቅጥያ መጫን ያስፈልጋቸዋል, እና pgoutput ለ PostgreSQL ስሪት 10 እና ከዚያ በላይ ተጨማሪ ማጭበርበሮችን አያስፈልግም;
  • database.* - ከመረጃ ቋቱ ጋር የመገናኘት አማራጮች ፣ የት database.server.name - በካፍካ ክላስተር ውስጥ የርዕሱን ስም ለመመስረት ጥቅም ላይ የዋለው የ PostgreSQL ምሳሌ ስም;
  • table.include.list - ለውጦችን ለመከታተል የምንፈልግባቸው የጠረጴዛዎች ዝርዝር; በቅርጸት ተሰጥቷል schema.table_name; ጋር አብሮ መጠቀም አይቻልም table.exclude.list;
  • heartbeat.interval.ms - ማገናኛው የልብ ምት መልዕክቶችን ወደ ልዩ ርዕስ የሚልክበት ክፍተት (በሚሊሰከንዶች);
  • heartbeat.action.query - እያንዳንዱን የልብ ምት መልእክት ሲላክ የሚተገበር ጥያቄ (አማራጩ ከስሪት 1.1 ጀምሮ ታይቷል);
  • slot.name - በማገናኛ ጥቅም ላይ የሚውለው የማባዛት ማስገቢያ ስም;
  • publication.name - ስም ጽሑፎች ማገናኛው በሚጠቀመው PostgreSQL ውስጥ። ከሌለ ደበዚየም ለመፍጠር ይሞክራል። ግንኙነቱ የተደረገበት ተጠቃሚ ለዚህ ድርጊት በቂ መብቶች ከሌሉት, ማገናኛው በስህተት ይወጣል;
  • transforms የታለመውን ርዕስ ስም በትክክል እንዴት መቀየር እንደሚቻል ይወስናል፡-
    • transforms.AddPrefix.type መደበኛ መግለጫዎችን እንደምንጠቀም ይጠቁማል;
    • transforms.AddPrefix.regex - የታለመው ርዕስ ስም እንደገና የሚገለጽበት ጭምብል;
    • transforms.AddPrefix.replacement - በቀጥታ የምንገልጸው.

ስለ የልብ ምት እና ለውጦች ተጨማሪ

በነባሪነት ማገናኛው ለእያንዳንዱ የተፈጸመ ግብይት ውሂብ ወደ ካፍ ይልካል እና LSN (Log Sequence Number) በአገልግሎት ርዕስ ላይ ይጽፋል offset. ግን ማገናኛው ሙሉውን ዳታቤዝ ሳይሆን እንዲያነብ ከተዋቀረ ምን ይሆናል የሠንጠረዦቹ ክፍል ብቻ (በየትኛው ውሂቡ ብዙም ያልዘመነው)?

  • ማገናኛው የWAL ፋይሎችን ያነባል እና በውስጣቸው ያለውን ግብይት ወደ ሚከታተላቸው ጠረጴዛዎች አያገኝም።
  • ስለዚህ፣ በርዕሱም ሆነ በማባዛት ማስገቢያ ውስጥ አሁን ያለውን ቦታ አያዘምንም።
  • ይህ ደግሞ የWAL ፋይሎች በዲስክ ላይ "እንዲጣበቁ" እና የዲስክ ቦታ ሊያልቅባቸው ይችላል።

እና እዚህ አማራጮች ለማዳን ይመጣሉ. heartbeat.interval.ms и heartbeat.action.query. እነዚህን አማራጮች በጥንድ መጠቀም የልብ ምት መልእክት በተላከ ቁጥር በተለየ ሠንጠረዥ ውስጥ ያለውን መረጃ የመቀየር ጥያቄን ለማስፈጸም ያስችላል። ስለዚህ, ማገናኛው በአሁኑ ጊዜ የሚገኝበት LSN (በማባዛት ማስገቢያ ውስጥ) ያለማቋረጥ ይሻሻላል. ይህ DBMS ከአሁን በኋላ የማያስፈልጉትን የWAL ፋይሎች እንዲያስወግድ ያስችለዋል። አማራጮች እንዴት እንደሚሠሩ የበለጠ መረጃ ለማግኘት ይመልከቱ ሰነድ.

የበለጠ ትኩረት ሊሰጠው የሚገባው ሌላው አማራጭ ነው transforms. ምንም እንኳን ስለ ምቾት እና ውበት የበለጠ ቢሆንም ...

በነባሪ፣ Debezium የሚከተለውን የስም ፖሊሲ በመጠቀም ርዕሶችን ይፈጥራል፡- serverName.schemaName.tableName. ይህ ሁልጊዜ ምቹ ላይሆን ይችላል. አማራጮች transforms መደበኛ አገላለጾችን በመጠቀም ክስተቶቻቸው የተወሰነ ስም ወዳለው ርዕስ መምራት ያለባቸውን የሰንጠረዦች ዝርዝር መግለጽ ይችላሉ።

በእኛ ውቅር እናመሰግናለን transforms የሚከተሉት ይከናወናሉ፡ ሁሉም ከተከታተለው ዳታቤዝ የሚመጡ የሲዲሲ ክስተቶች በስሙ ወደ ርዕስ ይሄዳሉ data.cdc.dbname. ያለበለዚያ (ያለ እነዚህ ቅንብሮች) Debezium በነባሪነት ለእያንዳንዱ የቅጹ ሠንጠረዥ ርዕስ ይፈጥራል፡ pg-dev.public.<table_name>.

የግንኙነት ገደቦች

ለ PostgreSQL አያያዥ ውቅር መግለጫ መጨረሻ ላይ ስለ ሥራው ባህሪዎች / ገደቦች ማውራት ጠቃሚ ነው-

  1. የ PostgreSQL አያያዥ ተግባር በሎጂክ ዲኮዲንግ ጽንሰ-ሀሳብ ላይ የተመሠረተ ነው። ስለዚህም እሱ የውሂብ ጎታውን መዋቅር ለመለወጥ ጥያቄዎችን አይከታተልም (ዲኤልኤል) - በዚህ መሠረት, ይህ ውሂብ በርዕሶች ውስጥ አይሆንም.
  2. የማባዛት ክፍተቶች ጥቅም ላይ ስለሚውሉ, የማገናኛው ግንኙነት ይቻላል ብቻ ወደ ዋና DBMS ምሳሌ።
  3. ማገናኛው ከመረጃ ቋቱ ጋር የሚገናኝበት ተጠቃሚ የማንበብ ብቻ መብቶች ካሉት፣ ከመጀመሪያው ጅምር በፊት እራስዎ የማባዛት ማስገቢያ መፍጠር እና ወደ ዳታቤዝ ማተም ያስፈልግዎታል።

ውቅረትን በመተግበር ላይ

ስለዚህ የእኛን ውቅረት ወደ ማገናኛ ውስጥ እንጫን፡-

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

ማውረዱ የተሳካ መሆኑን እና ማገናኛው መጀመሩን እናረጋግጣለን።

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

በጣም ጥሩ፡ ተዘጋጅቶ ለመሄድ ዝግጁ ነው። አሁን ሸማች መስለን ከካፍካ ጋር እንገናኝ ከዛ በኋላ በሰንጠረዡ ላይ ግቤት ጨምረን እንለውጣለን።

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

በእኛ ርዕስ ውስጥ, ይህ እንደሚከተለው ይታያል.

በጣም ረጅም JSON ከለውጦቻችን ጋር

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

በሁለቱም ሁኔታዎች መዝገቦቹ የተቀየረውን የመዝገቡ ቁልፍ (ፒኬ) እና የለውጦቹን ይዘት ያካትታሉ፡ መዝገቡ ከዚህ በፊት ምን እንደነበረ እና በኋላ ምን እንደ ሆነ።

  • በዚህ ረገድ INSERTዋጋ በፊትbefore) እኩል ነው። nullየገባው ሕብረቁምፊ ተከትሎ.
  • በዚህ ረገድ UPDATE: በ payload.before የረድፉ ቀዳሚው ሁኔታ ይታያል እና በ ውስጥ payload.after - ከለውጥ ይዘት ጋር አዲስ።

2.2 MongoDB

ይህ ማገናኛ ከዲቢኤምኤስ የመጀመሪያ ደረጃ መስቀለኛ መንገድ ኦፕሎግ ላይ መረጃን በማንበብ መደበኛውን MongoDB የማባዛት ዘዴን ይጠቀማል።

በተመሳሳይ ለ PgSQL ቀደም ሲል ከተገለጸው ማገናኛ ጋር፣ እዚህም እንዲሁ፣ በመጀመሪያው ጅምር ላይ ዋናው የውሂብ ቅጽበታዊ ገጽ እይታ ይወሰዳል፣ ከዚያ በኋላ ማገናኛው ወደ ኦፕሎግ ንባብ ሁነታ ይቀየራል።

የማዋቀር ምሳሌ፡-

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

እንደሚመለከቱት ፣ ከቀዳሚው ምሳሌ ጋር ሲነፃፀሩ ምንም አዲስ አማራጮች የሉም ፣ ግን ከመረጃ ቋቱ ጋር ለመገናኘት ኃላፊነት ያላቸው የአማራጮች ብዛት እና ቅድመ ቅጥያዎቻቸው ቀንሰዋል።

ቅንብሮች transforms በዚህ ጊዜ የሚከተሉትን ያደርጋሉ: የታለመውን ርዕስ ስም ከመርሃግብሩ ላይ ያዙሩት <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

ስህተትን መታገስ

በዘመናችን የስህተት መቻቻል እና ከፍተኛ አቅርቦት ጉዳይ ከመቼውም ጊዜ በበለጠ አሳሳቢ ነው - በተለይም ስለ መረጃ እና ግብይቶች ስናወራ እና የውሂብ ለውጥን መከታተል በዚህ ጉዳይ ላይ ከጎን አይደለም ። በመርህ ደረጃ ስህተት የሚሆነውን እና በእያንዳንዱ ጉዳይ ላይ ደበዚየም ምን እንደሚሆን እንይ።

ሶስት የመርጦ መውጣት አማራጮች አሉ፡-

  1. የካፍካ ግንኙነት አለመሳካት።. Connect በተከፋፈለ ሁነታ እንዲሰራ ከተዋቀረ ይህ ተመሳሳይ group.id ለማዘጋጀት ብዙ ሰራተኞችን ይፈልጋል። ከዚያም ከመካከላቸው አንዱ ካልተሳካ, ማገናኛው በሌላኛው ሰራተኛ ላይ እንደገና ይጀመራል እና በካፍካ ውስጥ ባለው ርዕስ ውስጥ ከመጨረሻው ቁርጠኛ አቋም ማንበብ ይቀጥላል.
  2. ከካፍካ ክላስተር ጋር ያለው ግንኙነት ማጣት. ማገናኛው በቀላሉ ወደ ካፍካ መላክ ያልቻለውን ቦታ ማንበብ ያቆማል እና ሙከራው እስኪሳካ ድረስ በየጊዜው እንደገና ለመላክ ይሞክራል።
  3. የውሂብ ምንጭ አይገኝም. ማገናኛው እንደ አወቃቀሩ መሰረት ከምንጩ ጋር እንደገና ለመገናኘት ይሞክራል። ነባሪው 16 ሙከራዎችን መጠቀም ነው። ገላጭ ጀርባ. ከ 16 ኛው ያልተሳካ ሙከራ በኋላ, ተግባሩ እንደ ምልክት ይደረግበታል አልተሳካም እና በካፍካ አገናኝ REST በይነገጽ በኩል እራስዎ እንደገና መጀመር ያስፈልገዋል.
    • በዚህ ረገድ PostgreSQL ውሂብ አይጠፋም, ምክንያቱም የማባዛት ክፍተቶችን መጠቀም በኮንክተሩ ያልተነበቡ የWAL ፋይሎች እንዳይሰረዙ ይከላከላል። በዚህ ሁኔታ, አሉታዊ ጎኖች አሉ-በማገናኛ እና በዲቢኤምኤስ መካከል ያለው የአውታረ መረብ ግንኙነት ለረጅም ጊዜ ከተስተጓጎለ, የዲስክ ቦታው ሊያልቅ የሚችልበት እድል አለ, እና ይህ ወደ አጠቃላይ ዲቢኤምኤስ ውድቀት ሊያመራ ይችላል.
    • በዚህ ረገድ MySQL የቢንሎግ ፋይሎች ግንኙነቱ ወደነበረበት ከመመለሱ በፊት በራሱ በ DBMS ሊሽከረከር ይችላል። ይህ ማገናኛው ወደ አልተሳካለት ሁኔታ እንዲገባ ያደርገዋል፣ እና መደበኛ ስራውን ወደነበረበት ለመመለሾ ከቢንሎግ ማንበብ ለመቀጠል በመነሻ ቅጽበታዊ ገጽ እይታ ውስጥ እንደገና መጀመር አለበት።
    • ላይ MongODB. ሰነዱ እንዲህ ይላል፡ የሎግ/oplog ፋይሎች ከተሰረዙ እና ማገናኛው ካቆመበት ቦታ ማንበብ መቀጠል ካልቻለ የማገናኛው ባህሪ ለሁሉም DBMS ተመሳሳይ ነው። ማገናኛው ወደ ስቴቱ ውስጥ ስለሚገባ ነው አልተሳካም እና በ ሁነታ ውስጥ እንደገና ማስጀመር ያስፈልገዋል የመጀመሪያ ቅጽበታዊ እይታ.

      ሆኖም ግን, ልዩ ሁኔታዎች አሉ. ማገናኛው ለረጅም ጊዜ በተቋረጠ ሁኔታ ውስጥ ከነበረ (ወይም MongoDB ምሳሌ መድረስ ካልቻለ) እና ኦፕሎግ በዚህ ጊዜ ዞሯል፣ ከዚያ ግንኙነቱ ወደነበረበት ሲመለስ ማገናኛው በረጋ መንፈስ ከመጀመሪያው የሚገኝ ቦታ መረጃን ማንበብ ይቀጥላል። , ለዚህም ነው በካፍካ ውስጥ ያሉ አንዳንድ መረጃዎች አይደለም ይመታል ።

መደምደሚያ

ዴቤዚየም በሲዲሲ ሲስተሞች የመጀመሪያ ልምዴ ነው እና በአጠቃላይ በጣም አዎንታዊ ነበር። ፕሮጀክቱ የዋናውን ዲቢኤምኤስ ድጋፍ፣ የመዋቅር ቀላልነት፣ ለክላስተር ድጋፍ እና የነቃ ማህበረሰብ ድጋፍ አድርጓል። በተግባር ላይ ላሉት, መመሪያዎቹን እንዲያነቡ እመክራችኋለሁ የካፍካ ግንኙነት и ደበዚየም.

ለካፍካ ኮኔክተር ከJDBC አያያዥ ጋር ሲወዳደር የደብዚየም ዋነኛ ጥቅም ከዲቢኤምኤስ ምዝግብ ማስታወሻዎች ላይ ለውጦች መነበቡ ነው፣ ይህም መረጃ በትንሹ መዘግየት እንዲቀበል ያስችላል። የJDBC ኮኔክተር (በካፍካ ኮኔክተር የቀረበ) የተከታተለውን ሠንጠረዥ በተወሰነ የጊዜ ክፍተት ይጠይቃል እና (በተመሳሳይ ምክንያት) ውሂቡ በሚሰረዝበት ጊዜ መልዕክቶችን አያመነጭም (እዚያ የሌለበትን ውሂብ እንዴት መጠየቅ ይችላሉ?)።

ተመሳሳይ ችግሮችን ለመፍታት ለሚከተሉት መፍትሄዎች ትኩረት መስጠት ይችላሉ (ከደብዚየም በተጨማሪ)

PS

በብሎጋችን ላይ ያንብቡ፡-

ምንጭ: hab.com

አስተያየት ያክሉ