Avvenimenti ta' riproċessar riċevuti mingħand Kafka

Avvenimenti ta' riproċessar riċevuti mingħand Kafka

Ħej Habr.

Riċentement I qasam l-esperjenza tiegħu dwar liema parametri aħna bħala tim l-aktar spiss nużaw għall-Produttur u l-Konsumatur Kafka biex jersqu eqreb lejn kunsinna garantita. F'dan l-artikolu nixtieq ngħidlek kif organizzajna l-ipproċessar mill-ġdid ta 'avveniment li wasal mingħand Kafka bħala riżultat ta' indisponibbiltà temporanja tas-sistema esterna.

Applikazzjonijiet moderni joperaw f'ambjent kumpless ħafna. Loġika tan-negozju mgeżwra f'munzell ta 'teknoloġija moderna, taħdem f'immaġni Docker ġestita minn orkestratur bħal Kubernetes jew OpenShift, u tikkomunika ma' applikazzjonijiet oħra jew soluzzjonijiet ta 'intrapriża permezz ta' katina ta 'routers fiżiċi u virtwali. F'ambjent bħal dan, xi ħaġa dejjem tista 'tinkisser, għalhekk l-ipproċessar mill-ġdid ta' avvenimenti jekk waħda mis-sistemi esterni ma tkunx disponibbli hija parti importanti mill-proċessi tan-negozju tagħna.

Kif kien qabel Kafka

Aktar kmieni fil-proġett użajna l-IBM MQ għall-kunsinna asinkronika tal-messaġġi. Jekk seħħ xi żball waqt it-tħaddim tas-servizz, il-messaġġ riċevut jista' jitqiegħed f'dead-letter-queue (DLQ) għal aktar parsing manwali. Id-DLQ inħoloq ħdejn il-kju deħlin, il-messaġġ ġie trasferit ġewwa IBM MQ.

Jekk l-iżball kien temporanju u nistgħu niddeterminawh (pereżempju, ResourceAccessException fuq sejħa HTTP jew MongoTimeoutException fuq talba MongoDb), allura l-istrateġija tal-prova mill-ġdid tidħol fis-seħħ. Irrispettivament mill-loġika tal-fergħat tal-applikazzjoni, il-messaġġ oriġinali ġie mċaqlaq jew fil-kju tas-sistema biex jintbagħat ittardjat, jew għal applikazzjoni separata li saret ħafna ilu biex terġa 'tibgħat il-messaġġi. Dan jinkludi numru mill-ġdid fl-header tal-messaġġ, li huwa marbut mal-intervall tad-dewmien jew it-tmiem tal-istrateġija fil-livell tal-applikazzjoni. Jekk wasalna fit-tmiem tal-istrateġija iżda s-sistema esterna għadha mhux disponibbli, allura l-messaġġ jitqiegħed fid-DLQ għal parsing manwali.

Tiftix tas-soluzzjoni

Tiftix fuq l-Internet, tista' ssib dan li ġej deċiżjoni. Fil-qosor, huwa propost li jinħoloq suġġett għal kull intervall ta 'dewmien u jiġu implimentati applikazzjonijiet tal-Konsumatur fuq in-naħa, li se jaqraw messaġġi bid-dewmien meħtieġ.

Avvenimenti ta' riproċessar riċevuti mingħand Kafka

Minkejja n-numru kbir ta 'reviżjonijiet pożittivi, jidhirli li mhux għal kollox suċċess. L-ewwelnett, minħabba li l-iżviluppatur, minbarra li jimplimenta r-rekwiżiti tan-negozju, ikollu jqatta 'ħafna ħin jimplimenta l-mekkaniżmu deskritt.

Barra minn hekk, jekk il-kontroll tal-aċċess ikun attivat fuq il-cluster Kafka, ikollok tqatta’ ftit ħin toħloq suġġetti u tipprovdi l-aċċess meħtieġ għalihom. Minbarra dan, ser ikollok bżonn tagħżel il-parametru retention.ms korrett għal kull wieħed mis-suġġetti mill-ġdid sabiex il-messaġġi jkollhom ħin biex jintbagħtu mill-ġdid u ma jisparixxux minnu. L-implimentazzjoni u t-talba għall-aċċess se jkollhom jiġu ripetuti għal kull servizz eżistenti jew ġdid.

Ejja issa naraw liema mekkaniżmi spring in ġenerali u spring-kafka b'mod partikolari jipprovdulna għall-ipproċessar mill-ġdid tal-messaġġi. Spring-kafka għandu dipendenza tranżittiva fuq spring-retry, li jipprovdi astrazzjonijiet għall-ġestjoni ta' BackOffPolicies differenti. Din hija għodda pjuttost flessibbli, iżda l-iżvantaġġ sinifikanti tagħha huwa li taħżen messaġġi biex jintbagħtu mill-ġdid fil-memorja tal-applikazzjoni. Dan ifisser li l-istartjar mill-ġdid tal-applikazzjoni minħabba aġġornament jew żball operattiv jirriżulta fit-telf tal-messaġġi kollha sakemm jiġu pproċessati mill-ġdid. Peress li dan il-punt huwa kritiku għas-sistema tagħna, aħna ma kkunsidrawx aktar.

spring-kafka innifsu jipprovdi diversi implimentazzjonijiet ta' ContainerAwareErrorHandler, pereżempju SeekToCurrentErrorHandler, li biha tista' tipproċessa l-messaġġ aktar tard mingħajr ma tbiddel l-offset f'każ ta' żball. Nibdew bil-verżjoni tar-rebbiegħa-kafka 2.3, sar possibbli li jiġi stabbilit BackOffPolicy.

Dan l-approċċ jippermetti messaġġi pproċessati mill-ġdid biex jibqgħu ħajjin mill-ġdid tal-applikazzjoni, iżda għad m'hemm l-ebda mekkaniżmu DLQ. Għażilna din l-għażla fil-bidu tal-2019, b’mod ottimist li nemmnu li DLQ ma kienx se jkun meħtieġ (konna xxurtjati u fil-fatt ma kellniex bżonnha wara diversi xhur ta’ tħaddim tal-applikazzjoni b’sistema ta’ riproċessar bħal din). Żbalji temporanji kkawżaw li SeekToCurrentErrorHandler jispara. L-iżbalji li kien fadal ġew stampati fir-reġistru, li rriżultaw f'offset, u l-ipproċessar kompla bil-messaġġ li jmiss.

Deċiżjoni finali

L-implimentazzjoni bbażata fuq SeekToCurrentErrorHandler wasslitna biex niżviluppaw il-mekkaniżmu tagħna stess biex nibagħtu mill-ġdid il-messaġġi.

L-ewwelnett, ridna nużaw l-esperjenza eżistenti u nespanduha skont il-loġika tal-applikazzjoni. Għal applikazzjoni ta' loġika lineari, ikun ottimali li tieqaf taqra messaġġi ġodda għal perjodu qasir ta' żmien speċifikat mill-istrateġija tal-prova mill-ġdid. Għal applikazzjonijiet oħra, ridt li jkolli punt wieħed li jinforza l-istrateġija mill-ġdid. Barra minn hekk, dan il-punt uniku għandu jkollu funzjonalità DLQ għaż-żewġ approċċi.

L-istrateġija tal-prova mill-ġdid innifisha għandha tinħażen fl-applikazzjoni, li hija responsabbli għall-irkupru tal-intervall li jmiss meta jseħħ żball temporanju.

Waqfien tal-Konsumatur għal Applikazzjoni Loġika Lineari

Meta taħdem ma 'spring-kafka, il-kodiċi biex iwaqqaf lill-Konsumatur jista' jidher xi ħaġa bħal din:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

Fl-eżempju, retryAt huwa l-ħin biex terġa 'tibda l-MessageListenerContainer jekk ikun għadu għaddej. It-tnedija mill-ġdid se sseħħ f'ħajt separat imniedi f'TaskScheduler, li l-implimentazzjoni tiegħu hija pprovduta wkoll mir-rebbiegħa.

Insibu l-valur retryAt bil-mod li ġej:

  1. Il-valur tal-counter tas-sejħa mill-ġdid huwa mħares 'il fuq.
  2. Ibbażat fuq il-valur tal-counter, jitfittex l-intervall tad-dewmien attwali fl-istrateġija tal-prova mill-ġdid. L-istrateġija hija ddikjarata fl-applikazzjoni nnifisha għażilna l-format JSON biex naħżnuha.
  3. L-intervall misjub fl-array JSON fih in-numru ta' sekondi li warajhom l-ipproċessar ikun jeħtieġ li jiġi ripetut. Dan in-numru ta' sekondi huwa miżjud mal-ħin kurrenti biex jifforma l-valur għal retryAt.
  4. Jekk l-intervall ma jinstabx, allura l-valur ta 'retryAt huwa null u l-messaġġ jintbagħat lil DLQ għal parsing manwali.

B'dan l-approċċ, kulma jibqa 'huwa li jiġi salvat in-numru ta' sejħiet ripetuti għal kull messaġġ li bħalissa qed jiġi pproċessat, pereżempju fil-memorja tal-applikazzjoni. Iż-żamma tal-għadd ta' tentattivi mill-ġdid fil-memorja mhix kritika għal dan l-approċċ, peress li applikazzjoni ta' loġika lineari ma tistax tieħu ħsieb l-ipproċessar kollu kemm hu. B'differenza mill-ġdid tar-rebbiegħa, il-bidu mill-ġdid tal-applikazzjoni mhux se jikkawża li l-messaġġi kollha jintilfu jiġu pproċessati mill-ġdid, iżda sempliċement jerġa 'jibda l-istrateġija.

Dan l-approċċ jgħin biex titneħħa t-tagħbija mis-sistema esterna, li tista 'ma tkunx disponibbli minħabba tagħbija tqila ħafna. Fi kliem ieħor, minbarra l-ipproċessar mill-ġdid, ksibna l-implimentazzjoni tal-mudell circuit breaker.

Fil-każ tagħna, il-limitu ta 'żball huwa biss 1, u biex jimminimizzaw il-perijodi ta' waqfien tas-sistema minħabba qtugħ temporanju tan-netwerk, nużaw strateġija ta 'prova mill-ġdid granulari ħafna b'intervalli ta' latenza żgħar. Dan jista 'ma jkunx adattat għall-applikazzjonijiet kollha tal-grupp, għalhekk ir-relazzjoni bejn il-limitu ta' żball u l-valur tal-intervall għandha tintgħażel abbażi tal-karatteristiċi tas-sistema.

Applikazzjoni separata għall-ipproċessar ta' messaġġi minn applikazzjonijiet b'loġika mhux deterministika

Hawn eżempju ta’ kodiċi li jibgħat messaġġ lil tali applikazzjoni (Retryer), li jerġa’ jibgħat lis-suġġett DESTINAZZJONI meta jintlaħaq il-ħin RETRY_AT:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

L-eżempju juri li ħafna informazzjoni hija trażmessa f'headers. Il-valur ta' RETRY_AT jinstab bl-istess mod bħall-mekkaniżmu tal-prova mill-ġdid permezz tal-waqfien tal-Konsumatur. Minbarra DESTINATION u RETRY_AT ngħaddu:

  • GROUP_ID, li permezz tiegħu niġbru messaġġi għal analiżi manwali u tfittxija simplifikata.
  • ORIGINAL_PARTITION biex tipprova żżomm l-istess Konsumatur għall-ipproċessar mill-ġdid. Dan il-parametru jista' jkun null, f'liema każ il-partizzjoni l-ġdida tinkiseb bl-użu taċ-ċavetta record.key() tal-messaġġ oriġinali.
  • Aġġorna l-valur COUNTER biex issegwi l-istrateġija pprova mill-ġdid.
  • SEND_TO hija kostanti li tindika jekk il-messaġġ jintbagħatx għall-ipproċessar mill-ġdid malli jilħaq RETRY_AT jew jitqiegħed f'DLQ.
  • RAĠUNI - ir-raġuni għaliex l-ipproċessar tal-messaġġi ġie interrott.

Retryer jaħżen messaġġi biex jerġgħu jintbagħtu u parsing manwali f'PostgreSQL. Timer jibda kompitu li jsib messaġġi b'RETRY_AT u jibgħathom lura lejn il-partizzjoni ORIGINAL_PARTITION tas-suġġett DESTINAZZJONI biċ-ċavetta record.key().

Ladarba jintbagħtu, il-messaġġi jitħassru minn PostgreSQL. L-analiżi manwali tal-messaġġi sseħħ f'UI sempliċi li jinteraġixxi ma' Retryer permezz tal-API REST. Il-karatteristiċi ewlenin tagħha huma li terġa 'tibgħat jew tħassar messaġġi minn DLQ, tara informazzjoni dwar żball u tfittex messaġġi, pereżempju bl-isem ta' żball.

Peress li l-kontroll tal-aċċess huwa attivat fuq il-clusters tagħna, huwa meħtieġ li addizzjonalment titlob aċċess għas-suġġett li Retryer qed jisma ', u jippermetti lil Retryer jikteb fis-suġġett DESTINAZZJONI. Dan huwa inkonvenjenti, iżda, b'differenza mill-approċċ tas-suġġett tal-intervall, għandna DLQ u UI sħiħ biex jimmaniġġjawha.

Hemm każijiet meta suġġett li jkun dieħel jinqara minn diversi gruppi ta' konsumaturi differenti, li l-applikazzjonijiet tagħhom jimplimentaw loġika differenti. L-ipproċessar mill-ġdid ta' messaġġ permezz ta' Retryer għal waħda minn dawn l-applikazzjonijiet jirriżulta f'duplikat fuq l-oħra. Biex nipproteġu minn dan, noħolqu suġġett separat għall-ipproċessar mill-ġdid. Is-suġġetti li jidħlu u li jerġgħu jippruvaw jistgħu jinqraw mill-istess Konsumatur mingħajr ebda restrizzjoni.

Avvenimenti ta' riproċessar riċevuti mingħand Kafka

B'mod awtomatiku dan l-approċċ ma jipprovdix funzjonalità ta 'circuit breaker, madankollu jista' jiġi miżjud mal-applikazzjoni bl-użu sħaba tar-rebbiegħa-netflix jew ġodda sħaba tar-rebbiegħa circuit breaker, tgeżwir tal-postijiet fejn is-servizzi esterni jissejħu fi astrazzjonijiet xierqa. Barra minn hekk, isir possibbli li tagħżel strateġija għal paratija mudell, li jista 'jkun utli wkoll. Pereżempju, fir-rebbiegħa-cloud-netflix dan jista 'jkun pool ta' ħajt jew semaforu.

Output

Bħala riżultat, għandna applikazzjoni separata li tippermettilna nirrepetu l-ipproċessar tal-messaġġi jekk xi sistema esterna ma tkunx temporanjament disponibbli.

Wieħed mill-vantaġġi ewlenin tal-applikazzjoni huwa li tista 'tintuża minn sistemi esterni li jaħdmu fuq l-istess cluster Kafka, mingħajr modifiki sinifikanti min-naħa tagħhom! Applikazzjoni bħal din tkun teħtieġ biss aċċess għas-suġġett mill-ġdid, timla ftit headers Kafka u tibgħat messaġġ lill-Retryer. M'hemmx bżonn li titqajjem xi infrastruttura addizzjonali. U sabiex innaqqsu n-numru ta 'messaġġi trasferiti mill-applikazzjoni għal Retryer u lura, identifikajna applikazzjonijiet b'loġika lineari u pproċessajnahom mill-ġdid permezz tal-waqfien tal-Konsumatur.

Sors: www.habr.com

Żid kumment