پردازش مجدد رویدادهای دریافت شده از کافکا

پردازش مجدد رویدادهای دریافت شده از کافکا

هی هابر

اخیرا من تجربه خود را به اشتراک گذاشت در مورد اینکه ما به عنوان یک تیم اغلب از چه پارامترهایی برای تولید کننده و مصرف کننده کافکا استفاده می کنیم تا به تحویل تضمینی نزدیکتر شویم. در این مقاله می‌خواهم به شما بگویم که چگونه پردازش مجدد یک رویداد دریافتی از کافکا را در نتیجه در دسترس نبودن موقت سیستم خارجی سازماندهی کردیم.

برنامه های کاربردی مدرن در یک محیط بسیار پیچیده عمل می کنند. منطق کسب و کار در یک پشته فناوری مدرن پیچیده شده است، در یک تصویر Docker که توسط ارکستراتوری مانند Kubernetes یا OpenShift مدیریت می شود اجرا می شود و از طریق زنجیره ای از روترهای فیزیکی و مجازی با سایر برنامه ها یا راه حل های سازمانی ارتباط برقرار می کند. در چنین محیطی، چیزی همیشه ممکن است خراب شود، بنابراین پردازش مجدد رویدادها در صورتی که یکی از سیستم های خارجی در دسترس نباشد، بخش مهمی از فرآیندهای تجاری ما است.

قبل از کافکا چطور بود

پیش از این در پروژه از IBM MQ برای تحویل پیام ناهمزمان استفاده می کردیم. اگر در حین کار سرویس خطایی رخ دهد، پیام دریافتی را می توان در یک صف خط مرده (DLQ) برای تجزیه دستی بیشتر قرار داد. DLQ در کنار صف ورودی ایجاد شد، پیام به داخل IBM MQ منتقل شد.

اگر خطا موقتی بود و می‌توانستیم آن را تعیین کنیم (به عنوان مثال، یک ResourceAccessException در یک تماس HTTP یا یک MongoTimeoutException در یک درخواست MongoDb)، پس استراتژی امتحان مجدد اعمال می‌شود. صرف نظر از منطق انشعاب برنامه، پیام اصلی یا به صف سیستم برای ارسال تاخیری یا به یک برنامه جداگانه که مدت ها پیش برای ارسال مجدد پیام ها ساخته شده بود منتقل شد. این شامل یک شماره ارسال مجدد در سرصفحه پیام است که به فاصله تاخیر یا پایان استراتژی در سطح برنامه مرتبط است. اگر به پایان استراتژی رسیده باشیم اما سیستم خارجی هنوز در دسترس نباشد، پیام برای تجزیه دستی در DLQ قرار می گیرد.

پیدا کردن یک راه حل

جستجو در اینترنت، می توانید موارد زیر را بیابید تصمیم. به طور خلاصه پیشنهاد می شود برای هر بازه تاخیر یک موضوع ایجاد کنید و برنامه های Consumer را در کنار پیاده سازی کنید که پیام ها را با تاخیر لازم بخواند.

پردازش مجدد رویدادهای دریافت شده از کافکا

با وجود تعداد زیادی از بررسی های مثبت، به نظر من کاملاً موفق نیست. اول از همه، به این دلیل که توسعه دهنده، علاوه بر اجرای الزامات تجاری، باید زمان زیادی را برای پیاده سازی مکانیسم توصیف شده صرف کند.

علاوه بر این، اگر کنترل دسترسی در خوشه کافکا فعال باشد، باید مدتی را صرف ایجاد موضوعات و ارائه دسترسی لازم به آنها کنید. علاوه بر این، باید پارامتر retain.ms صحیح را برای هر یک از موضوعات امتحان مجدد انتخاب کنید تا پیام‌ها زمان ارسال مجدد داشته باشند و از آن ناپدید نشوند. پیاده سازی و درخواست دسترسی باید برای هر سرویس موجود یا جدید تکرار شود.

اکنون ببینیم که فنر به طور کلی و فنر-کافکا به طور خاص چه مکانیسم هایی را برای پردازش مجدد پیام در اختیار ما قرار می دهند. Spring-kafka وابستگی گذرا به Spring-Retry دارد که انتزاعاتی را برای مدیریت BackOffPolicies مختلف ارائه می دهد. این یک ابزار نسبتاً انعطاف پذیر است، اما اشکال مهم آن ذخیره کردن پیام ها برای ارسال مجدد در حافظه برنامه است. این بدان معنی است که راه اندازی مجدد برنامه به دلیل به روز رسانی یا یک خطای عملیاتی منجر به از دست رفتن همه پیام های در انتظار پردازش مجدد می شود. از آنجایی که این نکته برای سیستم ما حیاتی است، ما آن را بیشتر در نظر نگرفتیم.

به عنوان مثال Spring-kafka چندین پیاده سازی از ContainerAwareErrorHandler را ارائه می دهد SeekToCurrentError Handler، که با آن می توانید پیام را بعداً بدون تغییر افست در صورت بروز خطا پردازش کنید. با شروع نسخه Spring-kafka 2.3، امکان تنظیم BackOffPolicy وجود داشت.

این رویکرد به پیام های پردازش مجدد اجازه می دهد تا از راه اندازی مجدد برنامه زنده بمانند، اما هنوز مکانیزم DLQ وجود ندارد. ما این گزینه را در ابتدای سال 2019 انتخاب کردیم و خوش بینانه معتقد بودیم که DLQ مورد نیاز نخواهد بود (ما خوش شانس بودیم و در واقع پس از چندین ماه کارکرد برنامه با چنین سیستم پردازش مجدد به آن نیاز نداشتیم). خطاهای موقت باعث شد SeekToCurrentErrorHandler فعال شود. خطاهای باقیمانده در گزارش چاپ شد و منجر به یک افست شد و پردازش با پیام بعدی ادامه یافت.

تصمیم نهایی

پیاده سازی مبتنی بر SeekToCurrentErrorHandler ما را بر آن داشت تا مکانیزم خود را برای ارسال مجدد پیام ها توسعه دهیم.

اول از همه، می خواستیم از تجربه موجود استفاده کنیم و بسته به منطق برنامه آن را گسترش دهیم. برای یک کاربرد منطق خطی، بهینه است که خواندن پیام های جدید را برای مدت کوتاهی که توسط استراتژی امتحان مجدد مشخص شده است متوقف کنید. برای برنامه های دیگر، من می خواستم یک نقطه واحد داشته باشم که استراتژی تلاش مجدد را اجرا کند. علاوه بر این، این نقطه واحد باید قابلیت DLQ را برای هر دو رویکرد داشته باشد.

خود استراتژی تلاش مجدد باید در برنامه ذخیره شود، برنامه ای که مسئول بازیابی بازه بعدی زمانی است که یک خطای موقت رخ می دهد.

توقف مصرف کننده برای یک برنامه منطق خطی

هنگام کار با Spring-kafka، کد توقف مصرف کننده ممکن است چیزی شبیه به این باشد:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

در مثال، retryAt زمان راه‌اندازی مجدد MessageListenerContainer است، اگر هنوز در حال اجرا است. راه اندازی مجدد در یک موضوع جداگانه که در TaskScheduler راه اندازی شده است، که اجرای آن نیز تا بهار ارائه شده است، رخ خواهد داد.

مقدار retryAt را به روش زیر پیدا می کنیم:

  1. مقدار شمارنده تماس مجدد بررسی می شود.
  2. بر اساس مقدار شمارنده، فاصله تاخیر فعلی در استراتژی تلاش مجدد جستجو می شود. استراتژی در خود برنامه اعلام شده است؛ ما فرمت JSON را برای ذخیره آن انتخاب کردیم.
  3. فاصله یافت شده در آرایه JSON حاوی تعداد ثانیه هایی است که پس از آن پردازش باید تکرار شود. این تعداد ثانیه به زمان فعلی اضافه می شود تا مقدار retryAt تشکیل شود.
  4. اگر بازه پیدا نشد، مقدار retryAt null است و پیام برای تجزیه دستی به DLQ ارسال می شود.

با این رویکرد، تنها چیزی که باقی می‌ماند ذخیره تعداد تماس‌های مکرر برای هر پیامی است که در حال حاضر پردازش می‌شود، مثلاً در حافظه برنامه. نگه داشتن تعداد تلاش مجدد در حافظه برای این رویکرد حیاتی نیست، زیرا یک برنامه منطقی خطی نمی تواند پردازش را به عنوان یک کل اداره کند. برخلاف تلاش مجدد بهار، راه اندازی مجدد برنامه باعث از بین رفتن همه پیام ها و پردازش مجدد نمی شود، بلکه به سادگی استراتژی را مجدداً راه اندازی می کند.

این رویکرد به برداشتن بار از سیستم خارجی کمک می کند، که ممکن است به دلیل بار بسیار سنگین در دسترس نباشد. به عبارت دیگر، علاوه بر پردازش مجدد، به اجرای الگو نیز دست یافتیم قطع کننده مدار.

در مورد ما، آستانه خطا فقط 1 است و برای به حداقل رساندن خرابی سیستم به دلیل قطع موقت شبکه، از یک استراتژی مجدد بسیار دقیق با فواصل تأخیر کوچک استفاده می کنیم. این ممکن است برای همه برنامه های گروهی مناسب نباشد، بنابراین رابطه بین آستانه خطا و مقدار فاصله باید بر اساس ویژگی های سیستم انتخاب شود.

یک برنامه جداگانه برای پردازش پیام ها از برنامه های کاربردی با منطق غیر قطعی

در اینجا یک مثال از کدی است که پیامی را به چنین برنامه‌ای ارسال می‌کند (Retryer)، که پس از رسیدن به زمان RETRY_AT، مجدداً به مبحث DESTINATION ارسال می‌شود:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

مثال نشان می دهد که اطلاعات زیادی در هدرها منتقل می شود. مقدار RETRY_AT مانند مکانیسم امتحان مجدد از طریق توقف مصرف کننده یافت می شود. ما علاوه بر DESTINATION و RETRY_AT:

  • GROUP_ID، که به وسیله آن پیام ها را برای تجزیه و تحلیل دستی و جستجوی ساده گروه بندی می کنیم.
  • ORIGINAL_PARTITION سعی کنید همان مصرف کننده را برای پردازش مجدد نگه دارید. این پارامتر می تواند null باشد، در این صورت پارتیشن جدید با استفاده از کلید record.key() پیام اصلی به دست می آید.
  • برای پیروی از استراتژی تلاش مجدد، مقدار COUNTER به‌روزرسانی شد.
  • SEND_TO یک ثابت است که نشان می دهد آیا پیام پس از رسیدن به RETRY_AT برای پردازش مجدد ارسال می شود یا در DLQ قرار می گیرد.
  • REASON - دلیل قطع شدن پردازش پیام.

Retryer پیام ها را برای ارسال مجدد و تجزیه دستی در PostgreSQL ذخیره می کند. یک تایمر کاری را شروع می‌کند که پیام‌ها را با RETRY_AT پیدا می‌کند و با کلید record.key() به پارتیشن ORIGINAL_PARTITION مبحث DESTINATION برمی‌گرداند.

پس از ارسال، پیام ها از PostgreSQL حذف می شوند. تجزیه دستی پیام ها در یک رابط کاربری ساده انجام می شود که با Retryer از طریق REST API در تعامل است. ویژگی های اصلی آن ارسال مجدد یا حذف پیام ها از DLQ، مشاهده اطلاعات خطا و جستجوی پیام ها، به عنوان مثال با نام خطا است.

از آنجایی که کنترل دسترسی در خوشه‌های ما فعال است، لازم است علاوه بر این درخواست دسترسی به موضوعی را که Retryer به آن گوش می‌دهد، داده شود و به Retryer اجازه دهید تا در موضوع DESTINATION بنویسد. این ناخوشایند است، اما، بر خلاف رویکرد موضوع فاصله، ما یک DLQ و UI کامل برای مدیریت آن داریم.

مواردی وجود دارد که یک موضوع ورودی توسط چندین گروه مصرف کننده مختلف خوانده می شود که برنامه های آنها منطق متفاوتی را پیاده سازی می کنند. پردازش مجدد یک پیام از طریق Retryer برای یکی از این برنامه ها منجر به تکراری در دیگری می شود. برای محافظت در برابر این موضوع، یک موضوع جداگانه برای پردازش مجدد ایجاد می کنیم. موضوعات ورودی و امتحان مجدد توسط همان مصرف کننده بدون هیچ محدودیتی قابل خواندن است.

پردازش مجدد رویدادهای دریافت شده از کافکا

به‌طور پیش‌فرض این رویکرد عملکرد قطع‌کننده مدار را ارائه نمی‌کند، با این حال می‌توان آن را با استفاده از برنامه به برنامه اضافه کرد Spring-Cloud-Netflix یا جدید مدار شکن ابر فنری، قرار دادن مکان هایی که در آن خدمات خارجی فراخوانی می شوند در انتزاع های مناسب. علاوه بر این، امکان انتخاب یک استراتژی برای سنگر الگو، که می تواند مفید نیز باشد. به عنوان مثال، در Spring-Cloud-Netflix این می تواند یک Thread Pool یا یک سمافور باشد.

نتیجه

در نتیجه، ما یک برنامه جداگانه داریم که به ما امکان می دهد در صورت عدم دسترسی موقت هر سیستم خارجی، پردازش پیام را تکرار کنیم.

یکی از مزیت های اصلی برنامه این است که می توان آن را توسط سیستم های خارجی در حال اجرا در همان خوشه کافکا، بدون تغییرات قابل توجه در سمت خود، استفاده کرد! چنین برنامه‌ای فقط نیاز به دسترسی به موضوع امتحان مجدد، پر کردن چند سرصفحه کافکا و ارسال پیام به تکرارکننده دارد. نیازی به ایجاد زیرساخت های اضافی نیست. و به منظور کاهش تعداد پیام‌های منتقل شده از برنامه به Retrier و برگشت، برنامه‌هایی را با منطق خطی شناسایی کرده و از طریق Consumer stop مجدداً پردازش کردیم.

منبع: www.habr.com

اضافه کردن نظر