کافکا سے موصول ہونے والے واقعات کی دوبارہ کارروائی

کافکا سے موصول ہونے والے واقعات کی دوبارہ کارروائی

ارے حبر۔

حال ہی میں میں اپنا تجربہ شیئر کیا۔ اس بارے میں کہ ہم بحیثیت ٹیم کافکا پروڈیوسر اور کنزیومر کے لیے گارنٹیڈ ڈیلیوری کے قریب جانے کے لیے اکثر کن پیرامیٹرز کا استعمال کرتے ہیں۔ اس مضمون میں میں آپ کو بتانا چاہتا ہوں کہ ہم نے بیرونی نظام کی عارضی عدم دستیابی کے نتیجے میں کافکا سے موصول ہونے والے ایک ایونٹ کی دوبارہ کارروائی کو کس طرح منظم کیا۔

جدید ایپلی کیشنز بہت پیچیدہ ماحول میں کام کرتی ہیں۔ کاروباری منطق جدید ٹیکنالوجی کے اسٹیک میں لپٹی ہوئی، ڈوکر امیج میں چلتی ہے جسے آرکیسٹریٹر جیسے Kubernetes یا OpenShift کے ذریعے منظم کیا جاتا ہے، اور فزیکل اور ورچوئل راؤٹرز کی ایک زنجیر کے ذریعے دیگر ایپلیکیشنز یا انٹرپرائز حل کے ساتھ بات چیت کرنا۔ ایسے ماحول میں، کوئی چیز ہمیشہ ٹوٹ سکتی ہے، لہذا اگر کوئی ایک بیرونی نظام دستیاب نہ ہو تو واقعات کو دوبارہ پروسیس کرنا ہمارے کاروباری عمل کا ایک اہم حصہ ہے۔

کافکا سے پہلے کیسا تھا۔

اس سے پہلے پروجیکٹ میں ہم نے غیر مطابقت پذیر پیغام کی ترسیل کے لیے IBM MQ استعمال کیا تھا۔ اگر سروس کے آپریشن کے دوران کوئی خرابی پیش آتی ہے، تو موصول ہونے والے پیغام کو مزید دستی تجزیہ کے لیے ڈیڈ لیٹر قطار (DLQ) میں رکھا جا سکتا ہے۔ DLQ آنے والی قطار کے آگے بنایا گیا تھا، پیغام کو IBM MQ کے اندر منتقل کیا گیا تھا۔

اگر غلطی عارضی تھی اور ہم اس کا تعین کر سکتے ہیں (مثال کے طور پر، HTTP کال پر ResourceAccessException یا MongoDb کی درخواست پر MongoTimeoutException)، تو دوبارہ کوشش کرنے کی حکمت عملی نافذ العمل ہوگی۔ ایپلیکیشن کی برانچنگ منطق سے قطع نظر، اصل پیغام کو یا تو تاخیر سے بھیجنے کے لیے سسٹم کی قطار میں منتقل کر دیا گیا تھا، یا پھر پیغامات کو دوبارہ بھیجنے کے لیے بہت پہلے بنائی گئی ایک علیحدہ ایپلیکیشن میں منتقل کر دیا گیا تھا۔ اس میں میسج ہیڈر میں دوبارہ بھیجنے کا نمبر شامل ہے، جو تاخیر کے وقفے یا ایپلیکیشن کی سطح کی حکمت عملی کے اختتام سے منسلک ہے۔ اگر ہم حکمت عملی کے اختتام پر پہنچ گئے ہیں لیکن بیرونی نظام ابھی تک دستیاب نہیں ہے، تو پیغام دستی تجزیہ کے لیے DLQ میں رکھا جائے گا۔

حل تلاش کریں

انٹرنیٹ پر تلاش کر رہے ہیں۔، آپ مندرجہ ذیل تلاش کرسکتے ہیں۔ فیصلہ. مختصراً، یہ تجویز ہے کہ ہر تاخیر کے وقفے کے لیے ایک موضوع بنایا جائے اور کنزیومر ایپلی کیشنز کو سائیڈ پر لاگو کیا جائے، جو مطلوبہ تاخیر کے ساتھ پیغامات کو پڑھے گا۔

کافکا سے موصول ہونے والے واقعات کی دوبارہ کارروائی

مثبت جائزوں کی ایک بڑی تعداد کے باوجود، مجھے لگتا ہے کہ یہ مکمل طور پر کامیاب نہیں ہے. سب سے پہلے، کیونکہ ڈویلپر، کاروباری ضروریات کو لاگو کرنے کے علاوہ، بیان کردہ طریقہ کار کو لاگو کرنے میں بہت زیادہ وقت خرچ کرنا پڑے گا.

اس کے علاوہ، اگر کافکا کلسٹر پر رسائی کنٹرول فعال ہے، تو آپ کو عنوانات بنانے اور ان تک ضروری رسائی فراہم کرنے میں کچھ وقت صرف کرنا پڑے گا۔ اس کے علاوہ، آپ کو دوبارہ کوشش کرنے والے ہر عنوان کے لیے صحیح retention.ms پیرامیٹر کو منتخب کرنے کی ضرورت ہوگی تاکہ پیغامات کو دوبارہ بھیجنے کا وقت ملے اور اس سے غائب نہ ہوں۔ رسائی کے نفاذ اور درخواست کو ہر موجودہ یا نئی سروس کے لیے دہرانا پڑے گا۔

آئیے اب دیکھتے ہیں کہ عام طور پر موسم بہار اور خاص طور پر اسپرنگ کافکا ہمیں پیغام کی دوبارہ پروسیسنگ کے لیے کون سے طریقہ کار فراہم کرتے ہیں۔ Spring-kafka کا spring-retry پر ایک عبوری انحصار ہے، جو مختلف BackOffPolicies کو منظم کرنے کے لیے تجرید فراہم کرتا ہے۔ یہ کافی حد تک لچکدار ٹول ہے، لیکن اس کی اہم خرابی ایپلی کیشن میموری میں دوبارہ بھیجنے کے لیے پیغامات کو محفوظ کرنا ہے۔ اس کا مطلب یہ ہے کہ اپ ڈیٹ یا آپریشنل خرابی کی وجہ سے ایپلیکیشن کو دوبارہ شروع کرنے کے نتیجے میں دوبارہ پروسیسنگ کے زیر التواء تمام پیغامات ضائع ہو جائیں گے۔ چونکہ یہ نکتہ ہمارے نظام کے لیے اہم ہے، اس لیے ہم نے اس پر مزید غور نہیں کیا۔

spring-kafka خود ContainerAwareErrorHandler کے کئی نفاذ فراہم کرتا ہے، مثال کے طور پر SeekToCurrentErrorHandler، جس کے ساتھ آپ کسی غلطی کی صورت میں آفسیٹ کو شفٹ کیے بغیر بعد میں پیغام پر کارروائی کر سکتے ہیں۔ spring-kafka 2.3 کے ورژن سے شروع کرتے ہوئے، BackOffPolicy کو سیٹ کرنا ممکن ہوا۔

یہ نقطہ نظر دوبارہ پروسیس شدہ پیغامات کو ایپلی کیشن کے دوبارہ شروع ہونے سے بچنے کی اجازت دیتا ہے، لیکن ابھی تک کوئی DLQ میکانزم موجود نہیں ہے۔ ہم نے 2019 کے آغاز میں اس آپشن کا انتخاب کیا، پر امید طور پر یقین رکھتے ہوئے کہ DLQ کی ضرورت نہیں ہوگی (ہم خوش قسمت تھے اور درحقیقت اس طرح کے ری پروسیسنگ سسٹم کے ساتھ ایپلیکیشن کو چلانے کے کئی ماہ بعد اس کی ضرورت نہیں تھی)۔ عارضی غلطیوں کی وجہ سے SeekToCurrentErrorHandler کو برطرف کر دیا گیا۔ بقیہ غلطیاں لاگ میں پرنٹ کی گئیں، جس کے نتیجے میں ایک آفسیٹ ہوا، اور اگلے پیغام کے ساتھ پروسیسنگ جاری رہی۔

آخری فیصلہ

SeekToCurrentErrorHandler پر مبنی نفاذ نے ہمیں پیغامات کو دوبارہ بھیجنے کے لیے اپنا طریقہ کار تیار کرنے پر آمادہ کیا۔

سب سے پہلے، ہم موجودہ تجربے کو استعمال کرنا چاہتے تھے اور ایپلیکیشن کی منطق کے مطابق اسے بڑھانا چاہتے تھے۔ لکیری منطق کے اطلاق کے لیے، دوبارہ کوشش کرنے کی حکمت عملی کے ذریعے مخصوص کردہ مختصر مدت کے لیے نئے پیغامات کو پڑھنا بند کرنا بہتر ہوگا۔ دیگر ایپلی کیشنز کے لیے، میں ایک نقطہ چاہتا ہوں جو دوبارہ کوشش کرنے کی حکمت عملی کو نافذ کرے۔ اس کے علاوہ، اس ایک پوائنٹ میں دونوں طریقوں کے لیے DLQ فعالیت ہونی چاہیے۔

دوبارہ کوشش کرنے کی حکمت عملی کو خود ایپلیکیشن میں محفوظ کیا جانا چاہیے، جو کہ ایک عارضی خرابی واقع ہونے پر اگلے وقفے کو بازیافت کرنے کے لیے ذمہ دار ہے۔

ایک لکیری منطق کی درخواست کے لیے صارفین کو روکنا

spring-kafka کے ساتھ کام کرتے وقت، صارف کو روکنے کا کوڈ کچھ اس طرح نظر آ سکتا ہے:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

مثال کے طور پر، retryAt میسج لسٹنر کنٹینر کو دوبارہ شروع کرنے کا وقت ہے اگر یہ اب بھی چل رہا ہے۔ دوبارہ لانچ ٹاسک شیڈیولر میں شروع کیے گئے ایک الگ تھریڈ میں ہوگا، جس کا نفاذ بھی موسم بہار کے ذریعے فراہم کیا جاتا ہے۔

ہمیں retryAt کی قدر درج ذیل طریقے سے ملتی ہے۔

  1. دوبارہ کال کاؤنٹر کی قیمت دیکھی جاتی ہے۔
  2. کاؤنٹر ویلیو کی بنیاد پر، دوبارہ کوشش کی حکمت عملی میں موجودہ تاخیر کا وقفہ تلاش کیا جاتا ہے۔ حکمت عملی کا اعلان درخواست میں ہی کیا گیا ہے؛ ہم نے اسے ذخیرہ کرنے کے لیے JSON فارمیٹ کا انتخاب کیا۔
  3. JSON صف میں پایا جانے والا وقفہ سیکنڈوں کی تعداد پر مشتمل ہے جس کے بعد پروسیسنگ کو دہرانے کی ضرورت ہوگی۔ سیکنڈ کی اس تعداد کو موجودہ وقت میں retryAt کی قدر بنانے کے لیے شامل کیا جاتا ہے۔
  4. اگر وقفہ نہیں ملتا ہے، تو retryAt کی قدر کالعدم ہے اور پیغام دستی تجزیہ کے لیے DLQ کو بھیجا جائے گا۔

اس نقطہ نظر کے ساتھ، جو باقی رہ جاتا ہے وہ ہر اس پیغام کے لیے دہرائی جانے والی کالوں کی تعداد کو محفوظ کرنا ہے جس پر فی الحال کارروائی ہو رہی ہے، مثال کے طور پر ایپلی کیشن میموری میں۔ یادداشت میں دوبارہ کوشش کی گنتی کو برقرار رکھنا اس نقطہ نظر کے لئے اہم نہیں ہے، کیونکہ ایک لکیری منطق کی درخواست مجموعی طور پر پروسیسنگ کو نہیں سنبھال سکتی ہے۔ spring-retry کے برعکس، ایپلیکیشن کو دوبارہ شروع کرنے سے تمام پیغامات کو دوبارہ پروسیس نہیں کیا جائے گا، لیکن یہ حکمت عملی کو دوبارہ شروع کر دے گی۔

یہ نقطہ نظر بیرونی نظام سے بوجھ اتارنے میں مدد کرتا ہے، جو بہت زیادہ بوجھ کی وجہ سے دستیاب نہیں ہوسکتا ہے۔ دوسرے الفاظ میں، ری پروسیسنگ کے علاوہ، ہم نے پیٹرن کے نفاذ کو حاصل کیا۔ سرکٹ بریکر.

ہمارے معاملے میں، خرابی کی حد صرف 1 ہے، اور نیٹ ورک کی عارضی بندش کی وجہ سے سسٹم کے ڈاؤن ٹائم کو کم سے کم کرنے کے لیے، ہم چھوٹے لیٹنسی وقفوں کے ساتھ ایک بہت ہی دانے دار دوبارہ کوشش کی حکمت عملی استعمال کرتے ہیں۔ ہو سکتا ہے کہ یہ تمام گروپ ایپلی کیشنز کے لیے موزوں نہ ہو، لہذا خرابی کی حد اور وقفہ کی قدر کے درمیان تعلق کو نظام کی خصوصیات کی بنیاد پر منتخب کیا جانا چاہیے۔

غیر متعین منطق کے ساتھ ایپلی کیشنز سے پیغامات پر کارروائی کرنے کے لیے ایک علیحدہ درخواست

یہاں کوڈ کی ایک مثال ہے جو ایسی ایپلیکیشن (ریٹریر) کو پیغام بھیجتا ہے، جو RETRY_AT کا وقت پورا ہونے پر DESTINATION موضوع پر دوبارہ بھیجا جائے گا:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

مثال سے پتہ چلتا ہے کہ بہت ساری معلومات ہیڈر میں منتقل ہوتی ہیں۔ RETRY_AT کی قدر اسی طرح پائی جاتی ہے جیسے کنزیومر سٹاپ کے ذریعے دوبارہ کوشش کرنے کے طریقہ کار کے لیے۔ DESTINATION اور RETRY_AT کے علاوہ ہم پاس کرتے ہیں:

  • GROUP_ID، جس کے ذریعے ہم دستی تجزیہ اور آسان تلاش کے لیے پیغامات کو گروپ کرتے ہیں۔
  • ORIGINAL_PARTITION دوبارہ پروسیسنگ کے لیے اسی صارف کو رکھنے کی کوشش کرنے کے لیے۔ یہ پیرامیٹر کالعدم ہو سکتا ہے، ایسی صورت میں اصل پیغام کی record.key() کلید کا استعمال کرتے ہوئے نیا پارٹیشن حاصل کیا جائے گا۔
  • دوبارہ کوشش کرنے کی حکمت عملی پر عمل کرنے کے لیے COUNTER قدر کو اپ ڈیٹ کیا گیا۔
  • SEND_TO ایک مستقل اشارہ کرتا ہے کہ آیا پیغام کو RETRY_AT تک پہنچنے پر دوبارہ پروسیسنگ کے لیے بھیجا گیا ہے یا DLQ میں رکھا گیا ہے۔
  • REASON - پیغام کی کارروائی میں خلل کی وجہ۔

Retryer پوسٹگری ایس کیو ایل میں دوبارہ بھیجنے اور دستی تجزیہ کرنے کے لیے پیغامات کو اسٹور کرتا ہے۔ ٹائمر ایک ایسا کام شروع کرتا ہے جو RETRY_AT کے ساتھ پیغامات تلاش کرتا ہے اور کلیدی record.key() کے ساتھ DESTINATION موضوع کے ORIGINAL_PARTITION پارٹیشن پر واپس بھیجتا ہے۔

بھیجے جانے کے بعد، پیغامات PostgreSQL سے حذف ہو جاتے ہیں۔ پیغامات کی دستی تجزیہ ایک سادہ UI میں ہوتی ہے جو REST API کے ذریعے Retryer کے ساتھ تعامل کرتا ہے۔ اس کی اہم خصوصیات DLQ سے پیغامات کو دوبارہ بھیجنا یا حذف کرنا، غلطی کی معلومات دیکھنا اور پیغامات کو تلاش کرنا، مثال کے طور پر غلطی کے نام سے۔

چونکہ ہمارے کلسٹرز پر رسائی کا کنٹرول فعال ہے، اس لیے اضافی طور پر اس موضوع تک رسائی کی درخواست کرنا ضروری ہے جسے Retryer سن رہا ہے، اور Retryer کو DESTINATION موضوع پر لکھنے کی اجازت دینا ضروری ہے۔ یہ تکلیف دہ ہے، لیکن، وقفہ کے موضوع کے نقطہ نظر کے برعکس، ہمارے پاس اسے منظم کرنے کے لیے ایک مکمل DLQ اور UI ہے۔

ایسے معاملات ہوتے ہیں جب آنے والے موضوع کو متعدد مختلف صارف گروپس پڑھتے ہیں، جن کی ایپلی کیشنز مختلف منطقوں کو نافذ کرتی ہیں۔ ان ایپلی کیشنز میں سے ایک کے لیے Retryer کے ذریعے پیغام کو دوبارہ پروسیس کرنے کے نتیجے میں دوسری پر ایک ڈپلیکیٹ ہو جائے گی۔ اس سے بچاؤ کے لیے، ہم ری پروسیسنگ کے لیے ایک الگ موضوع بناتے ہیں۔ آنے والے اور دوبارہ کوشش کرنے والے موضوعات کو ایک ہی صارف بغیر کسی پابندی کے پڑھ سکتا ہے۔

کافکا سے موصول ہونے والے واقعات کی دوبارہ کارروائی

پہلے سے طے شدہ طور پر یہ نقطہ نظر سرکٹ بریکر کی فعالیت فراہم نہیں کرتا ہے، تاہم اسے استعمال کرتے ہوئے ایپلیکیشن میں شامل کیا جا سکتا ہے۔ spring-Cloud-netflix یا نیا؟ موسم بہار کے بادل سرکٹ بریکر، ان جگہوں کو لپیٹنا جہاں بیرونی خدمات کو مناسب تجرید میں بلایا جاتا ہے۔ اس کے علاوہ، اس کے لیے حکمت عملی کا انتخاب کرنا ممکن ہو جاتا ہے۔ بلک ہیڈ پیٹرن، جو بھی مفید ہو سکتا ہے. مثال کے طور پر، spring-Cloud-netflix میں یہ تھریڈ پول یا سیمفور ہو سکتا ہے۔

آؤٹ پٹ

نتیجے کے طور پر، ہمارے پاس ایک علیحدہ ایپلی کیشن ہے جو ہمیں پیغام کی پروسیسنگ کو دوبارہ کرنے کی اجازت دیتی ہے اگر کوئی بیرونی نظام عارضی طور پر دستیاب نہ ہو۔

ایپلی کیشن کا ایک اہم فائدہ یہ ہے کہ اسے ایک ہی کافکا کلسٹر پر چلنے والے بیرونی سسٹمز کے ذریعے استعمال کیا جا سکتا ہے، بغیر کسی اہم ترمیم کے! ایسی ایپلیکیشن کو صرف دوبارہ کوشش کرنے والے عنوان تک رسائی حاصل کرنے کی ضرورت ہوگی، کچھ کافکا ہیڈر بھریں اور دوبارہ کوشش کرنے والے کو پیغام بھیجیں۔ کسی اضافی انفراسٹرکچر کو بڑھانے کی ضرورت نہیں ہے۔ اور ایپلی کیشن سے Retryer اور پیچھے منتقل کیے گئے پیغامات کی تعداد کو کم کرنے کے لیے، ہم نے لکیری منطق کے ساتھ ایپلی کیشنز کی نشاندہی کی اور کنزیومر اسٹاپ کے ذریعے ان پر دوبارہ کارروائی کی۔

ماخذ: www.habr.com

نیا تبصرہ شامل کریں