Inachakata tena matukio yaliyopokelewa kutoka Kafka

Inachakata tena matukio yaliyopokelewa kutoka Kafka

Habari Habr.

Hivi majuzi mimi alishiriki uzoefu wake kuhusu ni vigezo gani sisi kama timu hutumia mara nyingi kwa Mtayarishaji na Mtumiaji wa Kafka ili kukaribia uwasilishaji wa uhakika. Katika makala hii nataka kukuambia jinsi tulivyopanga uchakataji upya wa tukio lililopokelewa kutoka Kafka kutokana na kutopatikana kwa mfumo wa nje kwa muda.

Maombi ya kisasa yanafanya kazi katika mazingira magumu sana. Mantiki ya biashara iliyofungwa kwenye rundo la teknolojia ya kisasa, inayoendeshwa katika picha ya Doka inayodhibitiwa na mwimbaji kama Kubernetes au OpenShift, na kuwasiliana na programu zingine au suluhu za biashara kupitia msururu wa vipanga njia halisi na pepe. Katika mazingira kama haya, kitu kinaweza kuvunjika kila wakati, kwa hivyo kuchakata tena matukio ikiwa moja ya mifumo ya nje haipatikani ni sehemu muhimu ya michakato yetu ya biashara.

Ilikuwaje kabla ya Kafka

Hapo awali katika mradi tulitumia IBM MQ kwa uwasilishaji wa ujumbe usiolingana. Ikiwa hitilafu yoyote ilitokea wakati wa uendeshaji wa huduma, ujumbe uliopokelewa unaweza kuwekwa kwenye foleni ya herufi-kufa (DLQ) kwa uchanganuzi zaidi wa mwongozo. DLQ iliundwa karibu na foleni inayoingia, ujumbe ulihamishwa ndani ya IBM MQ.

Ikiwa hitilafu ilikuwa ya muda na tungeweza kuibainisha (kwa mfano, ResourceAccessException kwenye simu ya HTTP au MongoTimeoutException kwenye ombi la MongoDb), basi mkakati wa kujaribu tena utaanza kutumika. Bila kujali mantiki ya matawi ya programu, ujumbe asili ulihamishwa hadi kwenye foleni ya mfumo kwa kuchelewa kutuma, au kwa programu tofauti ambayo ilitumwa zamani kutuma tena ujumbe. Hii inajumuisha nambari ya kutuma tena katika kichwa cha ujumbe, ambayo inaambatana na muda wa kuchelewa au mwisho wa mkakati wa kiwango cha programu. Ikiwa tumefika mwisho wa mkakati lakini mfumo wa nje bado haupatikani, basi ujumbe utawekwa kwenye DLQ kwa uchanganuzi wa mwongozo.

Kutafuta suluhu

Kutafuta kwenye mtandao, unaweza kupata zifuatazo uamuzi. Kwa kifupi, inapendekezwa kuunda mada kwa kila muda wa kuchelewa na kutekeleza maombi ya Mtumiaji kwa upande, ambayo itasoma ujumbe na ucheleweshaji unaohitajika.

Inachakata tena matukio yaliyopokelewa kutoka Kafka

Licha ya idadi kubwa ya hakiki nzuri, inaonekana kwangu haijafanikiwa kabisa. Kwanza kabisa, kwa sababu msanidi programu, pamoja na kutekeleza mahitaji ya biashara, atalazimika kutumia muda mwingi kutekeleza utaratibu ulioelezewa.

Kwa kuongeza, ikiwa udhibiti wa ufikiaji umewezeshwa kwenye nguzo ya Kafka, utahitaji kutumia muda kuunda mada na kutoa ufikiaji muhimu kwao. Kwa kuongeza hii, utahitaji kuchagua kigezo sahihi cha retention.ms kwa kila mada ya kujaribu tena ili ujumbe uwe na wakati wa kutumwa tena na usipotee kutoka kwake. Utekelezaji na ombi la ufikiaji itabidi kurudiwa kwa kila huduma iliyopo au mpya.

Hebu sasa tuone ni njia gani za spring kwa ujumla na spring-kafka hasa hutupatia kwa ajili ya kuchakata ujumbe. Spring-kafka ina utegemezi wa mpito wa kujaribu tena majira ya kuchipua, ambayo hutoa muhtasari wa kudhibiti BackOffPolicies tofauti. Hiki ni zana inayoweza kunyumbulika, lakini kikwazo chake kikubwa ni kuhifadhi ujumbe wa kutuma tena kwenye kumbukumbu ya programu. Hii inamaanisha kuwa kuanzisha upya programu kwa sababu ya sasisho au hitilafu ya uendeshaji kutasababisha upotevu wa ujumbe wote unaosubiri kuchakatwa tena. Kwa kuwa hatua hii ni muhimu kwa mfumo wetu, hatukuzingatia zaidi.

spring-kafka yenyewe hutoa utekelezaji kadhaa wa ContainerAwareErrorHandler, kwa mfano SeekToCurrentErrorHandler, ambayo unaweza kuchakata ujumbe baadaye bila kubadilisha kifaa ikiwa kuna hitilafu. Kuanzia na toleo la spring-kafka 2.3, iliwezekana kuweka BackOffPolicy.

Mbinu hii huruhusu ujumbe uliochakatwa ili uendelee kusalimishwa na uanzishaji upya wa programu, lakini bado hakuna utaratibu wa DLQ. Tulichagua chaguo hili mwanzoni mwa 2019, tukiamini kwa matumaini kuwa DLQ haitahitajika (tulikuwa na bahati na kwa kweli hatukuihitaji baada ya miezi kadhaa ya kutumia programu na mfumo kama huo wa kuchakata tena). Hitilafu za muda zilisababisha SeekToCurrentErrorHandler kuwaka. Makosa yaliyosalia yalichapishwa kwenye logi, na kusababisha kukomesha, na usindikaji uliendelea na ujumbe unaofuata.

Uamuzi wa mwisho

Utekelezaji kulingana na SeekToCurrentErrorHandler ulituhimiza kuunda utaratibu wetu wa kutuma ujumbe tena.

Kwanza kabisa, tulitaka kutumia uzoefu uliopo na kuupanua kulingana na mantiki ya programu. Kwa utumizi wa mantiki ya mstari, itakuwa vyema kuacha kusoma ujumbe mpya kwa muda mfupi uliobainishwa na mkakati wa kujaribu tena. Kwa maombi mengine, nilitaka kuwa na sehemu moja ambayo ingetekeleza mkakati wa kujaribu tena. Kwa kuongeza, hatua hii moja lazima iwe na utendaji wa DLQ kwa mbinu zote mbili.

Mkakati wa kujaribu tena lazima uhifadhiwe katika programu, ambayo inawajibika kwa kurejesha muda unaofuata wakati hitilafu ya muda inatokea.

Kusimamisha Mtumiaji kwa Maombi ya Mantiki ya Linear

Wakati wa kufanya kazi na spring-kafka, nambari ya kusimamisha Mtumiaji inaweza kuonekana kama hii:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

Katika mfano, retryAt ni wakati wa kuanzisha upya MessageListenerContainer ikiwa bado inafanya kazi. Uzinduzi upya utatokea katika thread tofauti iliyozinduliwa katika TaskScheduler, utekelezaji ambao pia hutolewa na spring.

Tunapata thamani ya tryAt kwa njia ifuatayo:

  1. Thamani ya kaunta ya kupiga tena inatazamwa.
  2. Kulingana na thamani ya kaunta, muda wa sasa wa kuchelewa katika mkakati wa kujaribu tena hutafutwa. Mkakati umetangazwa katika programu yenyewe; tulichagua umbizo la JSON ili kuihifadhi.
  3. Muda unaopatikana katika safu ya JSON una idadi ya sekunde ambapo uchakataji utahitaji kurudiwa. Idadi hii ya sekunde huongezwa kwa wakati wa sasa ili kuunda thamani ya kujaribu tena.
  4. Ikiwa muda haupatikani, basi thamani ya retryAt ni batili na ujumbe utatumwa kwa DLQ kwa uchanganuzi wa mwongozo.

Kwa mbinu hii, kilichobaki ni kuokoa idadi ya simu zinazorudiwa kwa kila ujumbe ambao unashughulikiwa kwa sasa, kwa mfano katika kumbukumbu ya programu. Kuweka hesabu ya kujaribu tena kwenye kumbukumbu sio muhimu kwa mbinu hii, kwani programu tumizi ya mantiki ya mstari haiwezi kushughulikia uchakataji kwa ujumla. Tofauti na kujaribu tena majira ya kuchipua, kuanzisha upya programu hakutasababisha ujumbe wote kupotea kuchakatwa, lakini kutaanza upya mkakati.

Njia hii husaidia kuondoa mzigo kwenye mfumo wa nje, ambao unaweza kuwa haupatikani kwa sababu ya mzigo mkubwa sana. Kwa maneno mengine, pamoja na kuchakata tena, tulipata utekelezaji wa muundo mvunjaji wa mzunguko.

Kwa upande wetu, kiwango cha juu cha hitilafu ni 1 pekee, na ili kupunguza muda wa kukatika kwa mfumo kwa sababu ya kukatika kwa mtandao kwa muda, tunatumia mkakati wa kujaribu tena punjepunje na vipindi vidogo vya kusubiri. Hii inaweza kuwa haifai kwa programu zote za kikundi, kwa hivyo uhusiano kati ya kizingiti cha makosa na thamani ya muda lazima ichaguliwe kulingana na sifa za mfumo.

Programu tofauti ya kuchakata ujumbe kutoka kwa programu zisizo na mantiki isiyoamua

Huu hapa ni mfano wa msimbo unaotuma ujumbe kwa programu kama hiyo (Retryer), ambayo itatuma tena kwa mada ya DESTINATION wakati wa RETRY_AT utakapofikiwa:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

Mfano unaonyesha kuwa habari nyingi hupitishwa kwa vichwa. Thamani ya RETRY_AT inapatikana kwa njia sawa na ya utaratibu wa kujaribu tena kupitia kituo cha Mtumiaji. Mbali na DESTINATION na RETRY_AT tunapita:

  • GROUP_ID, ambayo kwayo tunaweka pamoja jumbe kwa uchanganuzi wa mikono na utafutaji uliorahisishwa.
  • ORIGINAL_PARTITION ili kujaribu kuweka Mtumiaji sawa kwa kuchakata tena. Kigezo hiki kinaweza kuwa batili, katika hali ambayo kizigeu kipya kitapatikana kwa kutumia kitufe cha record.key() cha ujumbe asili.
  • Imesasisha thamani COUNTER ili kufuata mkakati wa kujaribu tena.
  • SEND_TO ni ishara ya mara kwa mara ikiwa ujumbe unatumwa kwa kuchakatwa unapofika RETRY_AT au kuwekwa kwenye DLQ.
  • REASON - sababu kwa nini uchakataji wa ujumbe ulikatizwa.

Kijaribu tena huhifadhi jumbe za kutuma tena na kuchanganua mwenyewe katika PostgreSQL. Kipima muda huanzisha kazi inayopata ujumbe kwenye RETRY_AT na kuzituma tena kwenye sehemu ya ORIGINAL_PARTITION ya mada ya DESTINATION kwa ufunguo wa kumbukumbu.key().

Mara baada ya kutumwa, ujumbe hufutwa kutoka kwa PostgreSQL. Uchanganuzi wa ujumbe wenyewe hutokea katika Kiolesura rahisi ambacho hutangamana na Retryer kupitia REST API. Vipengele vyake kuu ni kutuma tena au kufuta ujumbe kutoka kwa DLQ, kutazama habari za hitilafu na kutafuta ujumbe, kwa mfano kwa jina la hitilafu.

Kwa kuwa udhibiti wa ufikiaji umewezeshwa kwenye makundi yetu, ni muhimu kuomba zaidi ufikiaji wa mada ambayo Retryer inasikiliza, na kuruhusu Retryer kuandika kwa mada ya DESTINATION. Hili ni jambo lisilofaa, lakini, tofauti na mbinu ya mada ya muda, tuna DLQ kamili na UI ya kuidhibiti.

Kuna matukio wakati mada inayoingia inasomwa na makundi mbalimbali ya watumiaji, ambao maombi yao yanatekeleza mantiki tofauti. Kuchakata tena ujumbe kupitia Retryer kwa mojawapo ya programu hizi kutasababisha nakala kwa upande mwingine. Ili kulinda dhidi ya hili, tunaunda mada tofauti kwa usindikaji upya. Mada zinazoingia na kujaribu tena zinaweza kusomwa na Mtumiaji sawa bila vikwazo vyovyote.

Inachakata tena matukio yaliyopokelewa kutoka Kafka

Kwa chaguo-msingi mbinu hii haitoi utendakazi wa kivunja mzunguko, hata hivyo inaweza kuongezwa kwa programu kwa kutumia spring-cloud-netflix au mpya spring wingu mzunguko mhalifu, kufunika mahali ambapo huduma za nje huitwa katika vifupisho vinavyofaa. Kwa kuongeza, inawezekana kuchagua mkakati kichwa cha habari muundo, ambayo inaweza pia kuwa na manufaa. Kwa mfano, katika spring-cloud-netflix hii inaweza kuwa bwawa la thread au semaphore.

Pato

Kwa hivyo, tuna programu tofauti inayoturuhusu kurudia uchakataji wa ujumbe ikiwa mfumo wowote wa nje haupatikani kwa muda.

Moja ya faida kuu za programu ni kwamba inaweza kutumika na mifumo ya nje inayoendesha kwenye nguzo moja ya Kafka, bila marekebisho makubwa kwa upande wao! Programu kama hiyo itahitaji tu kufikia mada ya kujaribu tena, jaza vichwa vichache vya Kafka na utume ujumbe kwa Kijaribu tena. Hakuna haja ya kuongeza miundombinu yoyote ya ziada. Na ili kupunguza idadi ya barua pepe zinazohamishwa kutoka kwa programu hadi kwa Retryer na kurudi, tulitambua programu kwa mantiki ya mstari na kuzichakata tena kupitia kituo cha Mtumiaji.

Chanzo: mapenzi.com

Kuongeza maoni