Կաֆկայից ստացված իրադարձությունների վերամշակում

Կաֆկայից ստացված իրադարձությունների վերամշակում

Հե՜յ Հաբր։

Վերջերս ես կիսվել է իր փորձով այն մասին, թե ինչ պարամետրեր ենք մենք որպես թիմ առավել հաճախ օգտագործում Kafka արտադրողի և սպառողի համար՝ երաշխավորված առաքմանը մոտենալու համար: Այս հոդվածում ուզում եմ պատմել ձեզ, թե ինչպես ենք մենք կազմակերպել Կաֆկայից ստացված իրադարձության վերամշակումը արտաքին համակարգի ժամանակավոր անհասանելիության արդյունքում։

Ժամանակակից հավելվածները գործում են շատ բարդ միջավայրում: Բիզնես տրամաբանությունը փաթաթված ժամանակակից տեխնոլոգիաների կույտով, որն աշխատում է Docker պատկերով, որը կառավարվում է Kubernetes-ի կամ OpenShift-ի նման նվագախմբի կողմից և հաղորդակցվում է այլ հավելվածների կամ ձեռնարկությունների լուծումների հետ ֆիզիկական և վիրտուալ երթուղիչների շղթայի միջոցով: Նման միջավայրում ինչ-որ բան միշտ կարող է կոտրվել, ուստի իրադարձությունների վերամշակումը, եթե արտաքին համակարգերից որևէ մեկը անհասանելի է, մեր բիզնես գործընթացների կարևոր մասն է:

Ինչպես էր Կաֆկան առաջ

Ավելի վաղ նախագծում մենք օգտագործում էինք IBM MQ-ն ասինխրոն հաղորդագրությունների առաքման համար: Եթե ​​ծառայության շահագործման ընթացքում որևէ սխալ է տեղի ունեցել, ստացված հաղորդագրությունը կարող է տեղադրվել մեռյալ տառերի հերթում (DLQ) հետագա ձեռքով վերլուծության համար: DLQ-ն ստեղծվել է մուտքային հերթի կողքին, հաղորդագրությունը փոխանցվել է IBM MQ-ի ներսում:

Եթե ​​սխալը ժամանակավոր էր, և մենք կարողանայինք որոշել այն (օրինակ՝ ResourceAccessException HTTP զանգի կամ MongoTimeoutException MongoDb հարցման դեպքում), ապա նորից փորձելու ռազմավարությունը կգործի: Անկախ հավելվածի ճյուղավորման տրամաբանությունից՝ սկզբնական հաղորդագրությունը տեղափոխվել է կա՛մ համակարգային հերթ՝ ուշ ուղարկելու համար, կա՛մ առանձին հավելված, որը վաղուց պատրաստվել է հաղորդագրությունները նորից ուղարկելու համար: Սա ներառում է հաղորդագրության վերնագրում նորից ուղարկելու համար, որը կապված է ուշացման միջակայքի կամ ծրագրի մակարդակի ռազմավարության ավարտի հետ: Եթե ​​մենք հասել ենք ռազմավարության ավարտին, բայց արտաքին համակարգը դեռևս անհասանելի է, ապա հաղորդագրությունը կտեղադրվի DLQ-ում՝ ձեռքով վերլուծելու համար:

Լուծում գտնելը

Որոնում ինտերնետում, կարող եք գտնել հետևյալը որոշում. Մի խոսքով, առաջարկվում է յուրաքանչյուր ուշացման միջակայքի համար ստեղծել թեմա և կողքից ներդնել Սպառողական հավելվածներ, որոնք կկարդան հաղորդագրությունները անհրաժեշտ ուշացումով։

Կաֆկայից ստացված իրադարձությունների վերամշակում

Չնայած դրական ակնարկների մեծ քանակին, ինձ թվում է, որ այն ամբողջովին հաջողակ չէ: Նախ այն պատճառով, որ մշակողը, բացի բիզնեսի պահանջները կյանքի կոչելուց, ստիպված է լինելու շատ ժամանակ ծախսել նկարագրված մեխանիզմի ներդրման վրա։

Բացի այդ, եթե Kafka կլաստերի վրա միացված է մուտքի կառավարումը, դուք ստիպված կլինեք որոշակի ժամանակ ծախսել թեմաների ստեղծման և դրանց անհրաժեշտ մուտքի ապահովման վրա: Ի հավելումն սրան, դուք պետք է ընտրեք ճիշտ retention.ms պարամետրը կրկնվող թեմաներից յուրաքանչյուրի համար, որպեսզի հաղորդագրությունները ժամանակ ունենան նորից ուղարկելու և չանհետանալ դրանցից: Մուտքի ներդրումը և պահանջը պետք է կրկնվեն յուրաքանչյուր գոյություն ունեցող կամ նոր ծառայության համար:

Հիմա տեսնենք, թե ինչ մեխանիզմներ են տալիս մեզ հաղորդագրության վերամշակման համար գարունն ընդհանրապես և գարուն-կաֆկան մասնավորապես: Spring-kafka-ն անցումային կախվածություն ունի գարուն-կրկին փորձից, որն ապահովում է աբստրակցիաներ տարբեր BackOffPolicies-ի կառավարման համար: Սա բավականին ճկուն գործիք է, սակայն դրա էական թերությունը հավելվածի հիշողության մեջ հաղորդագրությունների պահպանումն է: Սա նշանակում է, որ թարմացման կամ գործառնական սխալի պատճառով հավելվածի վերագործարկումը կհանգեցնի վերամշակման սպասող բոլոր հաղորդագրությունների կորստի: Քանի որ այս կետը չափազանց կարևոր է մեր համակարգի համար, մենք այն հետագայում չդիտարկեցինք:

Spring-kafka-ն ինքն է ապահովում ContainerAwareErrorHandler-ի մի քանի իրականացում, օրինակ SeekToCurrentError Handler, որով կարող եք հետագայում մշակել հաղորդագրությունը՝ սխալի դեպքում առանց օֆսեթ տեղափոխելու։ Սկսած Spring-kafka 2.3 տարբերակից՝ հնարավոր դարձավ սահմանել BackOffPolicy-ը:

Այս մոտեցումը թույլ է տալիս վերամշակված հաղորդագրություններին գոյատևել հավելվածի վերագործարկումը, բայց դեռևս չկա DLQ մեխանիզմ: Մենք ընտրեցինք այս տարբերակը 2019-ի սկզբին՝ լավատեսորեն հավատալով, որ DLQ-ի կարիքը չի լինի (մեզ հաջողակ էր և իրականում դրա կարիքը չունեինք հավելվածը նման վերամշակման համակարգով մի քանի ամիս աշխատելուց հետո): Ժամանակավոր սխալների պատճառով SeekToCurrentErrorHandler-ը գործարկվեց: Մնացած սխալները տպագրվել են մատյանում, որի արդյունքում առաջացել է օֆսեթ, և մշակումը շարունակվել է հաջորդ հաղորդագրությամբ:

Վերջնական որոշում

SeekToCurrentErrorHandler-ի վրա հիմնված իրականացումը մեզ հուշեց մշակել հաղորդագրությունների վերաուղարկման մեր սեփական մեխանիզմը:

Առաջին հերթին մենք ցանկանում էինք օգտագործել առկա փորձը և ընդլայնել այն՝ կախված հավելվածի տրամաբանությունից։ Գծային տրամաբանության կիրառման համար օպտիմալ կլինի դադարեցնել նոր հաղորդագրությունների ընթերցումը կարճ ժամանակով, որը նշված է կրկնակի ռազմավարությամբ: Այլ ծրագրերի համար ես ուզում էի ունենալ մեկ կետ, որը կկիրառի կրկնակի փորձի ռազմավարությունը: Բացի այդ, այս մեկ կետը պետք է ունենա DLQ ֆունկցիոնալություն երկու մոտեցումների համար:

Կրկնելու ռազմավարությունն ինքնին պետք է պահվի հավելվածում, որը պատասխանատու է ժամանակավոր սխալի դեպքում հաջորդ ինտերվալը ստանալու համար:

Սպառողի դադարեցում գծային տրամաբանական կիրառման համար

Spring-kafka-ի հետ աշխատելիս Սպառողին կանգնեցնելու կոդը կարող է այսպիսի տեսք ունենալ.

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

Օրինակ, retryAt-ը MessageListenerContainer-ը վերագործարկելու ժամանակն է, եթե այն դեռ աշխատում է: Վերագործարկումը տեղի կունենա TaskScheduler-ում գործարկված առանձին թեմայում, որի իրականացումը նույնպես ապահովված է մինչև գարուն:

Մենք գտնում ենք retryAt արժեքը հետևյալ կերպ.

  1. Վերականգնման հաշվիչի արժեքը փնտրվում է:
  2. Հաշվիչի հիման վրա որոնվում է կրկնակի ռազմավարության հետաձգման ընթացիկ միջակայքը: Ռազմավարությունը հայտարարված է հենց հավելվածում, այն պահելու համար մենք ընտրել ենք JSON ձևաչափը:
  3. JSON զանգվածում հայտնաբերված ինտերվալը պարունակում է վայրկյանների քանակը, որից հետո մշակումը պետք է կրկնվի: Այս վայրկյանների քանակը ավելացվում է ընթացիկ ժամանակին՝ retryAt արժեքը ձևավորելու համար:
  4. Եթե ​​միջակայքը չի գտնվել, ապա retryAt-ի արժեքը null է, և հաղորդագրությունը կուղարկվի DLQ՝ ձեռքով վերլուծելու համար:

Այս մոտեցմամբ, մնում է միայն պահպանել կրկնվող զանգերի քանակը ներկայումս մշակվող յուրաքանչյուր հաղորդագրության համար, օրինակ՝ հավելվածի հիշողության մեջ: Կրկնակի հաշվումը հիշողության մեջ պահելը կարևոր չէ այս մոտեցման համար, քանի որ գծային տրամաբանական հավելվածը չի կարող մշակել որպես ամբողջություն: Ի տարբերություն գարնանային կրկնակի փորձի, հավելվածի վերագործարկումը չի հանգեցնի բոլոր հաղորդագրությունների կորստի վերամշակմանը, այլ պարզապես կվերագործարկի ռազմավարությունը:

Այս մոտեցումը օգնում է հեռացնել բեռը արտաքին համակարգից, որը կարող է անհասանելի լինել շատ ծանր բեռի պատճառով: Այսինքն՝ բացի վերամշակումից, մենք հասանք օրինաչափության իրականացմանը անջատիչ.

Մեր դեպքում սխալի շեմը ընդամենը 1 է, և ցանցի ժամանակավոր խափանումների պատճառով համակարգի խափանումները նվազագույնի հասցնելու համար մենք օգտագործում ենք շատ հատիկավոր կրկնվող ռազմավարություն՝ փոքր ուշացման ընդմիջումներով: Սա կարող է հարմար չլինել բոլոր խմբային հավելվածների համար, ուստի սխալի շեմի և միջակայքի արժեքի միջև կապը պետք է ընտրվի՝ ելնելով համակարգի բնութագրերից:

Ոչ դետերմինիստական ​​տրամաբանությամբ հավելվածներից հաղորդագրությունների մշակման առանձին հավելված

Ահա կոդի օրինակ, որը հաղորդագրություն է ուղարկում նման հավելվածին (Retriver), որը նորից կուղարկվի DESTINATION թեմային, երբ հասնի RETRY_AT ժամը.


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

Օրինակը ցույց է տալիս, որ շատ տեղեկություններ փոխանցվում են վերնագրերով: RETRY_AT-ի արժեքը հայտնաբերվում է այնպես, ինչպես կրկնվող մեխանիզմի դեպքում՝ սպառողի կանգառի միջոցով: Բացի DESTINATION-ից և RETRY_AT-ից, մենք անցնում ենք՝

  • GROUP_ID, որով մենք խմբավորում ենք հաղորդագրությունները ձեռքով վերլուծության և պարզեցված որոնման համար:
  • ORIGINAL_PARTITION՝ փորձելու նույն Սպառողին պահել վերամշակման համար: Այս պարամետրը կարող է լինել զրոյական, որի դեպքում նոր բաժինը կստացվի սկզբնական հաղորդագրության record.key() ստեղնով։
  • Թարմացվել է COUNTER արժեքը՝ կրկին փորձելու ռազմավարությանը հետևելու համար:
  • SEND_TO-ը հաստատուն է, որը ցույց է տալիս, թե հաղորդագրությունն ուղարկվում է վերամշակման RETRY_AT-ին հասնելուց հետո, թե տեղադրվում է DLQ-ում:
  • REASON - հաղորդագրությունների մշակման ընդհատման պատճառ:

Retryer-ը պահում է հաղորդագրությունները՝ PostgreSQL-ում նորից ուղարկելու և ձեռքով վերլուծելու համար: Ժամաչափը սկսում է առաջադրանք, որը գտնում է հաղորդագրությունները RETRY_AT-ով և դրանք հետ է ուղարկում DESTINATION թեմայի ORIGINAL_PARTITION բաժին՝ record.key() բանալինով:

Ուղարկվելուց հետո հաղորդագրությունները ջնջվում են PostgreSQL-ից: Հաղորդագրությունների ձեռքով վերլուծությունը տեղի է ունենում պարզ UI-ում, որը փոխազդում է Retryer-ի հետ REST API-ի միջոցով: Դրա հիմնական առանձնահատկություններն են DLQ-ից հաղորդագրությունների վերաուղարկումը կամ ջնջումը, սխալի մասին տեղեկատվության դիտումը և հաղորդագրությունների որոնումը, օրինակ՝ սխալի անունով:

Քանի որ մուտքի կառավարումը միացված է մեր կլաստերներում, անհրաժեշտ է լրացուցիչ պահանջել մուտքի թույլտվություն դեպի այն թեման, որը Retrier-ը լսում է, և թույլ տալ Retrier-ին գրել DESTINATION թեմային: Սա անհարմար է, բայց, ի տարբերություն միջակայքային թեմայի մոտեցման, մենք ունենք լիարժեք DLQ և UI՝ այն կառավարելու համար:

Լինում են դեպքեր, երբ մուտքային թեման կարդում են սպառողների մի քանի տարբեր խմբեր, որոնց հավելվածները տարբեր տրամաբանություն են իրականացնում։ Այս հավելվածներից մեկի համար Retrier-ի միջոցով հաղորդագրություն վերամշակելը կհանգեցնի մյուսի կրկնօրինակին: Սրանից պաշտպանվելու համար մենք առանձին թեմա ենք ստեղծում վերամշակման համար։ Մուտքային և նորից փորձարկվող թեմաները կարող են կարդալ նույն Սպառողը առանց որևէ սահմանափակումների:

Կաֆկայից ստացված իրադարձությունների վերամշակում

Լռելյայն այս մոտեցումը չի ապահովում անջատիչի ֆունկցիոնալությունը, սակայն այն կարող է ավելացվել հավելվածին՝ օգտագործելով Spring-Cud-Netflix կամ նոր գարնանային ամպի անջատիչ, համապատասխան աբստրակցիաների մեջ փաթաթելով այն վայրերը, որտեղ արտաքին ծառայությունները կոչվում են։ Բացի այդ, հնարավոր է դառնում ընտրել ռազմավարություն միջնորմ օրինակ, որը նույնպես կարող է օգտակար լինել: Օրինակ, spring-cloud-netflix-ում սա կարող է լինել թելի լողավազան կամ սեմաֆոր:

Արտադրողականություն

Արդյունքում մենք ունենք առանձին հավելված, որը թույլ է տալիս կրկնել հաղորդագրությունների մշակումը, եթե որևէ արտաքին համակարգ ժամանակավորապես անհասանելի է:

Հավելվածի հիմնական առավելություններից մեկն այն է, որ այն կարող է օգտագործվել արտաքին համակարգերի կողմից, որոնք աշխատում են նույն Կաֆկա կլաստերի վրա՝ առանց իրենց կողմից էական փոփոխությունների: Նման հավելվածին անհրաժեշտ կլինի մուտք գործել միայն կրկնակի փորձի թեմա, լրացնել Կաֆկայի մի քանի վերնագրեր և հաղորդագրություն ուղարկել Կրկնվողին: Լրացուցիչ ենթակառուցվածք ստեղծելու կարիք չկա։ Եվ որպեսզի նվազեցնենք հավելվածից Retrier և հակառակ ուղղությամբ փոխանցվող հաղորդագրությունների քանակը, մենք նույնականացրեցինք գծային տրամաբանությամբ հավելվածները և վերամշակեցինք դրանք Consumer stop-ի միջոցով:

Source: www.habr.com

Добавить комментарий