Հե՜յ Հաբր։
Վերջերս ես
Ժամանակակից հավելվածները գործում են շատ բարդ միջավայրում: Բիզնես տրամաբանությունը փաթաթված ժամանակակից տեխնոլոգիաների կույտով, որն աշխատում է Docker պատկերով, որը կառավարվում է Kubernetes-ի կամ OpenShift-ի նման նվագախմբի կողմից և հաղորդակցվում է այլ հավելվածների կամ ձեռնարկությունների լուծումների հետ ֆիզիկական և վիրտուալ երթուղիչների շղթայի միջոցով: Նման միջավայրում ինչ-որ բան միշտ կարող է կոտրվել, ուստի իրադարձությունների վերամշակումը, եթե արտաքին համակարգերից որևէ մեկը անհասանելի է, մեր բիզնես գործընթացների կարևոր մասն է:
Ինչպես էր Կաֆկան առաջ
Ավելի վաղ նախագծում մենք օգտագործում էինք IBM MQ-ն ասինխրոն հաղորդագրությունների առաքման համար: Եթե ծառայության շահագործման ընթացքում որևէ սխալ է տեղի ունեցել, ստացված հաղորդագրությունը կարող է տեղադրվել մեռյալ տառերի հերթում (DLQ) հետագա ձեռքով վերլուծության համար: DLQ-ն ստեղծվել է մուտքային հերթի կողքին, հաղորդագրությունը փոխանցվել է IBM MQ-ի ներսում:
Եթե սխալը ժամանակավոր էր, և մենք կարողանայինք որոշել այն (օրինակ՝ ResourceAccessException HTTP զանգի կամ MongoTimeoutException MongoDb հարցման դեպքում), ապա նորից փորձելու ռազմավարությունը կգործի: Անկախ հավելվածի ճյուղավորման տրամաբանությունից՝ սկզբնական հաղորդագրությունը տեղափոխվել է կա՛մ համակարգային հերթ՝ ուշ ուղարկելու համար, կա՛մ առանձին հավելված, որը վաղուց պատրաստվել է հաղորդագրությունները նորից ուղարկելու համար: Սա ներառում է հաղորդագրության վերնագրում նորից ուղարկելու համար, որը կապված է ուշացման միջակայքի կամ ծրագրի մակարդակի ռազմավարության ավարտի հետ: Եթե մենք հասել ենք ռազմավարության ավարտին, բայց արտաքին համակարգը դեռևս անհասանելի է, ապա հաղորդագրությունը կտեղադրվի DLQ-ում՝ ձեռքով վերլուծելու համար:
Լուծում գտնելը
Չնայած դրական ակնարկների մեծ քանակին, ինձ թվում է, որ այն ամբողջովին հաջողակ չէ: Նախ այն պատճառով, որ մշակողը, բացի բիզնեսի պահանջները կյանքի կոչելուց, ստիպված է լինելու շատ ժամանակ ծախսել նկարագրված մեխանիզմի ներդրման վրա։
Բացի այդ, եթե Kafka կլաստերի վրա միացված է մուտքի կառավարումը, դուք ստիպված կլինեք որոշակի ժամանակ ծախսել թեմաների ստեղծման և դրանց անհրաժեշտ մուտքի ապահովման վրա: Ի հավելումն սրան, դուք պետք է ընտրեք ճիշտ retention.ms պարամետրը կրկնվող թեմաներից յուրաքանչյուրի համար, որպեսզի հաղորդագրությունները ժամանակ ունենան նորից ուղարկելու և չանհետանալ դրանցից: Մուտքի ներդրումը և պահանջը պետք է կրկնվեն յուրաքանչյուր գոյություն ունեցող կամ նոր ծառայության համար:
Հիմա տեսնենք, թե ինչ մեխանիզմներ են տալիս մեզ հաղորդագրության վերամշակման համար գարունն ընդհանրապես և գարուն-կաֆկան մասնավորապես: Spring-kafka-ն անցումային կախվածություն ունի գարուն-կրկին փորձից, որն ապահովում է աբստրակցիաներ տարբեր BackOffPolicies-ի կառավարման համար: Սա բավականին ճկուն գործիք է, սակայն դրա էական թերությունը հավելվածի հիշողության մեջ հաղորդագրությունների պահպանումն է: Սա նշանակում է, որ թարմացման կամ գործառնական սխալի պատճառով հավելվածի վերագործարկումը կհանգեցնի վերամշակման սպասող բոլոր հաղորդագրությունների կորստի: Քանի որ այս կետը չափազանց կարևոր է մեր համակարգի համար, մենք այն հետագայում չդիտարկեցինք:
Spring-kafka-ն ինքն է ապահովում ContainerAwareErrorHandler-ի մի քանի իրականացում, օրինակ
Այս մոտեցումը թույլ է տալիս վերամշակված հաղորդագրություններին գոյատևել հավելվածի վերագործարկումը, բայց դեռևս չկա DLQ մեխանիզմ: Մենք ընտրեցինք այս տարբերակը 2019-ի սկզբին՝ լավատեսորեն հավատալով, որ DLQ-ի կարիքը չի լինի (մեզ հաջողակ էր և իրականում դրա կարիքը չունեինք հավելվածը նման վերամշակման համակարգով մի քանի ամիս աշխատելուց հետո): Ժամանակավոր սխալների պատճառով SeekToCurrentErrorHandler-ը գործարկվեց: Մնացած սխալները տպագրվել են մատյանում, որի արդյունքում առաջացել է օֆսեթ, և մշակումը շարունակվել է հաջորդ հաղորդագրությամբ:
Վերջնական որոշում
SeekToCurrentErrorHandler-ի վրա հիմնված իրականացումը մեզ հուշեց մշակել հաղորդագրությունների վերաուղարկման մեր սեփական մեխանիզմը:
Առաջին հերթին մենք ցանկանում էինք օգտագործել առկա փորձը և ընդլայնել այն՝ կախված հավելվածի տրամաբանությունից։ Գծային տրամաբանության կիրառման համար օպտիմալ կլինի դադարեցնել նոր հաղորդագրությունների ընթերցումը կարճ ժամանակով, որը նշված է կրկնակի ռազմավարությամբ: Այլ ծրագրերի համար ես ուզում էի ունենալ մեկ կետ, որը կկիրառի կրկնակի փորձի ռազմավարությունը: Բացի այդ, այս մեկ կետը պետք է ունենա DLQ ֆունկցիոնալություն երկու մոտեցումների համար:
Կրկնելու ռազմավարությունն ինքնին պետք է պահվի հավելվածում, որը պատասխանատու է ժամանակավոր սխալի դեպքում հաջորդ ինտերվալը ստանալու համար:
Սպառողի դադարեցում գծային տրամաբանական կիրառման համար
Spring-kafka-ի հետ աշխատելիս Սպառողին կանգնեցնելու կոդը կարող է այսպիսի տեսք ունենալ.
public void pauseListenerContainer(MessageListenerContainer listenerContainer,
Instant retryAt) {
if (nonNull(retryAt) && listenerContainer.isRunning()) {
listenerContainer.stop();
taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
return;
}
// to DLQ
}
Օրինակ, retryAt-ը MessageListenerContainer-ը վերագործարկելու ժամանակն է, եթե այն դեռ աշխատում է: Վերագործարկումը տեղի կունենա TaskScheduler-ում գործարկված առանձին թեմայում, որի իրականացումը նույնպես ապահովված է մինչև գարուն:
Մենք գտնում ենք retryAt արժեքը հետևյալ կերպ.
- Վերականգնման հաշվիչի արժեքը փնտրվում է:
- Հաշվիչի հիման վրա որոնվում է կրկնակի ռազմավարության հետաձգման ընթացիկ միջակայքը: Ռազմավարությունը հայտարարված է հենց հավելվածում, այն պահելու համար մենք ընտրել ենք JSON ձևաչափը:
- JSON զանգվածում հայտնաբերված ինտերվալը պարունակում է վայրկյանների քանակը, որից հետո մշակումը պետք է կրկնվի: Այս վայրկյանների քանակը ավելացվում է ընթացիկ ժամանակին՝ retryAt արժեքը ձևավորելու համար:
- Եթե միջակայքը չի գտնվել, ապա retryAt-ի արժեքը null է, և հաղորդագրությունը կուղարկվի DLQ՝ ձեռքով վերլուծելու համար:
Այս մոտեցմամբ, մնում է միայն պահպանել կրկնվող զանգերի քանակը ներկայումս մշակվող յուրաքանչյուր հաղորդագրության համար, օրինակ՝ հավելվածի հիշողության մեջ: Կրկնակի հաշվումը հիշողության մեջ պահելը կարևոր չէ այս մոտեցման համար, քանի որ գծային տրամաբանական հավելվածը չի կարող մշակել որպես ամբողջություն: Ի տարբերություն գարնանային կրկնակի փորձի, հավելվածի վերագործարկումը չի հանգեցնի բոլոր հաղորդագրությունների կորստի վերամշակմանը, այլ պարզապես կվերագործարկի ռազմավարությունը:
Այս մոտեցումը օգնում է հեռացնել բեռը արտաքին համակարգից, որը կարող է անհասանելի լինել շատ ծանր բեռի պատճառով: Այսինքն՝ բացի վերամշակումից, մենք հասանք օրինաչափության իրականացմանը
Մեր դեպքում սխալի շեմը ընդամենը 1 է, և ցանցի ժամանակավոր խափանումների պատճառով համակարգի խափանումները նվազագույնի հասցնելու համար մենք օգտագործում ենք շատ հատիկավոր կրկնվող ռազմավարություն՝ փոքր ուշացման ընդմիջումներով: Սա կարող է հարմար չլինել բոլոր խմբային հավելվածների համար, ուստի սխալի շեմի և միջակայքի արժեքի միջև կապը պետք է ընտրվի՝ ելնելով համակարգի բնութագրերից:
Ոչ դետերմինիստական տրամաբանությամբ հավելվածներից հաղորդագրությունների մշակման առանձին հավելված
Ահա կոդի օրինակ, որը հաղորդագրություն է ուղարկում նման հավելվածին (Retriver), որը նորից կուղարկվի DESTINATION թեմային, երբ հասնի RETRY_AT ժամը.
public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic,
Instant retryAt, String counter, String groupId, Exception e) {
Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
List<Header> arrayOfHeaders =
new ArrayList<>(Arrays.asList(headers.toArray()));
updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
updateHeader(arrayOfHeaders, ORIGINAL_PARTITION,
() -> Integer.toString(record.partition()).getBytes());
if (nonNull(retryAt)) {
updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
} else {
updateHeader(arrayOfHeaders, REASON,
ExceptionUtils.getStackTrace(e)::getBytes);
updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
}
ProducerRecord<K, V> messageToSend =
new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
kafkaTemplate.send(messageToSend);
}
Օրինակը ցույց է տալիս, որ շատ տեղեկություններ փոխանցվում են վերնագրերով: RETRY_AT-ի արժեքը հայտնաբերվում է այնպես, ինչպես կրկնվող մեխանիզմի դեպքում՝ սպառողի կանգառի միջոցով: Բացի DESTINATION-ից և RETRY_AT-ից, մենք անցնում ենք՝
- GROUP_ID, որով մենք խմբավորում ենք հաղորդագրությունները ձեռքով վերլուծության և պարզեցված որոնման համար:
- ORIGINAL_PARTITION՝ փորձելու նույն Սպառողին պահել վերամշակման համար: Այս պարամետրը կարող է լինել զրոյական, որի դեպքում նոր բաժինը կստացվի սկզբնական հաղորդագրության record.key() ստեղնով։
- Թարմացվել է COUNTER արժեքը՝ կրկին փորձելու ռազմավարությանը հետևելու համար:
- SEND_TO-ը հաստատուն է, որը ցույց է տալիս, թե հաղորդագրությունն ուղարկվում է վերամշակման RETRY_AT-ին հասնելուց հետո, թե տեղադրվում է DLQ-ում:
- REASON - հաղորդագրությունների մշակման ընդհատման պատճառ:
Retryer-ը պահում է հաղորդագրությունները՝ PostgreSQL-ում նորից ուղարկելու և ձեռքով վերլուծելու համար: Ժամաչափը սկսում է առաջադրանք, որը գտնում է հաղորդագրությունները RETRY_AT-ով և դրանք հետ է ուղարկում DESTINATION թեմայի ORIGINAL_PARTITION բաժին՝ record.key() բանալինով:
Ուղարկվելուց հետո հաղորդագրությունները ջնջվում են PostgreSQL-ից: Հաղորդագրությունների ձեռքով վերլուծությունը տեղի է ունենում պարզ UI-ում, որը փոխազդում է Retryer-ի հետ REST API-ի միջոցով: Դրա հիմնական առանձնահատկություններն են DLQ-ից հաղորդագրությունների վերաուղարկումը կամ ջնջումը, սխալի մասին տեղեկատվության դիտումը և հաղորդագրությունների որոնումը, օրինակ՝ սխալի անունով:
Քանի որ մուտքի կառավարումը միացված է մեր կլաստերներում, անհրաժեշտ է լրացուցիչ պահանջել մուտքի թույլտվություն դեպի այն թեման, որը Retrier-ը լսում է, և թույլ տալ Retrier-ին գրել DESTINATION թեմային: Սա անհարմար է, բայց, ի տարբերություն միջակայքային թեմայի մոտեցման, մենք ունենք լիարժեք DLQ և UI՝ այն կառավարելու համար:
Լինում են դեպքեր, երբ մուտքային թեման կարդում են սպառողների մի քանի տարբեր խմբեր, որոնց հավելվածները տարբեր տրամաբանություն են իրականացնում։ Այս հավելվածներից մեկի համար Retrier-ի միջոցով հաղորդագրություն վերամշակելը կհանգեցնի մյուսի կրկնօրինակին: Սրանից պաշտպանվելու համար մենք առանձին թեմա ենք ստեղծում վերամշակման համար։ Մուտքային և նորից փորձարկվող թեմաները կարող են կարդալ նույն Սպառողը առանց որևէ սահմանափակումների:
Լռելյայն այս մոտեցումը չի ապահովում անջատիչի ֆունկցիոնալությունը, սակայն այն կարող է ավելացվել հավելվածին՝ օգտագործելով
Արտադրողականություն
Արդյունքում մենք ունենք առանձին հավելված, որը թույլ է տալիս կրկնել հաղորդագրությունների մշակումը, եթե որևէ արտաքին համակարգ ժամանակավորապես անհասանելի է:
Հավելվածի հիմնական առավելություններից մեկն այն է, որ այն կարող է օգտագործվել արտաքին համակարգերի կողմից, որոնք աշխատում են նույն Կաֆկա կլաստերի վրա՝ առանց իրենց կողմից էական փոփոխությունների: Նման հավելվածին անհրաժեշտ կլինի մուտք գործել միայն կրկնակի փորձի թեմա, լրացնել Կաֆկայի մի քանի վերնագրեր և հաղորդագրություն ուղարկել Կրկնվողին: Լրացուցիչ ենթակառուցվածք ստեղծելու կարիք չկա։ Եվ որպեսզի նվազեցնենք հավելվածից Retrier և հակառակ ուղղությամբ փոխանցվող հաղորդագրությունների քանակը, մենք նույնականացրեցինք գծային տրամաբանությամբ հավելվածները և վերամշակեցինք դրանք Consumer stop-ի միջոցով:
Source: www.habr.com