Reprocessing event na natanggap mula sa Kafka

Reprocessing event na natanggap mula sa Kafka

Hoy Habr.

Kamakailan I nagbahagi ng kanyang karanasan tungkol sa kung anong mga parameter ang pinakamadalas naming ginagamit bilang isang team para sa Kafka Producer at Consumer upang mas mapalapit sa garantisadong paghahatid. Sa artikulong ito gusto kong sabihin sa iyo kung paano namin inayos ang muling pagpoproseso ng isang kaganapan na natanggap mula sa Kafka bilang resulta ng pansamantalang hindi magagamit ng panlabas na sistema.

Ang mga modernong aplikasyon ay gumagana sa isang napakakomplikadong kapaligiran. Logic ng negosyo na nakabalot sa isang modernong stack ng teknolohiya, tumatakbo sa isang imahe ng Docker na pinamamahalaan ng isang orkestra tulad ng Kubernetes o OpenShift, at nakikipag-ugnayan sa iba pang mga application o mga solusyon sa enterprise sa pamamagitan ng isang chain ng mga pisikal at virtual na router. Sa ganitong kapaligiran, maaaring palaging masira ang isang bagay, kaya ang muling pagpoproseso ng mga kaganapan kung hindi available ang isa sa mga external na system ay isang mahalagang bahagi ng aming mga proseso sa negosyo.

Paano ito bago si Kafka

Mas maaga sa proyekto, ginamit namin ang IBM MQ para sa asynchronous na paghahatid ng mensahe. Kung may anumang error na nangyari sa panahon ng pagpapatakbo ng serbisyo, ang natanggap na mensahe ay maaaring ilagay sa isang dead-letter-queue (DLQ) para sa karagdagang manu-manong pag-parse. Ang DLQ ay nilikha sa tabi ng papasok na pila, ang mensahe ay inilipat sa loob ng IBM MQ.

Kung pansamantala lang ang error at matutukoy namin ito (halimbawa, isang ResourceAccessException sa isang HTTP na tawag o isang MongoTimeoutException sa isang kahilingan sa MongoDb), magkakabisa ang diskarteng muling subukan. Anuman ang sumasanga na lohika ng application, ang orihinal na mensahe ay inilipat alinman sa system queue para sa naantalang pagpapadala, o sa isang hiwalay na application na matagal nang ginawa upang muling ipadala ang mga mensahe. Kabilang dito ang isang muling ipadalang numero sa header ng mensahe, na nakatali sa pagitan ng pagkaantala o pagtatapos ng diskarte sa antas ng aplikasyon. Kung naabot na natin ang dulo ng diskarte ngunit hindi pa rin magagamit ang panlabas na sistema, ilalagay ang mensahe sa DLQ para sa manu-manong pag-parse.

Maghanap ng isang solusyon

Paghahanap sa Internet, mahahanap mo ang sumusunod desisyon. Sa madaling salita, iminumungkahi na lumikha ng isang paksa para sa bawat agwat ng pagkaantala at ipatupad ang mga aplikasyon ng Consumer sa gilid, na magbabasa ng mga mensahe na may kinakailangang pagkaantala.

Reprocessing event na natanggap mula sa Kafka

Sa kabila ng malaking bilang ng mga positibong pagsusuri, tila sa akin ay hindi lubos na matagumpay. Una sa lahat, dahil ang developer, bilang karagdagan sa pagpapatupad ng mga kinakailangan sa negosyo, ay kailangang gumastos ng maraming oras sa pagpapatupad ng inilarawan na mekanismo.

Bilang karagdagan, kung ang kontrol sa pag-access ay pinagana sa kumpol ng Kafka, kakailanganin mong gumugol ng ilang oras sa paglikha ng mga paksa at pagbibigay ng kinakailangang access sa mga ito. Bilang karagdagan dito, kakailanganin mong piliin ang tamang parameter ng retention.ms para sa bawat isa sa mga paksang muling subukan upang ang mga mensahe ay magkaroon ng oras upang magalit at hindi mawala mula dito. Ang pagpapatupad at paghiling ng pag-access ay kailangang ulitin para sa bawat umiiral o bagong serbisyo.

Tingnan natin ngayon kung anong mga mekanismo ang tagsibol sa pangkalahatan at partikular na ibinibigay sa atin ng spring-kafka para sa muling pagproseso ng mensahe. Ang Spring-kafka ay may transitive dependency sa spring-retry, na nagbibigay ng mga abstraction para sa pamamahala ng iba't ibang BackOffPolicies. Ito ay isang medyo nababaluktot na tool, ngunit ang makabuluhang disbentaha nito ay ang pag-iimbak ng mga mensahe para sa muling pagpapadala sa memorya ng application. Nangangahulugan ito na ang pag-restart ng application dahil sa isang update o isang error sa pagpapatakbo ay magreresulta sa pagkawala ng lahat ng mga mensahe na nakabinbing muling pagproseso. Dahil kritikal ang puntong ito para sa aming system, hindi na namin ito pinag-isipan pa.

Ang spring-kafka mismo ay nagbibigay ng ilang mga pagpapatupad ng ContainerAwareErrorHandler, halimbawa SeekToCurrentErrorHandler, kung saan maaari mong iproseso ang mensahe sa ibang pagkakataon nang hindi inililipat ang offset kung sakaling magkaroon ng error. Simula sa bersyon ng spring-kafka 2.3, naging posible na itakda ang BackOffPolicy.

Ang diskarte na ito ay nagbibigay-daan sa mga reprocessed na mensahe na makaligtas sa pag-restart ng application, ngunit wala pa ring mekanismo ng DLQ. Pinili namin ang opsyong ito sa simula ng 2019, na may positibong paniniwala na hindi kakailanganin ang DLQ (maswerte kami at talagang hindi namin ito kailangan pagkatapos ng ilang buwan ng pagpapatakbo ng application gamit ang naturang reprocessing system). Ang mga pansamantalang error ay naging sanhi ng paggana ng SeekToCurrentErrorHandler. Ang natitirang mga error ay na-print sa log, na nagresulta sa isang offset, at ang pagproseso ay nagpatuloy sa susunod na mensahe.

Huling desisyon

Ang pagpapatupad batay sa SeekToCurrentErrorHandler ay nagtulak sa amin na bumuo ng aming sariling mekanismo para sa muling pagpapadala ng mga mensahe.

Una sa lahat, nais naming gamitin ang umiiral na karanasan at palawakin ito depende sa lohika ng aplikasyon. Para sa isang linear logic application, magiging pinakamainam na ihinto ang pagbabasa ng mga bagong mensahe sa loob ng maikling panahon na tinukoy ng diskarteng muling subukan. Para sa iba pang mga application, gusto kong magkaroon ng isang punto na magpapatupad ng diskarte sa muling pagsubok. Bilang karagdagan, ang nag-iisang puntong ito ay dapat na may DLQ functionality para sa parehong mga diskarte.

Ang diskarteng muling subukan mismo ay dapat na naka-imbak sa application, na responsable para sa pagkuha ng susunod na agwat kapag may nangyaring pansamantalang error.

Paghinto sa Consumer para sa isang Linear Logic Application

Kapag nagtatrabaho sa spring-kafka, ang code para ihinto ang Consumer ay maaaring magmukhang ganito:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

Sa halimbawa, ang retryAt ay ang oras upang i-restart ang MessageListenerContainer kung ito ay tumatakbo pa rin. Ang muling paglulunsad ay magaganap sa isang hiwalay na thread na inilunsad sa TaskScheduler, ang pagpapatupad nito ay ibinibigay din ng tagsibol.

Nahanap namin ang halaga ng retryAt sa sumusunod na paraan:

  1. Ang halaga ng re-call counter ay hinahanap.
  2. Batay sa counter value, hinahanap ang kasalukuyang agwat ng pagkaantala sa diskarteng muling subukan. Ang diskarte ay idineklara sa application mismo; pinili namin ang JSON na format upang iimbak ito.
  3. Ang pagitan na makikita sa JSON array ay naglalaman ng bilang ng mga segundo pagkatapos ng pagproseso ay kailangang ulitin. Ang bilang ng mga segundong ito ay idinaragdag sa kasalukuyang oras upang mabuo ang halaga para sa retryAt.
  4. Kung ang pagitan ay hindi natagpuan, ang halaga ng retryAt ay null at ang mensahe ay ipapadala sa DLQ para sa manu-manong pag-parse.

Sa diskarteng ito, ang natitira na lang ay i-save ang bilang ng mga paulit-ulit na tawag para sa bawat mensahe na kasalukuyang pinoproseso, halimbawa sa memorya ng application. Ang pagpapanatili ng retry count sa memorya ay hindi kritikal para sa diskarteng ito, dahil ang isang linear logic application ay hindi maaaring mahawakan ang pagproseso sa kabuuan. Hindi tulad ng spring-retry, ang pag-restart ng application ay hindi magiging sanhi ng pagkawala ng lahat ng mga mensahe upang maproseso muli, ngunit i-restart lamang ang diskarte.

Nakakatulong ang diskarteng ito na alisin ang load sa external system, na maaaring hindi available dahil sa napakabigat na load. Sa madaling salita, bilang karagdagan sa muling pagproseso, nakamit namin ang pagpapatupad ng pattern circuit breaker.

Sa aming kaso, 1 lang ang threshold ng error, at para mabawasan ang downtime ng system dahil sa pansamantalang pagkawala ng network, gumagamit kami ng napakabutil na diskarte sa muling pagsubok na may maliliit na pagitan ng latency. Maaaring hindi ito angkop para sa lahat ng application ng grupo, kaya dapat piliin ang ugnayan sa pagitan ng threshold ng error at halaga ng pagitan batay sa mga katangian ng system.

Isang hiwalay na application para sa pagproseso ng mga mensahe mula sa mga application na may di-tiyak na lohika

Narito ang isang halimbawa ng code na nagpapadala ng mensahe sa naturang application (Retryer), na muling ipapadala sa paksang DESTINATION kapag naabot na ang RETRY_AT na oras:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

Ipinapakita ng halimbawa na maraming impormasyon ang ipinadala sa mga header. Ang halaga ng RETRY_AT ay matatagpuan sa parehong paraan tulad ng para sa muling pagsubok na mekanismo sa pamamagitan ng Consumer stop. Bilang karagdagan sa DESTINATION at RETRY_AT pumasa kami:

  • GROUP_ID, kung saan pinapangkat namin ang mga mensahe para sa manu-manong pagsusuri at pinasimpleng paghahanap.
  • ORIGINAL_PARTITION upang subukang panatilihin ang parehong Consumer para sa muling pagproseso. Ang parameter na ito ay maaaring null, kung saan ang bagong partition ay makukuha gamit ang record.key() key ng orihinal na mensahe.
  • Na-update ang halaga ng COUNTER upang sundin ang diskarteng muling subukan.
  • Ang SEND_TO ay isang pare-parehong nagsasaad kung ang mensahe ay ipinadala para sa muling pagproseso kapag naabot ang RETRY_AT o inilagay sa DLQ.
  • DAHILAN - ang dahilan kung bakit naantala ang pagpoproseso ng mensahe.

Nag-iimbak ang Retryer ng mga mensahe para sa muling pagpapadala at manu-manong pag-parse sa PostgreSQL. Ang isang timer ay nagsisimula ng isang gawain na naghahanap ng mga mensahe na may RETRY_AT at ibabalik ang mga ito sa ORIGINAL_PARTITION partition ng DESTINATION na paksa na may key record.key().

Kapag naipadala na, ang mga mensahe ay tatanggalin mula sa PostgreSQL. Ang manu-manong pag-parse ng mga mensahe ay nangyayari sa isang simpleng UI na nakikipag-ugnayan sa Retryer sa pamamagitan ng REST API. Ang mga pangunahing tampok nito ay muling pagpapadala o pagtanggal ng mga mensahe mula sa DLQ, pagtingin sa impormasyon ng error at paghahanap ng mga mensahe, halimbawa sa pamamagitan ng pangalan ng error.

Dahil naka-enable ang kontrol sa pag-access sa aming mga cluster, kinakailangan din na humiling ng access sa paksang pinakikinggan ng Retryer, at payagan ang Retryer na sumulat sa paksang DESTINATION. Hindi ito maginhawa, ngunit, hindi tulad ng diskarte sa paksa ng agwat, mayroon kaming ganap na DLQ at UI upang pamahalaan ito.

May mga kaso kapag ang isang papasok na paksa ay binabasa ng maraming iba't ibang mga grupo ng consumer, na ang mga aplikasyon ay nagpapatupad ng iba't ibang lohika. Ang muling pagproseso ng mensahe sa pamamagitan ng Retryer para sa isa sa mga application na ito ay magreresulta sa isang duplicate sa isa pa. Upang maprotektahan laban dito, lumikha kami ng isang hiwalay na paksa para sa muling pagproseso. Ang mga papasok at subukang muli na paksa ay maaaring basahin ng parehong Consumer nang walang anumang mga paghihigpit.

Reprocessing event na natanggap mula sa Kafka

Bilang default ang diskarteng ito ay hindi nagbibigay ng paggana ng circuit breaker, gayunpaman maaari itong idagdag sa application na ginagamit spring-cloud-netflix o bago spring cloud circuit breaker, binabalot ang mga lugar kung saan ang mga panlabas na serbisyo ay tinatawag sa mga naaangkop na abstraction. Bilang karagdagan, nagiging posible na pumili ng isang diskarte para sa bulkhead pattern, na maaari ding maging kapaki-pakinabang. Halimbawa, sa spring-cloud-netflix ito ay maaaring isang thread pool o isang semaphore.

Pagbubuhos

Bilang resulta, mayroon kaming hiwalay na application na nagpapahintulot sa amin na ulitin ang pagpoproseso ng mensahe kung ang anumang panlabas na system ay pansamantalang hindi magagamit.

Ang isa sa mga pangunahing bentahe ng application ay maaari itong magamit ng mga panlabas na sistema na tumatakbo sa parehong kumpol ng Kafka, nang walang makabuluhang pagbabago sa kanilang panig! Kakailanganin lamang ng naturang application na i-access ang paksang muling subukan, punan ang ilang mga header ng Kafka at magpadala ng mensahe sa Retryer. Hindi na kailangang itaas ang anumang karagdagang imprastraktura. At upang mabawasan ang bilang ng mga mensaheng inilipat mula sa application patungo sa Retryer at pabalik, natukoy namin ang mga application na may linear logic at muling pinoproseso ang mga ito sa pamamagitan ng Consumer stop.

Pinagmulan: www.habr.com

Magdagdag ng komento