කෆ්කා වෙතින් ලැබුණු සිදුවීම් නැවත සැකසීම

කෆ්කා වෙතින් ලැබුණු සිදුවීම් නැවත සැකසීම

හේ හබ්ර්.

මෑතකදී අයි ඔහුගේ අත්දැකීම් බෙදා ගත්තේය අපි කණ්ඩායමක් ලෙස බොහෝ විට Kafka නිෂ්පාදකයා සහ පාරිභෝගිකයා සඳහා සහතික බෙදාහැරීමට සමීප වීමට භාවිතා කරන පරාමිති මොනවාද යන්න ගැන. මෙම ලිපියෙන් මට ඔබට කියන්නට අවශ්‍ය වන්නේ බාහිර පද්ධතිය තාවකාලිකව ලබා ගත නොහැකි වීමේ ප්‍රතිඵලයක් ලෙස Kafka වෙතින් ලැබුණු සිදුවීමක් නැවත සැකසීම අප විසින් සංවිධානය කළ ආකාරයයි.

නවීන යෙදුම් ඉතා සංකීර්ණ පරිසරයක් තුළ ක්රියාත්මක වේ. නවීන තාක්‍ෂණ තොගයකින් ඔතා ඇති ව්‍යාපාරික තර්කනය, Kubernetes හෝ OpenShift වැනි වාද්‍ය වෘන්දයක් විසින් කළමනාකරණය කරන ලද ඩොකර් රූපයක ධාවනය වන අතර භෞතික සහ අතථ්‍ය රවුටර දාමයක් හරහා වෙනත් යෙදුම් හෝ ව්‍යවසාය විසඳුම් සමඟ සන්නිවේදනය කරයි. එවැනි පරිසරයක් තුළ, යමක් සැමවිටම කැඩී යා හැක, එබැවින් එක් බාහිර පද්ධතියක් නොමැති නම් සිදුවීම් නැවත සැකසීම අපගේ ව්‍යාපාර ක්‍රියාවලීන්ගේ වැදගත් කොටසකි.

කෆ්කාට කලින් කොහොමද

මීට පෙර ව්‍යාපෘතියේ දී අපි අසමමුහුර්ත පණිවිඩ බෙදා හැරීම සඳහා IBM MQ භාවිතා කළෙමු. සේවාව ක්‍රියාත්මක කිරීමේදී කිසියම් දෝෂයක් සිදුවුවහොත්, ලැබුණු පණිවිඩය තවදුරටත් අතින් විග්‍රහ කිරීම සඳහා මළ ලිපි පෝලිමක (DLQ) තැබිය හැකිය. එන පෝලිමට යාබදව DLQ නිර්මාණය කරන ලදී, පණිවිඩය IBM MQ තුළට මාරු කරන ලදී.

දෝෂය තාවකාලික නම් සහ අපට එය තීරණය කළ හැකි නම් (උදාහරණයක් ලෙස, HTTP ඇමතුමක ResourceAccessException හෝ MongoTimeoutException එකක් MongoDb ඉල්ලීමක් මත), එවිට නැවත උත්සාහ කිරීමේ උපාය බලාත්මක වනු ඇත. යෙදුමේ ශාඛා තර්කය කුමක් වුවත්, මුල් පණිවිඩය ප්‍රමාද වූ යැවීම සඳහා පද්ධති පෝලිමට හෝ පණිවිඩ නැවත යැවීමට බොහෝ කලකට පෙර සාදන ලද වෙනම යෙදුමකට ගෙන යන ලදී. මෙයට පණිවිඩ ශීර්ෂයේ නැවත යැවීමේ අංකයක් ඇතුළත් වන අතර, එය ප්‍රමාද අන්තරයට හෝ යෙදුම් මට්ටමේ උපාය මාර්ගයේ අවසානයට බැඳී ඇත. අප උපායමාර්ගයේ අවසානයට පැමිණ ඇති නමුත් බාහිර පද්ධතිය තවමත් නොමැති නම්, එම පණිවිඩය අතින් විග්‍රහ කිරීම සඳහා DLQ තුළ තබනු ඇත.

විසඳුමක් සොයන්න

අන්තර්ජාලයේ සොයනවා, ඔබට පහත සඳහන් දේ සොයාගත හැකිය තීන්දුවයි. කෙටියෙන් කිවහොත්, එක් එක් ප්‍රමාද කාල සීමාව සඳහා මාතෘකාවක් නිර්මාණය කිරීමටත්, අවශ්‍ය ප්‍රමාදය සහිත පණිවිඩ කියවන පාරිභෝගික යෙදුම් පැත්තේ ක්‍රියාත්මක කිරීමටත් යෝජිතය.

කෆ්කා වෙතින් ලැබුණු සිදුවීම් නැවත සැකසීම

ධනාත්මක සමාලෝචන විශාල සංඛ්යාවක් තිබියදීත්, එය සම්පූර්ණයෙන්ම සාර්ථක නොවන බව මට පෙනේ. පළමුවෙන්ම, සංවර්ධකයාට, ව්යාපාර අවශ්යතා ක්රියාත්මක කිරීමට අමතරව, විස්තර කරන ලද යාන්ත්රණය ක්රියාත්මක කිරීම සඳහා බොහෝ කාලයක් ගත කිරීමට සිදුවනු ඇත.

මීට අමතරව, Kafka පොකුරේ ප්‍රවේශ පාලනය සක්‍රීය කර ඇත්නම්, ඔබට මාතෘකා නිර්මාණය කිරීමට සහ ඒවාට අවශ්‍ය ප්‍රවේශය ලබා දීමට යම් කාලයක් ගත කිරීමට සිදුවනු ඇත. මෙයට අමතරව, ඔබ එක් එක් නැවත උත්සාහ කරන මාතෘකා සඳහා නිවැරදි retention.ms පරාමිතිය තෝරා ගැනීමට අවශ්‍ය වනු ඇත, එවිට පණිවිඩ නැවත යැවීමට කාලය ඇති අතර එයින් අතුරුදහන් නොවේ. එක් එක් පවතින හෝ නව සේවාවක් සඳහා ක්‍රියාත්මක කිරීම සහ ප්‍රවේශ ඉල්ලීම නැවත නැවත කිරීමට සිදුවේ.

අපි දැන් බලමු සාමාන්‍යයෙන් වසන්තය සහ විශේෂයෙන් වසන්ත-කෆ්කා අපට පණිවිඩ නැවත සැකසීම සඳහා සපයන යාන්ත්‍රණ මොනවාද? Spring-kafka හට වසන්ත-නැවත උත්සාහ කිරීම මත සංක්‍රාන්ති යැපීමක් ඇත, එය විවිධ BackOffPolicies කළමනාකරණය කිරීම සඳහා වියුක්ත කිරීම් සපයයි. මෙය තරමක් නම්‍යශීලී මෙවලමකි, නමුත් එහි සැලකිය යුතු අඩුපාඩුවක් වන්නේ යෙදුම් මතකයේ නැවත යැවීම සඳහා පණිවිඩ ගබඩා කිරීමයි. මෙයින් අදහස් කරන්නේ යාවත්කාලීන කිරීමක් හෝ මෙහෙයුම් දෝෂයක් හේතුවෙන් යෙදුම නැවත ආරම්භ කිරීම නැවත සැකසීමට අපේක්ෂිත සියලුම පණිවිඩ නැති වීමට හේතු වන බවයි. මෙම කරුණ අපගේ පද්ධතියට තීරණාත්මක වන බැවින්, අපි එය තවදුරටත් සලකා බැලුවේ නැත.

උදාහරණයක් ලෙස, වසන්ත-කෆ්කා විසින්ම ContainerAwareErrorHandler ක්‍රියාත්මක කිරීම් කිහිපයක් සපයයි SeekToCurrentErrorHandler, දෝෂයක් ඇති වූ විට ඕෆ්සෙට් මාරු නොකර ඔබට පසුව පණිවිඩය සැකසිය හැක. වසන්ත-කෆ්කා 2.3 අනුවාදයෙන් පටන් ගෙන, BackOffPolicy සැකසීමට හැකි විය.

මෙම ප්‍රවේශය නැවත සකසන ලද පණිවිඩ වලට යෙදුම් නැවත ආරම්භ කිරීමෙන් බේරීමට ඉඩ සලසයි, නමුත් තවමත් DLQ යාන්ත්‍රණයක් නොමැත. DLQ අවශ්‍ය නොවන බව ශුභවාදීව විශ්වාස කරමින් අපි 2019 ආරම්භයේදී මෙම විකල්පය තෝරා ගත්තෙමු (අප වාසනාවන්ත වූ අතර ඇත්ත වශයෙන්ම එවැනි නැවත සැකසීමේ පද්ධතියක් සමඟ යෙදුම ක්‍රියාත්මක කිරීමෙන් මාස කිහිපයකට පසු එය අවශ්‍ය නොවීය). තාවකාලික දෝෂ නිසා SeekToCurrentErrorHandler වෙඩි තැබීමට හේතු විය. ඉතිරි දෝෂ ලොගයේ මුද්‍රණය කර ඇති අතර, එහි ප්‍රතිඵලයක් ලෙස ඕෆ්සෙට් එකක් ඇති වූ අතර ඊළඟ පණිවිඩය සමඟින් සැකසීම දිගටම සිදු විය.

අවසන් තීරණය

SeekToCurrentErrorHandler මත පදනම් වූ ක්‍රියාත්මක කිරීම පණිවිඩ නැවත යැවීම සඳහා අපගේම යාන්ත්‍රණයක් වර්ධනය කිරීමට අපව පොළඹවන ලදී.

පළමුවෙන්ම, අපට අවශ්‍ය වූයේ පවතින අත්දැකීම් භාවිතා කර යෙදුම් තර්කනය අනුව එය පුළුල් කිරීමට ය. රේඛීය තාර්කික යෙදුමක් සඳහා, නැවත උත්සාහ කිරීමේ උපාය මාර්ගයෙන් නිශ්චිතව දක්වා ඇති කෙටි කාලයක් සඳහා නව පණිවිඩ කියවීම නැවැත්වීම ප්‍රශස්ත වනු ඇත. වෙනත් යෙදුම් සඳහා, මට නැවත උත්සාහ කිරීමේ උපාය මාර්ගය බලාත්මක කරන තනි කරුණක් ලබා ගැනීමට අවශ්‍ය විය. මීට අමතරව, මෙම තනි ලක්ෂ්‍යය ප්‍රවේශයන් දෙක සඳහාම DLQ ක්‍රියාකාරීත්වය තිබිය යුතුය.

නැවත උත්සාහ කිරීමේ උපාය මාර්ගයම යෙදුම තුළ ගබඩා කළ යුතු අතර, එය තාවකාලික දෝෂයක් සිදු වූ විට ඊළඟ විරාමය ලබා ගැනීම සඳහා වගකිව යුතුය.

රේඛීය තාර්කික යෙදුමක් සඳහා පාරිභෝගිකයා නැවැත්වීම

වසන්ත-කෆ්කා සමඟ වැඩ කරන විට, පාරිභෝගිකයා නැවැත්වීමේ කේතය මේ වගේ දෙයක් විය හැකිය:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

උදාහරණයේ, RetryAt යනු MessageListenerContainer තවමත් ක්‍රියාත්මක වන්නේ නම් එය නැවත ආරම්භ කිරීමට කාලයයි. නැවත දියත් කිරීම TaskScheduler හි දියත් කරන ලද වෙනම නූලක සිදුවනු ඇත, එය ක්‍රියාත්මක කිරීම වසන්තය විසින් ද සපයනු ලැබේ.

අපි පහත ආකාරයට retryAt අගය සොයා ගනිමු:

  1. නැවත ඇමතුම් කවුන්ටරයේ වටිනාකම සොයා බලයි.
  2. කවුන්ටර අගය මත පදනම්ව, නැවත උත්සාහ කිරීමේ උපාය මාර්ගයේ වත්මන් ප්‍රමාද විරාමය සොයනු ලැබේ. උපායමාර්ගය යෙදුම තුළම ප්‍රකාශ කර ඇත; අපි එය ගබඩා කිරීමට JSON ආකෘතිය තෝරා ගත්තෙමු.
  3. JSON අරාවේ ඇති විරාමයෙහි සැකසුම් නැවත නැවත කිරීමට අවශ්‍ය තත්පර ගණන අඩංගු වේ. retryAt සඳහා අගය සැකසීමට වත්මන් වේලාවට මෙම තත්පර ගණන එකතු වේ.
  4. විරාමය සොයාගත නොහැකි නම්, retryAt හි අගය ශුන්‍ය වන අතර අතින් විග්‍රහ කිරීම සඳහා පණිවිඩය DLQ වෙත යවනු ලැබේ.

මෙම ප්‍රවේශය සමඟින්, ඉතිරිව ඇත්තේ දැනට සකසනු ලබන සෑම පණිවිඩයක් සඳහාම නැවත නැවත ඇමතුම් ගණන සුරැකීම පමණි, උදාහරණයක් ලෙස යෙදුම් මතකයේ. රේඛීය තාර්කික යෙදුමකට සමස්තයක් ලෙස සැකසීම හැසිරවිය නොහැකි බැවින්, නැවත උත්සාහ ගණන මතකයේ තබා ගැනීම මෙම ප්‍රවේශය සඳහා තීරණාත්මක නොවේ. වසන්ත-නැවත උත්සාහ කිරීම මෙන් නොව, යෙදුම නැවත ආරම්භ කිරීම සියලු පණිවිඩ නැවත සැකසීමට නැති වීමට හේතු නොවනු ඇත, නමුත් උපාය මාර්ගය නැවත ආරම්භ කරනු ඇත.

මෙම ප්‍රවේශය බාහිර පද්ධතියෙන් බර ඉවත් කිරීමට උපකාරී වේ, එය ඉතා අධික බරක් හේතුවෙන් ලබා ගත නොහැක. වෙනත් වචන වලින් කිවහොත්, නැවත සැකසීමට අමතරව, අපි රටාව ක්රියාත්මක කිරීම සාක්ෂාත් කර ගත්තෙමු පරිපථ බිඳිනය.

අපගේ නඩුවේදී, දෝෂ සීමාව 1 පමණක් වන අතර, තාවකාලික ජාල ඇනහිටීම් හේතුවෙන් පද්ධතියේ අක්‍රිය කාලය අවම කිරීම සඳහා, අපි කුඩා ප්‍රමාද කාල අන්තරයන් සහිත ඉතා කැටිති නැවත උත්සාහ කිරීමේ උපාය මාර්ගයක් භාවිතා කරමු. මෙය සියලුම කණ්ඩායම් යෙදුම් සඳහා සුදුසු නොවිය හැක, එබැවින් පද්ධතියේ ලක්ෂණ මත පදනම්ව දෝෂ සීමාව සහ අන්තර අගය අතර සම්බන්ධතාවය තෝරා ගත යුතුය.

නිර්ණය නොවන තර්කයක් සහිත යෙදුම් වලින් පණිවිඩ සැකසීම සඳහා වෙනම යෙදුමක්

මෙන්න එවැනි යෙදුමකට (නැවත උත්සාහ කරන්නා) පණිවිඩයක් යවන කේතයක උදාහරණයක් වන අතර එය RETRY_AT වේලාවට ළඟා වූ විට DESTINATION මාතෘකාවට යවනු ලැබේ:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

උදාහරණයෙන් පෙන්නුම් කරන්නේ බොහෝ තොරතුරු ශීර්ෂයන් තුළ සම්ප්රේෂණය වන බවයි. RETRY_AT හි අගය පාරිභෝගික නැවතුම හරහා නැවත උත්සාහ කිරීමේ යාන්ත්‍රණය සඳහා වන ආකාරයටම සොයා ගැනේ. DESTINATION සහ RETRY_AT වලට අමතරව අපි සමත්:

  • GROUP_ID, අපි අතින් විශ්ලේෂණය සහ සරල සෙවුම් සඳහා පණිවිඩ සමූහගත කරයි.
  • ORIGINAL_PARTITION නැවත සැකසීම සඳහා එකම පාරිභෝගිකයා තබා ගැනීමට උත්සාහ කරන්න. මෙම පරාමිතිය ශුන්‍ය විය හැක, මෙම අවස්ථාවේදී මුල් පණිවිඩයේ Record.key() යතුර භාවිතයෙන් නව කොටස ලබා ගනී.
  • නැවත උත්සාහ කිරීමේ උපාය මාර්ගය අනුගමනය කිරීමට COUNTER අගය යාවත්කාලීන කරන ලදී.
  • SEND_TO යනු පණිවිඩය RETRY_AT වෙත ළඟා වූ පසු නැවත සැකසීම සඳහා යවනු ලබන්නේද නැතහොත් DLQ හි තබා තිබේද යන්න පෙන්නුම් කරන නියතයකි.
  • හේතුව - පණිවිඩ සැකසීමට බාධා ඇති වීමට හේතුව.

Retryer PostgreSQL හි නැවත යැවීම සහ අතින් විග්‍රහ කිරීම සඳහා පණිවිඩ ගබඩා කරයි. ටයිමරයක් මඟින් RETRY_AT සමඟ පණිවිඩ සොයා ගන්නා කාර්යයක් ආරම්භ කර ඒවා යතුර Record.key() සමඟ DESTINATION මාතෘකාවේ ORIGINAL_PARTITION කොටස වෙත ආපසු යවයි.

යැවූ පසු, PostgreSQL වෙතින් පණිවිඩ මකා දමනු ලැබේ. REST API හරහා Retryer සමඟ අන්තර්ක්‍රියා කරන සරල UI එකක පණිවිඩ අතින් විග්‍රහ කිරීම සිදු වේ. එහි ප්‍රධාන ලක්ෂණ වන්නේ DLQ වෙතින් පණිවිඩ නැවත යැවීම හෝ මකා දැමීම, දෝෂ තොරතුරු බැලීම සහ පණිවිඩ සෙවීම, උදාහරණයක් ලෙස දෝෂ නාමයෙන්.

අපගේ පොකුරු මත ප්‍රවේශ පාලනය සබල කර ඇති බැවින්, නැවත උත්සාහ කරන්නා සවන් දෙන මාතෘකාවට අමතර ප්‍රවේශය ඉල්ලා සිටීම අවශ්‍ය වන අතර, නැවත උත්සාහ කරන්නාට DESTINATION මාතෘකාවට ලිවීමට ඉඩ දෙන්න. මෙය අපහසුයි, නමුත්, විරාම මාතෘකා ප්‍රවේශය මෙන් නොව, එය කළමනාකරණය කිරීමට අපට සම්පූර්ණ DLQ සහ UI ඇත.

විවිධ පාරිභෝගික කණ්ඩායම් කිහිපයක් විසින් එන මාතෘකාවක් කියවන අවස්ථා තිබේ, ඒවායේ යෙදුම් විවිධ තර්ක ක්‍රියාත්මක කරයි. මෙම යෙදුම් වලින් එකක් සඳහා Retryer හරහා පණිවිඩයක් නැවත සැකසීමෙන් අනෙක් එකෙහි අනුපිටපතක් ඇති වේ. මෙයින් ආරක්ෂා වීමට, අපි නැවත සැකසීම සඳහා වෙනම මාතෘකාවක් නිර්මාණය කරමු. පැමිණෙන සහ නැවත උත්සාහ කරන මාතෘකා කිසිදු සීමාවකින් තොරව එකම පාරිභෝගිකයාට කියවිය හැක.

කෆ්කා වෙතින් ලැබුණු සිදුවීම් නැවත සැකසීම

පෙරනිමියෙන් මෙම ප්‍රවේශය පරිපථ කඩන ක්‍රියාකාරිත්වය සපයන්නේ නැත, කෙසේ වෙතත් එය භාවිතයෙන් යෙදුමට එක් කළ හැක වසන්ත-වලාකුළු-netflix හෝ අලුත් වසන්ත වලාකුළු පරිපථ කඩනය, බාහිර සේවා කැඳවා ඇති ස්ථාන සුදුසු වියුක්ත කිරීම් වලට එතීම. ඊට අමතරව, උපාය මාර්ගයක් තෝරා ගැනීමට හැකි වේ තොග ශීර්ෂය රටාව, ද ප්රයෝජනවත් විය හැක. උදාහරණයක් ලෙස, වසන්ත-වලාකුළු-නෙට්ෆ්ලික්ස් හි මෙය නූල් තටාකයක් හෝ සෙමාෆෝරයක් විය හැකිය.

නිගමනය

එහි ප්‍රතිඵලයක් වශයෙන්, කිසියම් බාහිර පද්ධතියක් තාවකාලිකව නොමැති නම්, පණිවිඩ සැකසීම නැවත කිරීමට අපට ඉඩ සලසන වෙනම යෙදුමක් අප සතුව ඇත.

යෙදුමේ එක් ප්‍රධාන වාසියක් නම්, එය එකම කෆ්කා පොකුරේ ක්‍රියාත්මක වන බාහිර පද්ධති මගින් ඔවුන්ගේ පැත්තෙන් සැලකිය යුතු වෙනස් කිරීම් නොමැතිව භාවිතා කළ හැකි වීමයි! එවැනි යෙදුමකට අවශ්‍ය වන්නේ නැවත උත්සාහ කිරීමේ මාතෘකාවට ප්‍රවේශ වීම, කෆ්කා ශීර්ෂ කිහිපයක් පුරවා නැවත උත්සාහ කරන්නා වෙත පණිවිඩයක් යැවීම පමණි. කිසිදු අමතර යටිතල පහසුකම් නැංවීමට අවශ්‍ය නැත. තවද යෙදුමෙන් Retryer වෙත සහ ආපසු මාරු කරන පණිවිඩ ගණන අඩු කිරීම සඳහා, අපි රේඛීය තර්කනය සහිත යෙදුම් හඳුනාගෙන පාරිභෝගික නැවතුම හරහා ඒවා නැවත සකස් කළෙමු.

මූලාශ්රය: www.habr.com

අදහස් එක් කරන්න