ከካፍካ የተቀበሉትን ክስተቶች እንደገና በማቀናበር ላይ

ከካፍካ የተቀበሉትን ክስተቶች እንደገና በማቀናበር ላይ

ሃይ ሀብር።

በቅርቡ I ልምዱን አካፍሏል። ወደ ዋስትና ማድረስ ለመቅረብ እኛ እንደ ቡድን ለካፍካ አምራች እና ሸማች ብዙ ጊዜ የምንጠቀመው በምን አይነት መለኪያዎች ነው። በዚህ ጽሑፍ ውስጥ የውጭ ስርዓቱ ጊዜያዊ አለመገኘት ምክንያት ከካፍ የተቀበለውን ክስተት እንደገና ለማቀናበር እንዴት እንዳደራጀን ልነግርዎ እፈልጋለሁ።

ዘመናዊ አፕሊኬሽኖች በጣም ውስብስብ በሆነ አካባቢ ውስጥ ይሰራሉ. የንግድ አመክንዮ በዘመናዊ የቴክኖሎጂ ቁልል ተጠቅልሎ፣ እንደ Kubernetes ወይም OpenShift ባሉ ኦርኬስትራ በሚተዳደረው Docker ምስል ውስጥ የሚሰራ፣ እና ከሌሎች መተግበሪያዎች ወይም የድርጅት መፍትሄዎች ጋር በአካል እና በምናባዊ ራውተሮች ሰንሰለት መገናኘት። በእንደዚህ ዓይነት አከባቢ ውስጥ አንድ ነገር ሁል ጊዜ ሊሰበር ይችላል ፣ ስለሆነም ከውጫዊ ስርዓቶች ውስጥ አንዱ ከሌለ ክስተቶችን እንደገና ማቀናበር የንግድ ሂደታችን አስፈላጊ አካል ነው።

ከካፍካ በፊት እንዴት ነበር

ቀደም ሲል በፕሮጄክቱ ውስጥ IBM MQ ለተመሳሳይ መልእክት ማድረሻ ተጠቀምን። በአገልግሎቱ ሂደት ውስጥ ማንኛውም ስህተት ከተከሰተ የተቀበለው መልእክት ለበለጠ በእጅ መተንተን በሙት ፊደል ወረፋ (DLQ) ውስጥ ሊቀመጥ ይችላል። DLQ የተፈጠረው ከመጪው ወረፋ ቀጥሎ ነው፣ መልእክቱ በ IBM MQ ውስጥ ተላልፏል።

ስህተቱ ጊዜያዊ ከሆነ እና ልንወስነው ከቻልን (ለምሳሌ፣ በHTTP ጥሪ ላይ ResourceAccessException ወይም MongoTimeoutException በMongoDb ጥያቄ)፣ የድጋሚ ሙከራ ስልቱ ተግባራዊ ይሆናል። የአፕሊኬሽኑ ቅርንጫፍ አመክንዮ ምንም ይሁን ምን ዋናው መልእክት ለመላክ መዘግየት ወደ ሲስተም ወረፋ ወይም ወደ ሌላ መተግበሪያ መልእክቶችን ለመላክ ከረጅም ጊዜ በፊት ተወስዷል። ይህ በመልእክት ራስጌ ውስጥ የዳግም መላኪያ ቁጥርን ያካትታል፣ እሱም ከመዘግየቱ ክፍተት ወይም ከመተግበሪያ-ደረጃ ስትራቴጂ መጨረሻ ጋር የተያያዘ። የስትራቴጂው መጨረሻ ላይ ከደረስን ግን ውጫዊ ስርዓቱ አሁንም አይገኝም፣ መልዕክቱ በእጅ ለመተንተን በዲኤልኪው ውስጥ ይቀመጣል።

መፍትሄ መፈለግ

በይነመረብ ላይ መፈለግ, የሚከተሉትን ማግኘት ይችላሉ መፍትሄ. ባጭሩ ለእያንዳንዱ የዘገየ ጊዜ ርዕስ ለመፍጠር እና የሸማቾች አፕሊኬሽኖችን በጎን በኩል ተግባራዊ ለማድረግ ታቅዷል፣ ይህም ከተፈለገ መዘግየት ጋር መልዕክቶችን ያነባል።

ከካፍካ የተቀበሉትን ክስተቶች እንደገና በማቀናበር ላይ

ምንም እንኳን ብዙ ቁጥር ያላቸው አዎንታዊ ግምገማዎች ፣ ለእኔ ሙሉ በሙሉ የተሳካ አይመስለኝም። በመጀመሪያ ደረጃ, ምክንያቱም ገንቢው, የንግድ ሥራ መስፈርቶችን ከመተግበሩ በተጨማሪ, የተገለጸውን ዘዴ በመተግበር ብዙ ጊዜ ማሳለፍ ይኖርበታል.

በተጨማሪም፣ የመዳረሻ መቆጣጠሪያ በካፍካ ክላስተር ላይ ከነቃ፣ አርእስቶችን በመፍጠር እና አስፈላጊውን መዳረሻ በማቅረብ የተወሰነ ጊዜ ማሳለፍ አለቦት። ከዚህ በተጨማሪ መልእክቶች ለመማረክ ጊዜ እንዲኖራቸው እና ከሱ እንዳይጠፉ ለእያንዳንዱ የድጋሚ ሙከራ ርዕስ ትክክለኛውን retention.ms መለኪያ መምረጥ ያስፈልግዎታል። ለእያንዳንዱ ነባር ወይም አዲስ አገልግሎት ትግበራ እና የመዳረሻ ጥያቄ መደገም አለበት።

አሁን በአጠቃላይ የፀደይ ወቅት ምን አይነት ዘዴዎች እና በተለይ ጸደይ-ካፍካ ለመልእክት ማቀናበሪያ ምን እንደሚሰጡን እንይ። ስፕሪንግ-ካፍካ የተለያዩ የBackOffPolicies ለማስተዳደር ረቂቅ ነገሮችን የሚያቀርብ በፀደይ-እንደገና ሙከራ ላይ ጊዜያዊ ጥገኛ አለው። ይህ በትክክል ተለዋዋጭ መሳሪያ ነው፣ ነገር ግን ጉልህ ጉዳቱ በመተግበሪያ ማህደረ ትውስታ ውስጥ ለመላክ መልዕክቶችን ማከማቸት ነው። ይህ ማለት በዝማኔ ወይም በአሰራር ስህተት ምክንያት አፕሊኬሽኑን እንደገና ማስጀመር ሁሉንም መልሶ ማቀናበር በመጠባበቅ ላይ ያሉ መልዕክቶችን ማጣት ያስከትላል። ይህ ነጥብ ለስርዓታችን ወሳኝ ስለሆነ ከዚህ በላይ አላሰብነውም።

spring-kafka ራሱ ContainerAwareErrorHandler ለምሳሌ በርካታ አተገባበርን ያቀርባል SeekToCurrentErrorHandlerስህተት ከተፈጠረ ማካካሻ ሳያስቀያይር በኋላ መልእክቱን ማስተናገድ ይችላሉ። ከስፕሪንግ-ካፍካ 2.3 ስሪት ጀምሮ BackOffPolicy ማዘጋጀት ተችሏል።

ይህ አካሄድ እንደገና የተስተካከሉ መልእክቶች የመተግበሪያ ዳግም መጀመርን እንዲቀጥሉ ያስችላቸዋል፣ነገር ግን አሁንም ምንም የDLQ ዘዴ የለም። ይህንን አማራጭ በ2019 መጀመሪያ ላይ የመረጥነው DLQ እንደማያስፈልግ በማመን (እድለኞች ነበርን እና ከበርካታ ወራት በኋላ መተግበሪያውን በእንደዚህ አይነት ዳግም ማቀናበሪያ ስርዓት ከሰራን በኋላ አያስፈልገንም)። ጊዜያዊ ስህተቶች SeekToCurrentErrorHandler እንዲቃጠል አድርገዋል። የተቀሩት ስህተቶች በምዝግብ ማስታወሻው ውስጥ ታትመዋል ፣ ይህም ማካካሻ አስከትሏል እና በሚቀጥለው መልእክት ሂደት ቀጥሏል።

የመጨረሻ ውሳኔ

በSeekToCurrentErrorHandler ላይ የተመሰረተው አተገባበር የራሳችንን መልእክት ለመላክ ዘዴ እንድናዘጋጅ አነሳሳን።

በመጀመሪያ ደረጃ, ያለውን ልምድ ለመጠቀም እና እንደ የመተግበሪያው አመክንዮ መጠን ለማስፋት እንፈልጋለን. ለመስመር አመክንዮ አፕሊኬሽን በድጋሚ ሙከራ ስልት ለተገለፀው አጭር ጊዜ አዳዲስ መልዕክቶችን ማንበብ ቢያቆም ጥሩ ነው። ለሌሎች አፕሊኬሽኖች የድጋሚ ሙከራ ስልቱን የሚያስፈጽም አንድ ነጥብ እንዲኖረኝ ፈልጌ ነበር። በተጨማሪም፣ ይህ ነጠላ ነጥብ ለሁለቱም አቀራረቦች የ DLQ ተግባር ሊኖረው ይገባል።

የድጋሚ ሙከራ ስልቱ ራሱ በመተግበሪያው ውስጥ መቀመጥ አለበት፣ ይህም ጊዜያዊ ስህተት በሚፈጠርበት ጊዜ ቀጣዩን የጊዜ ክፍተት ለማምጣት ሃላፊነት አለበት።

ለሊኒያር ሎጂክ መተግበሪያ ሸማቹን ማቆም

ከፀደይ-ካፍካ ጋር በሚሰሩበት ጊዜ ሸማቹን ለማቆም ኮድ እንደዚህ ያለ ነገር ሊመስል ይችላል-

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

ለምሳሌ፣ retryAt አሁንም እየሰራ ከሆነ የሜሴጅ ሊስተነር ኮንቴይነርን እንደገና ለማስጀመር ጊዜው ነው። ድጋሚ ማስጀመሪያው በታስክ ሼዱለር ውስጥ በተጀመረ የተለየ ክር ውስጥ ይከናወናል፣ አተገባበሩም በጸደይ ይሰጣል።

የድጋሚ ሙከራ ዋጋን በሚከተለው መንገድ እናገኛለን።

  1. የድጋሚ ጥሪ ቆጣሪ ዋጋ ወደላይ ይታያል።
  2. በቆጣሪው ዋጋ ላይ በመመስረት፣ በድጋሚ ሙከራ ስትራቴጂ ውስጥ ያለው የአሁኑ የዘገየ ጊዜ ይፈለጋል። ስልቱ በራሱ አፕሊኬሽኑ ውስጥ ተገልጿል፤ እሱን ለማስቀመጥ የJSON ፎርማትን መርጠናል።
  3. በJSON ድርድር ውስጥ የሚገኘው የጊዜ ክፍተት የሰከንዶች ብዛት ይይዛል ከዚያ በኋላ ሂደቱን መድገም ያስፈልጋል። ለድጋሚ ለመሞከር እሴቱን ለማዘጋጀት ይህ የሰከንዶች ብዛት አሁን ባለው ጊዜ ላይ ተጨምሯል።
  4. ክፍተቱ ካልተገኘ የ retryAt ዋጋ ዋጋ የለውም እና መልእክቱ በእጅ ለመተንተን ወደ DLQ ይላካል።

በዚህ አቀራረብ፣ የሚቀረው ነገር ቢኖር አሁን እየተሰራ ላለው እያንዳንዱ መልእክት ተደጋጋሚ ጥሪዎችን ቁጥር ማስቀመጥ ነው፣ ለምሳሌ በመተግበሪያ ማህደረ ትውስታ ውስጥ። የድጋሚ ሙከራ ቆጠራን በማህደረ ትውስታ ውስጥ ማቆየት ለዚህ አካሄድ ወሳኝ አይደለም፣ ምክንያቱም የመስመር ሎጂክ አፕሊኬሽኑ ሂደቱን በአጠቃላይ ማስተናገድ አይችልም። ከፀደይ-እንደገና ከመሞከር በተለየ, አፕሊኬሽኑን እንደገና ማስጀመር ሁሉም መልዕክቶች እንዲጠፉ አያደርግም, ነገር ግን በቀላሉ ስልቱን እንደገና ይጀምራል.

ይህ አቀራረብ ሸክሙን ከውጫዊው ስርዓት ላይ ለማንሳት ይረዳል, ይህም በጣም ከባድ በሆነ ጭነት ምክንያት ላይገኝ ይችላል. በሌላ አነጋገር፣ እንደገና ከማቀናበር በተጨማሪ የስርዓተ-ጥለት አተገባበር ላይ ደርሰናል። ቆጣሪ.

በእኛ ሁኔታ፣ የስህተት ደረጃው 1 ብቻ ነው፣ እና በጊዜያዊ የአውታረ መረብ መቆራረጥ ምክንያት የስርዓት መቋረጥ ጊዜን ለመቀነስ፣ በጣም ትንሽ የሆነ የድጋሚ ሙከራ ስትራቴጂ በትንሽ ጊዜ ቆይታ እንጠቀማለን። ይህ ለሁሉም የቡድን አፕሊኬሽኖች ተስማሚ ላይሆን ይችላል, ስለዚህ በስህተቱ ገደብ እና በጊዜ ልዩነት መካከል ያለው ግንኙነት በስርዓቱ ባህሪያት ላይ ተመርኩዞ መመረጥ አለበት.

ከመተግበሪያዎች የሚመጡ መልዕክቶችን ከማይወስን አመክንዮ ጋር ለማስኬድ የተለየ መተግበሪያ

የRETRY_AT ጊዜ ሲደርስ ወደ DESTINATION ርዕስ እንደገና ወደ እንደዚህ አይነት መተግበሪያ መልእክት የሚልክ ኮድ ምሳሌ ይኸውልዎት።


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

ምሳሌው ብዙ መረጃዎች በአርእስቶች ውስጥ እንደሚተላለፉ ያሳያል. የ RETRY_AT ዋጋ የሚገኘው በሸማች ማቆሚያ በኩል እንደገና ለመሞከር ዘዴው በተመሳሳይ መንገድ ነው። ከDESTINATION እና RETRY_AT በተጨማሪ እናልፋለን፡-

  • GROUP_ID፣ በእጅ ለመተንተን እና ለቀላል ፍለጋ መልእክቶችን የምንሰበስብበት።
  • ORIGINAL_PARTITION ተመሳሳዩን ሸማች ለድጋሚ ሂደት ለማቆየት መሞከር። ይህ ግቤት ባዶ ሊሆን ይችላል፣ በዚህ ጊዜ አዲሱ ክፍልፋይ የሚገኘው የዋናውን መልእክት መዝገብ ቁልፍ () ቁልፍ በመጠቀም ነው።
  • የዳግም ሙከራ ስልቱን ለመከተል የCOUNTER እሴት ተዘምኗል።
  • SEND_TO መልእክቱ RETRY_AT ሲደርስ እንደገና ለማቀናበር የተላከ መሆኑን ወይም በDLQ ውስጥ መቀመጡን የሚያመለክት ቋሚ ነው።
  • ምክንያት - የመልእክት ሂደት የተቋረጠበት ምክንያት።

ድጋሚ ሞክር ለዳግም መላክ እና በእጅ ለመተንተን በ PostgreSQL ውስጥ መልዕክቶችን ያከማቻል። ሰዓት ቆጣሪ በRETRY_AT መልዕክቶችን አግኝቶ ወደ ORIGINAL_PARTITION የDESTINATION ርዕስ ክፍልፍል ከቁልፍ መዝገብ() ጋር መልሶ ይልካል።

አንዴ ከተላኩ መልዕክቶች ከPostgreSQL ይሰረዛሉ። የመልእክቶችን በእጅ መተንተን የሚከሰተው ከሪትሪየር ጋር በREST API መስተጋብር በሚፈጥር ቀላል UI ነው። ዋና ባህሪያቱ ከ DLQ መልዕክቶችን እንደገና መላክ ወይም መሰረዝ፣ የስህተት መረጃዎችን መመልከት እና መልዕክቶችን መፈለግ፣ ለምሳሌ በስህተት ስም።

የመዳረሻ መቆጣጠሪያ በእኛ ዘለላዎች ላይ የነቃ ስለሆነ፣ እንደገና ፈታኙ የሚያዳምጠውን ርዕስ እንዲደርስ መጠየቅ እና እንደገና ሞክር ወደ DESTINATION ርዕስ እንዲጽፍ መፍቀድ ያስፈልጋል። ይህ የማይመች ነው፣ ነገር ግን፣ ከክፍለ ጊዜ ርዕስ አቀራረብ በተለየ፣ እሱን ለማስተዳደር ሙሉ DLQ እና UI አለን።

አንድ መጪ ርዕስ በተለያዩ የሸማቾች ቡድኖች ሲነበብ፣ አፕሊኬሽኑ የተለያዩ አመክንዮዎችን የሚተገበርባቸው አጋጣሚዎች አሉ። ከእነዚህ አፕሊኬሽኖች ውስጥ ለአንዱ በRetryer መልእክትን እንደገና ማካሄድ በሌላኛው ላይ ብዜት ያስከትላል። ይህንን ለመከላከል፣ እንደገና ለማቀናበር የተለየ ርዕስ እንፈጥራለን። መጪ እና እንደገና ሞክር ርዕሶች ያለ ምንም ገደብ በተመሳሳይ ሸማች ማንበብ ይችላሉ.

ከካፍካ የተቀበሉትን ክስተቶች እንደገና በማቀናበር ላይ

በነባሪ ይህ አካሄድ የወረዳ የሚላተም ተግባር አይሰጥም ነገር ግን በመጠቀም ወደ መተግበሪያ ሊታከል ይችላል ጸደይ-ደመና-netflix ወይም አዲስ የፀደይ ደመና ወረዳ ተላላፊ, የውጭ አገልግሎቶች የሚጠሩባቸውን ቦታዎች ወደ ተገቢ ማጠቃለያዎች መጠቅለል. በተጨማሪም, ለ ስትራቴጂ መምረጥ ይቻላል ጅምላ ስርዓተ-ጥለት, እሱም ደግሞ ጠቃሚ ሊሆን ይችላል. ለምሳሌ, በፀደይ-ክላውድ-ኔትፍሊክስ ውስጥ ይህ ክር ገንዳ ወይም ሴማፎር ሊሆን ይችላል.

መደምደሚያ

በውጤቱም, ማንኛውም ውጫዊ ስርዓት ለጊዜው የማይገኝ ከሆነ የመልዕክት ሂደትን ለመድገም የሚያስችል የተለየ መተግበሪያ አለን.

የአፕሊኬሽኑ ዋነኛ ጠቀሜታዎች በጎናቸው ላይ ጉልህ ማሻሻያ ሳይደረግላቸው በተመሳሳይ የካፍካ ክላስተር ላይ በሚሰሩ ውጫዊ ስርዓቶች መጠቀም ይቻላል! እንደዚህ አይነት አፕሊኬሽን የድጋሚ ሙከራውን ርዕስ መድረስ፣ ጥቂት የካፍካ ራስጌዎችን መሙላት እና ለዳግም ፈታኙ መልእክት መላክ ብቻ ያስፈልገዋል። ምንም ተጨማሪ መሠረተ ልማት ማሳደግ አያስፈልግም. እና ከመተግበሪያው ወደ Retryer እና ወደ ኋላ የሚተላለፉ መልዕክቶችን ብዛት ለመቀነስ፣ አፕሊኬሽኖችን በመስመራዊ አመክንዮ ለይተን በሸማቾች ማቆሚያ በኩል እንደገና አዘጋጅተናል።

ምንጭ: hab.com

አስተያየት ያክሉ