Ponovno obrađivanje događaja primljenih od Kafke

Ponovno obrađivanje događaja primljenih od Kafke

Hej Habr.

Nedavno sam podijelio svoje iskustvo o tome koje parametre mi kao tim najčešće koristimo za Kafka Producer i Consumer kako bismo se približili zajamčenoj isporuci. U ovom članku želim vam reći kako smo organizirali ponovnu obradu događaja primljenog od Kafke kao rezultat privremene nedostupnosti vanjskog sustava.

Moderne aplikacije rade u vrlo složenom okruženju. Poslovna logika umotana u moderni tehnološki skup, izvodi se u Docker slici kojom upravlja orkestrator poput Kubernetesa ili OpenShifta i komunicira s drugim aplikacijama ili poslovnim rješenjima kroz lanac fizičkih i virtualnih usmjerivača. U takvom okruženju uvijek se nešto može pokvariti, stoga je ponovna obrada događaja u slučaju nedostupnosti nekog od vanjskih sustava važan dio naših poslovnih procesa.

Kako je bilo prije Kafke

Ranije u projektu koristili smo IBM MQ za asinkronu isporuku poruka. Ako se tijekom rada usluge dogodi bilo kakva pogreška, primljena poruka se može staviti u red čekanja za mrtva pisma (DLQ) za daljnje ručno analiziranje. DLQ je kreiran pored dolaznog reda, poruka je prebačena unutar IBM MQ.

Ako je pogreška bila privremena i mogli bismo je utvrditi (na primjer, ResourceAccessException na HTTP pozivu ili MongoTimeoutException na MongoDb zahtjevu), tada bi strategija ponovnog pokušaja stupila na snagu. Bez obzira na logiku grananja aplikacije, originalna poruka je premještena ili u red čekanja sustava za odgođeno slanje ili u zasebnu aplikaciju koja je odavno napravljena za ponovno slanje poruka. To uključuje broj ponovnog slanja u zaglavlju poruke, koji je vezan uz interval odgode ili kraj strategije na razini aplikacije. Ako smo došli do kraja strategije, ali vanjski sustav još uvijek nije dostupan, tada će poruka biti smještena u DLQ za ručno analiziranje.

Traženje rješenja

Pretraživanje na Internetu, možete pronaći sljedeće odluka. Ukratko, predlaže se kreiranje teme za svaki interval odgode i implementacija Consumer aplikacija sa strane, koje će čitati poruke sa traženom odgodom.

Ponovno obrađivanje događaja primljenih od Kafke

Unatoč velikom broju pozitivnih recenzija, čini mi se da nije sasvim uspješan. Prije svega zato što će programer, osim implementacije poslovnih zahtjeva, morati potrošiti dosta vremena na implementaciju opisanog mehanizma.

Osim toga, ako je kontrola pristupa omogućena na Kafka klasteru, morat ćete potrošiti neko vrijeme na stvaranje tema i pružanje potrebnog pristupa njima. Uz ovo, morat ćete odabrati ispravan parametar retention.ms za svaku od tema za ponovni pokušaj tako da poruke imaju vremena za ponovno slanje i da ne nestanu iz njih. Implementacija i zahtjev za pristup morat će se ponoviti za svaku postojeću ili novu uslugu.

Pogledajmo sada koje mehanizme spring općenito, a posebno spring-kafka, pružaju za ponovnu obradu poruka. Spring-kafka ima tranzitivnu ovisnost o spring-retry, koja pruža apstrakcije za upravljanje različitim BackOffPolicies. Ovo je prilično fleksibilan alat, ali njegov značajan nedostatak je pohranjivanje poruka za ponovno slanje u memoriju aplikacije. To znači da će ponovno pokretanje aplikacije zbog ažuriranja ili operativne pogreške rezultirati gubitkom svih poruka koje čekaju ponovnu obradu. Budući da je ova točka kritična za naš sustav, nismo je dalje razmatrali.

sama spring-kafka nudi nekoliko implementacija ContainerAwareErrorHandlera, na primjer SeekToCurrentErrorHandler, s kojim kasnije možete obraditi poruku bez pomaka offseta u slučaju pogreške. Počevši od verzije spring-kafka 2.3, postalo je moguće postaviti BackOffPolicy.

Ovaj pristup omogućuje da ponovno obrađene poruke prežive ponovno pokretanje aplikacije, ali još uvijek ne postoji DLQ mehanizam. Ovu smo opciju odabrali početkom 2019. optimistično vjerujući da DLQ neće biti potreban (imali smo sreće pa nam zapravo nije trebao nakon nekoliko mjeseci rada aplikacije s ovakvim sustavom reprocesiranja). Privremene pogreške uzrokovale su aktiviranje SeekToCurrentErrorHandlera. Preostale pogreške ispisane su u dnevniku, što je rezultiralo pomakom, a obrada je nastavljena sa sljedećom porukom.

Konačna odluka

Implementacija temeljena na SeekToCurrentErrorHandleru potaknula nas je da razvijemo vlastiti mehanizam za ponovno slanje poruka.

Prije svega, željeli smo iskoristiti postojeće iskustvo i proširiti ga ovisno o logici aplikacije. Za aplikaciju linearne logike bilo bi optimalno zaustaviti čitanje novih poruka na kratko vremensko razdoblje određeno strategijom ponovnog pokušaja. Za druge aplikacije, želio sam imati jednu točku koja bi nametnula strategiju ponovnog pokušaja. Osim toga, ova jedna točka mora imati DLQ funkcionalnost za oba pristupa.

Sama strategija ponovnog pokušaja mora biti pohranjena u aplikaciji koja je odgovorna za dohvaćanje sljedećeg intervala kada se pojavi privremena pogreška.

Zaustavljanje potrošača za aplikaciju linearne logike

Kada radite sa spring-kafkom, kod za zaustavljanje potrošača može izgledati otprilike ovako:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

U primjeru, retryAt je vrijeme za ponovno pokretanje MessageListenerContainera ako još uvijek radi. Ponovno pokretanje dogodit će se u zasebnoj niti pokrenutoj u TaskScheduleru, čija je implementacija također predviđena do proljeća.

Vrijednost retryAt nalazimo na sljedeći način:

  1. Traži se vrijednost brojača ponovnog poziva.
  2. Na temelju vrijednosti brojača pretražuje se trenutni interval odgode u strategiji ponovnog pokušaja. Strategija je deklarirana u samoj aplikaciji, a za pohranu smo odabrali JSON format.
  3. Interval koji se nalazi u JSON polju sadrži broj sekundi nakon kojih će se obrada morati ponoviti. Taj se broj sekundi dodaje trenutnom vremenu kako bi se formirala vrijednost za retryAt.
  4. Ako interval nije pronađen, vrijednost retryAt je null i poruka će biti poslana DLQ-u na ručno analiziranje.

Ovim pristupom preostaje samo pohraniti broj ponovljenih poziva za svaku poruku koja se trenutno obrađuje, primjerice u memoriju aplikacije. Održavanje broja ponovnih pokušaja u memoriji nije kritično za ovaj pristup, jer aplikacija linearne logike ne može podnijeti obradu u cjelini. Za razliku od proljetnog ponovnog pokušaja, ponovno pokretanje aplikacije neće uzrokovati gubitak svih poruka i ponovnu obradu, već će jednostavno ponovno pokrenuti strategiju.

Ovaj pristup pomaže smanjiti opterećenje vanjskog sustava, koji može biti nedostupan zbog vrlo velikog opterećenja. Drugim riječima, osim ponovne obrade, postigli smo implementaciju uzorka osigurač.

U našem slučaju, prag pogreške je samo 1, a kako bismo smanjili vrijeme prekida rada sustava zbog privremenih prekida rada mreže, koristimo vrlo detaljnu strategiju ponovnog pokušaja s malim intervalima latencije. Ovo možda neće biti prikladno za sve grupne aplikacije, tako da se odnos između praga pogreške i vrijednosti intervala mora odabrati na temelju karakteristika sustava.

Zasebna aplikacija za obradu poruka iz aplikacija s nedeterminističkom logikom

Ovdje je primjer koda koji šalje poruku takvoj aplikaciji (Retryer), koja će se ponovno poslati u temu DESTINATION kada se postigne vrijeme RETRY_AT:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

Primjer pokazuje da se puno informacija prenosi u zaglavljima. Vrijednost RETRY_AT pronalazi se na isti način kao i za mehanizam ponovnog pokušaja putem zaustavljanja potrošača. Uz DESTINATION i RETRY_AT prolazimo:

  • GROUP_ID, prema kojem grupiramo poruke za ručnu analizu i pojednostavljeno pretraživanje.
  • ORIGINAL_PARTITION kako biste pokušali zadržati istog potrošača za ponovnu obradu. Ovaj parametar može biti null, u kojem će slučaju nova particija biti dobivena pomoću ključa record.key() izvorne poruke.
  • Ažurirana vrijednost COUNTER za praćenje strategije ponovnog pokušaja.
  • SEND_TO je konstanta koja pokazuje da li se poruka šalje na ponovnu obradu nakon što dosegne RETRY_AT ili se stavlja u DLQ.
  • RAZLOG - razlog zbog kojeg je obrada poruke prekinuta.

Retryer pohranjuje poruke za ponovno slanje i ručno analiziranje u PostgreSQL. Mjerač vremena pokreće zadatak koji pronalazi poruke s RETRY_AT i šalje ih natrag na particiju ORIGINAL_PARTITION teme DESTINATION s ključem record.key().

Nakon slanja, poruke se brišu iz PostgreSQL-a. Ručno analiziranje poruka odvija se u jednostavnom korisničkom sučelju koje komunicira s Retryerom putem REST API-ja. Njegove glavne značajke su ponovno slanje ili brisanje poruka iz DLQ-a, pregled informacija o pogrešci i traženje poruka, na primjer prema nazivu pogreške.

Budući da je kontrola pristupa omogućena na našim klasterima, potrebno je dodatno zatražiti pristup temi koju Retryer sluša, te dopustiti Retryeru da piše u DESTINACIJU topicu. Ovo je nezgodno, ali, za razliku od pristupa temi intervala, imamo potpuni DLQ i korisničko sučelje za upravljanje.

Postoje slučajevi kada dolaznu temu čita nekoliko različitih skupina potrošača, čije aplikacije implementiraju različitu logiku. Ponovna obrada poruke kroz Retryer za jednu od ovih aplikacija rezultirat će duplikatom druge. Kako bismo se zaštitili od toga, stvaramo zasebnu temu za ponovnu obradu. Isti potrošač može čitati dolazne i ponovne teme bez ikakvih ograničenja.

Ponovno obrađivanje događaja primljenih od Kafke

Prema zadanim postavkama ovaj pristup ne pruža funkciju prekidača, no može se dodati u aplikaciju pomoću proljeće-oblak-netflix ili novi proljetni prekidač oblaka, omotavajući mjesta gdje se pozivaju vanjske usluge u odgovarajuće apstrakcije. Osim toga, postaje moguće odabrati strategiju za pregrada uzorak, koji također može biti koristan. Na primjer, u spring-cloud-netflixu to može biti skup niti ili semafor.

Izlaz

Kao rezultat toga, imamo zasebnu aplikaciju koja nam omogućuje ponavljanje obrade poruka ako je bilo koji vanjski sustav privremeno nedostupan.

Jedna od glavnih prednosti aplikacije je da je mogu koristiti vanjski sustavi koji rade na istom Kafka klasteru, bez značajnih izmjena na njihovoj strani! Takva će aplikacija trebati samo pristupiti temi ponovnog pokušaja, ispuniti nekoliko Kafka zaglavlja i poslati poruku Retrieru. Nema potrebe za podizanjem dodatne infrastrukture. A kako bismo smanjili broj poruka koje se prenose iz aplikacije u Retryer i natrag, identificirali smo aplikacije s linearnom logikom i ponovno ih obradili putem Consumer stop.

Izvor: www.habr.com

Dodajte komentar