چگونه کافکا به واقعیت تبدیل شد

چگونه کافکا به واقعیت تبدیل شد

هی هابر!

من روی تیم Tinkoff کار می کنم که در حال توسعه مرکز اطلاع رسانی خود است. من بیشتر در جاوا با استفاده از Spring boot توسعه می دهم و مشکلات فنی مختلفی را که در یک پروژه ایجاد می شود حل می کنم.

بیشتر ریزسرویس‌های ما از طریق یک کارگزار پیام به صورت ناهمزمان با یکدیگر ارتباط برقرار می‌کنند. قبلاً از IBM MQ به عنوان کارگزار استفاده می کردیم که دیگر نمی توانست با این بار مقابله کند اما در عین حال ضمانت تحویل بالایی داشت.

به عنوان جایگزین، آپاچی کافکا به ما پیشنهاد شد که پتانسیل مقیاس پذیری بالایی دارد، اما، متأسفانه، به یک رویکرد تقریباً فردی برای پیکربندی برای سناریوهای مختلف نیاز دارد. علاوه بر این، مکانیسم تحویل حداقل یک بار که به طور پیش‌فرض در کافکا کار می‌کند، اجازه نمی‌دهد تا سطح مورد نیاز از سازگاری خارج از جعبه حفظ شود. در مرحله بعد، من تجربه خود را در پیکربندی کافکا به اشتراک می‌گذارم، به ویژه به شما می‌گویم که چگونه دقیقاً یک بار تحویل را پیکربندی کنید و زندگی کنید.

تحویل تضمینی و موارد دیگر

تنظیمات مورد بحث در زیر به جلوگیری از تعدادی از مشکلات مربوط به تنظیمات اتصال پیش فرض کمک می کند. اما ابتدا می خواهم به یک پارامتر توجه کنم که اشکال زدایی احتمالی را تسهیل می کند.

این کمک خواهد کرد شناسه مشتری برای تولید کننده و مصرف کننده در نگاه اول، می توانید از نام برنامه به عنوان مقدار استفاده کنید، و در بیشتر موارد این کار می کند. اگر چه زمانی که یک برنامه از چندین Consumer استفاده می کند و شما به آنها یک client.id می دهید، هشدار زیر را به همراه دارد:

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

اگر می‌خواهید از JMX در برنامه‌ای با کافکا استفاده کنید، این می‌تواند یک مشکل باشد. برای این مورد، بهتر است از ترکیبی از نام برنامه و به عنوان مثال، نام موضوع به عنوان مقدار client.id استفاده کنید. نتیجه پیکربندی ما در خروجی فرمان قابل مشاهده است گروه های مصرف کننده کافکا از برنامه های کاربردی از Confluent:

چگونه کافکا به واقعیت تبدیل شد

حال بیایید به سناریوی تحویل تضمینی پیام نگاه کنیم. تهیه کننده کافکا یک پارامتر دارد آکس، که به شما امکان می دهد پس از تعداد تأییدهایی که رهبر خوشه برای در نظر گرفتن پیام با موفقیت نوشته شده نیاز دارد پیکربندی کنید. این پارامتر می تواند مقادیر زیر را بگیرد:

  • 0 - تصدیق در نظر گرفته نخواهد شد.
  • 1 پارامتر پیش فرض است، فقط 1 ماکت برای تایید لازم است.
  • -1 - تأیید از همه کپی های همگام سازی شده مورد نیاز است (راه اندازی خوشه min.insync.replicas).

از مقادیر ذکر شده مشخص است که اک های برابر با -1 قوی ترین تضمین را برای از بین نرفتن پیام می دهد.

همانطور که همه ما می دانیم، سیستم های توزیع شده قابل اعتماد نیستند. برای محافظت در برابر خطاهای گذرا، تولید کننده کافکا این گزینه را فراهم می کند دوباره تلاش می کند، که به شما امکان می دهد تعداد دفعات ارسال مجدد را در داخل تنظیم کنید delivery.timeout.ms. از آنجایی که پارامتر retries مقدار پیش‌فرض Integer.MAX_VALUE (2147483647) دارد، تعداد دفعات تکرار پیام را می‌توان تنها با تغییر delivery.timeout.ms تنظیم کرد.

ما به سمت تحویل دقیقا یک بار حرکت می کنیم

تنظیمات فهرست شده به تولیدکننده ما امکان می دهد پیام ها را با ضمانت بالا تحویل دهد. بیایید اکنون در مورد چگونگی اطمینان از اینکه فقط یک کپی از یک پیام در یک موضوع کافکا نوشته شده است صحبت کنیم؟ در ساده ترین حالت، برای انجام این کار، باید پارامتر را روی Producer تنظیم کنید فعال کردن.ناتوانی به درستی Idempotency تضمین می کند که فقط یک پیام در یک پارتیشن خاص از یک موضوع نوشته می شود. پیش شرط برای توانمندسازی ناتوانی، ارزش ها هستند acks = همه، سعی مجدد > 0، max.in.flight.requests.per.connection ≤ 5. اگر این پارامترها توسط توسعه دهنده مشخص نشده باشد، مقادیر بالا به طور خودکار تنظیم می شوند.

هنگامی که idempotency پیکربندی می شود، لازم است اطمینان حاصل شود که هر بار همان پیام ها در پارتیشن های یکسانی قرار می گیرند. این را می توان با تنظیم کلید partitioner.class و پارامتر روی Producer انجام داد. بیایید با کلید شروع کنیم. برای هر ارسال باید یکسان باشد. با استفاده از هر یک از شناسه های تجاری از پست اصلی می توان به راحتی به این امر دست یافت. پارامتر partitioner.class دارای یک مقدار پیش فرض - است DefaultPartitioner. با این استراتژی پارتیشن بندی، به طور پیش فرض به این صورت عمل می کنیم:

  • اگر پارتیشن به طور صریح در هنگام ارسال پیام مشخص شده باشد، از آن استفاده می کنیم.
  • اگر پارتیشن مشخص نیست، اما کلید مشخص است، پارتیشن را با هش کلید انتخاب کنید.
  • در صورتی که پارتیشن و کلید مشخص نشده اند، پارتیشن ها را یکی یکی انتخاب کنید (round-robin).

همچنین با استفاده از کلید و ارسال غیر توانمند با پارامتر max.in.flight.requests.per.connection = 1 به شما امکان پردازش ساده پیام در مصرف کننده را می دهد. همچنین لازم به یادآوری است که اگر کنترل دسترسی روی خوشه شما پیکربندی شده باشد، به حقوقی نیاز خواهید داشت که به صورت ناتوان در یک موضوع بنویسید.

اگر به طور ناگهانی فاقد قابلیت‌های ارسال بی‌توان با کلید هستید یا منطق طرف سازنده مستلزم حفظ ثبات داده‌ها بین پارتیشن‌های مختلف است، تراکنش‌ها به کمک خواهند آمد. علاوه بر این، با استفاده از یک تراکنش زنجیره ای، می توانید به طور مشروط یک رکورد در کافکا را با یک رکورد در پایگاه داده همگام سازی کنید. برای فعال کردن ارسال تراکنشی به تولید کننده، باید فاقد قدرت و تنظیم اضافی باشد Transactional.id. اگر خوشه کافکا شما دارای کنترل دسترسی پیکربندی شده باشد، یک رکورد تراکنشی، مانند یک رکورد بی توان، به مجوزهای نوشتن نیاز دارد، که می تواند با استفاده از مقدار ذخیره شده درtransactional.id توسط ماسک داده شود.

به طور رسمی، هر رشته ای، مانند نام برنامه، می تواند به عنوان شناسه تراکنش استفاده شود. اما اگر چندین نمونه از یک برنامه کاربردی را با همان Transactional.id راه اندازی کنید، اولین نمونه راه اندازی شده با خطا متوقف می شود، زیرا کافکا آن را یک فرآیند زامبی در نظر می گیرد.

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

برای حل این مشکل یک پسوند به نام برنامه به صورت نام میزبان اضافه می کنیم که از متغیرهای محیطی بدست می آوریم.

سازنده پیکربندی شده است، اما تراکنش های روی کافکا فقط دامنه پیام را کنترل می کند. صرف نظر از وضعیت تراکنش، پیام بلافاصله به موضوع می رود، اما دارای ویژگی های اضافی سیستم است.

برای جلوگیری از خواندن پیش از موعد چنین پیام هایی توسط مصرف کننده، باید پارامتر را تنظیم کند انزوا.سطح به مقدار read_committed. چنین مصرف‌کننده‌ای می‌تواند پیام‌های غیرمعمولی را مانند قبل بخواند و پیام‌های تراکنشی را فقط پس از انجام تعهد بخواند.
اگر تمام تنظیمات لیست شده را قبلاً تنظیم کرده اید، دقیقاً یک بار تحویل را پیکربندی کرده اید. تبریک می گویم!

اما یک نکته ظریف دیگر وجود دارد. Transactional.id که در بالا پیکربندی کردیم، در واقع پیشوند تراکنش است. در مدیر تراکنش، یک شماره ترتیبی به آن اضافه می شود. شناسه دریافتی صادر می شود transactional.id.expiration.ms، که روی یک خوشه کافکا پیکربندی شده است و مقدار پیش فرض آن "7 روز" است. اگر در این مدت برنامه هیچ پیامی دریافت نکرده باشد، پس از ارسال تراکنشی بعدی، دریافت خواهید کرد InvalidPidMappingException. سپس هماهنگ کننده تراکنش یک شماره توالی جدید برای تراکنش بعدی صادر می کند. با این حال، اگر InvalidPidMappingException به درستی مدیریت نشود، ممکن است پیام از بین برود.

به جای کل

همانطور که می بینید، فقط ارسال پیام به کافکا کافی نیست. شما باید ترکیبی از پارامترها را انتخاب کنید و برای ایجاد تغییرات سریع آماده باشید. در این مقاله، من سعی کردم به طور دقیق تنظیمات یک بار تحویل را نشان دهم و چندین مشکل را در مورد پیکربندی های client.id وtransactional.id که با آنها مواجه شدیم شرح دادم. در زیر خلاصه ای از تنظیمات تولید کننده و مصرف کننده آورده شده است.

تهيه كننده:

  1. آکس = همه
  2. تلاش های مجدد > 0
  3. enable.idempotence = درست است
  4. max.in.flight.requests.per.connection ≤ 5 (1 برای ارسال منظم)
  5. transactional.id = ${application-name}-${hostname}

مصرف كننده:

  1. isolation.level = read_committed

برای به حداقل رساندن خطاها در برنامه های آینده، ما پوشش مخصوص خود را روی پیکربندی فنری ساختیم، جایی که مقادیر برخی از پارامترهای فهرست شده قبلاً تنظیم شده است.

در اینجا چند ماده برای خودآموزی آورده شده است:

منبع: www.habr.com

اضافه کردن نظر