Кафка кантип чындыкка айланды

Кафка кантип чындыкка айланды

Эй Хабр!

Мен Tinkoff командасында иштейм, ал өзүнүн билдирүү борборун иштеп чыгууда. Мен көбүнчө Javaда Spring boot аркылуу иштеп чыгам жана долбоордо пайда болгон ар кандай техникалык маселелерди чечем.

Биздин микросервистердин көбү бири-бири менен асинхрондук түрдө билдирүү брокери аркылуу байланышат. Мурда биз брокер катары IBM MQ колдончубуз, ал мындан ары жүктү көтөрө албай калды, бирок ошол эле учурда жеткирүү кепилдиктери жогору болчу.

Алмаштыруу катары бизге Apache Kafka сунушталды, анын масштабдоо потенциалы жогору, бирок, тилекке каршы, ар кандай сценарийлер үчүн конфигурацияга дээрлик жеке мамилени талап кылат. Кошумчалай кетсек, Кафкада демейки боюнча иштеген жок дегенде бир жолу жеткирүү механизми талап кылынган ырааттуулук деңгээлин кутудан тышкары сактоого мүмкүндүк берген эмес. Кийинки, мен Кафка конфигурациясындагы тажрыйбабыз менен бөлүшөм, атап айтканда, кантип конфигурациялоону жана так бир жолу жеткирүү менен кантип жашоону айтып берем.

Кепилденген жеткирүү жана башкалар

Төмөндө талкууланган орнотуулар демейки байланыш орнотуулары менен бир катар көйгөйлөрдүн алдын алууга жардам берет. Бирок адегенде мен мүмкүн болгон мүчүлүштүктөрдү оңдоого жардам бере турган бир параметрге көңүл бургум келет.

Бул жардам берет client.id Өндүрүүчү жана Керектөөчү үчүн. Бир караганда, сиз маани катары колдонмонун атын колдоно аласыз жана көпчүлүк учурларда бул иштейт. Тиркеме бир нече Керектөөчүлөрдү колдонгон жана сиз аларга бир эле client.id берген жагдай болсо да, төмөнкү эскертүүгө алып келет:

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

Эгер сиз JMXти Кафка менен тиркемеде колдонгуңуз келсе, анда бул көйгөй болушу мүмкүн. Бул учурда, колдонмонун атын жана, мисалы, client.id мааниси катары теманын атын айкалыштыруу жакшы. Биздин конфигурациянын натыйжасын буйрук чыгаруудан көрүүгө болот кафка-керектөөчү топтор Confluentтин коммуналдык кызматтарынан:

Кафка кантип чындыкка айланды

Эми кепилденген кабарды жеткирүү сценарийин карап көрөлү. Кафка Продюсер параметри бар acks, бул кластердин лидери ийгиликтүү жазылган билдирүүнү карап чыгуу үчүн канча ырастагандан кийин конфигурациялоого мүмкүндүк берет. Бул параметр төмөнкү маанилерди кабыл алышы мүмкүн:

  • 0 — моюнга алуу каралбайт.
  • 1 - демейки параметр, моюнга алуу үчүн 1 гана реплика талап кылынат.
  • −1 — бардык синхрондоштурулган репликалардан ырастоо талап кылынат (кластерди орнотуу min.insync.replicas).

Көрсөтүлгөн баалуулуктардан көрүнүп тургандай, −1ге барабар болгон acks кабардын жоголбоосуна эң күчтүү кепилдик берет.

Баарыбызга белгилүү болгондой, бөлүштүрүлгөн системалар ишенимсиз. Убактылуу каталардан коргоо үчүн, Кафка Продюсер вариантты камсыз кылат кайра аракет кылатичинде кайра жөнөтүү аракеттеринин санын коюуга мүмкүндүк берет жеткирүү.тайм.м. Кайталоо параметринин демейки Integer.MAX_VALUE (2147483647) мааниси болгондуктан, билдирүүнү кайра аракет кылуунун санын бир гана delivery.timeout.ms өзгөртүү менен тууралоого болот.

Биз так бир жолу жеткирүү багытында баратабыз

Тизмеде көрсөтүлгөн орнотуулар биздин Продюсерге кабарларды жогорку кепилдик менен жеткирүүгө мүмкүндүк берет. Келгиле, Кафка темасына билдирүүнүн бир гана көчүрмөсү жазылышын кантип камсыз кылуу керектиги жөнүндө сүйлөшөлү? Эң жөнөкөй учурда, бул үчүн сиз Продюсерге параметрди коюшуңуз керек иштетүү.идемкүчтүүлүк чынга. Idempotency бир теманын белгилүү бир бөлүгүнө бир гана билдирүү жазылганына кепилдик берет. Идемпотенттүүлүккө мүмкүндүк берүүчү шарт бул баалуулуктар acks = баары, кайра аракет кылуу > 0, max.in.flight.requests.per.connection ≤ 5. Бул параметрлер иштеп чыгуучу тарабынан көрсөтүлбөсө, жогорудагы маанилер автоматтык түрдө орнотулат.

Идемпотенттүүлүк конфигурацияланганда, ошол эле билдирүүлөр ар бир жолу бир эле бөлүмдөрдө аякташын камсыз кылуу керек. Муну partitioner.class ачкычын жана параметрин Продюсерге коюу менен жасоого болот. Ачкычтан баштайлы. Бул ар бир тапшыруу үчүн бирдей болушу керек. Буга баштапкы посттогу бизнес идентификаторлордун каалаганын колдонуу менен оңой жетүүгө болот. partitioner.class параметринин демейки мааниси бар - DefaultPartitioner. Бул бөлүү стратегиясы менен демейки боюнча биз төмөнкүдөй иш-аракет кылабыз:

  • Бөлүм билдирүү жөнөтүүдө ачык көрсөтүлгөн болсо, анда биз аны колдонобуз.
  • Бөлүм көрсөтүлбөсө, бирок ачкыч көрсөтүлгөн болсо, бөлүмдү ачкычтын хэши боюнча тандаңыз.
  • Бөлүм жана ачкыч көрсөтүлбөсө, бөлүмдөрдү бирден тандаңыз (тегерек-робин).

Ошондой эле, бир параметр менен ачкыч жана idempotent жөнөтүү колдонуу max.in.flight.requests.per.connection = 1 сизге Керектөөчүдө жөнөкөйлөштүрүлгөн билдирүүнү иштетүүнү берет. Ошондой эле, эгерде сиздин кластериңизде кирүү башкаруусу конфигурацияланса, темага идемпотенттүү түрдө жазуу укугуңуз керек болорун эстен чыгарбоо керек.

Эгер күтүлбөгөн жерден сизде ачкыч аркылуу идемпотенттүү жөнөтүү мүмкүнчүлүгү болбосо же Продюсер тарабындагы логика ар кандай бөлүмдөрдүн ортосундагы маалыматтардын ырааттуулугун сактоону талап кылса, транзакциялар жардамга келет. Мындан тышкары, чынжыр транзакцияны колдонуу менен, Кафкадагы жазууну, мисалы, маалымат базасындагы жазуу менен шарттуу түрдө синхрондоштурууга болот. Продюсерге транзакциялык жөнөтүүнү иштетүү үчүн, ал идемпотенттүү жана кошумча орнотулган болушу керек transactional.id. Эгер сиздин Кафка кластериңизде кирүү башкаруусу конфигурацияланган болсо, анда транзакциялык жазууга, идемпотенттик жазуу сыяктуу, жазуу уруксаттары керек болот, алар transferal.id ичинде сакталган маанини колдонуу менен маска аркылуу берилиши мүмкүн.

Расмий түрдө, ар кандай сап, мисалы, колдонмонун аты, транзакция идентификатору катары колдонулушу мүмкүн. Бирок, эгер сиз бир эле транзакциянын бир нече инстанциясын бир эле transactional.id менен ишке киргизсеңиз, анда Кафка аны зомби процесси деп эсептегендиктен, биринчи ишке киргизилген инстанция ката менен токтотулат.

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Бул көйгөйдү чечүү үчүн биз чөйрө өзгөрмөлөрүнөн алынган хост аты түрүндөгү тиркеме атына суффикс кошобуз.

Продюсер конфигурацияланган, бирок Кафкадагы транзакциялар билдирүүнүн көлөмүн гана көзөмөлдөйт. Транзакция статусуна карабастан, билдирүү дароо темага өтөт, бирок системанын кошумча атрибуттары бар.

Мындай билдирүүлөрдү Керектөөчү мөөнөтүнөн мурда окууга жол бербөө үчүн, ал параметрди коюу керек изоляция.деңгээл окуу_милдеттүү мааниге. Мындай Керектөөчү мурункудай транзакциялык эмес билдирүүлөрдү, ал эми транзакциялык билдирүүлөрдү милдеттенме алгандан кийин гана окуй алат.
Эгер сиз мурда саналып өткөн бардык орнотууларды орноткон болсоңуз, анда сиз так бир жолу жеткирүүнү конфигурациялагансыз. Куттуктайбыз!

Бирок дагы бир нюанс бар. Биз жогоруда конфигурациялаган Transactional.id чындыгында транзакциянын префикси болуп саналат. Транзакция менеджеринде ага катар номери кошулат. Алынган идентификатор берилет transactional.id.expiration.ms, Кафка кластеринде конфигурацияланган жана демейки "7 күн" маанисине ээ. Эгерде ушул убакыттын ичинде колдонмо эч кандай билдирүүлөрдү албаса, анда кийинки транзакциялык жөнөтүүнү аракет кылганыңызда сиз аласыз InvalidPidMappingException. Андан кийин транзакциянын координатору кийинки транзакция үчүн жаңы катар номерин берет. Бирок, InvalidPidMappingException туура иштетилбесе, билдирүү жоголуп кетиши мүмкүн.

Жалпы суммалардын ордуна

Көрүнүп тургандай, Кафкага жөн гана билдирүү жөнөтүү жетишсиз. Параметрлердин айкалышын тандап, тез өзгөрүүлөрдү жасоого даяр болушуңуз керек. Бул макалада мен так бир жолу жеткирүү жөндөөсүн майда-чүйдөсүнө чейин көрсөтүүгө аракет кылдым жана биз туш болгон client.id жана transactional.id конфигурациялары менен бир нече көйгөйлөрдү сүрөттөп бердим. Төмөндө Өндүрүүчүнүн жана Керектөөчүнүн жөндөөлөрүнүн корутундусу келтирилген.

Продюсер:

  1. acks = баары
  2. кайра аракет кылуу > 0
  3. enable.idempotence = чындык
  4. max.in.flight.requests.per.connection ≤ 5 (ырааттуу түрдө жөнөтүү үчүн 1)
  5. transactional.id = ${application-name}-${хост аты}

Керектөөчү:

  1. isolation.level = read_committed

Келечектеги тиркемелердеги каталарды азайтуу үчүн, биз жазгы конфигурациянын үстүнө өзүбүздүн таңгычты жасадык, мында саналып өткөн кээ бир параметрлер үчүн маанилер коюлган.

Бул жерде өз алдынча изилдөө үчүн бир нече материалдар бар:

Source: www.habr.com

Комментарий кошуу