Reprocesiranje događaja primljenih od Kafke

Reprocesiranje događaja primljenih od Kafke

Hej Habr.

Nedavno sam ja podijelio svoje iskustvo o tome koje parametre mi kao tim najčešće koristimo za Kafka proizvođača i potrošača da bismo se približili zagarantovanoj isporuci. U ovom članku želim da vam kažem kako smo organizovali ponovnu obradu događaja primljenog od Kafke kao rezultat privremene nedostupnosti eksternog sistema.

Moderne aplikacije rade u vrlo složenom okruženju. Poslovna logika umotana u moderni tehnološki stog, radi u Docker imidžu kojim upravlja orkestrator kao što je Kubernetes ili OpenShift, i komunicira s drugim aplikacijama ili poslovnim rješenjima kroz lanac fizičkih i virtuelnih rutera. U takvom okruženju uvijek se nešto može pokvariti, pa je ponovna obrada događaja ako je jedan od vanjskih sistema nedostupan važan dio naših poslovnih procesa.

Kako je bilo prije Kafke

Ranije u projektu koristili smo IBM MQ za asinkronu isporuku poruka. Ako je došlo do bilo kakve greške tokom rada usluge, primljena poruka bi mogla biti stavljena u red mrtvih poruka (DLQ) radi daljeg ručnog raščlanjivanja. DLQ je kreiran pored dolaznog reda, poruka je prebačena unutar IBM MQ.

Ako je greška bila privremena i mogli bismo je utvrditi (na primjer, ResourceAccessException na HTTP pozivu ili MongoTimeoutException na MongoDb zahtjevu), tada bi strategija ponovnog pokušaja stupila na snagu. Bez obzira na logiku grananja aplikacije, originalna poruka je premeštena ili u sistemski red za odloženo slanje, ili u posebnu aplikaciju koja je davno napravljena za ponovno slanje poruka. Ovo uključuje broj ponovnog slanja u zaglavlju poruke, koji je vezan za interval kašnjenja ili kraj strategije na nivou aplikacije. Ako smo došli do kraja strategije, ali vanjski sistem još uvijek nije dostupan, tada će poruka biti smještena u DLQ radi ručnog raščlanjivanja.

Pretraživanje rješenja

Pretraživanje na internetu, možete pronaći sljedeće odluka. Ukratko, predlaže se kreiranje teme za svaki interval kašnjenja i implementacija korisničkih aplikacija sa strane, koje će čitati poruke sa potrebnim kašnjenjem.

Reprocesiranje događaja primljenih od Kafke

Unatoč velikom broju pozitivnih kritika, čini mi se ne baš uspješnim. Prije svega, zato što će programer, pored implementacije poslovnih zahtjeva, morati potrošiti dosta vremena na implementaciju opisanog mehanizma.

Osim toga, ako je kontrola pristupa omogućena na Kafka klasteru, morat ćete potrošiti neko vrijeme na kreiranje tema i pružanje potrebnog pristupa njima. Osim toga, morat ćete odabrati ispravan parametar retention.ms za svaku temu ponovnog pokušaja kako bi poruke imale vremena za ponovno slanje i ne bi nestale iz nje. Implementacija i zahtjev za pristup će se morati ponoviti za svaku postojeću ili novu uslugu.

Hajde sada da vidimo koje nam mehanizme spring uopšte i spring-kafka posebno pružaju za ponovnu obradu poruka. Spring-kafka ima tranzitivnu zavisnost od spring-retry, koji obezbeđuje apstrakcije za upravljanje različitim BackOffPolicies. Ovo je prilično fleksibilan alat, ali njegov značajan nedostatak je pohranjivanje poruka za ponovno slanje u memoriju aplikacije. To znači da će ponovno pokretanje aplikacije zbog ažuriranja ili operativne greške rezultirati gubitkom svih poruka koje čekaju ponovnu obradu. Pošto je ova tačka kritična za naš sistem, nismo je dalje razmatrali.

Spring-kafka sama pruža nekoliko implementacija ContainerAwareErrorHandlera, na primjer SeekToCurrentErrorHandler, s kojim možete kasnije obraditi poruku bez pomicanja pomaka u slučaju greške. Počevši od verzije spring-kafke 2.3, postalo je moguće postaviti BackOffPolicy.

Ovaj pristup omogućava ponovno obrađenim porukama da prežive ponovno pokretanje aplikacije, ali još uvijek ne postoji DLQ mehanizam. Ovu opciju smo odabrali početkom 2019. godine, optimistično vjerujući da DLQ neće biti potreban (imali smo sreće i zapravo nam nije trebao nakon nekoliko mjeseci rada aplikacije sa ovakvim sistemom za obradu). Privremene greške dovele su do paljenja SeekToCurrentErrorHandlera. Preostale greške su ispisane u dnevniku, što je rezultiralo pomakom, a obrada je nastavljena sa sljedećom porukom.

Konačna odluka

Implementacija zasnovana na SeekToCurrentErrorHandleru navela nas je da razvijemo vlastiti mehanizam za ponovno slanje poruka.

Prije svega, željeli smo iskoristiti postojeće iskustvo i proširiti ga ovisno o logici aplikacije. Za primjenu linearne logike, bilo bi optimalno prestati čitati nove poruke na kratak vremenski period određen strategijom ponovnog pokušaja. Za druge aplikacije, želio sam imati jednu tačku koja bi nametnula strategiju ponovnog pokušaja. Osim toga, ova pojedinačna tačka mora imati DLQ funkcionalnost za oba pristupa.

Sama strategija ponovnog pokušaja mora biti pohranjena u aplikaciji, koja je odgovorna za preuzimanje sljedećeg intervala kada dođe do privremene greške.

Zaustavljanje potrošača za primjenu linearne logike

Kada radite sa spring-kafkom, kod za zaustavljanje potrošača može izgledati otprilike ovako:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

U primjeru, retryAt je vrijeme za ponovno pokretanje MessageListenerContainer ako je još uvijek pokrenut. Ponovno pokretanje će se desiti u posebnoj niti pokrenutoj u TaskScheduler-u, čija implementacija je također omogućena do proljeća.

RetryAt vrijednost pronalazimo na sljedeći način:

  1. Traži se vrijednost brojača ponovnog poziva.
  2. Na osnovu vrijednosti brojača, pretražuje se trenutni interval kašnjenja u strategiji ponovnog pokušaja. Strategija je deklarirana u samoj aplikaciji, a mi smo odabrali JSON format za pohranjivanje.
  3. Interval pronađen u JSON nizu sadrži broj sekundi nakon kojih će se obrada morati ponoviti. Ovaj broj sekundi se dodaje trenutnom vremenu kako bi se formirala vrijednost za retryAt.
  4. Ako interval nije pronađen, tada je vrijednost retryAt nulta i poruka će biti poslana u DLQ radi ručnog raščlanjivanja.

Uz ovaj pristup, ostaje samo da se broj ponovljenih poziva za svaku poruku koja se trenutno obrađuje, pohrani na primjer u memoriju aplikacije. Čuvanje broja ponovnih pokušaja u memoriji nije kritično za ovaj pristup, budući da aplikacija linearne logike ne može upravljati obradom u cjelini. Za razliku od proljetnog ponovnog pokušaja, ponovno pokretanje aplikacije neće uzrokovati da sve poruke budu izgubljene za ponovnu obradu, već će jednostavno ponovno pokrenuti strategiju.

Ovaj pristup pomaže da se skine opterećenje sa eksternog sistema, koji može biti nedostupan zbog veoma velikog opterećenja. Drugim riječima, pored ponovne obrade, postigli smo implementaciju šablona prekidač.

U našem slučaju, prag greške je samo 1, a da bismo minimizirali zastoje sistema zbog privremenih prekida mreže, koristimo vrlo detaljnu strategiju ponovnog pokušaja s malim intervalima kašnjenja. Ovo možda nije prikladno za sve grupne aplikacije, tako da se odnos između praga greške i vrijednosti intervala mora odabrati na osnovu karakteristika sistema.

Zasebna aplikacija za obradu poruka od aplikacija sa nedeterminističkom logikom

Evo primjera koda koji šalje poruku takvoj aplikaciji (Retryer), koja će ponovo poslati temu DESTINATION kada se dostigne vrijeme RETRY_AT:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

Primjer pokazuje da se puno informacija prenosi u zaglavljima. Vrijednost RETRY_AT nalazi se na isti način kao i za mehanizam ponovnog pokušaja kroz Consumer stop. Pored DESTINATION i RETRY_AT prolazimo:

  • GROUP_ID, po kojem grupišemo poruke za ručnu analizu i pojednostavljeno pretraživanje.
  • ORIGINAL_PARTITION da pokušate zadržati istog potrošača za ponovnu obradu. Ovaj parametar može biti null, u kom slučaju će se nova particija dobiti pomoću ključa record.key() originalne poruke.
  • Ažurirana vrijednost COUNTER za praćenje strategije ponovnog pokušaja.
  • SEND_TO je konstanta koja ukazuje na to da li se poruka šalje na ponovnu obradu kada dostigne RETRY_AT ili se stavlja u DLQ.
  • RAZLOG - razlog zbog kojeg je obrada poruke prekinuta.

Retryer pohranjuje poruke za ponovno slanje i ručno raščlanjivanje u PostgreSQL. Tajmer pokreće zadatak koji pronalazi poruke sa RETRY_AT i šalje ih nazad na ORIGINAL_PARTITION particiju teme DESTINATION sa ključem record.key().

Kada se pošalju, poruke se brišu iz PostgreSQL-a. Ručno raščlanjivanje poruka događa se u jednostavnom korisničkom sučelju koje je u interakciji s Retryer-om preko REST API-ja. Njegove glavne karakteristike su ponovno slanje ili brisanje poruka iz DLQ-a, pregled informacija o grešci i traženje poruka, na primjer po imenu greške.

Pošto je kontrola pristupa omogućena na našim klasterima, potrebno je dodatno zatražiti pristup temi koju Retryer sluša i dozvoliti Retryeru da piše u ODREDIŠNU temu. Ovo je nezgodno, ali, za razliku od pristupa intervalnim temama, imamo punopravni DLQ i korisničko sučelje za upravljanje.

Postoje slučajevi kada dolaznu temu čita nekoliko različitih grupa potrošača, čije aplikacije implementiraju različitu logiku. Ponovna obrada poruke putem Retryera za jednu od ovih aplikacija će rezultirati duplikat na drugoj. Da bismo se od toga zaštitili, kreiramo posebnu temu za ponovnu obradu. Teme o dolasku i ponovnom pokušaju može čitati isti potrošač bez ikakvih ograničenja.

Reprocesiranje događaja primljenih od Kafke

Prema zadanim postavkama, ovaj pristup ne pruža funkciju prekidača, ali se može dodati aplikaciji pomoću proljeće-cloud-netflix ili novi opružni prekidač za oblake, omotavajući mjesta na kojima se eksterne usluge pozivaju u odgovarajuće apstrakcije. Osim toga, postaje moguće odabrati strategiju za pregrada uzorak, koji takođe može biti koristan. Na primjer, u spring-cloud-netflix-u ovo može biti skup niti ili semafor.

zaključak

Kao rezultat toga, imamo zasebnu aplikaciju koja nam omogućava da ponovimo obradu poruke ako je bilo koji vanjski sistem privremeno nedostupan.

Jedna od glavnih prednosti aplikacije je da je mogu koristiti eksterni sistemi koji rade na istom Kafka klasteru, bez značajnijih modifikacija na njihovoj strani! Takva aplikacija će samo trebati pristupiti temi ponovnog pokušaja, popuniti nekoliko Kafka zaglavlja i poslati poruku Retryeru. Nema potrebe za podizanjem dodatne infrastrukture. A da bismo smanjili broj poruka koje se prenose iz aplikacije u Retryer i nazad, identifikovali smo aplikacije sa linearnom logikom i ponovo ih obrađivali kroz Consumer stop.

izvor: www.habr.com

Dodajte komentar