Кафкадан алынған оқиғаларды қайта өңдеу

Кафкадан алынған оқиғаларды қайта өңдеу

Эй Хабр.

Жақында мен тәжірибесімен бөлісті Кепілдендірілген жеткізуге жақындау үшін біз команда ретінде Кафка өндірушісі мен тұтынушысы үшін қандай параметрлерді жиі қолданатынымыз туралы. Бұл мақалада мен сізге сыртқы жүйенің уақытша қолжетімсіздігі нәтижесінде Кафкадан алынған оқиғаны қайта өңдеуді қалай ұйымдастырғанымызды айтқым келеді.

Қазіргі заманғы қолданбалар өте күрделі ортада жұмыс істейді. Kubernetes немесе OpenShift сияқты оркестр басқаратын Docker кескінінде жұмыс істейтін және физикалық және виртуалды маршрутизаторлар тізбегі арқылы басқа қолданбалармен немесе кәсіпорын шешімдерімен байланысатын заманауи технологиялық стекке оралған бизнес логикасы. Мұндай ортада бірдеңе әрқашан үзілуі мүмкін, сондықтан сыртқы жүйелердің бірі қолжетімсіз болса, оқиғаларды қайта өңдеу біздің бизнес-процестердің маңызды бөлігі болып табылады.

Кафкаға дейін қалай болды

Бұрын жобада біз IBM MQ асинхронды хабарламаны жеткізу үшін пайдаландық. Қызметтің жұмысы кезінде қандай да бір қате орын алса, алынған хабарды әрі қарай қолмен талдау үшін өлі әріптер кезегіне (DLQ) қоюға болады. DLQ кіріс кезегінің жанында құрылды, хабар IBM MQ ішінде тасымалданды.

Егер қате уақытша болса және біз оны анықтай алсақ (мысалы, HTTP қоңырауындағы ResourceAccessException немесе MongoDb сұрауындағы MongoTimeoutException), онда қайталау стратегиясы күшіне енеді. Қолданбаның тармақталған логикасына қарамастан, бастапқы хабарлама не кешіктірілген жіберу үшін жүйелік кезекке немесе хабарламаларды қайта жіберу үшін бұрыннан жасалған бөлек қолданбаға жылжытылды. Бұл кешігу аралығына немесе қолданба деңгейіндегі стратегияның соңына байланысты хабар тақырыбын қайта жіберу нөмірін қамтиды. Егер біз стратегияның соңына жеткен болсақ, бірақ сыртқы жүйе әлі қолжетімсіз болса, хабарлама қолмен талдау үшін DLQ ішіне орналастырылады.

Шешім іздеу

Интернетте іздеу, келесіні таба аласыз шешім. Бір сөзбен айтқанда, әрбір кідіріс аралығы үшін тақырып құру және қажетті кідіріспен хабарламаларды оқитын жағында Тұтынушы қосымшаларын енгізу ұсынылады.

Кафкадан алынған оқиғаларды қайта өңдеу

Көптеген оң пікірлерге қарамастан, менің ойымша, бұл мүлдем сәтті емес. Ең алдымен, әзірлеушіге бизнес талаптарын жүзеге асырумен қатар, сипатталған механизмді енгізуге көп уақыт жұмсауға тура келеді.

Сонымен қатар, егер Кафка кластерінде рұқсатты басқару қосылған болса, тақырыптарды жасауға және оларға қажетті қатынасты қамтамасыз етуге біраз уақыт жұмсауға тура келеді. Бұған қоса, хабарлардың қайта жіберілетін уақыты болуы және одан жоғалып кетпеуі үшін қайталап көру тақырыптарының әрқайсысы үшін дұрыс retention.ms параметрін таңдау қажет. Қол жеткізуді іске асыру және сұрау әрбір бар немесе жаңа қызмет үшін қайталануы керек.

Енді хабарларды қайта өңдеу үшін жалпы көктемгі және атап айтқанда көктем-кафка қандай механизмдер беретінін көрейік. Spring-kafka әртүрлі BackOffPolicies басқару үшін абстракцияларды қамтамасыз ететін көктемгі қайталау әрекетіне өтпелі тәуелділікке ие. Бұл өте икемді құрал, бірақ оның маңызды кемшілігі хабарларды қолданба жадында қайта жіберу үшін сақтау болып табылады. Бұл жаңарту немесе операциялық қатеге байланысты қолданбаны қайта іске қосу қайта өңдеуді күткен барлық хабарлардың жоғалуына әкелетінін білдіреді. Бұл тармақ біздің жүйеміз үшін маңызды болғандықтан, біз оны әрі қарай қарастырмадық.

Spring-kafka өзі, мысалы, ContainerAwareErrorHandler бірнеше іске асыруларын қамтамасыз етеді SeekToCurrentErrorHandler, оның көмегімен қате болған жағдайда офсетті жылжытпай хабарламаны кейінірек өңдеуге болады. Spring-kafka 2.3 нұсқасынан бастап BackOffPolicy орнату мүмкін болды.

Бұл тәсіл қайта өңделген хабарларға қолданбаны қайта іске қосу кезінде аман қалуға мүмкіндік береді, бірақ әлі де DLQ механизмі жоқ. Біз бұл опцияны 2019 жылдың басында таңдадық, оптимистік түрде DLQ қажет емес деп сендік (бізге сәттілік болды және қолданбаны осындай қайта өңдеу жүйесімен бірнеше ай жұмыс істегеннен кейін іс жүзінде қажет болмады). Уақытша қателер SeekToCurrentErrorHandler іске қосылды. Қалған қателер журналда басып шығарылды, нәтижесінде офсет пайда болды және өңдеу келесі хабарламамен жалғасты.

Соңғы шешім

SeekToCurrentErrorHandler негізінде іске асыру бізді хабарламаларды қайта жіберудің жеке механизмін әзірлеуге итермеледі.

Ең алдымен, біз бар тәжірибені қолданып, оны қолданбалы логикаға байланысты кеңейткіміз келді. Сызықтық логикалық қолданба үшін қайталау стратегиясымен көрсетілген қысқа уақыт аралығында жаңа хабарларды оқуды тоқтату оңтайлы болады. Басқа қолданбалар үшін мен қайталау стратегиясын орындайтын жалғыз нүктеге ие болғым келді. Бұған қоса, бұл жалғыз нүкте екі тәсіл үшін де DLQ функциясына ие болуы керек.

Қайталау стратегиясының өзі қолданбада сақталуы керек, ол уақытша қате орын алған кезде келесі аралықты шығарып алуға жауапты.

Сызықтық логикалық қолданба үшін тұтынушыны тоқтату

Spring-kafka-мен жұмыс істегенде, Тұтынушыны тоқтату коды келесідей болуы мүмкін:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

Мысалда retryAt - MessageListenerContainer әлі жұмыс істеп тұрса, оны қайта іске қосу уақыты. Қайта іске қосу TaskScheduler бағдарламасында іске қосылған бөлек ағында болады, оның орындалуы көктемде де қамтамасыз етіледі.

retryAt мәнін келесі жолмен табамыз:

  1. Қайта шақыру есептегішінің мәні ізделеді.
  2. Есептегіш мәнге сүйене отырып, қайталау стратегиясындағы ағымдағы кешігу аралығы ізделеді. Стратегия қолданбаның өзінде жарияланған; біз оны сақтау үшін JSON пішімін таңдадық.
  3. JSON массивінде табылған аралық өңдеуді қайталау қажет болатын секундтар санын қамтиды. Бұл секунд саны retryAt мәнін қалыптастыру үшін ағымдағы уақытқа қосылады.
  4. Егер аралық табылмаса, retryAt мәні нөлге тең болады және хабарлама қолмен талдау үшін DLQ-ге жіберіледі.

Бұл тәсілмен қазіргі уақытта өңделіп жатқан әрбір хабар үшін қайталанатын қоңыраулар санын сақтау ғана қалады, мысалы, қолданба жадында. Қайталау санын жадта сақтау бұл тәсіл үшін маңызды емес, өйткені сызықтық логикалық қолданба өңдеуді тұтастай өңдей алмайды. Көктемгі қайталаудан айырмашылығы, қолданбаны қайта іске қосу барлық хабарлардың қайта өңделуіне жол бермейді, тек стратегияны қайта іске қосады.

Бұл тәсіл өте ауыр жүктемеге байланысты қол жетімсіз болуы мүмкін сыртқы жүйе жүктемесін алып тастауға көмектеседі. Яғни, қайта өңдеумен қатар, үлгіні жүзеге асыруға қол жеткіздік ажыратқыш.

Біздің жағдайда қате шегі тек 1 болып табылады және желінің уақытша үзілуіне байланысты жүйенің тоқтап қалу уақытын азайту үшін біз аз кідіріс аралықтары бар өте түйіршікті қайталау стратегиясын қолданамыз. Бұл барлық топтық қолданбалар үшін сәйкес келмеуі мүмкін, сондықтан қате шегі мен интервал мәні арасындағы қатынас жүйенің сипаттамалары негізінде таңдалуы керек.

Детерминирленбеген логикасы бар қолданбалардан хабарларды өңдеуге арналған бөлек қолданба

Міне, RETRY_AT уақытына жеткенде DESTINATION тақырыбына қайта жіберілетін осындай қолданбаға (Қайталаушы) хабарлама жіберетін кодтың мысалы:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

Мысал көптеген ақпараттың тақырыптарда берілетінін көрсетеді. RETRY_AT мәні тұтынушы аялдамасы арқылы қайталау механизмі сияқты табылады. DESTINATION және RETRY_AT жолдарынан басқа біз өтеміз:

  • GROUP_ID, ол арқылы қолмен талдау және оңайлатылған іздеу үшін хабарларды топтаймыз.
  • ORIGINAL_PARTITION қайта өңдеу үшін бірдей Тұтынушыны сақтауға тырысу үшін. Бұл параметр нөл болуы мүмкін, бұл жағдайда жаңа бөлім бастапқы хабарламаның record.key() пернесі арқылы алынады.
  • Қайталау стратегиясын орындау үшін COUNTER мәні жаңартылды.
  • SEND_TO – хабардың RETRY_AT жеткенде қайта өңдеуге жіберілгенін немесе DLQ ішіне орналастырылғанын көрсететін тұрақты.
  • СЕБЕП - хабарламаны өңдеудің үзілу себебі.

Retryer PostgreSQL жүйесінде қайта жіберу және қолмен талдау үшін хабарларды сақтайды. Таймер RETRY_AT көмегімен хабарларды табатын тапсырманы бастайды және оларды record.key() кілтімен DESTINATION тақырыбының ORIGINAL_PARTITION бөліміне қайта жібереді.

Жіберілгеннен кейін хабарламалар PostgreSQL жүйесінен жойылады. Хабарларды қолмен талдау REST API арқылы Retryer-мен әрекеттесетін қарапайым UI-де орын алады. Оның негізгі мүмкіндіктері хабарларды DLQ жүйесінен қайта жіберу немесе жою, қате туралы ақпаратты қарау және хабарларды іздеу, мысалы қате аты бойынша.

Кластерлерімізде қол жеткізуді басқару қосылғандықтан, Retryer тыңдап отырған тақырыпқа рұқсатты қосымша сұрау керек және Retryer бағдарламасына DESTINATION тақырыбына жазуға рұқсат беру керек. Бұл ыңғайсыз, бірақ интервалдық тақырып тәсілінен айырмашылығы, бізде оны басқару үшін толыққанды DLQ және UI бар.

Кіріс тақырыпты қолданбалары әртүрлі логиканы жүзеге асыратын бірнеше әртүрлі тұтынушылар топтары оқитын жағдайлар бар. Осы қолданбалардың біреуі үшін Retryer арқылы хабарды қайта өңдеу екіншісінде көшірмеге әкеледі. Бұдан қорғау үшін біз қайта өңдеу үшін бөлек тақырып жасаймыз. Кіріс және қайталау тақырыптарын бір Тұтынушы ешбір шектеусіз оқи алады.

Кафкадан алынған оқиғаларды қайта өңдеу

Әдепкі бойынша бұл тәсіл автоматты ажыратқыштың функционалдығын қамтамасыз етпейді, бірақ оны қолдану арқылы қолданбаға қосуға болады көктемгі бұлт-netflix немесе жаңа серіппелі бұлтты ажыратқыш, сыртқы қызметтер шақырылатын орындарды сәйкес абстракцияларға орау. Сонымен қатар, стратегияны таңдауға болады қалқан үлгі, ол да пайдалы болуы мүмкін. Мысалы, spring-cloud-netflix-те бұл ағындық пул немесе семафор болуы мүмкін.

қорытынды

Нәтижесінде бізде кез келген сыртқы жүйе уақытша қолжетімсіз болған жағдайда хабарды өңдеуді қайталауға мүмкіндік беретін бөлек қолданба бар.

Қолданбаның басты артықшылықтарының бірі - оны бір Кафка кластерінде жұмыс істейтін сыртқы жүйелер, олардың жағында айтарлықтай өзгерістерсіз пайдалана алады! Мұндай қолданбаға тек қайталау тақырыбына қатынасу, бірнеше Кафка тақырыптарын толтыру және Қайталаушыға хабарлама жіберу қажет болады. Ешқандай қосымша инфрақұрылымды көтерудің қажеті жоқ. Ал қосымшадан Retryer-ге және кері жіберілетін хабарламалар санын азайту үшін біз сызықтық логикасы бар қолданбаларды анықтадық және оларды Consumer stop арқылы қайта өңдедік.

Ақпарат көзі: www.habr.com

пікір қалдыру