Opätovné spracovanie udalostí prijatých od Kafku

Opätovné spracovanie udalostí prijatých od Kafku

Ahoj Habr.

Nedávno I podelil o svoje skúsenosti o tom, aké parametre ako tím najčastejšie používame pre výrobcu a spotrebiteľa Kafka, aby sme sa priblížili k garantovanému doručeniu. V tomto článku vám chcem priblížiť, ako sme organizovali opätovné spracovanie udalosti prijatej od Kafku v dôsledku dočasnej nedostupnosti externého systému.

Moderné aplikácie fungujú vo veľmi zložitom prostredí. Obchodná logika zabalená do balíka moderných technológií, spustená v obraze Docker spravovaného orchestrátorom ako Kubernetes alebo OpenShift a komunikujúca s inými aplikáciami alebo podnikovými riešeniami prostredníctvom reťazca fyzických a virtuálnych smerovačov. V takomto prostredí sa vždy môže niečo pokaziť, takže opätovné spracovanie udalostí, ak je niektorý z externých systémov nedostupný, je dôležitou súčasťou našich obchodných procesov.

Ako to bolo pred Kafkom

V minulosti sme v projekte používali IBM MQ na asynchrónne doručovanie správ. Ak sa počas prevádzky služby vyskytla nejaká chyba, prijatá správa sa mohla umiestniť do frontu nedoručených listov (DLQ) na ďalšiu manuálnu analýzu. DLQ bol vytvorený vedľa prichádzajúceho frontu, správa bola prenesená do IBM MQ.

Ak bola chyba dočasná a mohli by sme ju určiť (napríklad výnimka ResourceAccessException pri HTTP volaní alebo MongoTimeoutException pri požiadavke MongoDb), potom by sa uplatnila stratégia opakovania. Bez ohľadu na logiku vetvenia aplikácie bola pôvodná správa presunutá buď do systémového frontu na oneskorené odoslanie, alebo do samostatnej aplikácie, ktorá bola vytvorená už dávno na opätovné odoslanie správ. To zahŕňa číslo opätovného odoslania v hlavičke správy, ktoré je viazané na interval oneskorenia alebo koniec stratégie na úrovni aplikácie. Ak sme dosiahli koniec stratégie, ale externý systém je stále nedostupný, správa sa umiestni do DLQ na manuálnu analýzu.

Vyhľadajte riešenie

Hľadanie na internete, nájdete nasledovné rozhodnutie. Stručne povedané, navrhuje sa vytvoriť tému pre každý interval oneskorenia a implementovať spotrebiteľské aplikácie na strane, ktoré budú čítať správy s požadovaným oneskorením.

Opätovné spracovanie udalostí prijatých od Kafku

Napriek veľkému množstvu pozitívnych recenzií sa mi zdá nie celkom vydarená. Po prvé, pretože vývojár bude musieť okrem implementácie obchodných požiadaviek stráviť veľa času implementáciou opísaného mechanizmu.

Okrem toho, ak je v klastri Kafka povolená kontrola prístupu, budete musieť stráviť nejaký čas vytváraním tém a poskytovaním potrebného prístupu k nim. Okrem toho budete musieť pre každú z tém zopakovať výber správneho parametra keep.ms, aby správy mali čas na opätovné odoslanie a nezmizli z nej. Implementácia a žiadosť o prístup sa bude musieť opakovať pre každú existujúcu alebo novú službu.

Pozrime sa teraz, aké mechanizmy nám pružina vo všeobecnosti a najmä spring-kafka poskytuje na opätovné spracovanie správ. Spring-kafka má prechodnú závislosť na spring-retry, ktorá poskytuje abstrakcie pre správu rôznych BackOffPolicies. Ide o pomerne flexibilný nástroj, no jeho významnou nevýhodou je ukladanie správ na opätovné odoslanie do pamäte aplikácie. To znamená, že reštartovanie aplikácie z dôvodu aktualizácie alebo prevádzkovej chyby bude mať za následok stratu všetkých správ čakajúcich na opätovné spracovanie. Keďže tento bod je pre náš systém kritický, ďalej sme sa ním nezaoberali.

Samotná spring-kafka poskytuje napríklad niekoľko implementácií ContainerAwareErrorHandler SeekToCurrentErrorHandler, s ktorým môžete správu spracovať neskôr bez posunutia offsetu v prípade chyby. Počnúc verziou spring-kafka 2.3 bolo možné nastaviť BackOffPolicy.

Tento prístup umožňuje, aby prepracované správy prežili reštarty aplikácií, ale stále neexistuje mechanizmus DLQ. Túto možnosť sme zvolili začiatkom roka 2019, optimisticky veriac, že ​​DLQ nebude potrebný (mali sme šťastie a po niekoľkých mesiacoch prevádzky aplikácie s takýmto systémom prepracovania sme ho vlastne ani nepotrebovali). Dočasné chyby spôsobili spustenie funkcie SeekToCurrentErrorHandler. Zostávajúce chyby boli vytlačené do denníka, čo malo za následok posun a spracovanie pokračovalo ďalšou správou.

Konečné rozhodnutie

Implementácia založená na SeekToCurrentErrorHandler nás podnietila k vyvinutiu vlastného mechanizmu na opätovné odosielanie správ.

V prvom rade sme chceli využiť existujúce skúsenosti a rozšíriť ich v závislosti od aplikačnej logiky. Pre aplikáciu lineárnej logiky by bolo optimálne zastaviť čítanie nových správ na krátky čas špecifikovaný stratégiou opakovania. Pre ostatné aplikácie som chcel mať jeden bod, ktorý by presadzoval stratégiu opakovania. Navyše tento jediný bod musí mať funkciu DLQ pre oba prístupy.

Samotná stratégia opakovania musí byť uložená v aplikácii, ktorá je zodpovedná za načítanie nasledujúceho intervalu pri výskyte dočasnej chyby.

Zastavenie spotrebiteľa pre aplikáciu lineárnej logiky

Pri práci s spring-kafkou môže kód na zastavenie spotrebiteľa vyzerať takto:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

V tomto príklade je retryAt čas na reštartovanie kontajnera MessageListenerContainer, ak je stále spustený. K opätovnému spusteniu dôjde v samostatnom vlákne spustenom v TaskScheduler, ktorého implementáciu zabezpečuje aj jar.

Hodnotu retryAt nájdeme nasledujúcim spôsobom:

  1. Vyhľadá sa hodnota počítadla opakovaných hovorov.
  2. Na základe hodnoty počítadla sa vyhľadá aktuálny interval oneskorenia v stratégii opakovania. Stratégia je deklarovaná v samotnej aplikácii, na jej uloženie sme zvolili formát JSON.
  3. Interval nájdený v poli JSON obsahuje počet sekúnd, po ktorých bude potrebné spracovanie zopakovať. Tento počet sekúnd sa pripočíta k aktuálnemu času a vytvorí sa hodnota pre retryAt.
  4. Ak sa interval nenájde, potom je hodnota retryAt nulová a správa sa odošle do DLQ na manuálnu analýzu.

Pri tomto prístupe zostáva len uložiť počet opakovaných hovorov pre každú správu, ktorá sa práve spracováva, napríklad do pamäte aplikácie. Udržiavanie počítadla opakovania v pamäti nie je pre tento prístup kritické, pretože aplikácia lineárnej logiky nemôže zvládnuť spracovanie ako celok. Na rozdiel od jarného opakovania reštart aplikácie nespôsobí stratu všetkých správ, ktoré budú opätovne spracované, ale jednoducho reštartuje stratégiu.

Tento prístup pomáha odbremeniť externý systém, ktorý môže byť nedostupný z dôvodu veľmi veľkého zaťaženia. Inými slovami, okrem opätovného spracovania sme dosiahli implementáciu vzoru istič.

V našom prípade je prah chyby iba 1 a na minimalizáciu prestojov systému v dôsledku dočasných výpadkov siete používame veľmi podrobnú stratégiu opakovania s malými intervalmi latencie. Toto nemusí byť vhodné pre všetky skupinové aplikácie, takže vzťah medzi prahovou hodnotou chyby a hodnotou intervalu musí byť zvolený na základe charakteristík systému.

Samostatná aplikácia na spracovanie správ z aplikácií s nedeterministickou logikou

Tu je príklad kódu, ktorý odošle správu takejto aplikácii (Retryer), ktorá po dosiahnutí času RETRY_AT znova odošle tému DESTINATION:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

Príklad ukazuje, že veľa informácií sa prenáša v hlavičkách. Hodnota RETRY_AT sa zistí rovnakým spôsobom ako pre mechanizmus opakovania prostredníctvom zastavenia spotrebiteľa. Okrem DESTINATION a RETRY_AT míňame:

  • GROUP_ID, pomocou ktorého zoskupujeme správy na manuálnu analýzu a zjednodušené vyhľadávanie.
  • ORIGINAL_PARTITION, aby ste sa pokúsili zachovať rovnakého spotrebiteľa na opätovné spracovanie. Tento parameter môže mať hodnotu null, v takom prípade sa nový oddiel získa pomocou kľúča record.key() pôvodnej správy.
  • Aktualizovaná hodnota COUNTER, aby ste mohli postupovať podľa stratégie opätovného pokusu.
  • SEND_TO je konštanta označujúca, či sa správa odošle na opätovné spracovanie po dosiahnutí RETRY_AT alebo sa umiestni do DLQ.
  • REASON - dôvod, prečo bolo spracovanie správy prerušené.

Retryer ukladá správy na opätovné odoslanie a manuálnu analýzu v PostgreSQL. Časovač spustí úlohu, ktorá nájde správy s RETRY_AT a odošle ich späť do oddielu ORIGINAL_PARTITION témy DESTINATION s kľúčom record.key().

Po odoslaní sa správy z PostgreSQL vymažú. Manuálna analýza správ prebieha v jednoduchom používateľskom rozhraní, ktoré interaguje s Retryerom cez REST API. Jeho hlavnými funkciami sú opätovné odosielanie alebo mazanie správ z DLQ, prezeranie informácií o chybách a vyhľadávanie správ, napríklad podľa názvu chyby.

Keďže je v našich klastroch povolená kontrola prístupu, je potrebné dodatočne požiadať o prístup k téme, ktorú Retryer počúva, a umožniť Retryerovi písať do témy CIEĽ. Je to nepohodlné, ale na rozdiel od prístupu s intervalovými témami máme na to plnohodnotné DLQ a používateľské rozhranie.

Existujú prípady, keď prichádzajúcu tému číta niekoľko rôznych skupín spotrebiteľov, ktorých aplikácie implementujú odlišnú logiku. Opätovné spracovanie správy cez Retryer pre jednu z týchto aplikácií bude mať za následok duplikát v druhej. Na ochranu pred týmto vytvárame samostatnú tému na opätovné spracovanie. Témy prichádzajúcich a opakovaných pokusov môže čítať ten istý spotrebiteľ bez akýchkoľvek obmedzení.

Opätovné spracovanie udalostí prijatých od Kafku

Štandardne tento prístup neposkytuje funkčnosť ističa, ale môže byť pridaný do aplikácie pomocou spring-cloud-netflix alebo nové pružinový mrakový istič, zabaľovanie miest, kde sa volajú externé služby, do vhodných abstrakcií. Okrem toho je možné zvoliť stratégiu prepážka vzor, ​​ktorý môže byť tiež užitočný. Napríklad v spring-cloud-netflix to môže byť fond vlákien alebo semafor.

Výkon

V dôsledku toho máme samostatnú aplikáciu, ktorá nám umožňuje zopakovať spracovanie správ, ak je dočasne nedostupný akýkoľvek externý systém.

Jednou z hlavných výhod aplikácie je, že ju môžu používať externé systémy bežiace na rovnakom klastri Kafka, bez výrazných úprav na ich strane! Takáto aplikácia bude potrebovať iba prístup k téme opätovného pokusu, vyplniť niekoľko hlavičiek Kafka a odoslať správu používateľovi Retryer. Nie je potrebné budovať žiadnu ďalšiu infraštruktúru. A aby sme znížili počet správ prenášaných z aplikácie do Retryeru a späť, identifikovali sme aplikácie s lineárnou logikou a opätovne sme ich spracovali cez Consumer stop.

Zdroj: hab.com

Pridať komentár