Kudzokorora zviitiko zvakagamuchirwa kubva kuKafka

Kudzokorora zviitiko zvakagamuchirwa kubva kuKafka

Hei Habr.

Munguva pfupi yapfuura I akagoverana zvakaitika kwaari nezvekuti ndeapi maparamendi isu sechikwata tinowanzo shandisa kuKafka Mugadziri uye Mutengi kuti aswedere pedyo nekuvimbiswa kutumirwa. Muchikamu chino ndinoda kukuudza kuti takaronga sei kugadziriswa kwechiitiko chakagamuchirwa kubva kuKafka nekuda kwekusavapo kwekanguva kwekunze kwehurongwa.

Zvishandiso zvemazuva ano zvinoshanda munzvimbo yakaoma kwazvo. Bhizinesi repfungwa rakaputirwa mune yemazuva ano tekinoroji stack, inomhanya muDocker mufananidzo unotungamirwa neanoimba seKubernetes kana OpenShift, uye kutaurirana nemamwe maapplication kana mabhizinesi mhinduro kuburikidza netani yemuviri uye chaiyo ma routers. Mumamiriro ezvinhu akadaro, chimwe chinhu chinogona kuputsa nguva dzose, saka kudzokorora zviitiko kana imwe yemaitiro ekunze isipo chikamu chinokosha chebhizimisi redu.

Zvakanga zvakaita sei pamberi peKafka

Pakutanga mupurojekiti takashandisa IBM MQ kune asynchronous meseji kutumira. Kana paine chikanganiso chakaitika panguva yekushanda kwesevhisi, meseji yakagamuchirwa inogona kuiswa mune yakafa-tsamba-mutsara (DLQ) yekuwedzera manyorero parsing. Iyo DLQ yakagadzirwa padivi pemutsetse unouya, meseji yakaendeswa mukati meIBM MQ.

Kana kukanganisa kwaive kwenguva pfupi uye isu taigona kuzvisarudzira (semuenzaniso, ResourceAccessException parunhare rweHTTP kana MongoTimeoutException pachikumbiro cheMongoDb), zano rekuyedza zvakare raizoita. Zvisineyi nebranching logic yechishandiso, meseji yepakutanga yakaendeswa kune system queue yekunonoka kutumira, kana kune yakaparadzana application yakaitwa kare kutumira mameseji. Izvi zvinosanganisira nhamba yekutumira zvakare mumusoro wemeseji, iyo yakasungirirwa kunguva yekunonoka kana kupera kweiyo application-level zano. Kana isu tasvika kumagumo ehurongwa asi iyo yekunze system haisati yavepo, saka meseji inozoiswa muDLQ yekunyorwa kwemaoko.

Kutsvaga Mhinduro

Kutsvaga paInternet, unogona kuwana zvinotevera mhinduro. Muchidimbu, zvinokurudzirwa kugadzira musoro wenguva yega yega yekunonoka uye kushandisa Consumer application padivi, iyo inoverenga mameseji nekunonoka kunodiwa.

Kudzokorora zviitiko zvakagamuchirwa kubva kuKafka

Pasinei nenhamba huru yekuongorora kwakanaka, inoratidzika kwandiri haina kubudirira zvachose. Chekutanga pane zvese, nekuti mugadziri, kunze kwekuita bhizinesi zvinodiwa, anozofanira kupedza nguva yakawanda achiita yakatsanangurwa nzira.

Uye zvakare, kana kutonga kwekuwana kwakagoneswa pane Kafka cluster, iwe uchafanirwa kupedza imwe nguva uchigadzira misoro uye nekupa iyo inodiwa kuwana kwavari. Pamusoro peizvi, iwe unozofanirwa kusarudza iyo chaiyo retention.ms parameter kune yega yega misoro yekuyedza zvakare kuitira kuti mameseji ave nenguva yekupondwa uye asanyangarika kubva pairi. Kuitwa uye kukumbira kwekuwana kuchafanirwa kudzokororwa kune yega yega iripo kana nyowani sevhisi.

Ngationei zvino nzira dzipi dzinobuda muuzhinji uye chitubu-kafka kunyanya dzinotipa kune meseji reprocessing. Spring-kafka ine transitive inotsamira pane chitubu-yetry, iyo inopa mabatiro ekugadzirisa akasiyana BackOffPolicies. Ichi chishandiso chinochinjika, asi dhizaini yayo yakakosha kuchengetedza mameseji ekutumira zvakare mundangariro yekushandisa. Izvi zvinoreva kuti kutangazve application nekuda kwekuvandudza kana kukanganisa kwekushanda kunozokonzera kurasikirwa kwemameseji ese akamirira kugadziridzwa. Sezvo iyi poindi yakakosha kune yedu system, isu hatina kuifunga zvakare.

spring-kafka pachayo inopa akati wandei mashandisirwo eContainerAwareErrorHandler, semuenzaniso SeekToCurrentErrorHandler, iyo iwe yaunogona kugadzirisa meseji gare gare pasina kuchinja offset kana paine chikanganiso. Kutanga neshanduro yechitubu-kafka 2.3, zvakave zvichiita kuseta BackOffPolicy.

Iyi nzira inobvumira mameseji akadzokororwa kuti ararame application inotanga, asi hapasati pasisina DLQ michina. Isu takasarudza iyi sarudzo pakutanga kwa2019, tichitenda netariro kuti DLQ yaisazodikanwa (takaita rombo rakanaka uye hatiide mushure memwedzi yakati wandei yekushandisa application neiyo reprocessing system). Zvikanganiso zvenguva pfupi zvakaita kuti SeekToCurrentErrorHandler iite moto. Zvikanganiso zvakasara zvakadhindwa murogi, zvichiita kuti pave nekugadzirisa, uye kugadzirisa kunoenderera mberi neshoko rinotevera.

Sarudzo yekupedzisira

Iko kusevenzeswa kwakavakirwa paSeekToCurrentErrorHandler kwakatikurudzira kugadzira yedu nzira yekutumira mameseji.

Chekutanga pane zvese, isu taida kushandisa chiitiko chiripo uye nekuchiwedzera zvichienderana nekushandisa logic. Kune mutsara logic application, zvingave zvakaringana kumira kuverenga mameseji matsva kwenguva pfupi yakataurwa nehurongwa hwekuedzazve. Kune mamwe maapplication, ndaida kuve nepoindi imwe chete yaizosimbisa iyo yekuedzazve zano. Mukuwedzera, iyi poindi imwe chete inofanirwa kunge iine DLQ mashandiro emaitiro ese ari maviri.

Iyo yekuedzazve zano pachayo inofanirwa kuchengetwa mukushandisa, iyo ine basa rekudzoreredza nguva inotevera kana kukanganisa kwechinguva kukaitika.

Kumisa Mutengi kune Linear Logic Application

Paunenge uchishanda nechirimo-kafka, iyo kodhi yekumisa Mutengi inogona kutaridzika seizvi:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

Mumuenzaniso, retryAt inguva yekutangazve iyo MessageListenerContainer kana ichiri kushanda. Iko kutangwazve kuchaitika mune imwe tambo yakatangwa muTaskScheduler, kuitiswa kwacho kunopihwawo nechirimo.

Isu tinowana iyo yetryAt kukosha nenzira inotevera:

  1. Kukosha kwekaunda yekufonazve kwakatariswa.
  2. Zvichienderana nehuwandu hwekaunda, nguva iripo yekunonoka muchirongwa chekuyedza zvakare inotsvakwa. Iyo zano inoziviswa mukushandisa pachayo; isu takasarudza iyo JSON fomati kuti tiichengete.
  3. Nguva inowanikwa muJSON array ine nhamba yemasekonzi mushure mezvo kugadzirisa kunoda kudzokororwa. Iyi nhamba yemasekonzi inowedzerwa kunguva iripo kugadzira kukosha kweretryAt.
  4. Kana nguva yacho ikasawanikwa, kukosha kweretryAt hakuna maturo uye meseji ichatumirwa kuDLQ kuti inyore.

Neiyi nzira, chasara kuchengetedza nhamba yekudzokororwa mafoni kune yega yega meseji iri kugadziriswa, semuenzaniso mundangariro yekushandisa. Kuchengeta iyo yekuyedza kuverenga mundangariro hakuna kukosha kune iyi nzira, sezvo mutsara logic application haigone kubata kugadziridzwa kwese. Kusiyana nechirimo-yedzazve, kutangazve application hakuzokonzerese kuti mameseji ese arasikirwe kuti adzokororwe, asi zvinongotangazve zano.

Iyi nzira inobatsira kubvisa mutoro kunze kwegadziriro yekunze, iyo inogona kunge isipo nekuda kwekuremerwa kwakanyanya. Mune mamwe mazwi, mukuwedzera kune reprocessing, isu takawana kuitwa kweiyo pateni redunhu mudariki.

Kwatiri, chikumbaridzo chekukanganisa chingori 1, uye kuderedza kuderera kwehurongwa nekuda kwekudzima kwenetiweki kwenguva pfupi, tinoshandisa zano rekuyedzazve granular nediki latency intervals. Izvi zvinogona kunge zvisina kukodzera kune ese maapplication eboka, saka hukama pakati pechikumbaridzo chekukanganisa uye kukosha kwepakati hunofanirwa kusarudzwa zvichienderana nehunhu hwehurongwa.

Chishandiso chakasiyana chekugadzirisa mameseji kubva kumashandisirwo ane asiri-deterministic logic

Heunoi muenzaniso wekodhi inotumira meseji kune yakadaro application (Retryer), iyo inozotumirazve kune DESTINATION musoro kana RETRY_AT nguva yasvika:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

Muenzaniso unoratidza kuti ruzivo rwakawanda runofambiswa mumusoro. Hukoshi hweRETRY_AT hunowanikwa nenzira imwe chete neyezve nzira yekuedza kuburikidza neConsumer stop. Pamusoro peDESTINATION neRETRY_AT tinopasa:

  • GROUP_ID, iyo yatinounganidza nayo mameseji kuti tiongororwe nemaoko nekutsvaga kwakareruka.
  • ORIGINAL_PARTITION yekuedza kuchengetedza Mutengi mumwe chete kuti agadzirise zvakare. Iyi parameter inogona kuve isina chinhu, iyo iyo iyo chikamu chitsva chichawanikwa uchishandisa rekodhi.key () kiyi yeshoko rekutanga.
  • Yakagadziridzwa COUNTER kukosha kuti uteedzere nzira yekuyedza zvakare.
  • SEND_TO inongogara inoratidza kana meseji yatumirwa kuzogadziridzwa kana yasvika RETRY_AT kana kuiswa muDLQ.
  • REASON - chikonzero nei kuseta meseji kwakanganiswa.

Retryer inochengeta mameseji ekutumira zvakare uye manyore parsing muPostgreSQL. Chiyereso chenguva chinotanga basa rinowana mameseji neRETRY_AT yodzidzosera kuchikamu cheORIGINAL_PARTITION chemusoro weDESTINATION nekiyi rekodhi.key().

Kana yangotumirwa, mameseji anobviswa kubva kuPostgreSQL. Manual parsing yemameseji inoitika muUI iri nyore inodyidzana neRetryer kuburikidza neREST API. Zvimiro zvayo zvikuru zviri kutumira kana kudzima mameseji kubva kuDLQ, kuona ruzivo rwekukanganisa uye kutsvaga mameseji, semuenzaniso nezita rekukanganisa.

Sezvo kutonga kwekuwana kuchigoneswa pamasumbu edu, zvinodikanwa kuti uwedzere kukumbira mukana kune iyo Retryer iri kuteerera, uye kubvumidza Retryer kunyora kune DESTINATION musoro. Izvi hazvinetse, asi, kusiyana neiyo yenguva yenyaya nzira, isu tine yakazara-yakazara DLQ uye UI yekuibata.

Pane zviitiko apo musoro unouya unoverengwa nemapoka akati wandei evatengi, ane maapplication anoita akasiyana logic. Kudzokorodza meseji kuburikidza neRetryer kune imwe yeaya maapplication kunozokonzera kudhindwa kune imwe. Kuti tidzivirire kubva kune izvi, tinogadzira imwe nyaya yakaparadzana yekugadzirisa zvakare. Misoro iri kuuya uye yekuyedza zvakare inogona kuverengwa neMutengi mumwechete pasina zvirambidzo.

Kudzokorora zviitiko zvakagamuchirwa kubva kuKafka

By default nzira iyi haipe wedunhu breaker mashandiro, zvisinei inogona kuwedzerwa kune application uchishandisa spring-cloud-netflix kana itsva spring cloud circuit breaker, kuputira nzvimbo uko masevhisi ekunze anodaidzwa kuita abstractions akakodzera. Mukuwedzera, zvinova zvinogoneka kusarudza nzira ye bulkhead muenzaniso, iyo inogonawo kubatsira. Semuenzaniso, muchirimo-cloud-netflix iyi inogona kunge iri tambo dziva kana semaphore.

mhedziso

Nekuda kweizvozvo, isu tine yakaparadzana application inotibvumira kudzokorora meseji kugadzirisa kana chero yekunze system isiri kuwanikwa.

Imwe yemabhenefiti makuru echishandiso ndeyekuti inogona kushandiswa nekunze masisitimu anomhanya pane imwechete Kafka cluster, pasina akakosha shanduko padivi pavo! Chishandiso chakadaro chinongoda kuwana iyo yeedzazve musoro, zadza mashoma eKafka misoro uye tumira meseji kune Retryer. Hapana chikonzero chekusimudza chero humwe hutano hwekuvaka. Uye kuti tideredze huwandu hwemameseji anotamiswa kubva kuapp kuenda kuRetryer nekumashure, takaona maapplication ane linear logic uye takaagadzirisa zvakare kuburikidza neConsumer stop.

Source: www.habr.com

Voeg