అపాచీ కాఫ్కా కోసం Debezium - CDCని పరిచయం చేస్తున్నాము

అపాచీ కాఫ్కా కోసం Debezium - CDCని పరిచయం చేస్తున్నాము

నా పనిలో, నేను తరచుగా కొత్త సాంకేతిక పరిష్కారాలు / సాఫ్ట్‌వేర్ ఉత్పత్తులను చూస్తాను, రష్యన్ మాట్లాడే ఇంటర్నెట్‌లో దీని గురించి సమాచారం చాలా తక్కువగా ఉంటుంది. ఈ కథనంతో, నేను రెండు ప్రసిద్ధ DBMSల (PostgreSQL మరియు MongoDB) నుండి Debeziumని ఉపయోగించి కాఫ్కా క్లస్టర్‌కి CDC ఈవెంట్‌లను పంపడాన్ని సెటప్ చేయవలసి వచ్చినప్పుడు, నా ఇటీవలి అభ్యాసం నుండి అటువంటి ఖాళీని పూరించడానికి ప్రయత్నిస్తాను. చేసిన కృషి ఫలితంగా వెలువడిన ఈ సమీక్షా వ్యాసం ఇతరులకు ఉపయోగపడుతుందని ఆశిస్తున్నాను.

సాధారణంగా Debezium మరియు CDC అంటే ఏమిటి?

డెబెజియం - CDC సాఫ్ట్‌వేర్ వర్గం యొక్క ప్రతినిధి (డేటా మార్పును క్యాప్చర్ చేయండి), లేదా మరింత ఖచ్చితంగా, ఇది Apache Kafka Connect ఫ్రేమ్‌వర్క్‌కు అనుకూలంగా ఉండే వివిధ DBMSల కోసం కనెక్టర్‌ల సమితి.

ఓపెన్ సోర్స్ ప్రాజెక్ట్, Apache లైసెన్స్ v2.0 క్రింద లైసెన్స్ పొందింది మరియు Red Hat స్పాన్సర్ చేయబడింది. 2016 నుండి అభివృద్ధి జరుగుతోంది మరియు ప్రస్తుతం ఇది క్రింది DBMS కోసం అధికారిక మద్దతును అందిస్తుంది: MySQL, PostgreSQL, MongoDB, SQL సర్వర్. కాసాండ్రా మరియు ఒరాకిల్ కోసం కనెక్టర్‌లు కూడా ఉన్నాయి, కానీ అవి ప్రస్తుతం "ప్రారంభ యాక్సెస్" స్థితిలో ఉన్నాయి మరియు కొత్త విడుదలలు వెనుకబడిన అనుకూలతకు హామీ ఇవ్వవు.

మేము CDCని సాంప్రదాయ విధానంతో పోల్చినట్లయితే (అప్లికేషన్ DBMS నుండి నేరుగా డేటాను చదివినప్పుడు), అప్పుడు దాని ప్రధాన ప్రయోజనాలు తక్కువ జాప్యం, అధిక విశ్వసనీయత మరియు లభ్యతతో వరుస స్థాయిలో డేటా మార్పు స్ట్రీమింగ్‌ను అమలు చేయడం. CDC ఈవెంట్‌ల కోసం కాఫ్కా క్లస్టర్‌ని రిపోజిటరీగా ఉపయోగించడం ద్వారా చివరి రెండు పాయింట్లు సాధించబడతాయి.

అలాగే, ప్రయోజనాలలో ఈవెంట్‌లను నిల్వ చేయడానికి ఒకే మోడల్ ఉపయోగించబడుతుందనే వాస్తవాన్ని కలిగి ఉంటుంది, కాబట్టి తుది అప్లికేషన్ వేర్వేరు DBMS నిర్వహణ యొక్క సూక్ష్మ నైపుణ్యాల గురించి ఆందోళన చెందాల్సిన అవసరం లేదు.

చివరగా, మెసేజ్ బ్రోకర్‌ని ఉపయోగించడం వల్ల డేటాలో మార్పులను ట్రాక్ చేసే అప్లికేషన్‌ల క్షితిజ సమాంతర స్కేలింగ్ కోసం స్కోప్ తెరవబడుతుంది. అదే సమయంలో, డేటా నేరుగా DBMS నుండి కాకుండా కాఫ్కా క్లస్టర్ నుండి స్వీకరించబడినందున, డేటా సోర్స్‌పై ప్రభావం తగ్గించబడుతుంది.

డెబెజియం ఆర్కిటెక్చర్ గురించి

Debeziumని ఉపయోగించడం ఈ సాధారణ పథకానికి వస్తుంది:

DBMS (డేటా మూలంగా) → కాఫ్కా కనెక్ట్‌లో కనెక్టర్ → అపాచీ కాఫ్కా → వినియోగదారు

ఒక ఉదాహరణగా, నేను ప్రాజెక్ట్ వెబ్‌సైట్ నుండి ఒక రేఖాచిత్రాన్ని ఇస్తాను:

అపాచీ కాఫ్కా కోసం Debezium - CDCని పరిచయం చేస్తున్నాము

అయినప్పటికీ, నేను ఈ పథకాన్ని నిజంగా ఇష్టపడను, ఎందుకంటే సింక్ కనెక్టర్ మాత్రమే సాధ్యమవుతుందని తెలుస్తోంది.

వాస్తవానికి, పరిస్థితి భిన్నంగా ఉంది: మీ డేటా సరస్సును నింపడం (పై రేఖాచిత్రంలో చివరి లింక్) Debezium ఉపయోగించడానికి ఏకైక మార్గం కాదు. అపాచీ కాఫ్కాకు పంపబడిన ఈవెంట్‌లను మీ అప్లికేషన్‌లు వివిధ పరిస్థితులను ఎదుర్కోవడానికి ఉపయోగించవచ్చు. ఉదాహరణకి:

  • కాష్ నుండి అసంబద్ధ డేటా తొలగింపు;
  • నోటిఫికేషన్లను పంపడం;
  • శోధన సూచిక నవీకరణలు;
  • కొన్ని రకాల ఆడిట్ లాగ్‌లు;
  • ...

మీకు జావా అప్లికేషన్ ఉంటే మరియు కాఫ్కా క్లస్టర్‌ను ఉపయోగించాల్సిన అవసరం/అవకాశం లేనట్లయితే, దాని ద్వారా పని చేసే అవకాశం కూడా ఉంది ఎంబెడెడ్ కనెక్టర్. స్పష్టమైన ప్లస్ ఏమిటంటే, దానితో మీరు అదనపు మౌలిక సదుపాయాలను (కనెక్టర్ మరియు కాఫ్కా రూపంలో) తిరస్కరించవచ్చు. అయినప్పటికీ, సంస్కరణ 1.1 నుండి ఈ పరిష్కారం నిలిపివేయబడింది మరియు ఇకపై ఉపయోగం కోసం సిఫార్సు చేయబడదు (భవిష్యత్తు విడుదలలలో ఇది తీసివేయబడవచ్చు).

ఈ కథనం డెవలపర్లు సిఫార్సు చేసిన నిర్మాణాన్ని చర్చిస్తుంది, ఇది తప్పు సహనం మరియు స్కేలబిలిటీని అందిస్తుంది.

కనెక్టర్ కాన్ఫిగరేషన్

అత్యంత ముఖ్యమైన విలువ - డేటాలో మార్పులను ట్రాక్ చేయడం ప్రారంభించడానికి మాకు ఇది అవసరం:

  1. డేటా మూలం, ఇది MySQL వెర్షన్ 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (పూర్తి జాబితా);
  2. అపాచీ కాఫ్కా క్లస్టర్
  3. కాఫ్కా కనెక్ట్ ఉదాహరణ (వెర్షన్లు 1.x, 2.x);
  4. కాన్ఫిగర్ చేయబడిన Debezium కనెక్టర్.

మొదటి రెండు పాయింట్లపై పని చేయండి, అనగా. DBMS మరియు Apache Kafkaని ఇన్‌స్టాల్ చేసే ప్రక్రియ వ్యాసం పరిధికి మించినది. అయినప్పటికీ, శాండ్‌బాక్స్‌లో ప్రతిదీ అమర్చాలనుకునే వారికి, ఉదాహరణలతో కూడిన అధికారిక రిపోజిటరీలో రెడీమేడ్ ఉంటుంది డాకర్-compose.yaml.

మేము చివరి రెండు పాయింట్లపై మరింత వివరంగా దృష్టి పెడతాము.

0. కాఫ్కా కనెక్ట్

ఇక్కడ మరియు తరువాత వ్యాసంలో, అన్ని కాన్ఫిగరేషన్ ఉదాహరణలు Debezium డెవలపర్లు పంపిణీ చేసిన డాకర్ ఇమేజ్ సందర్భంలో పరిగణించబడతాయి. ఇది అవసరమైన అన్ని ప్లగ్ఇన్ ఫైల్‌లను (కనెక్టర్‌లు) కలిగి ఉంది మరియు ఎన్విరాన్‌మెంట్ వేరియబుల్స్ ఉపయోగించి కాఫ్కా కనెక్ట్ కాన్ఫిగరేషన్‌ను అందిస్తుంది.

మీరు కాన్‌ఫ్లూయెంట్ నుండి కాఫ్కా కనెక్ట్‌ని ఉపయోగించాలనుకుంటే, మీరు పేర్కొన్న డైరెక్టరీకి అవసరమైన కనెక్టర్‌ల ప్లగిన్‌లను మీరే జోడించాలి. plugin.path లేదా ఎన్విరాన్మెంట్ వేరియబుల్ ద్వారా సెట్ చేయండి CLASSPATH. కాఫ్కా కనెక్ట్ వర్కర్ మరియు కనెక్టర్‌ల సెట్టింగ్‌లు వర్కర్ స్టార్ట్ కమాండ్‌కు ఆర్గ్యుమెంట్‌లుగా పంపబడే కాన్ఫిగరేషన్ ఫైల్‌ల ద్వారా నిర్వచించబడతాయి. వివరాల కోసం చూడండి డాక్యుమెంటేషన్.

కనెక్టర్ వెర్షన్‌లో డెబీజమ్‌ను సెటప్ చేసే మొత్తం ప్రక్రియ రెండు దశల్లో నిర్వహించబడుతుంది. వాటిలో ప్రతి ఒక్కటి పరిశీలిద్దాం:

1. కాఫ్కా కనెక్ట్ ఫ్రేమ్‌వర్క్‌ను ఏర్పాటు చేస్తోంది

అపాచీ కాఫ్కా క్లస్టర్‌కి డేటాను ప్రసారం చేయడానికి, నిర్దిష్ట పారామితులు కాఫ్కా కనెక్ట్ ఫ్రేమ్‌వర్క్‌లో సెట్ చేయబడతాయి, అవి:

  • క్లస్టర్ కనెక్షన్ సెట్టింగ్‌లు,
  • కనెక్టర్ యొక్క కాన్ఫిగరేషన్ నిల్వ చేయబడే అంశాల పేర్లు,
  • కనెక్టర్ నడుస్తున్న సమూహం పేరు (పంపిణీ మోడ్‌ను ఉపయోగిస్తున్నప్పుడు).

ప్రాజెక్ట్ యొక్క అధికారిక డాకర్ చిత్రం ఎన్విరాన్మెంట్ వేరియబుల్స్ ఉపయోగించి కాన్ఫిగరేషన్‌కు మద్దతు ఇస్తుంది - ఇది మేము ఉపయోగిస్తాము. కాబట్టి చిత్రాన్ని డౌన్‌లోడ్ చేద్దాం:

docker pull debezium/connect

కనెక్టర్‌ను అమలు చేయడానికి అవసరమైన ఎన్విరాన్‌మెంట్ వేరియబుల్స్ యొక్క కనీస సెట్ క్రింది విధంగా ఉంటుంది:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - క్లస్టర్ సభ్యుల పూర్తి జాబితాను పొందడానికి కాఫ్కా క్లస్టర్ సర్వర్‌ల ప్రారంభ జాబితా;
  • OFFSET_STORAGE_TOPIC=connector-offsets - కనెక్టర్ ప్రస్తుతం ఉన్న స్థానాలను నిల్వ చేయడానికి ఒక అంశం;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - కనెక్టర్ యొక్క స్థితిని మరియు దాని పనులను నిల్వ చేయడానికి ఒక అంశం;
  • CONFIG_STORAGE_TOPIC=connector-config - కనెక్టర్ కాన్ఫిగరేషన్ డేటా మరియు దాని పనులను నిల్వ చేయడానికి ఒక అంశం;
  • GROUP_ID=1 - కనెక్టర్ పనిని అమలు చేయగల కార్మికుల సమూహం యొక్క ఐడెంటిఫైయర్; పంపిణీని ఉపయోగిస్తున్నప్పుడు అవసరం (పంపిణీ చేయబడింది) పాలన.

మేము ఈ వేరియబుల్స్‌తో కంటైనర్‌ను ప్రారంభిస్తాము:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

అవ్రో గురించి గమనించండి

డిఫాల్ట్‌గా, Debezium JSON ఫార్మాట్‌లో డేటాను వ్రాస్తుంది, ఇది శాండ్‌బాక్స్‌లు మరియు తక్కువ మొత్తంలో డేటాకు ఆమోదయోగ్యమైనది, కానీ ఎక్కువగా లోడ్ చేయబడిన డేటాబేస్‌లలో సమస్య కావచ్చు. JSON కన్వర్టర్‌కు ప్రత్యామ్నాయం సందేశాలను ఉపయోగించి సీరియలైజ్ చేయడం యూరో బైనరీ ఆకృతికి, ఇది అపాచీ కాఫ్కాలోని I/O సబ్‌సిస్టమ్‌పై లోడ్‌ను తగ్గిస్తుంది.

Avroని ఉపయోగించడానికి, మీరు ప్రత్యేకంగా అమర్చాలి స్కీమా-రిజిస్ట్రీ (స్కీమాలను నిల్వ చేయడానికి). కన్వర్టర్ కోసం వేరియబుల్స్ ఇలా కనిపిస్తాయి:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Avroని ఉపయోగించడం మరియు దాని కోసం రిజిస్ట్రీని సెటప్ చేయడం గురించిన వివరాలు కథనం యొక్క పరిధికి మించినవి - ఇంకా, స్పష్టత కోసం, మేము JSONని ఉపయోగిస్తాము.

2. కనెక్టర్‌ను సెటప్ చేయడం

ఇప్పుడు మీరు నేరుగా కనెక్టర్ యొక్క కాన్ఫిగరేషన్‌కు వెళ్లవచ్చు, ఇది మూలం నుండి డేటాను చదువుతుంది.

రెండు DBMS కోసం కనెక్టర్‌ల ఉదాహరణను చూద్దాం: PostgreSQL మరియు MongoDB, దీని కోసం నాకు అనుభవం ఉంది మరియు వాటి కోసం తేడాలు ఉన్నాయి (చిన్నవి అయినప్పటికీ కొన్ని సందర్భాల్లో ముఖ్యమైనవి!).

కాన్ఫిగరేషన్ JSON సంజ్ఞామానంలో వివరించబడింది మరియు POST అభ్యర్థనను ఉపయోగించి కాఫ్కా కనెక్ట్‌కి అప్‌లోడ్ చేయబడింది.

2.1 PostgreSQL

PostgreSQL కోసం ఉదాహరణ కనెక్టర్ కాన్ఫిగరేషన్:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

ఈ కాన్ఫిగరేషన్ తర్వాత కనెక్టర్ యొక్క ఆపరేషన్ సూత్రం చాలా సులభం:

  • మొదటి ప్రారంభంలో, ఇది కాన్ఫిగరేషన్‌లో పేర్కొన్న డేటాబేస్‌కు కనెక్ట్ చేస్తుంది మరియు మోడ్‌లో ప్రారంభమవుతుంది ప్రారంభ స్నాప్‌షాట్, షరతులతో అందుకున్న ప్రాథమిక డేటా సెట్‌ను కాఫ్కాకు పంపడం SELECT * FROM table_name.
  • ప్రారంభించడం పూర్తయిన తర్వాత, కనెక్టర్ PostgreSQL WAL ఫైల్‌ల నుండి రీడింగ్ మార్పుల మోడ్‌లోకి ప్రవేశిస్తుంది.

ఉపయోగించిన ఎంపికల గురించి:

  • name - క్రింద వివరించిన కాన్ఫిగరేషన్ ఉపయోగించబడే కనెక్టర్ పేరు; భవిష్యత్తులో, ఈ పేరు Kafka Connect REST API ద్వారా కనెక్టర్‌తో పని చేయడానికి (అంటే స్థితిని వీక్షించడానికి / పునఃప్రారంభించండి / కాన్ఫిగరేషన్‌ను నవీకరించండి);
  • connector.class - కాన్ఫిగర్ చేయబడిన కనెక్టర్ ద్వారా ఉపయోగించబడే DBMS కనెక్టర్ క్లాస్;
  • plugin.name అనేది WAL ఫైల్స్ నుండి డేటా యొక్క లాజికల్ డీకోడింగ్ కోసం ప్లగ్ఇన్ పేరు. ఎంచుకోవడానికి అందుబాటులో ఉంది wal2json, decoderbuffs и pgoutput. మొదటి రెండింటికి DBMSలో తగిన పొడిగింపుల సంస్థాపన అవసరం, మరియు pgoutput PostgreSQL వెర్షన్ 10 మరియు అంతకంటే ఎక్కువ కోసం అదనపు అవకతవకలు అవసరం లేదు;
  • database.* — డేటాబేస్కు కనెక్ట్ చేయడానికి ఎంపికలు, ఎక్కడ database.server.name - కాఫ్కా క్లస్టర్‌లో టాపిక్ పేరును రూపొందించడానికి ఉపయోగించే PostgreSQL ఉదాహరణ పేరు;
  • table.include.list - మేము మార్పులను ట్రాక్ చేయాలనుకుంటున్న పట్టికల జాబితా; ఫార్మాట్‌లో ఇవ్వబడింది schema.table_name; తో కలిసి ఉపయోగించబడదు table.exclude.list;
  • heartbeat.interval.ms - కనెక్టర్ ప్రత్యేక అంశానికి హృదయ స్పందన సందేశాలను పంపే విరామం (మిల్లీసెకన్లలో);
  • heartbeat.action.query - ప్రతి హృదయ స్పందన సందేశాన్ని పంపేటప్పుడు అమలు చేయబడే అభ్యర్థన (వెర్షన్ 1.1 నుండి ఎంపిక కనిపించింది);
  • slot.name - కనెక్టర్ ఉపయోగించే రెప్లికేషన్ స్లాట్ పేరు;
  • publication.name - పేరు ప్రచురణ కనెక్టర్ ఉపయోగించే PostgreSQLలో. ఒకవేళ అది ఉనికిలో లేనట్లయితే, Debezium దానిని సృష్టించడానికి ప్రయత్నిస్తుంది. కనెక్షన్ చేయబడిన వినియోగదారుకు ఈ చర్య కోసం తగినంత హక్కులు లేకుంటే, కనెక్టర్ లోపంతో నిష్క్రమిస్తుంది;
  • transforms లక్ష్య అంశం పేరును సరిగ్గా ఎలా మార్చాలో నిర్ణయిస్తుంది:
    • transforms.AddPrefix.type మేము సాధారణ వ్యక్తీకరణలను ఉపయోగిస్తామని సూచిస్తుంది;
    • transforms.AddPrefix.regex - లక్ష్యం అంశం పేరు పునర్నిర్వచించబడిన ముసుగు;
    • transforms.AddPrefix.replacement - నేరుగా మనం పునర్నిర్వచించేది.

హృదయ స్పందన మరియు రూపాంతరాల గురించి మరింత

డిఫాల్ట్‌గా, ప్రతి నిబద్ధత లావాదేవీకి సంబంధించిన డేటాను కనెక్టర్ కాఫ్కాకు పంపుతుంది మరియు దాని LSN (లాగ్ సీక్వెన్స్ నంబర్)ని సేవా అంశానికి వ్రాస్తుంది offset. కనెక్టర్ మొత్తం డేటాబేస్ కాకుండా దాని పట్టికలలో కొంత భాగాన్ని మాత్రమే చదవడానికి కాన్ఫిగర్ చేయబడితే (ఇందులో డేటా చాలా అరుదుగా నవీకరించబడుతుంది) ఏమి జరుగుతుంది?

  • కనెక్టర్ WAL ఫైల్‌లను చదువుతుంది మరియు అది పర్యవేక్షించే టేబుల్‌లలో లావాదేవీలను గుర్తించదు.
  • కాబట్టి, ఇది టాపిక్‌లో లేదా రెప్లికేషన్ స్లాట్‌లో దాని ప్రస్తుత స్థితిని నవీకరించదు.
  • ఇది, WAL ఫైల్‌లు డిస్క్‌లో "స్టక్" అయ్యేలా చేస్తుంది మరియు డిస్క్ స్పేస్ అయిపోతుంది.

మరియు ఇక్కడ ఎంపికలు రక్షించటానికి వస్తాయి. heartbeat.interval.ms и heartbeat.action.query. ఈ ఎంపికలను జంటగా ఉపయోగించడం వలన హృదయ స్పందన సందేశం పంపబడిన ప్రతిసారి ప్రత్యేక పట్టికలో డేటాను మార్చడానికి అభ్యర్థనను అమలు చేయడం సాధ్యపడుతుంది. అందువలన, కనెక్టర్ ప్రస్తుతం ఉన్న LSN (రెప్లికేషన్ స్లాట్‌లో) నిరంతరం నవీకరించబడుతుంది. ఇది ఇకపై అవసరం లేని WAL ఫైల్‌లను తీసివేయడానికి DBMSని అనుమతిస్తుంది. ఎంపికలు ఎలా పని చేస్తాయి అనే దాని గురించి మరింత సమాచారం కోసం, చూడండి డాక్యుమెంటేషన్.

దగ్గరి శ్రద్ధకు అర్హమైన మరొక ఎంపిక transforms. ఇది సౌలభ్యం మరియు అందం గురించి ఎక్కువ అయినప్పటికీ ...

డిఫాల్ట్‌గా, Debezium క్రింది నామకరణ విధానాన్ని ఉపయోగించి అంశాలను సృష్టిస్తుంది: serverName.schemaName.tableName. ఇది ఎల్లప్పుడూ సౌకర్యవంతంగా ఉండకపోవచ్చు. ఎంపికలు transforms సాధారణ వ్యక్తీకరణలను ఉపయోగించి, మీరు నిర్దిష్ట పేరుతో ఒక అంశానికి ఈవెంట్‌లను మళ్లించాల్సిన పట్టికల జాబితాను నిర్వచించవచ్చు.

మా కాన్ఫిగరేషన్‌లో ధన్యవాదాలు transforms కిందివి జరుగుతాయి: ట్రాక్ చేయబడిన డేటాబేస్ నుండి అన్ని CDC ఈవెంట్‌లు పేరుతో టాపిక్‌కి వెళ్తాయి data.cdc.dbname. లేకపోతే (ఈ సెట్టింగ్‌లు లేకుండా), Debezium డిఫాల్ట్‌గా ఫారమ్‌లోని ప్రతి టేబుల్‌కి ఒక టాపిక్‌ను సృష్టిస్తుంది: pg-dev.public.<table_name>.

కనెక్టర్ పరిమితులు

PostgreSQL కోసం కనెక్టర్ కాన్ఫిగరేషన్ యొక్క వివరణ ముగింపులో, దాని పని యొక్క క్రింది లక్షణాలు / పరిమితుల గురించి మాట్లాడటం విలువ:

  1. PostgreSQL కోసం కనెక్టర్ ఫంక్షనాలిటీ లాజికల్ డీకోడింగ్ భావనపై ఆధారపడి ఉంటుంది. అందువలన అతను డేటాబేస్ యొక్క నిర్మాణాన్ని మార్చడానికి అభ్యర్థనలను ట్రాక్ చేయదు (DDL) - తదనుగుణంగా, ఈ డేటా అంశాలలో ఉండదు.
  2. ప్రతిరూపణ స్లాట్‌లు ఉపయోగించబడుతున్నందున, కనెక్టర్ యొక్క కనెక్షన్ సాధ్యమవుతుంది మాత్రమే మాస్టర్ DBMS ఉదాహరణకి.
  3. కనెక్టర్ డేటాబేస్‌కు కనెక్ట్ చేసే వినియోగదారుకు చదవడానికి మాత్రమే హక్కులు ఉంటే, మొదటి లాంచ్‌కు ముందు, మీరు మాన్యువల్‌గా రెప్లికేషన్ స్లాట్‌ను సృష్టించి డేటాబేస్‌లో ప్రచురించాలి.

కాన్ఫిగరేషన్‌ను వర్తింపజేస్తోంది

కాబట్టి మన కాన్ఫిగరేషన్‌ను కనెక్టర్‌లోకి లోడ్ చేద్దాం:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

డౌన్‌లోడ్ విజయవంతమైందని మరియు కనెక్టర్ ప్రారంభించబడిందని మేము తనిఖీ చేస్తాము:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

గొప్పది: ఇది సెటప్ చేయబడింది మరియు సిద్ధంగా ఉంది. ఇప్పుడు మనం వినియోగదారునిగా నటిస్తాము మరియు కాఫ్కాకు కనెక్ట్ చేద్దాం, దాని తర్వాత మేము పట్టికలో ఒక ఎంట్రీని జోడించి, మారుస్తాము:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

మా అంశంలో, ఇది క్రింది విధంగా ప్రదర్శించబడుతుంది:

మా మార్పులతో చాలా కాలం JSON

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

రెండు సందర్భాల్లో, రికార్డ్‌లు మార్చబడిన రికార్డ్ యొక్క కీ (PK) మరియు మార్పుల యొక్క సారాంశాన్ని కలిగి ఉంటాయి: రికార్డ్ ముందు ఏమిటి మరియు అది ఏమి అయ్యింది.

  • విషయంలో INSERT: ముందు విలువ (before) సమానం nullచొప్పించిన స్ట్రింగ్ తర్వాత.
  • విషయంలో UPDATE: వద్ద payload.before అడ్డు వరుస యొక్క మునుపటి స్థితి ప్రదర్శించబడుతుంది మరియు ఇన్ payload.after - మార్పు యొక్క సారాంశంతో కొత్తది.

2.2 మొంగోడిబి

ఈ కనెక్టర్ ప్రామాణిక MongoDB రెప్లికేషన్ మెకానిజంను ఉపయోగిస్తుంది, DBMS ప్రైమరీ నోడ్ యొక్క ఆప్లాగ్ నుండి సమాచారాన్ని చదవడం.

PgSQL కోసం ఇప్పటికే వివరించిన కనెక్టర్‌కు అదేవిధంగా, ఇక్కడ కూడా, మొదటి ప్రారంభంలో, ప్రాథమిక డేటా స్నాప్‌షాట్ తీసుకోబడుతుంది, ఆ తర్వాత కనెక్టర్ ఆప్లాగ్ రీడింగ్ మోడ్‌కి మారుతుంది.

కాన్ఫిగరేషన్ ఉదాహరణ:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

మీరు చూడగలిగినట్లుగా, మునుపటి ఉదాహరణతో పోలిస్తే కొత్త ఎంపికలు లేవు, కానీ డేటాబేస్ మరియు వాటి ఉపసర్గలకు కనెక్ట్ చేయడానికి బాధ్యత వహించే ఎంపికల సంఖ్య మాత్రమే తగ్గించబడింది.

సెట్టింగులను transforms ఈసారి వారు ఈ క్రింది వాటిని చేస్తారు: పథకం నుండి లక్ష్య అంశం పేరును మార్చండి <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

తప్పు సహనం

మన కాలంలో తప్పు సహనం మరియు అధిక లభ్యత సమస్య గతంలో కంటే చాలా తీవ్రంగా ఉంది - ప్రత్యేకించి మేము డేటా మరియు లావాదేవీల గురించి మాట్లాడేటప్పుడు మరియు డేటా మార్పు ట్రాకింగ్ ఈ విషయంలో పక్కన ఉండదు. సూత్రప్రాయంగా ఏది తప్పు కావచ్చు మరియు ప్రతి సందర్భంలో డెబెజియంకు ఏమి జరుగుతుందో చూద్దాం.

మూడు నిలిపివేత ఎంపికలు ఉన్నాయి:

  1. కాఫ్కా కనెక్ట్ వైఫల్యం. డిస్ట్రిబ్యూటెడ్ మోడ్‌లో పని చేయడానికి కనెక్ట్ కాన్ఫిగర్ చేయబడితే, ఒకే group.idని సెట్ చేయడానికి బహుళ కార్మికులు అవసరం. అప్పుడు, వాటిలో ఒకటి విఫలమైతే, కనెక్టర్ మరొక వర్కర్‌పై పునఃప్రారంభించబడుతుంది మరియు కాఫ్కాలో టాపిక్‌లో చివరిగా కట్టుబడి ఉన్న స్థానం నుండి చదవడం కొనసాగిస్తుంది.
  2. కాఫ్కా క్లస్టర్‌తో కనెక్టివిటీ కోల్పోవడం. కనెక్టర్ కాఫ్కాకు పంపడంలో విఫలమైన స్థానం వద్ద చదవడం ఆపివేస్తుంది మరియు ప్రయత్నం విజయవంతమయ్యే వరకు క్రమానుగతంగా దాన్ని మళ్లీ పంపడానికి ప్రయత్నిస్తుంది.
  3. డేటా మూలం అందుబాటులో లేదు. కాన్ఫిగరేషన్ ప్రకారం కనెక్టర్ మూలానికి మళ్లీ కనెక్ట్ చేయడానికి ప్రయత్నిస్తుంది. డిఫాల్ట్ 16 ప్రయత్నాలను ఉపయోగించడం ఘాతాంక బ్యాక్‌ఆఫ్. 16వ ప్రయత్నం విఫలమైన తర్వాత, టాస్క్‌గా మార్క్ చేయబడుతుంది విఫలమైంది మరియు ఇది కాఫ్కా కనెక్ట్ REST ఇంటర్‌ఫేస్ ద్వారా మానవీయంగా పునఃప్రారంభించబడాలి.
    • విషయంలో PostgreSQL డేటా కోల్పోదు, ఎందుకంటే రెప్లికేషన్ స్లాట్‌లను ఉపయోగించడం వల్ల కనెక్టర్ చదవని WAL ఫైల్‌ల తొలగింపు నిరోధిస్తుంది. ఈ సందర్భంలో, ఒక ప్రతికూలత ఉంది: కనెక్టర్ మరియు DBMS మధ్య నెట్‌వర్క్ కనెక్టివిటీ చాలా కాలం పాటు అంతరాయం కలిగితే, డిస్క్ స్థలం అయిపోయే అవకాశం ఉంది మరియు ఇది మొత్తం DBMS యొక్క వైఫల్యానికి దారితీయవచ్చు.
    • విషయంలో MySQL కనెక్టివిటీని పునరుద్ధరించడానికి ముందు బిన్‌లాగ్ ఫైల్‌లను DBMS స్వయంగా తిప్పవచ్చు. ఇది కనెక్టర్ విఫలమైన స్థితికి వెళ్లేలా చేస్తుంది మరియు సాధారణ ఆపరేషన్‌ను పునరుద్ధరించడానికి బిన్‌లాగ్‌ల నుండి చదవడం కొనసాగించడానికి ఇది ప్రారంభ స్నాప్‌షాట్ మోడ్‌లో పునఃప్రారంభించవలసి ఉంటుంది.
    • MongoDB. డాక్యుమెంటేషన్ ఇలా చెబుతోంది: లాగ్/ఆప్లాగ్ ఫైల్‌లు తొలగించబడినప్పుడు మరియు కనెక్టర్ ఆపివేసిన స్థానం నుండి చదవడం కొనసాగించలేనప్పుడు కనెక్టర్ యొక్క ప్రవర్తన అన్ని DBMSకి ఒకే విధంగా ఉంటుంది. ఇది కనెక్టర్ రాష్ట్రంలోకి వెళుతుందనే వాస్తవంలో ఉంది విఫలమైంది మరియు మోడ్‌లో పునఃప్రారంభించవలసి ఉంటుంది ప్రారంభ స్నాప్‌షాట్.

      అయితే, మినహాయింపులు ఉన్నాయి. కనెక్టర్ చాలా కాలం పాటు డిస్‌కనెక్ట్ చేయబడిన స్థితిలో ఉంటే (లేదా మొంగోడిబి ఉదాహరణకి చేరుకోలేకపోయింది), మరియు ఈ సమయంలో ఆప్లాగ్ తిప్పబడితే, కనెక్షన్ పునరుద్ధరించబడినప్పుడు, కనెక్టర్ ప్రశాంతంగా అందుబాటులో ఉన్న మొదటి స్థానం నుండి డేటాను చదవడం కొనసాగిస్తుంది. , అందుకే కాఫ్కాలోని కొన్ని డేటా కాదు కొట్టేస్తుంది.

తీర్మానం

Debezium CDC సిస్టమ్‌లతో నా మొదటి అనుభవం మరియు మొత్తం మీద చాలా సానుకూలంగా ఉంది. ప్రాజెక్ట్ ప్రధాన DBMS మద్దతు, కాన్ఫిగరేషన్ సౌలభ్యం, క్లస్టరింగ్‌కు మద్దతు మరియు క్రియాశీల కమ్యూనిటీకి లంచం ఇచ్చింది. ఆచరణలో ఆసక్తి ఉన్నవారికి, మీరు గైడ్‌లను చదవమని నేను సిఫార్సు చేస్తున్నాను కాఫ్కా కనెక్ట్ и డెబెజియం.

కాఫ్కా కనెక్ట్ కోసం JDBC కనెక్టర్‌తో పోలిస్తే, Debezium యొక్క ప్రధాన ప్రయోజనం ఏమిటంటే, మార్పులు DBMS లాగ్‌ల నుండి చదవబడతాయి, ఇది డేటాను కనిష్ట ఆలస్యంతో స్వీకరించడానికి అనుమతిస్తుంది. JDBC కనెక్టర్ (కాఫ్కా కనెక్ట్ ద్వారా అందించబడింది) ట్రాక్ చేయబడిన పట్టికను నిర్ణీత వ్యవధిలో ప్రశ్నిస్తుంది మరియు (అదే కారణంతో) డేటా తొలగించబడినప్పుడు సందేశాలను రూపొందించదు (అక్కడ లేని డేటా కోసం మీరు ఎలా ప్రశ్నించవచ్చు?).

ఇలాంటి సమస్యలను పరిష్కరించడానికి, మీరు ఈ క్రింది పరిష్కారాలకు శ్రద్ధ వహించవచ్చు (డెబెజియంతో పాటు):

PS

మా బ్లాగులో కూడా చదవండి:

మూలం: www.habr.com

ఒక వ్యాఖ్యను జోడించండి