Kafkalt saadud sündmuste ümbertöötlemine

Kafkalt saadud sündmuste ümbertöötlemine

Tere Habr.

Hiljuti ma jagas oma kogemust milliseid parameetreid me meeskonnana kõige sagedamini kasutame, et Kafka tootja ja tarbija jõuaks garanteeritud tarnele lähemale. Selles artiklis tahan teile rääkida, kuidas korraldasime välissüsteemi ajutise kättesaamatuse tõttu Kafkalt saadud sündmuse uuesti töötlemise.

Kaasaegsed rakendused töötavad väga keerulises keskkonnas. Äriloogika on mähitud moodsasse tehnoloogiavirna, mis töötab Dockeri kujutises, mida haldab orkestrant, nagu Kubernetes või OpenShift, ning suhtleb teiste rakenduste või ettevõttelahendustega füüsiliste ja virtuaalsete ruuterite ahela kaudu. Sellises keskkonnas võib alati midagi puruneda, seega on sündmuste ümbertöötlemine, kui mõni välistest süsteemidest pole saadaval, meie äriprotsesside oluline osa.

Kuidas oli enne Kafkat

Projekti alguses kasutasime asünkroonseks sõnumiedastuseks IBM MQ-d. Kui teenuse töötamise ajal ilmnes tõrge, võib vastuvõetud sõnumi paigutada edasiseks käsitsi sõelumiseks surnud kirjade järjekorda (DLQ). DLQ loodi sissetuleva järjekorra kõrvale, sõnum kanti üle IBM MQ sees.

Kui viga oli ajutine ja saaksime selle kindlaks teha (nt HTTP-kõne ResourceAccessException või MongoDb päringu MongoTimeoutException), jõustub uuesti proovimise strateegia. Sõltumata rakenduse hargnemisloogikast viidi algne sõnum kas hilinenud saatmise süsteemi järjekorda või eraldi rakendusse, mis on ammu loodud sõnumite uuesti saatmiseks. See hõlmab sõnumi päises uuesti saatmise numbrit, mis on seotud viivitusintervalli või rakendusetaseme strateegia lõpuga. Kui oleme jõudnud strateegia lõpuni, kuid väline süsteem pole ikka veel saadaval, paigutatakse teade käsitsi sõelumiseks DLQ-sse.

Lahenduse otsimine

Internetist otsimine, leiate järgmise otsus. Lühidalt, tehakse ettepanek luua iga viivitusintervalli jaoks teema ja juurutada küljele tarbijarakendused, mis loevad sõnumeid vajaliku viivitusega.

Kafkalt saadud sündmuste ümbertöötlemine

Vaatamata suurele arvule positiivsetele arvustustele ei tundu see mulle päris edukas. Esiteks seetõttu, et arendaja peab lisaks ärinõuete rakendamisele kulutama palju aega kirjeldatud mehhanismi rakendamisele.

Lisaks, kui juurdepääsukontroll on Kafka klastris lubatud, peate kulutama veidi aega teemade loomisele ja neile vajaliku juurdepääsu tagamisele. Lisaks peate valima iga korduskatsete teema jaoks õige retention.ms parameetri, et sõnumitel oleks aega uuesti saata ja need ei kaoks sealt. Rakendust ja juurdepääsu taotlemist tuleb korrata iga olemasoleva või uue teenuse puhul.

Vaatame nüüd, milliseid mehhanisme vedru üldiselt ja eriti vedru-kafka meile sõnumite ümbertöötlemiseks pakuvad. Spring-kafkal on transitiivne sõltuvus vedru korduskatsest, mis pakub abstraktsioone erinevate BackOffPolicies'ide haldamiseks. See on üsna paindlik tööriist, kuid selle oluliseks puuduseks on sõnumite salvestamine rakenduse mällu uuesti saatmiseks. See tähendab, et rakenduse taaskäivitamine värskenduse või talitlusvea tõttu kaotab kõik ümbertöötlemise ootel olevad sõnumid. Kuna see punkt on meie süsteemi jaoks kriitiline, ei käsitlenud me seda rohkem.

spring-kafka ise pakub näiteks mitmeid ContainerAwareErrorHandleri rakendusi SeekToCurrentErrorHandler, millega saate hiljem tõrke korral sõnumit töödelda ilma nihke nihketa. Alates Spring-kafka versioonist 2.3 sai võimalikuks BackOffPolicy seadistamine.

See lähenemine võimaldab ümbertöödeldud sõnumitel rakenduste taaskäivitamise korral ellu jääda, kuid DLQ-mehhanismi siiski pole. Valisime selle võimaluse 2019. aasta alguses, uskudes optimistlikult, et DLQ-d pole vaja (meil vedas ja tegelikult ei läinudki seda vaja pärast mitu kuud rakenduse sellise ümbertöötlussüsteemiga töötamist). Ajutised vead põhjustasid SeekToCurrentErrorHandleri käivitumise. Ülejäänud vead trükiti logisse, mille tulemuseks oli nihe ja töötlemine jätkus järgmise teatega.

Lõplik otsus

SeekToCurrentErrorHandleril põhinev rakendamine ajendas meid välja töötama oma mehhanismi sõnumite uuesti saatmiseks.

Esiteks soovisime kasutada olemasolevat kogemust ja laiendada seda olenevalt rakendusloogikast. Lineaarse loogika rakenduse puhul oleks optimaalne lõpetada uute sõnumite lugemine lühikeseks perioodiks, mis on määratud uuesti proovimise strateegias. Teiste rakenduste jaoks soovisin, et oleks üks punkt, mis jõustaks uuesti proovimise strateegia. Lisaks peab sellel üksikul punktil olema mõlema lähenemisviisi jaoks DLQ-funktsioon.

Uuesti proovimise strateegia ise tuleb salvestada rakendusse, mis vastutab ajutise vea ilmnemisel järgmise intervalli allalaadimise eest.

Tarbija peatamine lineaarse loogika rakenduse jaoks

Spring-kafkaga töötades võib tarbija peatamise kood välja näha umbes selline:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

Näites on retryAt aeg MessageListenerContaineri taaskäivitamiseks, kui see veel töötab. Taaskäivitamine toimub TaskScheduleris käivitatud eraldi lõimes, mille juurutamist pakub samuti kevad.

Leiame väärtuse retryAt järgmisel viisil:

  1. Otsitakse uuesti kõne loenduri väärtust.
  2. Loenduri väärtuse põhjal otsitakse korduskatse strateegia praegust viivitusintervalli. Strateegia on deklareeritud rakenduses endas; valisime selle salvestamiseks JSON-vormingu.
  3. JSON-i massiivist leitud intervall sisaldab sekundite arvu, pärast mida tuleb töötlemist korrata. See sekundite arv lisatakse praegusele ajale, et moodustada väärtus uuesti proovimiseks.
  4. Kui intervalli ei leita, on retryAt väärtus null ja sõnum saadetakse käsitsi sõelumiseks DLQ-le.

Selle lähenemisviisi puhul jääb üle vaid salvestada korduvate kõnede arv iga parajasti töödeldava sõnumi kohta, näiteks rakenduse mällu. Korduskatsete arvu mällu hoidmine ei ole selle lähenemisviisi jaoks kriitilise tähtsusega, kuna lineaarse loogika rakendus ei saa töötlemist tervikuna käsitleda. Erinevalt kevadisest uuesti proovimisest ei põhjusta rakenduse taaskäivitamine kõigi sõnumite kadumist, vaid taaskäivitab strateegia.

See lähenemine aitab eemaldada koormuse välissüsteemist, mis ei pruugi olla väga suure koormuse tõttu saadaval. Ehk siis lisaks ümbertöötlemisele saavutasime mustri juurutamise kaitselüliti.

Meie puhul on vealävi vaid 1 ja ajutisest võrgukatkestusest tingitud süsteemi seisakuaja minimeerimiseks kasutame väga detailset väikeste latentsusintervallidega uuesti proovimise strateegiat. See ei pruugi sobida kõigi rühmarakenduste jaoks, mistõttu tuleb vealäve ja intervalli väärtuse suhe valida süsteemi omadustest lähtuvalt.

Eraldi rakendus mittedeterministliku loogikaga rakenduste sõnumite töötlemiseks

Siin on näide koodist, mis saadab sellisele rakendusele (kordusproovija) sõnumi, mis saadetakse uuesti teemasse SIHT, kui aeg RETRY_AT on käes:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

Näide näitab, et palju teavet edastatakse päistes. RETRY_AT väärtus leitakse samamoodi nagu uuesti proovimise mehhanismi puhul tarbija peatuse kaudu. Lisaks sihtkohtadele DESTINATION ja RETRY_AT läbime:

  • GROUP_ID, mille järgi rühmitame sõnumid käsitsi analüüsi ja lihtsustatud otsingu jaoks.
  • ORIGINAL_PARTITION, et proovida säilitada sama tarbija uuesti töötlemiseks. See parameeter võib olla null, sel juhul saadakse uus partitsioon algse sõnumi klahvi record.key() abil.
  • Uuesti proovimise strateegia järgimiseks värskendati väärtust COUNTER.
  • SEND_TO on konstant, mis näitab, kas sõnum saadetakse RETRY_AT-i jõudmisel uuesti töötlemiseks või paigutatakse DLQ-sse.
  • REASON – põhjus, miks sõnumite töötlemine katkestati.

Uuesti proovija salvestab PostgreSQL-is sõnumid uuesti saatmiseks ja käsitsi sõelumiseks. Taimer käivitab ülesande, mis otsib teateid funktsiooniga RETRY_AT ja saadab need võtmega record.key() teema SIHTKOHT partitsiooni ORIGINAL_PARTITION tagasi.

Pärast saatmist kustutatakse kirjad PostgreSQL-ist. Sõnumite käsitsi sõelumine toimub lihtsas kasutajaliideses, mis suhtleb uuesti proovijaga REST API kaudu. Selle põhifunktsioonid on sõnumite uuesti saatmine või kustutamine DLQ-st, veateabe vaatamine ja teadete otsimine näiteks vea nime järgi.

Kuna juurdepääsu kontroll on meie klastrites lubatud, on vaja lisaks taotleda juurdepääsu teemale, mida Retryer kuulab, ja lubada uuesti proovijal kirjutada teemasse SIHT. See on ebamugav, kuid erinevalt intervallide teemade lähenemisest on meil selle haldamiseks täisväärtuslik DLQ ja kasutajaliides.

On juhtumeid, kus saabuvat teemat loevad mitu erinevat tarbijagruppi, kelle rakendused rakendavad erinevat loogikat. Sõnumi ümbertöötlemine ühe rakenduse kordusproovi kaudu toob kaasa teise duplikaadi. Selle eest kaitsmiseks loome uuesti töötlemiseks eraldi teema. Sissetulevaid ja uuesti proovitavaid teemasid saab sama Tarbija ilma piiranguteta lugeda.

Kafkalt saadud sündmuste ümbertöötlemine

Vaikimisi see lähenemisviis kaitselüliti funktsiooni ei paku, kuid selle saab rakendusele lisada kevad-pilv-netflix või uus kevadpilve kaitselüliti, mähkides kohad, kus välisteenuseid kutsutakse, sobivateks abstraktsioonideks. Lisaks on võimalik valida strateegia vahesein muster, mis võib samuti olla kasulik. Näiteks Spring-cloud-netflixis võib see olla niidikogu või semafor.

Väljund

Sellest tulenevalt on meil eraldi rakendus, mis võimaldab meil sõnumite töötlemist korrata, kui mõni väline süsteem on ajutiselt kättesaamatu.

Rakenduse üks peamisi eeliseid on see, et seda saavad kasutada samas Kafka klastris töötavad välised süsteemid, ilma nende poolel oluliste muudatusteta! Selline rakendus peab pääsema juurde ainult uuesti proovimise teemale, täitma mõned Kafka päised ja saatma uuesti proovijale sõnumi. Täiendavat infrastruktuuri pole vaja tõsta. Ja selleks, et vähendada rakendusest uuesti proovijasse ja tagasi edastatavate sõnumite arvu, tuvastasime lineaarse loogikaga rakendused ja töötlesime need uuesti läbi Tarbijapeatuse.

Allikas: www.habr.com

Lisa kommentaar