Кафка қалай шындыққа айналды

Кафка қалай шындыққа айналды

Эй Хабр!

Мен Tinkoff командасында жұмыс істеймін, ол өзінің хабарландыру орталығын дамытады. Мен негізінен Spring boot көмегімен Java-да дамып, жобада туындайтын әртүрлі техникалық мәселелерді шешемін.

Біздің микросервистердің көпшілігі хабарлама брокері арқылы бір-бірімен асинхронды түрде байланысады. Бұрын біз брокер ретінде IBM MQ пайдаландық, ол енді жүктемені көтере алмады, бірақ сонымен бірге жеткізу кепілдігі жоғары болды.

Ауыстыру ретінде бізге масштабтау әлеуеті жоғары Apache Kafka ұсынылды, бірақ, өкінішке орай, әртүрлі сценарийлер үшін конфигурацияға дерлік жеке көзқарасты талап етеді. Сонымен қатар, әдепкі бойынша Кафкада жұмыс істейтін кем дегенде бір рет жеткізу механизмі қораптан тыс консистенцияның қажетті деңгейін сақтауға мүмкіндік бермеді. Әрі қарай, мен Кафка конфигурациясындағы тәжірибемізбен бөлісемін, атап айтқанда, мен бір рет жеткізіліммен қалай конфигурациялау және өмір сүру керектігін айтамын.

Кепілдендірілген жеткізу және т.б

Төменде талқыланған параметрлер әдепкі қосылым параметрлеріне қатысты бірқатар мәселелердің алдын алуға көмектеседі. Бірақ алдымен ықтимал жөндеуді жеңілдететін бір параметрге назар аударғым келеді.

Бұл көмектеседі client.id Өндіруші мен тұтынушыға арналған. Бір қарағанда, мән ретінде қолданба атын пайдалануға болады және көп жағдайда бұл жұмыс істейді. Қолданба бірнеше Тұтынушыларды пайдаланатын және сіз оларға бірдей client.id берген жағдай келесі ескертуге әкеледі:

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

JMX-ті Кафка бар қолданбада пайдаланғыңыз келсе, бұл мәселе болуы мүмкін. Бұл жағдайда клиент.id мәні ретінде қолданба атауының және, мысалы, тақырып атауының тіркесімін қолданған дұрыс. Біздің конфигурацияның нәтижесін пәрмен шығысынан көруге болады кафка-тұтынушы топтары Confluent утилиталарынан:

Кафка қалай шындыққа айналды

Енді хабарламаны кепілдендірілген жеткізу сценарийін қарастырайық. Kafka Producer параметрі бар акс, бұл кластер жетекшісінің хабарды сәтті жазылғанын қарастыру үшін қанша растаудан кейін конфигурациялауға мүмкіндік береді. Бұл параметр келесі мәндерді қабылдай алады:

  • 0 — растау қарастырылмайды.
  • 1 әдепкі параметр, растау үшін тек 1 көшірме қажет.
  • −1 — барлық синхрондалған көшірмелерді растау қажет (кластерді орнату min.insync.replicas).

Көрсетілген мәндерден −1-ге тең acks хабардың жоғалмайтынына ең күшті кепілдік беретіні анық.

Барлығымыз білетіндей, бөлінген жүйелер сенімсіз. Өтпелі ақаулардан қорғау үшін Kafka Producer опциясын ұсынады қайталайды, ол ішіндегі қайта жіберу әрекеттерінің санын орнатуға мүмкіндік береді жеткізу.тайм-аут.мс. Қайталау параметрінде Integer.MAX_VALUE (2147483647) әдепкі мәні болғандықтан, хабарды қайталау әрекеттерінің санын тек delivery.timeout.ms өзгерту арқылы реттеуге болады.

Біз дәл бір рет жеткізуге көшеміз

Көрсетілген параметрлер біздің Продюсерге хабарларды жоғары кепілдікпен жеткізуге мүмкіндік береді. Енді хабардың бір ғана көшірмесі Кафка тақырыбына жазылғанын қалай қамтамасыз ету керектігі туралы сөйлесейік? Ең қарапайым жағдайда, бұл әрекетті орындау үшін параметрді Өндірушіге орнату керек enable.idempotence шындыққа. Idempotency бір тақырыптың белгілі бір бөлігіне тек бір хабарлама жазылуына кепілдік береді. Идемпотенттілікті қосудың алғы шарты мәндер болып табылады acks = барлығы, қайталау > 0, max.in.flight.requests.per.connection ≤ 5. Егер бұл параметрлерді әзірлеуші ​​көрсетпесе, жоғарыда көрсетілген мәндер автоматты түрде орнатылады.

Идемпотенттілік конфигурацияланған кезде, бірдей хабарлардың әр уақытта бірдей бөлімдерде аяқталуын қамтамасыз ету қажет. Мұны partitioner.class кілті мен параметрін Жасаушыға орнату арқылы жасауға болады. Кілттен бастайық. Ол әрбір жіберу үшін бірдей болуы керек. Бұған бастапқы жазбадағы кез келген бизнес идентификаторын пайдалану арқылы оңай қол жеткізуге болады. partitioner.class параметрінің әдепкі мәні бар - DefaultPartitioner. Осы бөлу стратегиясымен әдепкі бойынша біз келесідей әрекет етеміз:

  • Хабарламаны жіберу кезінде бөлім нақты көрсетілген болса, біз оны пайдаланамыз.
  • Бөлім көрсетілмесе, бірақ кілт көрсетілген болса, бөлімді кілттің хэші бойынша таңдаңыз.
  • Бөлім мен кілт көрсетілмесе, бөлімдерді бір-бірден таңдаңыз (айналмалы).

Сондай-ақ, параметрмен кілтті және идемпотентті жіберуді пайдалану max.in.flight.requests.per.connection = 1 Тұтынушыда хабарларды өңдеуді жеңілдетеді. Сондай-ақ, егер қол жеткізуді басқару кластеріңізде конфигурацияланса, тақырыпқа идемпотентті түрде жазу құқығы қажет болатынын есте ұстаған жөн.

Егер кенеттен кілт арқылы идемпотентті жіберу мүмкіндіктері болмаса немесе Продюсер жағындағы логика әртүрлі бөлімдер арасындағы деректер сәйкестігін сақтауды қажет етсе, транзакциялар көмекке келеді. Сонымен қатар, тізбекті транзакцияны пайдалана отырып, Кафкадағы жазбаны, мысалы, дерекқордағы жазбамен шартты түрде синхрондауға болады. Өндірушіге транзакциялық жіберуді қосу үшін ол идемпотентті және қосымша орнатылған болуы керек transactional.id. Егер сіздің Кафка кластеріңізде рұқсатты басқару конфигурацияланған болса, транзакциялық жазбаға, мысалы, идемпотентті жазбаға, transactional.id ішінде сақталған мәнді пайдаланып маска арқылы беруге болатын жазу рұқсаттары қажет болады.

Ресми түрде, қолданба атауы сияқты кез келген жолды транзакция идентификаторы ретінде пайдалануға болады. Бірақ егер бір қолданбаның бірнеше данасын бірдей transactional.id арқылы іске қоссаңыз, бірінші іске қосылған данасы қатемен тоқтатылады, өйткені Кафка оны зомби процесі деп санайды.

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Бұл мәселені шешу үшін біз ортаның айнымалы мәндерінен алатын хост атауы түріндегі қолданба атына жұрнақты қосамыз.

Өндіруші конфигурацияланған, бірақ Кафкадағы транзакциялар тек хабардың ауқымын басқарады. Транзакция күйіне қарамастан, хабарлама бірден тақырыпқа өтеді, бірақ қосымша жүйе атрибуттары бар.

Мұндай хабарламаларды Тұтынушы уақытынан бұрын оқуын болдырмау үшін ол параметрді орнатуы керек оқшаулау.деңгей read_committed мәніне. Мұндай Тұтынушы транзакциялық емес хабарламаларды бұрынғыдай және транзакциялық хабарламаларды міндеттемеден кейін ғана оқи алады.
Егер сіз бұрын аталған барлық параметрлерді орнатқан болсаңыз, онда сіз жеткізуді дәл бір рет конфигурациялағансыз. Құттықтаймыз!

Бірақ тағы бір нюанс бар. Біз жоғарыда конфигурациялаған Transactional.id шын мәнінде транзакция префиксі болып табылады. Транзакция менеджерінде оған реттік нөмір қосылады. Алынған идентификатор беріледі tranzak.id.expiration.ms, ол Кафка кластерінде конфигурацияланған және әдепкі мәні “7 күн”. Егер осы уақыт ішінде қолданба ешқандай хабарлама алмаса, келесі транзакциялық жіберуді орындаған кезде сіз аласыз InvalidPidMappingException. Содан кейін транзакция үйлестірушісі келесі транзакция үшін жаңа реттік нөмір береді. Дегенмен, InvalidPidMappingException дұрыс өңделмесе, хабар жоғалуы мүмкін.

Қорытындылар орнына

Көріп отырғаныңыздай, Кафкаға жай ғана хабарлама жіберу жеткіліксіз. Параметрлердің комбинациясын таңдап, жылдам өзгертулер енгізуге дайын болу керек. Бұл мақалада мен дәл бір рет жеткізуді орнатуды егжей-тегжейлі көрсетуге тырыстым және біз кездескен client.id және transactional.id конфигурацияларына қатысты бірнеше мәселелерді сипаттадым. Төменде өндіруші және тұтынушы параметрлерінің қысқаша мазмұны берілген.

Өндіруші:

  1. acks = барлығы
  2. қайталау > 0
  3. enable.idempotence = шын
  4. max.in.flight.requests.per.connection ≤ 5 (ретімен жіберу үшін 1)
  5. transferal.id = ${application-name}-${хост аты}

Тұтынушы:

  1. isolation.level = оқылған_ орындалды

Болашақ қолданбалардағы қателерді азайту үшін біз серіппелі конфигурацияның үстінен өз орауышымызды жасадық, мұнда кейбір тізімделген параметрлердің мәндері орнатылған.

Міне, өздігінен оқуға арналған бірнеше материалдар:

Ақпарат көзі: www.habr.com

пікір қалдыру