د کافکا څخه ترلاسه شوي پیښې بیا پروسس کول

د کافکا څخه ترلاسه شوي پیښې بیا پروسس کول

اې حبر

په دې وروستیو کې زه خپله تجربه شریکه کړه د کومو پیرامیټونو په اړه چې موږ د ټیم په توګه ډیری وختونه د کافکا تولید کونکي او مصرف کونکي لپاره کاروو ترڅو تضمین شوي تحویل ته نږدې شو. پدې مقاله کې زه غواړم تاسو ته ووایم چې څنګه موږ د کافکا څخه د بهرني سیسټم د موقتي نه شتون په پایله کې د یوې پیښې بیا پروسس تنظیم کړ.

عصري غوښتنلیکونه په خورا پیچلي چاپیریال کې کار کوي. د سوداګرۍ منطق په عصري ټیکنالوژۍ سټیک کې پوښل شوی، د ډاکر عکس کې روان دی چې د آرکیسټرټر لخوا اداره کیږي لکه کبرنیټس یا OpenShift، او د فزیکي او مجازی روټرونو سلسله له لارې د نورو غوښتنلیکونو یا تصدۍ حلونو سره اړیکه نیسي. په داسې چاپیریال کې ، یو څه تل ​​مات کیدی شي ، نو د پیښو بیا پروسس کول که چیرې یو بهرنی سیسټم شتون ونلري زموږ د سوداګرۍ پروسې یوه مهمه برخه ده.

د کافکا نه مخکې څنګه وو

د پروژې په پیل کې موږ د غیر متناسب پیغام رسولو لپاره IBM MQ کاروو. که چیرې د خدماتو د عملیاتو په جریان کې کومه تېروتنه رامنځ ته شي، ترلاسه شوی پیغام د لازیاتو لاسي تجزیه کولو لپاره په مړ شوي لیک کتار (DLQ) کې کیښودل کیدی شي. DLQ د راتلونکي کتار تر څنګ رامینځته شوی ، پیغام د IBM MQ دننه لیږدول شوی.

که تېروتنه لنډمهاله وه او موږ کولی شو دا وټاکو (د مثال په توګه، په HTTP کال کې د ResourceAccessException یا د MongoDb غوښتنې په اړه د MongoTimeoutException)، نو بیا د بیاکتنې ستراتیژي به اغیزمنه وي. د غوښتنلیک د شاخ کولو منطق ته په پام سره، اصلي پیغام یا د ځنډولو لیږلو لپاره د سیسټم کتار ته لیږدول شوی، یا یو جلا غوښتنلیک ته چې د پیغامونو بیا لیږلو لپاره ډیر وخت مخکې جوړ شوی و. پدې کې د پیغام سرلیک کې د بیا لیږلو شمیره شامله ده، کوم چې د ځنډ وقفې یا د غوښتنلیک کچې ستراتیژۍ پای پورې تړلی دی. که موږ د ستراتیژۍ پای ته رسیدلي یو مګر بهرنی سیسټم لاهم شتون نلري، نو پیغام به د لاسي تحلیل لپاره په DLQ کې ځای پرځای شي.

د حل لار موندنه

په انټرنیټ کې لټون کول، تاسو کولی شئ لاندې ومومئ پریکړه. په لنډه توګه، دا وړاندیز شوی چې د هر ځنډ وقفې لپاره یوه موضوع جوړه کړي او په اړخ کې د مصرف کونکي غوښتنلیکونه پلي کړي، کوم چې به د اړین ځنډ سره پیغامونه ولولي.

د کافکا څخه ترلاسه شوي پیښې بیا پروسس کول

د لوی شمیر مثبت بیاکتنو سره سره، دا زما په اند په بشپړه توګه بریالی نه دی. له هرڅه دمخه ، ځکه چې پراختیا کونکی ، د سوداګرۍ اړتیاو پلي کولو سربیره ، د بیان شوي میکانیزم پلي کولو کې به ډیر وخت مصرف کړي.

برسېره پردې، که د کافکا کلستر کې د لاسرسي کنټرول فعال شوی وي، تاسو باید یو څه وخت د موضوعاتو په جوړولو او دوی ته د اړتیا وړ لاسرسي چمتو کولو کې مصرف کړئ. برسېره پر دې، تاسو به اړتیا ولرئ چې د هرې بیاکتنې موضوع لپاره سم retention.ms پیرامیټر وټاکئ ترڅو پیغامونه د بیا لیږلو وخت ولري او له هغې څخه ورک نشي. د لاسرسي پلي کول او غوښتنه باید د هر موجوده یا نوي خدمت لپاره تکرار شي.

راځئ چې اوس وګورو چې کوم میکانیزمونه په عمومي ډول پسرلي او په ځانګړي ډول د پسرلي کافکا موږ ته د پیغام بیا پروسس کولو لپاره چمتو کوي. Spring-kafka په پسرلي-بیا هڅه کې انتقالي انحصار لري، کوم چې د بیالبیلو بیک آف پالیسي اداره کولو لپاره خلاصې وړاندې کوي. دا یو کافي انعطاف وړ وسیله ده ، مګر د دې مهم نیمګړتیا د غوښتنلیک حافظه کې د بیا لیږلو لپاره د پیغامونو ذخیره کول دي. دا پدې مانا ده چې د اپډیټ یا عملیاتي خطا له امله د غوښتنلیک بیا پیل کول به د بیا پروسس کولو پاتې ټول پیغامونه له لاسه ورکړي. څرنګه چې دا ټکی زموږ د سیسټم لپاره خورا مهم دی، موږ نور په پام کې نه نیسو.

spring-kafka پخپله د ContainerAwareErrorHandler ډیری تطبیقونه وړاندې کوي، د بیلګې په توګه SeekToCurrentErrorHandler، د کوم سره چې تاسو کولی شئ وروسته پیغام پروسس کړئ پرته لدې چې د غلطۍ په صورت کې آفسیټ بدل کړئ. د پسرلي کافکا 2.3 نسخه سره پیل کول، د بیک آف پالیسي تنظیم کول ممکن شو.

دا طریقه بیا پروسس شوي پیغامونو ته اجازه ورکوي چې د غوښتنلیک بیا پیل کولو ژوندي پاتې شي، مګر لاهم د DLQ میکانیزم شتون نلري. موږ دا اختیار د 2019 په پیل کې غوره کړ، په خوشبینانه توګه په دې باور وو چې DLQ ته به اړتیا ونلري (موږ نېکمرغه یو او په حقیقت کې د ورته بیا پروسس کولو سیسټم سره د غوښتنلیک د چلولو څو میاشتې وروسته اړتیا نه درلوده). لنډمهاله تیروتنې د SeekToCurrentErrorHandler د اوریدو لامل شوې. پاتې تېروتنې په لاګ کې چاپ شوي، په پایله کې د آفسټ، او پروسس د راتلونکي پیغام سره دوام لري.

وروستۍ پریکړه

د SeekToCurrentErrorHandler پر بنسټ پلي کول موږ ته وهڅول چې د پیغامونو بیا لیږلو لپاره خپل میکانیزم رامینځته کړو.

له هرڅه دمخه ، موږ غوښتل چې موجوده تجربه وکاروو او د غوښتنلیک منطق پورې اړه ولري. د خطي منطق غوښتنلیک لپاره، دا به غوره وي چې د لنډې مودې لپاره د نوي پیغامونو لوستل بند کړئ چې د بیاکتنې ستراتیژۍ لخوا مشخص شوي. د نورو غوښتنلیکونو لپاره، زه غواړم یو واحد ټکی ولرم چې د بیاکتنې ستراتیژي پلي کړي. برسېره پردې، دا واحد ټکی باید د دواړو طریقو لپاره د DLQ فعالیت ولري.

د بیا هڅه کولو ستراتیژي باید پخپله په غوښتنلیک کې زیرمه شي، کوم چې د راتلونکي وقفې بیرته ترلاسه کولو مسولیت لري کله چې لنډمهاله تېروتنه رامنځته شي.

د لینر منطق غوښتنلیک لپاره د مصرف کونکي ودرول

کله چې د پسرلي کافکا سره کار کول، د مصرف کونکي د بندولو کوډ ممکن داسې ښکاري:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

په مثال کې، retryAt هغه وخت دی چې د MessageListenerContainer بیا پیل کړئ که دا لاهم روان وي. بیا پیل به په ټاسک شیډولر کې په لاره اچول شوي جلا تار کې پیښ شي ، چې پلي کول یې د پسرلي لخوا هم چمتو شوي.

موږ د retryAt ارزښت په لاندې ډول پیدا کوو:

  1. د بیا زنګ کاونټر ارزښت لیدل کیږي.
  2. د مقابل ارزښت پراساس، د بیاکتنې ستراتیژۍ کې اوسنی ځنډ وقفه لټول کیږي. ستراتیژي پخپله په غوښتنلیک کې اعلان شوې؛ موږ د دې ذخیره کولو لپاره د JSON بڼه غوره کړه.
  3. په JSON صف کې موندل شوی وقفه د ثانیو شمیر لري چې وروسته به پروسس تکرار شي. د ثانیو دا شمیره په اوسني وخت کې اضافه کیږي ترڅو د retryAt ارزښت رامینځته کړي.
  4. که وقفه ونه موندل شي، نو د retryAt ارزښت ناپاک دی او پیغام به DLQ ته د لاسي تحلیل لپاره واستول شي.

د دې طریقې سره، ټول هغه څه چې پاتې دي د هر پیغام لپاره د تکراري تلیفونونو شمیر خوندي کول دي چې اوس مهال پروسس کیږي، د بیلګې په توګه د غوښتنلیک حافظه کې. په حافظه کې د بیاکتنې شمیرې ساتل د دې طریقې لپاره مهم ندي، ځکه چې د خطي منطق غوښتنلیک نشي کولی په ټوله توګه پروسس اداره کړي. د پسرلي بیا پیل کولو برعکس، د غوښتنلیک بیا پیل کول به د دې لامل نه شي چې ټول پیغامونه له لاسه ورکړي چې بیا پروسس شي، مګر په ساده ډول به ستراتیژي بیا پیل کړي.

دا طریقه د بهرني سیسټم څخه د بار وړلو کې مرسته کوي، کوم چې ممکن د ډیر دروند بار له امله شتون ونلري. په بل عبارت، د بیا پروسس کولو سربیره، موږ د نمونې پلي کول ترلاسه کړل سرکټ ماتونکی.

زموږ په قضیه کې ، د خطا حد یوازې 1 دی ، او د لنډمهاله شبکې بندیدو له امله د سیسټم ځنډیدو وخت کمولو لپاره ، موږ د کوچني ځنډ وقفې سره د بیاکتنې خورا پراخه ستراتیژي کاروو. دا ممکن د ټولو ګروپ غوښتنلیکونو لپاره مناسب نه وي، نو د خطا حد او د وقفې ارزښت ترمنځ اړیکه باید د سیسټم ځانګړتیاو پراساس وټاکل شي.

د غیر متمرکز منطق سره د غوښتنلیکونو څخه د پیغامونو پروسس کولو لپاره جلا غوښتنلیک

دلته د کوډ یوه بیلګه ده چې ورته غوښتنلیک (ریټرییر) ته پیغام لیږي ، کوم چې د RETRY_AT وخت ته رسیدو سره به د DESTINATION موضوع ته بیا لیږل کیږي:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

مثال ښیې چې ډیری معلومات په سرلیکونو کې لیږدول کیږي. د RETRY_AT ارزښت په ورته ډول موندل کیږي لکه څنګه چې د مصرف کونکي سټاپ له لارې د بیا هڅه کولو میکانیزم لپاره. د DESTINATION او RETRY_AT سربیره موږ تیریږي:

  • GROUP_ID، د کوم په واسطه موږ د لاسي تحلیل او ساده لټون لپاره پیغامونه ګروپ کوو.
  • ORIGINAL_PARTITION د بیا پروسس کولو لپاره ورته مصرف کونکي ساتلو هڅه کول. دا پیرامیټر کولی شي ناپاک وي، په دې حالت کې نوې برخه به د اصلي پیغام د record.key() کیلي په کارولو سره ترلاسه شي.
  • د بیا هڅه کولو ستراتیژی تعقیبولو لپاره د COUNTER ارزښت تازه شوی.
  • SEND_TO یو ثابت دی چې دا په ګوته کوي چې ایا پیغام RETRY_AT ته رسیدو وروسته د بیا پروسس کولو لپاره لیږل شوی یا په DLQ کې ځای پرځای شوی.
  • REASON - هغه دلیل چې ولې د پیغام پروسس کې خنډ شو.

Retryer په PostgreSQL کې د بیا لیږلو او لاسي پارس کولو لپاره پیغامونه ذخیره کوي. یو ټیمر یو کار پیل کوي چې د RETRY_AT سره پیغامونه لټوي او بیرته یې د اصلي record.key() سره د DESTINATION موضوع ORIGINAL_PARTITION برخې ته لیږي.

یوځل لیږل شوي ، پیغامونه د PostgreSQL څخه حذف کیږي. د پیغامونو لاسي تجزیه په ساده UI کې پیښیږي چې د REST API له لارې د ریټرییر سره تعامل کوي. د دې اصلي ځانګړتیاوې د DLQ څخه د پیغامونو بیا لیږل یا حذف کول، د خطا معلوماتو لیدل او د پیغامونو لټون کول، د بیلګې په توګه د غلط نوم په واسطه.

څنګه چې زموږ په کلسترونو کې د لاسرسي کنټرول فعال شوی ، نو اړینه ده چې اضافي موضوع ته د لاسرسي غوښتنه وکړئ چې ریټرییر یې اوري ، او ریټرییر ته اجازه ورکوي چې د DESTINATION موضوع ته ولیکي. دا ناشونی دی، مګر، د وقفې موضوع طریقې برعکس، موږ د دې اداره کولو لپاره بشپړ DLQ او UI لرو.

داسې قضیې شتون لري کله چې راتلونکی موضوع د ډیری مختلف مصرف کونکو ډلو لخوا لوستل کیږي ، چې غوښتنلیکونه مختلف منطق پلي کوي. د دې غوښتنلیکونو څخه یو لپاره د Retryer له لارې د پیغام بیا پروسس کول به په بل کې د نقل پایله وي. د دې په وړاندې د ساتنې لپاره، موږ د بیا پروسس لپاره جلا موضوع جوړه کوو. راتلونکی او بیا هڅه کولو موضوعات د ورته مصرف کونکي لخوا پرته له کوم محدودیت لوستل کیدی شي.

د کافکا څخه ترلاسه شوي پیښې بیا پروسس کول

د ډیفالټ په واسطه دا طریقه د سرکټ بریکر فعالیت نه وړاندې کوي، مګر دا په کارولو سره غوښتنلیک کې اضافه کیدی شي spring-Cloud-netflix یا نوی د پسرلي بادل سرکټ بریکر, هغه ځایونه وتړل چې بهرني خدمات په مناسب خلاصون کې ویل کیږي. برسېره پردې، دا ممکنه ده چې د دې لپاره یوه ستراتیژي غوره کړئ سر نمونه، کوم چې هم ګټور کیدی شي. د مثال په توګه، په پسرلي-کلاؤډ-نیټ فلکس کې دا کیدای شي د تار حوض یا سیمفور وي.

پایلې

د پایلې په توګه، موږ یو جلا غوښتنلیک لرو چې موږ ته اجازه راکوي چې د پیغام پروسس بیا تکرار کړو که چیرې کوم بهرنی سیسټم په لنډمهاله توګه شتون ونلري.

د غوښتنلیک یوه اصلي ګټه دا ده چې دا د بهرني سیسټمونو لخوا کارول کیدی شي چې په ورته کافکا کلستر کې روان وي ، پرته له دې چې د دوی اړخ کې د پام وړ بدلونونه راشي! دا ډول غوښتنلیک به یوازې د بیا هڅه کولو موضوع ته لاسرسی ته اړتیا ولري، د کافکا یو څو سرلیکونه ډک کړئ او بیا هڅه کونکي ته پیغام واستوئ. هیڅ اضافي زیربنا ته اړتیا نشته. او د دې لپاره چې د غوښتنلیک څخه Retryer او بیرته ته لیږدول شوي پیغامونو شمیر کم کړي، موږ غوښتنلیکونه د خطي منطق سره پیژندلي او د مصرف کونکي سټاپ له لارې یې بیا پروسس کړي.

سرچینه: www.habr.com

Add a comment