Přepracování událostí přijatých od Kafky

Přepracování událostí přijatých od Kafky

Čau Habr.

Nedávno I podělil o své zkušenosti o tom, jaké parametry jako tým nejčastěji používáme pro výrobce a spotřebitele Kafka, abychom se přiblížili garantovanému doručení. V tomto článku vám chci říci, jak jsme zorganizovali přepracování události přijaté od Kafky v důsledku dočasné nedostupnosti externího systému.

Moderní aplikace fungují ve velmi složitém prostředí. Obchodní logika zabalená do zásobníku moderních technologií, běžící v obrazu Dockeru spravovaném orchestrátorem, jako je Kubernetes nebo OpenShift, a komunikující s jinými aplikacemi nebo podnikovými řešeními prostřednictvím řetězce fyzických a virtuálních směrovačů. V takovém prostředí se vždy může něco pokazit, takže přepracování událostí, pokud je některý z externích systémů nedostupný, je důležitou součástí našich obchodních procesů.

Jak to bylo před Kafkou

Dříve jsme v projektu používali IBM MQ pro asynchronní doručování zpráv. Pokud během provozu služby došlo k nějaké chybě, přijatá zpráva by mohla být umístěna do fronty nedoručených dopisů (DLQ) pro další ruční analýzu. DLQ byl vytvořen vedle příchozí fronty, zpráva byla přenesena uvnitř IBM MQ.

Pokud byla chyba dočasná a mohli bychom ji určit (například ResourceAccessException u HTTP volání nebo MongoTimeoutException u požadavku MongoDb), pak by se strategie opakování projevila. Bez ohledu na logiku větvení aplikace byla původní zpráva přesunuta buď do systémové fronty pro zpožděné odeslání, nebo do samostatné aplikace, která byla vytvořena již dávno, aby zprávy znovu odeslala. To zahrnuje číslo pro opětovné odeslání v hlavičce zprávy, které je svázáno s intervalem zpoždění nebo koncem strategie na úrovni aplikace. Pokud jsme dosáhli konce strategie, ale externí systém je stále nedostupný, bude zpráva umístěna do DLQ pro ruční analýzu.

Hledání řešení

Hledání na internetu, můžete najít následující rozhodnutí. Stručně řečeno, navrhuje se vytvořit téma pro každý interval zpoždění a na straně implementovat spotřebitelské aplikace, které budou číst zprávy s požadovaným zpožděním.

Přepracování událostí přijatých od Kafky

I přes velké množství kladných recenzí se mi zdá ne úplně povedený. Za prvé proto, že vývojář bude muset kromě implementace obchodních požadavků strávit spoustu času implementací popsaného mechanismu.

Pokud je navíc v clusteru Kafka povoleno řízení přístupu, budete muset strávit nějaký čas vytvářením témat a poskytováním potřebného přístupu k nim. Kromě toho budete muset pro každé téma opakování vybrat správný parametr retence.ms, aby zprávy měly čas na opětovné odeslání a nezmizely z něj. Implementace a žádost o přístup se bude muset opakovat pro každou stávající nebo novou službu.

Podívejme se nyní, jaké mechanismy nám pružina obecně a spring-kafka konkrétně poskytují pro přepracování zpráv. Spring-kafka má přechodnou závislost na spring-retry, která poskytuje abstrakce pro správu různých BackOffPolicies. Jedná se o poměrně flexibilní nástroj, ale jeho významnou nevýhodou je ukládání zpráv pro opětovné odeslání do paměti aplikace. To znamená, že restartování aplikace z důvodu aktualizace nebo provozní chyby bude mít za následek ztrátu všech zpráv čekajících na opětovné zpracování. Protože je tento bod pro náš systém kritický, dále jsme ho nezvažovali.

Spring-kafka sama o sobě poskytuje například několik implementací ContainerAwareErrorHandler SeekToCurrentErrorHandler, pomocí kterého můžete zprávu zpracovat později bez posunutí offsetu v případě chyby. Počínaje verzí spring-kafka 2.3 bylo možné nastavit BackOffPolicy.

Tento přístup umožňuje, aby přepracované zprávy přežily restartování aplikace, ale stále neexistuje mechanismus DLQ. Tuto možnost jsme zvolili na začátku roku 2019 v optimistickém přesvědčení, že DLQ nebude potřeba (měli jsme štěstí a vlastně jsme ho po několika měsících provozu aplikace s takovým systémem přepracování nepotřebovali). Dočasné chyby způsobily spuštění funkce SeekToCurrentErrorHandler. Zbývající chyby byly vytištěny v protokolu, což vedlo k posunu, a zpracování pokračovalo další zprávou.

Konečné rozhodnutí

Implementace založená na SeekToCurrentErrorHandler nás přiměla k vývoji vlastního mechanismu pro opětovné zasílání zpráv.

V první řadě jsme chtěli využít stávající zkušenosti a rozšířit je v závislosti na aplikační logice. Pro aplikaci lineární logiky by bylo optimální zastavit čtení nových zpráv na krátkou dobu určenou strategií opakování. Pro ostatní aplikace jsem chtěl mít jeden bod, který by prosadil strategii opakování. Navíc tento jediný bod musí mít funkci DLQ pro oba přístupy.

Samotná strategie opakování musí být uložena v aplikaci, která je zodpovědná za načtení dalšího intervalu, když dojde k dočasné chybě.

Zastavení spotřebitele pro aplikaci lineární logiky

Při práci s spring-kafka může kód pro zastavení spotřebitele vypadat nějak takto:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

V tomto příkladu je retryAt čas restartovat MessageListenerContainer, pokud je stále spuštěn. K opětovnému spuštění dojde v samostatném vláknu spuštěném v TaskScheduler, jehož implementaci zajišťuje také jaro.

Hodnotu retryAt zjistíme následujícím způsobem:

  1. Vyhledá se hodnota počítadla opětovného volání.
  2. Na základě hodnoty čítače se vyhledá aktuální interval zpoždění ve strategii opakování. Strategie je deklarována v samotné aplikaci, pro její uložení jsme zvolili formát JSON.
  3. Interval nalezený v poli JSON obsahuje počet sekund, po kterých bude nutné zpracování opakovat. Tento počet sekund se přičte k aktuálnímu času a vytvoří hodnotu pro retryAt.
  4. Pokud interval není nalezen, je hodnota retryAt nulová a zpráva bude odeslána do DLQ k ruční analýze.

Při tomto přístupu zbývá pouze uložit počet opakovaných volání pro každou zprávu, která je právě zpracovávána, například do paměti aplikace. Udržování počtu opakování v paměti není pro tento přístup kritické, protože aplikace lineární logiky nemůže zvládnout zpracování jako celek. Na rozdíl od jarního opakování restartování aplikace nezpůsobí ztrátu všech zpráv, které budou znovu zpracovány, ale jednoduše restartuje strategii.

Tento přístup pomáhá snižovat zátěž externího systému, který může být nedostupný kvůli velmi velkému zatížení. Jinými slovy, kromě přepracování jsme dosáhli implementace vzoru jistič.

V našem případě je práh chyby pouze 1 a abychom minimalizovali výpadky systému kvůli dočasným výpadkům sítě, používáme velmi granulární strategii opakování s malými intervaly latence. To nemusí být vhodné pro všechny skupinové aplikace, takže vztah mezi prahovou hodnotou chyby a hodnotou intervalu musí být zvolen na základě vlastností systému.

Samostatná aplikace pro zpracování zpráv z aplikací s nedeterministickou logikou

Zde je příklad kódu, který takové aplikaci (Retryer) odešle zprávu, která se po dosažení času RETRY_AT znovu odešle do tématu DESTINATION:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

Příklad ukazuje, že mnoho informací se přenáší v hlavičkách. Hodnota RETRY_AT se zjistí stejným způsobem jako u mechanismu opakování prostřednictvím zastavení spotřebitele. Kromě DESTINATION a RETRY_AT míjíme:

  • GROUP_ID, pomocí kterého seskupujeme zprávy pro ruční analýzu a zjednodušené vyhledávání.
  • ORIGINAL_PARTITION, abyste se pokusili zachovat stejného spotřebitele pro opětovné zpracování. Tento parametr může mít hodnotu null, v takovém případě bude nový oddíl získán pomocí klíče record.key() původní zprávy.
  • Byla aktualizována hodnota COUNTER, aby se řídila strategií opakování.
  • SEND_TO je konstanta udávající, zda je zpráva odeslána k přepracování po dosažení RETRY_AT nebo umístěna do DLQ.
  • REASON - důvod, proč bylo zpracování zprávy přerušeno.

Retryer ukládá zprávy pro opětovné odeslání a ruční analýzu v PostgreSQL. Časovač spustí úlohu, která najde zprávy s RETRY_AT a odešle je zpět do oddílu ORIGINAL_PARTITION tématu DESTINATION s klíčem record.key().

Po odeslání jsou zprávy z PostgreSQL smazány. Ruční analýza zpráv probíhá v jednoduchém uživatelském rozhraní, které interaguje s Retryer přes REST API. Jeho hlavními funkcemi jsou přeposílání nebo mazání zpráv z DLQ, prohlížení chybových informací a vyhledávání zpráv, například podle názvu chyby.

Protože je na našich clusterech povoleno řízení přístupu, je nutné dodatečně požádat o přístup k tématu, které Retryer poslouchá, a umožnit Retryerovi zapisovat do tématu CÍL. To je nepohodlné, ale na rozdíl od přístupu s intervalovým tématem máme plnohodnotné DLQ a uživatelské rozhraní, které to zvládne.

Existují případy, kdy příchozí téma čte několik různých skupin spotřebitelů, jejichž aplikace implementují odlišnou logiku. Opětovné zpracování zprávy prostřednictvím Retryer pro jednu z těchto aplikací bude mít za následek duplikát ve druhé. Abychom tomu zabránili, vytváříme samostatné téma pro opětovné zpracování. Příchozí témata a témata opakování může číst stejný spotřebitel bez jakýchkoli omezení.

Přepracování událostí přijatých od Kafky

Ve výchozím nastavení tento přístup neposkytuje funkci jističe, lze jej však do aplikace přidat pomocí spring-cloud-netflix nebo nové jistič jarního mraku, obalující místa, kde jsou externí služby volány, do vhodných abstrakcí. Navíc je možné zvolit strategii přepážky vzor, ​​který může být také užitečný. Například v spring-cloud-netflix to může být fond vláken nebo semafor.

Výkon

V důsledku toho máme samostatnou aplikaci, která nám umožňuje opakovat zpracování zpráv, pokud je dočasně nedostupný jakýkoli externí systém.

Jednou z hlavních výhod aplikace je, že ji mohou používat externí systémy běžící na stejném clusteru Kafka, a to bez výrazných úprav na jejich straně! Taková aplikace bude potřebovat pouze přístup k tématu opakování, vyplnit několik hlaviček Kafka a odeslat zprávu Retryerovi. Není potřeba budovat žádnou další infrastrukturu. A abychom snížili počet zpráv přenášených z aplikace do Retryeru a zpět, identifikovali jsme aplikace s lineární logikou a znovu je zpracovali prostřednictvím zákaznické zastávky.

Zdroj: www.habr.com

Přidat komentář