Reprocessing eveneminten ûntfongen fan Kafka

Reprocessing eveneminten ûntfongen fan Kafka

Hoi Habr.

Koartlyn I dielde syn ûnderfining oer hokker parameters wy as team meast brûke foar Kafka Producer en Consumer om tichter by garandearre levering te kommen. Yn dit artikel wol ik jo fertelle hoe't wy de werferwurking organisearre hawwe fan in evenemint ûntfongen fan Kafka as gefolch fan tydlike net-beskikberens fan it eksterne systeem.

Moderne applikaasjes wurkje yn in heul komplekse omjouwing. Bedriuwslogika ferpakt yn in moderne technologystapel, rint yn in Docker-ôfbylding beheard troch in orkestrator lykas Kubernetes of OpenShift, en kommunisearje mei oare applikaasjes as bedriuwsoplossingen fia in keatling fan fysike en firtuele routers. Yn sa'n omjouwing kin altyd wat brekke, dus it opnij ferwurkjen fan eveneminten as ien fan 'e eksterne systemen net beskikber is, is in wichtich ûnderdiel fan ús saaklike prosessen.

Hoe wie it foar Kafka

Earder yn it projekt brûkten wy IBM MQ foar asynchrone berjochtlevering. As der in flater barde tidens de operaasje fan 'e tsjinst, koe it ûntfongen berjocht yn in dead-letter-wachtrige (DLQ) pleatst wurde foar fierdere hantlieding. De DLQ is oanmakke neist de ynkommende wachtrige, it berjocht waard oerbrocht binnen IBM MQ.

As de flater tydlik wie en wy koene it bepale (bygelyks in ResourceAccessException op in HTTP-oprop of in MongoTimeoutException op in MongoDb-fersyk), dan soe de opnij probearje strategy effekt hawwe. Nettsjinsteande de fertakkingslogika fan 'e applikaasje, waard it orizjinele berjocht ferpleatst of nei de systeemwachtrige foar fertrage ferstjoering, of nei in aparte applikaasje dy't lang lyn makke is om berjochten opnij te ferstjoeren. Dit omfettet in opnij ferstjoernûmer yn 'e berjochtkop, dat is bûn oan it fertragingsynterval as it ein fan' e strategy op applikaasjenivo. As wy it ein fan 'e strategy berikt hawwe, mar it eksterne systeem is noch net beskikber, dan sil it berjocht yn' e DLQ pleatst wurde foar hânmjittich parsing.

In oplossing fine

Sykje op it ynternet, kinne jo fine de folgjende решение. Koartsein, it wurdt foarsteld om in ûnderwerp te meitsjen foar elke fertraging ynterval en ymplemintearje Consumer applikaasjes oan 'e kant, dy't sil lêze berjochten mei de fereaske fertraging.

Reprocessing eveneminten ûntfongen fan Kafka

Nettsjinsteande it grutte tal positive resinsjes liket it my net hielendal slagge. Earst fan alles, om't de ûntwikkelder, neist it útfieren fan saaklike easken, in protte tiid sil moatte besteegje oan it útfieren fan it beskreaune meganisme.

Derneist, as tagongskontrôle ynskeakele is op it Kafka-kluster, sille jo wat tiid moatte besteegje oan it meitsjen fan ûnderwerpen en it jaan fan de nedige tagong ta har. Dêrnjonken moatte jo de juste retention.ms-parameter selektearje foar elk fan 'e opnij probearje ûnderwerpen, sadat berjochten tiid hawwe om opnij te stjoeren en der net út ferdwine. De ymplemintaasje en fersyk fan tagong sil moatte wurde werhelle foar elke besteande of nije tsjinst.

Litte wy no sjen hokker meganismen spring yn 't algemien en spring-kafka yn it bysûnder ús jouwe foar it ferwurkjen fan berjochten. Spring-kafka hat in transitive ôfhinklikens fan spring-opnij besykjen, wat abstraksjes leveret foar it behearen fan ferskate BackOffPolicies. Dit is in frij fleksibel ark, mar it wichtige nadeel is it opslaan fan berjochten foar opnij ferstjoeren yn applikaasjeûnthâld. Dit betsjut dat it opnij starte fan 'e applikaasje fanwege in fernijing of in operaasjeflater sil resultearje yn it ferlies fan alle berjochten yn ôfwachting fan opnij ferwurking. Om't dit punt kritysk is foar ús systeem, hawwe wy it net fierder beskôge.

spring-kafka sels leveret ferskate ymplemintaasjes fan ContainerAwareErrorHandler, bygelyks SeekToCurrentErrorHandler, wêrmei jo it berjocht letter ferwurkje kinne sûnder offset te ferskowen yn gefal fan in flater. Begjin mei ferzje fan spring-kafka 2.3, waard it mooglik om BackOffPolicy yn te stellen.

Dizze oanpak lit opnij ferwurke berjochten oerlibje om applikaasje opnij te starten, mar d'r is noch gjin DLQ-meganisme. Wy keas dizze opsje oan it begjin fan 2019, optimistysk te leauwen dat DLQ net nedich wêze soe (wy hiene gelok en hienen it eins net nedich nei ferskate moannen fan it operearjen fan de applikaasje mei sa'n ferwurkingssysteem). Tydlike flaters feroarsake SeekToCurrentErrorHandler te brânen. De oerbleaune flaters waarden printe yn it log, resultearre yn in offset, en ferwurking fierder mei it folgjende berjocht.

Einbeslút

De ymplemintaasje basearre op SeekToCurrentErrorHandler hat ús frege om ús eigen meganisme te ûntwikkeljen foar it opnij ferstjoeren fan berjochten.

Alderearst woenen wy de besteande ûnderfining brûke en it útwreidzje ôfhinklik fan 'e applikaasjelogika. Foar in lineêre logika-applikaasje soe it optimaal wêze om te stopjen mei it lêzen fan nije berjochten foar in koarte perioade fan tiid oantsjutte troch de opnij probearje strategy. Foar oare tapassingen woe ik ien punt hawwe dat de strategy foar opnij besykje soe. Derneist moat dit ienige punt DLQ-funksjonaliteit hawwe foar beide oanpakken.

De werbesykstrategy sels moat wurde opslein yn 'e applikaasje, dy't ferantwurdlik is foar it opheljen fan it folgjende ynterval as in tydlike flater optreedt.

De konsumint stopje foar in lineêre logikaapplikaasje

By it wurkjen mei spring-kafka kin de koade om de konsumint te stopjen der sa útsjen:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

Yn it foarbyld is retryAt de tiid om de MessageListenerContainer opnij te starten as it noch rint. De werstart sil plakfine yn in aparte thread lansearre yn TaskScheduler, wêrfan de ymplemintaasje ek wurdt levere troch maitiid.

Wy fine de retryAt-wearde op 'e folgjende manier:

  1. De wearde fan de weropropteller wurdt opsocht.
  2. Op grûn fan de tellerwearde wurdt it aktuele fertragingsynterval yn 'e opnij probearje strategy socht. De strategy wurdt ferklearre yn 'e applikaasje sels; wy hawwe it JSON-formaat keazen om it op te slaan.
  3. It ynterval fûn yn 'e JSON-array befettet it oantal sekonden wêrnei't ferwurking werhelle wurde moat. Dit oantal sekonden wurdt tafoege oan de aktuele tiid om de wearde te foarmjen foar retryAt.
  4. As it ynterval net fûn is, dan is de wearde fan retryAt nul en sil it berjocht nei DLQ stjoerd wurde foar hânmjittich parsing.

Mei dizze oanpak bliuwt it allinich om it oantal werhelle oproppen op te slaan foar elk berjocht dat op it stuit ferwurke wurdt, bygelyks yn it applikaasjeûnthâld. It hâlden fan de werhelling yn it ûnthâld is net kritysk foar dizze oanpak, om't in lineêre logika-applikaasje de ferwurking as gehiel kin net omgean. Oars as spring-opnij, sil it opnij starte fan 'e applikaasje net soargje dat alle berjochten ferlern gean wurde opnij ferwurke, mar sil de strategy gewoan opnij starte.

Dizze oanpak helpt om de lading fan it eksterne systeem ôf te nimmen, dat kin net beskikber wêze fanwege in heul swiere lading. Mei oare wurden, neist it ferwurkjen hawwe wy de ymplemintaasje fan it patroan berikt sekering.

Yn ús gefal is de flaterdrompel mar 1, en om systeemûnderbrekking te minimalisearjen fanwege tydlike netwurkûnderbrekkingen, brûke wy in heul granulêre werhellingsstrategy mei lytse latency-yntervallen. Dit kin net geskikt foar alle groep applikaasjes, sadat de relaasje tusken de flater drompel en de ynterval wearde moat wurde selektearre basearre op de skaaimerken fan it systeem.

In aparte applikaasje foar it ferwurkjen fan berjochten fan applikaasjes mei net-deterministyske logika

Hjir is in foarbyld fan koade dy't in berjocht stjoert nei sa'n applikaasje (Retryer), dy't opnij ferstjoerd wurdt nei it DESTINATION-ûnderwerp as de RETRY_AT-tiid wurdt berikt:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

It foarbyld lit sjen dat in protte ynformaasje wurdt oerdroegen yn kopteksten. De wearde fan RETRY_AT wurdt fûn op deselde wize as foar it opnij meganisme fia de Consumer stop. Neist DESTINATION en RETRY_AT passe wy:

  • GROUP_ID, wêrmei wy berjochten groepearje foar hânmjittich analyze en ferienfâldige sykjen.
  • ORIGINAL_PARTITION om te besykjen deselde konsumint te hâlden foar opnij ferwurking. Dizze parameter kin nul wêze, yn dat gefal sil de nije partysje krije mei de record.key () kaai fan it orizjinele berjocht.
  • COUNTER-wearde bywurke om de strategy foar opnij besykjen te folgjen.
  • SEND_TO is in konstante dy't oanjout oft it berjocht wurdt ferstjoerd foar werferwurking by it berikken fan RETRY_AT of pleatst yn DLQ.
  • REASON - de reden wêrom't berjochtferwurking ûnderbrutsen waard.

Retryer bewarret berjochten foar opnij ferstjoeren en hantlieding yn PostgreSQL. In timer begjint in taak dy't fynt berjochten mei RETRY_AT en stjoert se werom nei de ORIGINAL_PARTITION partition fan it DESTINATION ûnderwerp mei de kaai record.key ().

Ienris ferstjoerd wurde berjochten wiske fan PostgreSQL. Hânlieding parsing fan berjochten komt foar yn in ienfâldige UI dy't ynteraksje mei Retryer fia REST API. De wichtichste skaaimerken binne it opnij ferstjoeren of wiskjen fan berjochten fan DLQ, besjen fan flaterynformaasje en sykjen nei berjochten, bygelyks troch flaternamme.

Sûnt tagongskontrôle is ynskeakele op ús klusters, is it nedich om ekstra tagong te freegjen ta it ûnderwerp dat Retryer harket, en Retryer tastean om te skriuwen nei it DESTINATION-ûnderwerp. Dit is ûngemaklik, mar, yn tsjinstelling ta de oanpak fan yntervalûnderwerp, hawwe wy in folweardige DLQ en UI om it te behearjen.

D'r binne gefallen as in ynkommende ûnderwerp wurdt lêzen troch ferskate ferskillende konsumintegroepen, waans applikaasjes ferskate logika implementearje. It opnij ferwurkjen fan in berjocht fia Retryer foar ien fan dizze applikaasjes sil resultearje yn in duplikaat op 'e oare. Om dit te beskermjen, meitsje wy in apart ûnderwerp foar opnij ferwurkjen. De ynkommende en opnij probearje ûnderwerpen kinne wurde lêzen troch deselde konsumint sûnder beheiningen.

Reprocessing eveneminten ûntfongen fan Kafka

Standert jout dizze oanpak gjin circuit breaker-funksjonaliteit, it kin lykwols wurde tafoege oan 'e applikaasje mei help fan spring-wolk-netflix of nij spring wolk circuit breaker, wrapping de plakken dêr't eksterne tsjinsten wurde neamd yn passende abstraksjes. Dêrneist wurdt it mooglik om te kiezen foar in strategy foar skot patroan, dat kin ek nuttich. Bygelyks, yn spring-cloud-netflix kin dit in threadpool of in semafoar wêze.

konklúzje

As gefolch hawwe wy in aparte applikaasje wêrmei wy berjochtferwurking kinne werhelje as in ekstern systeem tydlik net beskikber is.

Ien fan 'e wichtichste foardielen fan' e applikaasje is dat it kin wurde brûkt troch eksterne systemen dy't rinne op deselde Kafka-kluster, sûnder signifikante oanpassingen oan har kant! Sa'n applikaasje sil allinich tagong krije ta it opnij besykjen ûnderwerp, in pear Kafka-koppen ynfolje en in berjocht stjoere nei de Retryer. Der is gjin needsaak om ekstra ynfrastruktuer te ferheegjen. En om it oantal berjochten oerdroegen fan 'e applikaasje nei Retryer en werom te ferminderjen, identifisearren wy applikaasjes mei lineêre logika en ferwurke se opnij fia de Consumer-stop.

Boarne: www.habr.com

Add a comment