ڪافڪا کان حاصل ڪيل واقعن کي ٻيهر پروسيسنگ

ڪافڪا کان حاصل ڪيل واقعن کي ٻيهر پروسيسنگ

هي حبر.

تازو آء پنهنجو تجربو شيئر ڪيو اسان ڪهڙن معيارن جي باري ۾ هڪ ٽيم جي طور تي گهڻو ڪري ڪفڪا پروڊيوسر ۽ صارفين لاءِ استعمال ڪندا آهيون ضمانت جي ترسيل جي ويجهو وڃڻ لاءِ. هن آرٽيڪل ۾ مان توهان کي ٻڌائڻ چاهيان ٿو ته اسان ڪافڪا کان حاصل ڪيل هڪ واقعي جي ٻيهر پروسيسنگ کي ڪيئن منظم ڪيو جنهن جي نتيجي ۾ خارجي نظام جي عارضي غير موجودگي جي نتيجي ۾.

جديد ايپليڪيشنون تمام پيچيده ماحول ۾ ڪم ڪن ٿيون. ڪاروباري منطق هڪ جديد ٽيڪنالاجي اسٽيڪ ۾ ويڙهيل آهي، هڪ ڊاکر تصوير ۾ هلندڙ آهي آرڪيسٽرٽر طرفان منظم ڪيل ڪبرنيٽس يا OpenShift، ۽ ٻين ايپليڪيشنن يا ڪاروباري حلن سان رابطي سان فزيڪل ۽ ورچوئل روٽرز جي زنجير ذريعي. اهڙي ماحول ۾، ڪا شيءِ هميشه ڀڃي سگهي ٿي، تنهن ڪري واقعن کي ٻيهر پروسيس ڪرڻ جيڪڏهن هڪ خارجي نظام موجود نه هجي ته اسان جي ڪاروباري عملن جو هڪ اهم حصو آهي.

ڪافڪا کان اڳ ڪيئن هو

اڳ ۾ پروجيڪٽ ۾ اسان استعمال ڪيو IBM MQ asynchronous پيغام پهچائڻ لاءِ. جيڪڏهن سروس جي آپريشن دوران ڪا به غلطي ٿي وئي، وصول ٿيل پيغام کي وڌيڪ دستي طور تي پارس ڪرڻ لاء مئل-ليٽر-قطار (DLQ) ۾ رکي سگهجي ٿو. ايندڙ قطار جي اڳيان DLQ ٺاهي وئي، پيغام IBM MQ اندر منتقل ڪيو ويو.

جيڪڏهن غلطي عارضي هئي ۽ اسان ان کي طئي ڪري سگهون ٿا (مثال طور، هڪ HTTP ڪال تي هڪ ResourceAccessException يا MongoDb درخواست تي MongoTimeoutException)، پوءِ ٻيهر ڪوشش جي حڪمت عملي اثر انداز ٿيندي. ايپليڪيشن جي برانچنگ منطق کان سواء، اصل پيغام يا ته دير سان موڪلڻ لاء سسٽم قطار ڏانهن منتقل ڪيو ويو، يا هڪ الڳ ايپليڪيشن ڏانهن جيڪو پيغام ٻيهر موڪلڻ لاء گهڻو اڳ ٺاهيو ويو هو. ھن ۾ شامل آھي ھڪڙو ٻيهر موڪليو نمبر پيغام جي ھيڊر ۾، جيڪو دير سان وقف يا ايپليڪيشن-سطح جي حڪمت عملي جي آخر سان ڳنڍيل آھي. جيڪڏهن اسان حڪمت عملي جي پڄاڻي تي پهچي چڪا آهيون پر خارجي نظام اڃا تائين دستياب ناهي، ته پيغام دستي پارسنگ لاءِ DLQ ۾ رکيو ويندو.

حل ڳولڻ

انٽرنيٽ تي ڳولهي رهيو آهي، توھان ھيٺ ڏنل ڳولي سگھو ٿا حل. مختصر ۾، اهو تجويز ڪيو ويو آهي ته هر دير جي وقفي لاءِ هڪ موضوع ٺاهيو ۽ پاسي تي صارفين جي ايپليڪيشنن کي لاڳو ڪيو وڃي، جيڪي گهربل دير سان پيغام پڙهي سگهندا.

ڪافڪا کان حاصل ڪيل واقعن کي ٻيهر پروسيسنگ

مثبت جائزو جي وڏي تعداد جي باوجود، مون کي لڳي ٿو ته مڪمل طور تي ڪامياب نه آهي. سڀ کان پهريان، ڇو ته ڊولپر، ڪاروباري گهرجن کي لاڳو ڪرڻ کان علاوه، بيان ڪيل ميکانيزم کي لاڳو ڪرڻ لاء گهڻو وقت خرچ ڪرڻو پوندو.

اضافي طور تي، جيڪڏهن ڪافڪا ڪلستر تي رسائي ڪنٽرول فعال آهي، توهان کي ڪجهه وقت گذارڻو پوندو موضوع ٺاهڻ ۽ انهن تائين ضروري رسائي فراهم ڪرڻ ۾. ان کان علاوه، توهان کي ٻيهر ڪوشش ڪرڻ جي هر موضوع لاءِ صحيح retention.ms پيٽرولر چونڊڻ جي ضرورت پوندي ته جيئن پيغامن کي ٻيهر موڪلڻ جو وقت هجي ۽ ان مان غائب نه ٿئي. لاڳو ڪرڻ ۽ رسائي جي درخواست کي هر موجوده يا نئين خدمت لاء بار بار ڪرڻو پوندو.

اچو ته ھاڻي ڏسون ته ڪھڙا ميکانيزم اسپرنگ عام طور تي ۽ اسپرنگ ڪافڪا خاص طور تي اسان کي پيغام جي ٻيهر پروسيسنگ لاءِ مهيا ڪن ٿا. Spring-kafka جو spring-retry تي هڪ عبوري انحصار آهي، جيڪو مختلف BackOffPolicies کي منظم ڪرڻ لاءِ خلاصيون مهيا ڪري ٿو. هي هڪ ڪافي لچڪدار اوزار آهي، پر ان جي اهم خرابي ايپليڪيشن ميموري ۾ ٻيهر موڪلڻ لاءِ پيغامن کي محفوظ ڪرڻ آهي. هن جو مطلب آهي ته ايپليڪيشن کي ٻيهر شروع ڪرڻ جي ڪري هڪ تازه ڪاري يا هڪ آپريشنل غلطي جي نتيجي ۾ سڀني پيغامن جي نقصان جي نتيجي ۾ ٻيهر پروسيسنگ جي انتظار ۾ آهي. جيئن ته هي نقطو اسان جي سسٽم لاءِ نازڪ آهي، ان ڪري اسان ان تي وڌيڪ غور نه ڪيو.

spring-kafka خود ContainerAwareErrorHandler جي ڪيترن ئي عملن کي مهيا ڪري ٿو، مثال طور ڳولھيو ڪرنٽ ايرر ھينڊلر، جنهن سان توهان ڪنهن غلطي جي صورت ۾ آفسيٽ کي شفٽ ڪرڻ کان سواءِ بعد ۾ پيغام تي عمل ڪري سگهو ٿا. اسپرنگ ڪافڪا 2.3 جي ورزن سان شروع ڪندي، بيڪ آف پاليسي کي سيٽ ڪرڻ ممڪن ٿي ويو.

اهو طريقو ٻيهر پروسيس ٿيل پيغامن کي ايپليڪيشن ٻيهر شروع ڪرڻ جي اجازت ڏئي ٿو، پر اڃا تائين ڪوبه DLQ ميڪانيزم ناهي. اسان هي آپشن 2019 جي شروعات ۾ چونڊيو، پراميد طور تي يقين رکون ٿا ته DLQ جي ضرورت نه هوندي (اسان خوش قسمت هئاسين ۽ حقيقت ۾ ان جي ضرورت نه هئي ڪيترن ئي مهينن کان پوءِ ايپليڪيشن کي اهڙي ري پروسيسنگ سسٽم سان هلائڻ). عارضي غلطين سبب SeekToCurrentErrorHandler کي فائر ڪيو ويو. باقي غلطيون لاگ ۾ ڇپيل هيون، نتيجي ۾ هڪ آفسٽ، ۽ پروسيسنگ ايندڙ پيغام سان جاري رهي.

آخري فيصلو

SeekToCurrentErrorHandler جي بنياد تي عمل درآمد اسان کي پيغامن کي ٻيهر موڪلڻ لاءِ پنهنجو ميڪانيزم تيار ڪرڻ لاءِ چيو.

سڀ کان پهريان، اسان چاهيون ٿا ته موجوده تجربي کي استعمال ڪريو ۽ ان کي وڌائڻ چاهيون ٿا ايپليڪيشن منطق جي لحاظ سان. هڪ لڪير منطق جي ايپليڪيشن لاءِ، اهو بهتر هوندو ته نئين پيغامن کي پڙهڻ کان روڪيو وڃي ٿوري عرصي لاءِ جيڪو بيان ڪيو ويو آهي ٻيهر ڪوشش جي حڪمت عملي. ٻين ايپليڪيشنن لاءِ، مان چاهيان ٿو ته هڪڙو نقطو هجي جيڪو ٻيهر ڪوشش جي حڪمت عملي کي لاڳو ڪري. ان کان علاوه، ھن ھڪڙي نقطي کي لازمي آھي DLQ ڪارڪردگي ٻنهي طريقن لاءِ.

ٻيهر ڪوشش جي حڪمت عملي پاڻ کي ايپليڪيشن ۾ ذخيرو ٿيڻ گهرجي، جيڪو ايندڙ وقفي کي ٻيهر حاصل ڪرڻ لاء ذميوار آهي جڏهن هڪ عارضي غلطي ٿئي ٿي.

هڪ لڪير منطق جي درخواست لاء صارفين کي روڪڻ

جڏهن اسپرنگ ڪافڪا سان ڪم ڪندي، صارف کي روڪڻ لاءِ ڪوڊ شايد ڪجهه هن طرح نظر ايندو:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

مثال طور، retryAt اهو وقت آهي جيڪو MessageListenerContainer کي ٻيهر شروع ڪرڻ لاءِ جيڪڏهن اهو اڃا تائين هلي رهيو آهي. ٻيهر لانچ ٿيندو هڪ الڳ سلسلي ۾ شروع ڪيو ويو ٽاسڪ شيڊولر، جنهن تي عملدرآمد بهار طرفان مهيا ڪيل آهي.

اسان هيٺ ڏنل طريقي سان retryAt قدر ڳوليندا آهيون:

  1. ري-ڪال ڪائونٽر جو قدر مٿي ڏٺو ويو آهي.
  2. جوابي قدر جي بنياد تي، ٻيهر ڪوشش جي حڪمت عملي ۾ موجوده دير جي وقفي جي ڳولا ڪئي وئي آهي. حڪمت عملي خود ايپليڪيشن ۾ اعلان ڪئي وئي آهي؛ اسان ان کي ذخيرو ڪرڻ لاء JSON فارميٽ چونڊيو آهي.
  3. JSON صف ۾ مليو وقفو سيڪنڊن جو تعداد تي مشتمل آهي جنهن کان پوء پروسيسنگ کي بار بار ڪرڻ جي ضرورت پوندي. ھن سيڪنڊن جو تعداد موجوده وقت ۾ شامل ڪيو ويو آھي retryAt لاءِ قدر ٺاھيو.
  4. جيڪڏهن وقفو نه مليو آهي، ته پوءِ ريٽري ايٽ جو قدر null آهي ۽ پيغام موڪليو ويندو DLQ ڏانهن دستي تجزيي لاءِ.

هن طريقي سان، باقي اهو آهي ته هر پيغام لاء بار بار ڪالن جو تعداد بچائڻ لاء جيڪو في الحال پروسيس ڪيو پيو وڃي، مثال طور ايپليڪيشن ميموري ۾. يادگيري ۾ ٻيهر ڪوشش جي ڳڻپ کي رکڻ هن طريقي جي لاءِ نازڪ نه آهي، ڇاڪاڻ ته هڪ لڪير منطق ايپليڪيشن مڪمل طور تي پروسيسنگ کي سنڀالي نٿو سگهي. بهار جي ٻيهر ڪوشش جي برعڪس، ايپليڪيشن کي ٻيهر شروع ڪرڻ سبب سڀني پيغامن کي ٻيهر پروسيس ٿيڻ جو سبب نه ٿيندو، پر صرف حڪمت عملي کي ٻيهر شروع ڪندو.

اهو طريقو ٻاهرين سسٽم کي لوڊ ڪرڻ ۾ مدد ڪري ٿو، جيڪو شايد تمام ڳري لوڊ جي ڪري دستياب نه هجي. ٻين لفظن ۾، ٻيهر پروسيسنگ کان علاوه، اسان نموني جي عمل کي حاصل ڪيو سرڪٽ برير.

اسان جي صورت ۾، غلطي جي حد صرف 1 آهي، ۽ عارضي نيٽ ورڪ جي بندش جي ڪري سسٽم جي گھٽتائي کي گھٽائڻ لاء، اسان ننڍي ويڪرائي وقفي سان گڏ هڪ تمام وڏي پيماني تي ٻيهر ڪوشش جي حڪمت عملي استعمال ڪندا آهيون. اهو ٿي سگهي ٿو سڀني گروپن جي ايپليڪيشنن لاءِ موزون نه هجي، تنهن ڪري نقص جي حد ۽ وقفي جي قيمت جي وچ ۾ تعلق کي سسٽم جي خاصيتن جي بنياد تي چونڊيو وڃي.

غير مقرراتي منطق سان ايپليڪيشنن مان پيغامن جي پروسيسنگ لاءِ الڳ ايپليڪيشن

هتي ڪوڊ جو هڪ مثال آهي جيڪو هڪ پيغام موڪلي ٿو اهڙي ايپليڪيشن (Retryer) ڏانهن، جيڪو ٻيهر موڪليندو DESTINATION موضوع تي جڏهن RETRY_AT وقت پهچي ويندو:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

مثال ڏيکاري ٿو ته تمام گهڻي معلومات هيڊرن ۾ منتقل ڪئي وئي آهي. RETRY_AT جي قيمت ساڳئي طرح ملي ٿي جيئن صارف اسٽاپ ذريعي ٻيهر ڪوشش ڪرڻ واري ميڪانيزم لاءِ. DESTINATION ۽ RETRY_AT کان علاوه اسين پاس ڪريون ٿا:

  • GROUP_ID، جنهن جي ذريعي اسان گروپ پيغامن کي دستي تجزيي ۽ آسان ڳولا لاءِ.
  • ORIGINAL_PARTITION کي ٻيهر پروسيسنگ لاءِ ساڳيو صارف رکڻ جي ڪوشش ڪرڻ لاءِ. هي پيٽرول نيل ٿي سگهي ٿو، انهي صورت ۾ نئين ورهاڱي کي استعمال ڪندي حاصل ڪيو ويندو record.key() اصلي پيغام جي چيڪ.
  • ٻيهر ڪوشش جي حڪمت عملي تي عمل ڪرڻ لاءِ COUNTER قدر کي اپڊيٽ ڪيو ويو.
  • SEND_TO هڪ مستقل اشارو آهي ته ڇا پيغام RETRY_AT تي پهچڻ تي ٻيهر پروسيسنگ لاءِ موڪليو ويو آهي يا DLQ ۾ رکيل آهي.
  • REASON - سبب ڇو پيغام پروسيسنگ ۾ مداخلت ڪئي وئي.

ٻيهر ڪوشش ڪندڙ پوسٽ گري ايس ايس ايل ۾ ٻيهر موڪلڻ ۽ دستي پارس ڪرڻ لاءِ پيغامن کي محفوظ ڪري ٿو. هڪ ٽائمر هڪ ڪم شروع ڪري ٿو جيڪو پيغام ڳولي ٿو RETRY_AT سان ۽ انهن کي واپس موڪلي ٿو DESTINATION موضوع جي ORIGINAL_PARTITION ورهاڱي تي ڪي record.key().

هڪ دفعو موڪليو ويو، پيغام پوسٽ گري ايس ايس ايل مان ڊهي ويندا آهن. پيغامن جي دستي تجزيه هڪ سادي UI ۾ ٿئي ٿي جيڪا REST API ذريعي Retryer سان رابطو ڪري ٿي. ان جون مکيه خاصيتون DLQ کان پيغامن کي ٻيهر موڪلڻ يا حذف ڪرڻ، غلطي جي معلومات ڏسڻ ۽ پيغام ڳولڻ، مثال طور غلطي جي نالي سان.

جيئن ته اسان جي ڪلسٽرز تي رسائي ڪنٽرول فعال ٿيل آهي، ان لاءِ ضروري آهي ته اضافي طور تي ان موضوع تائين رسائي جي درخواست ڪئي وڃي جيڪا ريٽريٽر ٻڌي رهيو آهي، ۽ ٻيهر ڪوشش ڪندڙ کي اجازت ڏيو ته هو DESTINATION موضوع تي لکن. اها تڪليف آهي، پر، وقفي واري موضوع جي نقطه نظر جي برعڪس، اسان وٽ ان کي منظم ڪرڻ لاءِ مڪمل ڊي ايل ق ۽ UI آهي.

اهڙا ڪيس آهن جڏهن هڪ ايندڙ موضوع ڪيترن ئي مختلف صارفين گروپن طرفان پڙهيو ويندو آهي، جن جون ايپليڪيشنون مختلف منطق لاڳو ڪن ٿيون. انهن ايپليڪيشنن مان هڪ لاءِ Retryer ذريعي هڪ پيغام کي ٻيهر پروسيس ڪرڻ جي نتيجي ۾ ٻئي تي هڪ نقل ٿيندو. ان کان بچاءَ لاءِ، اسان ٻيهر پروسيسنگ لاءِ الڳ موضوع ٺاهيندا آهيون. ايندڙ ۽ ٻيهر ڪوشش ڪرڻ وارا موضوع ساڳيا صارف بغير ڪنهن پابندي جي پڙهي سگهن ٿا.

ڪافڪا کان حاصل ڪيل واقعن کي ٻيهر پروسيسنگ

ڊفالٽ طور، هي طريقو سرڪٽ برڪر ڪارڪردگي مهيا نٿو ڪري، پر ان کي استعمال ڪندي ايپليڪيشن ۾ شامل ڪري سگهجي ٿو بهار-ڪلائوڊ-نيٽ فلڪس يا نئون بهار بادل سرڪٽ برڪر، جڳهن کي لپائڻ جتي ٻاهرين خدمتن کي مناسب خلاصن ۾ سڏيو وڃي ٿو. ان کان سواء، اهو ممڪن آهي ته هڪ حڪمت عملي چونڊڻ لاء بائيڪاٽ نموني، جيڪو پڻ مفيد ٿي سگهي ٿو. مثال طور، spring-cloud-netflix ۾ هي ٿي سگهي ٿو ٿريڊ پول يا سيمفور.

ٿڪل

نتيجي طور، اسان وٽ هڪ الڳ ايپليڪيشن آهي جيڪا اسان کي پيغام جي پروسيسنگ کي ٻيهر ڏيڻ جي اجازت ڏئي ٿي جيڪڏهن ڪو خارجي نظام عارضي طور تي دستياب ناهي.

ايپليڪيشن جي مکيه فائدن مان هڪ اهو آهي ته اهو هڪ ئي ڪافڪا ڪلستر تي هلندڙ خارجي سسٽم طرفان استعمال ڪري سگهجي ٿو، انهن جي پاسي ۾ اهم تبديلين کان سواء! اهڙي ايپليڪيشن کي صرف ٻيهر ڪوشش ڪرڻ واري موضوع تائين رسائي جي ضرورت پوندي، ڪجهه ڪافڪا هيڊر ڀريو ۽ ٻيهر ڪوشش ڪندڙ ڏانهن پيغام موڪليو. ڪنهن به اضافي زيربنا کي وڌائڻ جي ڪا ضرورت ناهي. ۽ ايپليڪيشن مان منتقل ٿيل پيغامن جي تعداد کي گھٽائڻ لاءِ Retryer ۽ واپس، اسان ايپليڪيشنن کي سڌريل منطق سان سڃاڻي ورتو ۽ انھن کي ڪنزيومر اسٽاپ ذريعي ٻيهر پروسيس ڪيو.

جو ذريعو: www.habr.com

تبصرو شامل ڪريو