Endurvinnsla atburða sem berast frá Kafka

Endurvinnsla atburða sem berast frá Kafka

Hæ Habr.

Nýlega I deildi reynslu sinni um hvaða færibreytur við sem teymi notum oftast fyrir Kafka framleiðanda og neytenda til að komast nær tryggðri afhendingu. Í þessari grein vil ég segja þér hvernig við skipulögðum endurvinnslu á atburði sem barst frá Kafka vegna tímabundins óaðgengis ytra kerfisins.

Nútíma forrit starfa í mjög flóknu umhverfi. Viðskiptarökfræði vafin inn í nútíma tæknistafla, keyrandi í Docker mynd sem stjórnað er af hljómsveitarstjóra eins og Kubernetes eða OpenShift, og hefur samskipti við önnur forrit eða fyrirtækjalausnir í gegnum keðju líkamlegra og sýndarbeina. Í slíku umhverfi getur alltaf eitthvað bilað, þannig að endurvinnsla atburða ef eitt af ytri kerfum er ekki tiltækt er mikilvægur hluti af viðskiptaferlum okkar.

Hvernig það var fyrir Kafka

Fyrr í verkefninu notuðum við IBM MQ fyrir ósamstillta sendingu skilaboða. Ef einhver villa kom upp við notkun þjónustunnar gæti móttekið skeyti verið sett í dauðabókstafröð (DLQ) til frekari handvirkrar þáttunar. DLQ var búið til við hliðina á komandi biðröð, skilaboðin voru flutt inn í IBM MQ.

Ef villan var tímabundin og við gætum ákvarðað hana (til dæmis ResourceAccessException á HTTP símtali eða MongoTimeoutException á MongoDb beiðni), þá myndi endurreyna stefnan taka gildi. Burtséð frá greiningarrökfræði forritsins voru upprunalegu skilaboðin færð annað hvort í kerfisröðina fyrir seinkun á sendingu eða í sérstakt forrit sem var búið til fyrir löngu til að endursenda skilaboð. Þetta felur í sér endursendu númer í skilaboðahausnum, sem er bundið við seinkunabilið eða lok stefnu á forritastigi. Ef við höfum náð enda á stefnunni en ytra kerfið er enn ekki tiltækt, þá verða skilaboðin sett í DLQ fyrir handvirka þáttun.

Lausnaleit

Leitað á netinu, þú getur fundið eftirfarandi ákvörðun. Í stuttu máli er lagt til að búið verði til umræðuefni fyrir hvert tafatímabil og innleiða neytendaforrit til hliðar sem munu lesa skilaboð með tilskildum töfum.

Endurvinnsla atburða sem berast frá Kafka

Þrátt fyrir fjöldann allan af jákvæðum umsögnum sýnist mér það ekki alveg heppnast. Fyrst af öllu, vegna þess að verktaki, auk þess að innleiða viðskiptakröfur, verður að eyða miklum tíma í að innleiða lýst kerfi.

Að auki, ef aðgangsstýring er virkjuð á Kafka þyrpingunni, verður þú að eyða tíma í að búa til efni og veita nauðsynlegan aðgang að þeim. Til viðbótar við þetta þarftu að velja rétta retention.ms færibreytu fyrir hvert endurreyna efni svo að skilaboð fái tíma til að sendast aftur og hverfi ekki úr þeim. Innleiðing og beiðni um aðgang þarf að endurtaka fyrir hverja núverandi eða nýja þjónustu.

Við skulum nú sjá hvaða kerfi vor almennt og vorkafka sérstaklega veita okkur fyrir endurvinnslu skilaboða. Spring-kafka er tímabundið háð spring-rery, sem veitir útdrætti til að stjórna mismunandi BackOffPolicies. Þetta er nokkuð sveigjanlegt tól, en verulegur galli þess er að geyma skilaboð til endursendingar í minni forrita. Þetta þýðir að endurræsing forritsins vegna uppfærslu eða rekstrarvillu mun leiða til þess að öll skilaboð sem bíða endurvinnslu tapast. Þar sem þetta atriði er mikilvægt fyrir kerfið okkar, skoðuðum við það ekki frekar.

spring-kafka sjálft býður upp á nokkrar útfærslur á ContainerAwareErrorHandler, til dæmis SeekToCurrentErrorHandler, sem þú getur afgreitt skilaboðin síðar án þess að skipta á móti ef villur koma upp. Frá og með útgáfu af spring-kafka 2.3 varð mögulegt að stilla BackOffPolicy.

Þessi nálgun gerir endurunnnum skilaboðum kleift að lifa af endurræsingu forrits, en það er enn engin DLQ vélbúnaður. Við völdum þennan valkost í byrjun árs 2019 og trúðum því bjartsýn að DLQ yrði ekki þörf (við vorum heppin og þurftum þess reyndar ekki eftir nokkra mánuði af rekstri forritsins með slíku endurvinnslukerfi). Tímabundnar villur urðu til þess að SeekToCurrentErrorHandler kviknaði. Villurnar sem eftir voru voru prentaðar í annálinn, sem leiddi til offset, og vinnsla hélt áfram með næstu skilaboð.

Endanleg ákvörðun

Innleiðingin sem byggði á SeekToCurrentErrorHandler varð til þess að við þróuðum okkar eigin kerfi til að endursenda skilaboð.

Í fyrsta lagi vildum við nota núverandi upplifun og auka hana eftir rökfræði forritsins. Fyrir línulega rökfræði forrit væri ákjósanlegt að hætta að lesa ný skilaboð í stuttan tíma sem tilgreindur er af endurreynslustefnunni. Fyrir önnur forrit vildi ég hafa einn punkt sem myndi framfylgja endurreynslustefnunni. Að auki verður þessi eini punktur að hafa DLQ virkni fyrir báðar aðferðir.

Reynsluaðferðin sjálf verður að vera geymd í forritinu, sem ber ábyrgð á að sækja næsta bil þegar tímabundin villa kemur upp.

Að stöðva neytandann fyrir línulega rökfræðiforrit

Þegar unnið er með spring-kafka gæti kóðinn til að stöðva neytandann litið svona út:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

Í dæminu er retryAt tíminn til að endurræsa MessageListenerContainer ef hann er enn í gangi. Endurræsingin mun eiga sér stað í sérstökum þræði sem er hleypt af stokkunum í TaskScheduler, en útfærslan á honum er einnig veitt fyrir vorið.

Við finnum retryAt gildið á eftirfarandi hátt:

  1. Gildi endurkallateljarans er flett upp.
  2. Byggt á teljaragildinu er leitað í núverandi seinkabili í endurreynslustefnunni. Stefnan er lýst yfir í forritinu sjálfu; við völdum JSON sniðið til að geyma það.
  3. Tímabilið sem finnast í JSON fylkinu inniheldur fjölda sekúndna sem þarf að endurtaka vinnsluna eftir. Þessum sekúndufjölda er bætt við núverandi tíma til að mynda gildið fyrir retryAt.
  4. Ef bilið finnst ekki, þá er gildi retryAt núll og skilaboðin verða send til DLQ til handvirkrar þáttunar.

Með þessari nálgun er ekki annað eftir en að vista fjölda endurtekinna hringinga fyrir hvert skeyti sem er í vinnslu, til dæmis í minni forrita. Það er ekki mikilvægt fyrir þessa nálgun að halda afturtalningunni í minni þar sem línuleg rökfræðiforrit getur ekki séð um vinnsluna í heild sinni. Ólíkt vor-endurreyndu, mun endurræsing forritsins ekki valda því að öll skilaboð glatast aftur, heldur mun það einfaldlega endurræsa stefnuna.

Þessi aðferð hjálpar til við að taka álagið af ytra kerfinu, sem gæti verið ófáanlegt vegna mjög mikið álags. Með öðrum orðum, auk endurvinnslu, náðum við innleiðingu mynstrsins aflrofa.

Í okkar tilviki er villuþröskuldurinn aðeins 1, og til að lágmarka niður í miðbæ kerfisins vegna tímabundinnar netrofs, notum við mjög nákvæma endurreynslustefnu með litlum leynd millibili. Þetta hentar kannski ekki öllum hópforritum, þannig að sambandið milli villuþröskulds og bilsgildis verður að velja út frá eiginleikum kerfisins.

Sérstakt forrit til að vinna úr skilaboðum frá forritum með óákveðna rökfræði

Hér er dæmi um kóða sem sendir skilaboð til slíks forrits (Retryer), sem mun senda aftur til DESTINATION efnisins þegar RETRY_AT tímanum er náð:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

Dæmið sýnir að mikið af upplýsingum er sent í hausum. Gildi RETRY_AT er fundið á sama hátt og fyrir endurreynslukerfið í gegnum neytendastöðvun. Auk DESTINATION og RETRY_AT förum við framhjá:

  • GROUP_ID, þar sem við flokkum skilaboð fyrir handvirka greiningu og einfaldaða leit.
  • ORIGINAL_PARTITION til að reyna að halda sama neytanda til endurvinnslu. Þessi færibreyta getur verið núll, í því tilviki verður nýja skiptingin fengin með því að nota record.key() lykilinn í upprunalegu skilaboðunum.
  • COUNTER gildi uppfært til að fylgja áætluninni um að reyna aftur.
  • SEND_TO er stöðugur sem gefur til kynna hvort skilaboðin séu send til endurvinnslu þegar hún er komin í RETRY_AT eða sett í DLQ.
  • Ástæða - ástæðan fyrir því að vinnsla skilaboða var trufluð.

Retryer geymir skilaboð til endursendingar og handvirkrar þáttunar í PostgreSQL. Tímamælir ræsir verkefni sem finnur skilaboð með RETRY_AT og sendir þau aftur á ORIGINAL_PARTITION skiptinguna í DESTINATION efninu með lyklinum record.key().

Eftir sendingu er skilaboðum eytt úr PostgreSQL. Handvirk þáttun skilaboða á sér stað í einföldu notendaviðmóti sem hefur samskipti við Retryer í gegnum REST API. Helstu eiginleikar þess eru að endursenda eða eyða skilaboðum frá DLQ, skoða villuupplýsingar og leita að skilaboðum, til dæmis eftir villanafni.

Þar sem aðgangsstýring er virkjuð á klösunum okkar, er nauðsynlegt að biðja um aðgang að efninu sem Retryer er að hlusta á og leyfa Retryer að skrifa á DESTINATION efnið. Þetta er óþægilegt, en ólíkt interval topic nálguninni, höfum við fullgild DLQ og notendaviðmót til að stjórna því.

Það eru tilvik þar sem komandi efni er lesið af nokkrum mismunandi neytendahópum, þar sem forritin útfæra mismunandi rökfræði. Endurvinnsla skilaboða í gegnum Retryer fyrir eitt af þessum forritum mun leiða til tvítekningar á hinu. Til að verjast þessu búum við til sérstakt efni til endurvinnslu. Sami neytandi getur lesið efni sem kemur og reynir aftur án nokkurra takmarkana.

Endurvinnsla atburða sem berast frá Kafka

Sjálfgefið er að þessi aðferð veitir ekki aflrofavirkni, en þó er hægt að bæta henni við forritið með því að nota vor-ský-netflix eða ný gormaskýjarofi, pakka þeim stöðum þar sem ytri þjónusta er kölluð inn í viðeigandi abstrakt. Að auki verður hægt að velja stefnu fyrir þil mynstur, sem getur líka verið gagnlegt. Til dæmis, í vorský-netflix gæti þetta verið þráðalaug eða semafór.

Output

Fyrir vikið erum við með sérstakt forrit sem gerir okkur kleift að endurtaka vinnslu skilaboða ef eitthvað utanaðkomandi kerfi er tímabundið ekki tiltækt.

Einn af helstu kostum forritsins er að það er hægt að nota það af ytri kerfum sem keyra á sama Kafka þyrpingunni, án teljandi breytinga á þeirra hlið! Slíkt forrit þarf aðeins að fá aðgang að reyndu efninu, fylla út nokkra Kafka-hausa og senda skilaboð til Retryer. Það er engin þörf á að koma upp neinum viðbótarinnviðum. Og til þess að fækka skilaboðum sem fluttir voru úr forritinu yfir í Retryer og til baka, auðkenndum við forrit með línulegri rökfræði og endurunnið þau í gegnum neytendastöðina.

Heimild: www.habr.com

Bæta við athugasemd