Reprocessing txheej xwm tau txais los ntawm Kafka

Reprocessing txheej xwm tau txais los ntawm Kafka

Hlo Habr.

Tsis ntev los no kuv qhia nws qhov kev paub hais txog dab tsi uas peb ua ib pab neeg feem ntau siv rau Kafka Tus tsim khoom thiab cov neeg siv khoom kom tau los ze zog rau cov khoom xa tuaj. Nyob rau hauv tsab xov xwm no kuv xav qhia rau koj seb peb tau teeb tsa qhov kev rov ua tiav ntawm qhov kev tshwm sim tau txais los ntawm Kafka vim qhov tsis muaj nyob rau ib ntus ntawm cov txheej txheem sab nraud.

Cov ntawv thov niaj hnub ua haujlwm hauv ib puag ncig nyuaj heev. Kev lag luam logic qhwv hauv cov txheej txheem thev naus laus zis niaj hnub, khiav hauv Docker duab tswj los ntawm tus kws tshaj lij xws li Kubernetes lossis OpenShift, thiab sib txuas lus nrog lwm cov ntawv thov lossis kev daws teeb meem kev lag luam los ntawm cov saw ntawm lub cev thiab virtual routers. Nyob rau hauv ib puag ncig zoo li no, ib yam dab tsi tuaj yeem ua rau tawg, yog li kev rov ua dua tshiab yog tias ib qho ntawm cov txheej txheem sab nraud tsis muaj yog ib feem tseem ceeb ntawm peb cov txheej txheem kev lag luam.

Yuav ua li cas nws yog ua ntej Kafka

Ua ntej hauv qhov project peb siv IBM MQ rau kev xa xov asynchronous. Yog tias muaj teeb meem tshwm sim thaum lub sijhawm ua haujlwm ntawm qhov kev pabcuam, cov lus tau txais tuaj yeem muab tso rau hauv kab ntawv tuag (DLQ) rau kev ntsuas phau ntawv ntxiv. Lub DLQ tau tsim nyob ib sab ntawm cov kab tuaj, cov lus tau hloov mus rau hauv IBM MQ.

Yog tias qhov ua yuam kev yog ib ntus thiab peb tuaj yeem txiav txim siab nws (piv txwv li ResourceAccessException ntawm HTTP hu lossis MongoTimeoutException ntawm MongoDb thov), tom qab ntawd cov tswv yim rov ua dua yuav siv tau. Txawm hais tias lub logic ceg ntawm daim ntawv thov, cov lus qub tau txav mus rau hauv kab ke rau kev xa tawm qeeb, lossis mus rau ib daim ntawv thov cais uas tau ua ntev dhau los kom rov xa cov lus. Qhov no suav nrog tus lej xa rov qab rau hauv cov lus header, uas yog khi rau lub sijhawm ncua lossis qhov kawg ntawm daim ntawv thov-theem lub tswv yim. Yog tias peb tau mus txog qhov kawg ntawm lub tswv yim tab sis cov txheej txheem sab nraud tseem tsis muaj, ces cov lus yuav muab tso rau hauv DLQ rau kev txheeb xyuas phau ntawv.

Nrhiav kev daws teeb meem

Nrhiav hauv Internet, koj tuaj yeem pom cov hauv qab no qhov kev txiav txim siab. Nyob rau hauv luv luv, nws yog npaj los tsim ib lub ntsiab lus rau txhua ncua ncua sij hawm thiab siv cov neeg siv daim ntawv thov nyob rau sab, uas yuav nyeem cov lus nrog rau qhov yuav tsum tau ncua.

Reprocessing txheej xwm tau txais los ntawm Kafka

Txawm hais tias muaj ntau qhov kev tshuaj xyuas zoo, nws zoo li kuv tsis ua tiav tag nrho. Ua ntej tshaj plaws, vim tias tus tsim tawm, ntxiv rau kev siv cov kev cai ua lag luam, yuav tsum siv sijhawm ntau los siv cov txheej txheem piav qhia.

Tsis tas li ntawd, yog tias kev tswj xyuas nkag tau qhib rau ntawm Kafka pawg, koj yuav tau siv qee lub sijhawm los tsim cov ntsiab lus thiab muab qhov tsim nyog nkag rau lawv. Ntxiv rau qhov no, koj yuav tsum xaiv qhov tseeb retention.ms parameter rau txhua qhov rov sim cov ntsiab lus kom cov lus muaj sij hawm los tawm tsam thiab tsis ploj ntawm nws. Kev siv thiab kev thov nkag yuav tsum rov ua dua rau txhua qhov kev pabcuam uas twb muaj lawm lossis tshiab.

Wb tam sim no saib dab tsi mechanisms caij nplooj ntoos hlav feem ntau thiab caij nplooj ntoos hlav-kafka tshwj xeeb muab rau peb rau cov lus reprocessing. Caij nplooj ntoos hlav-kafka muaj qhov hloov pauv hloov pauv ntawm lub caij nplooj ntoo hlav-retry, uas muab kev paub daws teeb meem rau kev tswj hwm BackOffPolicies sib txawv. Qhov no yog cov cuab yeej hloov tau yooj yim, tab sis nws qhov teeb meem tseem ceeb yog khaws cov lus rau resend hauv daim ntawv thov nco. Qhov no txhais tau hais tias rov pib dua daim ntawv thov vim muaj qhov hloov tshiab lossis kev ua haujlwm yuam kev yuav ua rau poob tag nrho cov lus tseem tab tom rov ua dua. Txij li cov ntsiab lus no tseem ceeb rau peb lub cev, peb tsis xav txog nws ntxiv.

caij nplooj ntoos hlav-kafka nws tus kheej muab ntau qhov kev siv ntawm ContainerAwareErrorHandler, piv txwv li SeekToCurrentErrorHandler, uas koj tuaj yeem ua cov lus tom qab tsis muaj kev hloov pauv thaum muaj kev ua yuam kev. Pib nrog version ntawm lub caij nplooj ntoos hlav-kafka 2.3, nws tau los ua teeb tsa BackOffPolicy.

Txoj hauv kev no tso cai rau cov lus rov ua dua kom muaj sia nyob daim ntawv thov rov pib dua, tab sis tseem tsis muaj DLQ mechanism. Peb tau xaiv qhov kev xaiv no thaum pib ntawm 2019, qhov zoo ntseeg tias DLQ yuav tsis xav tau (peb muaj hmoo thiab tiag tiag tsis xav tau nws tom qab ob peb lub hlis ntawm kev khiav haujlwm ntawm daim ntawv thov nrog cov txheej txheem rov ua dua). Qhov yuam kev ib ntus ua rau SeekToCurrentErrorHandler tua hluav taws. Cov kev ua yuam kev uas tseem tshuav tau luam tawm hauv lub cav, ua rau muaj kev cuam tshuam, thiab kev ua haujlwm txuas ntxiv nrog cov lus tom ntej.

Kev txiav txim zaum kawg

Qhov kev siv raws li SeekToCurrentErrorHandler tau ua rau peb tsim peb tus kheej cov txheej txheem rau rov xa cov lus.

Ua ntej tshaj plaws, peb xav siv qhov kev paub dhau los thiab nthuav dav nws nyob ntawm daim ntawv thov logic. Rau ib daim ntawv thov linear logic, nws yuav yog qhov zoo tshaj kom tsis txhob nyeem cov lus tshiab rau lub sijhawm luv luv uas tau teev tseg los ntawm kev rov ua lub tswv yim. Rau lwm daim ntawv thov, kuv xav kom muaj ib qho taw tes uas yuav tswj hwm lub tswv yim rov ua dua. Tsis tas li ntawd, ib qho taw tes no yuav tsum muaj DLQ ua haujlwm rau ob qho tib si.

Lub tswv yim rov ua dua nws tus kheej yuav tsum tau muab cia rau hauv daim ntawv thov, uas yog lub luag haujlwm rau kev rov qab mus rau lub sijhawm tom ntej thaum muaj qhov yuam kev ib ntus.

Kev txwv tus neeg siv khoom rau daim ntawv thov Linear Logic

Thaum ua hauj lwm nrog caij nplooj ntoos hlav-kafka, txoj cai kom txwv tus neeg siv khoom yuav zoo li no:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

Hauv qhov piv txwv, retryAt yog lub sijhawm rov pib dua MessageListenerContainer yog tias nws tseem khiav. Qhov rov pib dua yuav tshwm sim nyob rau hauv ib qho kev sib cais tawm hauv TaskScheduler, qhov kev siv uas tseem muab los ntawm lub caij nplooj ntoo hlav.

Peb pom tus nqi retryAt raws li hauv qab no:

  1. Tus nqi ntawm tus nqi rov hu rov qab yog ntsia.
  2. Raws li tus nqi txee, qhov ncua sij hawm ncua sij hawm tam sim no hauv cov tswv yim rov qab tshawb nrhiav. Lub tswv yim tau tshaj tawm hauv daim ntawv thov nws tus kheej; peb xaiv JSON hom los khaws nws.
  3. Lub ncua sij hawm pom nyob rau hauv JSON array muaj pes tsawg lub vib nas this tom qab uas yuav tsum tau rov ua dua. Tus lej ntawm cov vib nas this ntxiv rau lub sijhawm tam sim no los tsim tus nqi rau retryAt.
  4. Yog tias lub sijhawm tsis pom, ces tus nqi ntawm retryAt yog null thiab cov lus yuav raug xa mus rau DLQ rau phau ntawv parsing.

Nrog rau txoj hauv kev no, txhua yam uas tseem tshuav yog khaws cov xov tooj rov qab hu rau txhua cov lus uas tam sim no tau ua tiav, piv txwv li hauv daim ntawv thov nco. Kev khaws cov kev suav rov qab rau hauv lub cim xeeb tsis yog qhov tseem ceeb rau txoj hauv kev no, vim tias daim ntawv thov kev siv logic tsis tuaj yeem ua haujlwm tag nrho. Tsis zoo li lub caij nplooj ntoos hlav-retry, rov pib dua daim ntawv thov yuav tsis ua rau tag nrho cov lus ploj kom rov ua dua, tab sis tsuas yog rov pib lub tswv yim.

Txoj hauv kev no yuav pab nqa lub nra tawm ntawm lub cev sab nraud, uas tej zaum yuav tsis muaj vim yog lub nra hnyav heev. Hauv lwm lo lus, ntxiv rau kev rov ua dua tshiab, peb ua tiav qhov kev siv ntawm tus qauv Circuit Court tej fais fab.

Hauv peb qhov xwm txheej, qhov kev ua yuam kev tsuas yog 1 xwb, thiab txhawm rau txo qis lub sijhawm kaw vim muaj kev cuam tshuam hauv network ib ntus, peb siv lub tswv yim rov ua dua tshiab nrog me me latency intervals. Qhov no tej zaum yuav tsis haum rau tag nrho cov kev siv pab pawg, yog li kev sib raug zoo ntawm qhov kev ua yuam kev thiab lub caij nyoog tus nqi yuav tsum raug xaiv raws li cov yam ntxwv ntawm lub system.

Ib daim ntawv thov cais rau kev ua cov lus los ntawm cov ntawv thov uas tsis yog kev txiav txim siab logic

Nov yog ib qho piv txwv ntawm cov lej uas xa cov lus mus rau ib daim ntawv thov no (Retryer), uas yuav rov xa mus rau DESTINATION cov ncauj lus thaum lub sijhawm RETRY_AT mus txog:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

Qhov piv txwv qhia tau hais tias ntau cov ntaub ntawv raug xa mus rau hauv headers. Tus nqi ntawm RETRY_AT yog pom nyob rau hauv tib txoj kev raws li rau lub retry mechanism los ntawm Consumer nres. Ntxiv rau DESTINATION thiab RETRY_AT peb hla:

  • GROUP_ID, uas peb pab pawg lus rau kev txheeb xyuas phau ntawv thiab tshawb nrhiav yooj yim.
  • ORIGINAL_PARTITION kom sim ua kom tib neeg siv tau rov ua dua. Qhov kev ntsuas no tuaj yeem ua tsis tau, nyob rau hauv rooj plaub uas qhov kev faib tshiab yuav tau txais los ntawm kev siv tus yuam sij record.key() ntawm cov lus qub.
  • Hloov kho COUNTER tus nqi kom ua raws li cov tswv yim rov ua dua.
  • SEND_TO yog qhov qhia tsis tu ncua seb cov lus xa mus rau kev rov ua dua thaum ncav cuag RETRY_AT lossis muab tso rau hauv DLQ.
  • REASON - yog vim li cas vim li cas kev xa xov raug cuam tshuam.

Retryer khaws cov lus rau resend thiab phau ntawv txheeb xyuas hauv PostgreSQL. Lub timer pib ua haujlwm uas pom cov lus nrog RETRY_AT thiab xa lawv rov qab mus rau ORIGINAL_PARTITION muab faib ntawm DESTINATION cov ncauj lus nrog cov ntaub ntawv tseem ceeb.key().

Thaum xa, cov lus raug tshem tawm ntawm PostgreSQL. Kev piav qhia ntawm cov lus tshwm sim hauv UI yooj yim uas cuam tshuam nrog Retryer ntawm REST API. Nws cov yam ntxwv tseem ceeb yog rov xa lossis rho tawm cov lus los ntawm DLQ, saib cov ntaub ntawv yuam kev thiab tshawb nrhiav cov lus, piv txwv li los ntawm lub npe yuam kev.

Txij li kev tswj xyuas kev nkag tau qhib rau ntawm peb pawg, nws yog qhov yuav tsum tau thov nkag mus rau lub ncauj lus uas Retryer mloog, thiab tso cai rau Retryer sau rau lub ntsiab lus DESTINATION. Qhov no yog qhov tsis yooj yim, tab sis, tsis zoo li lub sijhawm sib tham, peb muaj DLQ thiab UI tag nrho los tswj nws.

Muaj cov xwm txheej thaum lub ntsiab lus nkag tau nyeem los ntawm ntau pawg neeg siv khoom sib txawv, uas nws cov ntawv thov siv cov logic sib txawv. Rov ua dua cov lus los ntawm Retryer rau ib qho ntawm cov ntawv thov no yuav ua rau muaj qhov sib npaug ntawm lwm qhov. Txhawm rau tiv thaiv qhov no, peb tsim ib lub ntsiab lus cais rau kev rov ua dua. Cov ntsiab lus tuaj thiab rov sim dua tuaj yeem nyeem los ntawm tib tus neeg siv khoom yam tsis muaj kev txwv.

Reprocessing txheej xwm tau txais los ntawm Kafka

Los ntawm lub neej ntawd txoj hauv kev no tsis muab kev ua haujlwm hauv Circuit Court breaker, txawm li cas los xij nws tuaj yeem ntxiv rau daim ntawv thov siv spring-cloud-netflix los yog tshiab caij nplooj ntoos hlav huab Circuit Court breaker, wrapping cov chaw uas cov kev pabcuam sab nraud raug hu ua cov kev xav tsim nyog. Tsis tas li ntawd, nws tuaj yeem xaiv lub tswv yim rau bulkhead qauv, uas kuj yuav pab tau. Piv txwv li, nyob rau lub caij nplooj ntoos hlav-huab-netflix qhov no yuav yog ib lub pas dej los yog ib tug semaphore.

xaus

Raws li qhov tshwm sim, peb muaj ib daim ntawv thov cais uas tso cai rau peb rov ua cov lus rov qab yog tias ib qho kev sab nraud tsis muaj nyob rau ib ntus.

Ib qho ntawm cov txiaj ntsig tseem ceeb ntawm daim ntawv thov yog tias nws tuaj yeem siv los ntawm cov tshuab sab nraud khiav ntawm tib Kafka pawg, tsis muaj kev hloov pauv tseem ceeb ntawm lawv sab! Ib daim ntawv thov no tsuas yog yuav tsum nkag mus rau lub ntsiab lus rov ua dua, sau ob peb Kafka headers thiab xa lus mus rau Retryer. Tsis tas yuav tsa tej infrastructure ntxiv. Thiab txhawm rau txo tus naj npawb ntawm cov lus xa tawm los ntawm daim ntawv thov mus rau Retryer thiab rov qab, peb tau txheeb xyuas cov ntawv thov nrog cov lus piav qhia thiab rov ua tiav lawv los ntawm Cov Neeg Siv Khoom nres.

Tau qhov twg los: www.hab.com

Ntxiv ib saib