Avvenimenti di riprocessazione ricevuti da Kafka

Avvenimenti di riprocessazione ricevuti da Kafka

Hey Habr.

Recentemente I hà spartutu a so sperienza circa i paràmetri chì noi cum'è una squadra usemu più spessu per Kafka Producer and Consumer per avvicinassi à a consegna garantita. In questu articulu, vogliu dì cumu avemu urganizatu a riprocessazione di un avvenimentu ricevutu da Kafka per via di a indisponibilità temporale di u sistema esternu.

L'applicazioni muderne operanu in un ambiente assai cumplessu. A logica cummerciale impannillata in una pila di tecnulugia muderna, in esecuzione in una maghjina Docker gestita da un orchestratore cum'è Kubernetes o OpenShift, è cumunicà cù altre applicazioni o soluzioni d'impresa attraversu una catena di routers fisici è virtuali. In un tali ambiente, qualcosa pò sempre rompe, cusì a riprocessazione di l'avvenimenti se unu di i sistemi esterni ùn hè micca dispunibule hè una parte impurtante di i nostri prucessi di cummerciale.

Cume era prima di Kafka

Nanzu à u prugettu avemu usatu IBM MQ per a spedizione di missaghju asincrona. Se un errore hè accadutu durante l'operazione di u serviziu, u messagiu ricevutu puderia esse piazzatu in una fila di lettere morte (DLQ) per una ulteriore analisi manuale. U DLQ hè statu creatu accantu à a fila in entrata, u messagiu hè statu trasferitu in IBM MQ.

Se l'errore era tempuranee è pudemu determinà lu (per esempiu, una ResourceAccessException nantu à una chjama HTTP o una MongoTimeoutException nantu à una dumanda MongoDb), allora a strategia di ricuperazione hà da esse efficace. Indipendentemente da a logica di ramificazione di l'applicazione, u missaghju originale hè stata spustata sia in a fila di u sistema per l'invio ritardatu, sia in una applicazione separata chì hè stata fatta assai fà per rinvià i missaghji. Questu includenu un numeru di resend in l'intestazione di u messagiu, chì hè ligatu à l'intervallu di ritardu o à a fine di a strategia di u livellu di l'applicazione. Se avemu ghjuntu à a fine di a strategia, ma u sistema esternu ùn hè ancu dispunibile, allora u messagiu serà postu in u DLQ per l'analisi manuale.

Cerca una suluzione

Ricerca in Internet, pudete truvà i seguenti решение. In breve, hè prupostu di creà un tema per ogni intervallu di ritardu è implementà l'applicazioni Consumer à u latu, chì leghje i missaghji cù u ritardu necessariu.

Avvenimenti di riprocessazione ricevuti da Kafka

Malgradu u gran numaru di recensioni pusitivi, mi pare micca sanu successu. Prima di tuttu, perchè u sviluppatore, in più di implementà i bisogni di l'affari, duverà passà assai tempu per implementà u mecanismu descrittu.

Inoltre, se u cuntrollu di l'accessu hè attivatu nantu à u cluster Kafka, avete da passà un pocu di tempu per creà temi è furnisce l'accessu necessariu per elli. In più di questu, avete bisognu di selezziunà u paràmetru di retention.ms currettu per ognunu di i sughjetti di ritruvà in modu chì i missaghji anu u tempu per esse risentutu è ùn sparisce micca da ellu. L'implementazione è a dumanda di accessu duverà esse ripetuta per ogni serviziu esistente o novu.

Videmu avà chì i meccanismi Spring in generale è Spring-kafka in particulare ci furniscenu per a rielaborazione di i missaghji. Spring-kafka hà una dependenza transitiva di spring-retry, chì furnisce astrazioni per gestisce diverse BackOffPolicies. Questu hè un strumentu abbastanza flessibile, ma u so svantaghju significativu hè l'almacenamiento di missaghji per rinvià in a memoria di l'applicazione. Questu significa chì u riavviu di l'applicazione per un aghjurnamentu o un errore operativu hà da risultatu in a perdita di tutti i missaghji in attesa di riprocessazione. Siccomu questu puntu hè criticu per u nostru sistema, ùn avemu micca cunsideratu più.

Spring-kafka stessu furnisce parechje implementazioni di ContainerAwareErrorHandler, per esempiu SeekToCurrentErrorHandler, cù quale pudete processà u missaghju più tardi senza cambià l'offset in casu d'errore. Partendu cù a versione di primavera-kafka 2.3, hè diventatu pussibule di stabilisce BackOffPolicy.

Stu approcciu permette à i missaghji riprocessati per sopravvive à i reinicii di l'applicazioni, ma ùn ci hè ancu micca miccanisimu DLQ. Avemu sceltu sta opzione à l'iniziu di u 2019, cridendu ottimisimu chì DLQ ùn saria micca necessariu (avemu furtunati è in realtà ùn l'avemu micca bisognu dopu à parechji mesi di uperazione di l'applicazione cù un tali sistema di riprocessazione). Errori tempuranee anu causatu à u focu SeekToCurrentErrorHandler. L'errori rimanenti sò stati stampati in u logu, risultatu in un offset, è u processamentu cuntinuau cù u missaghju dopu.

Decisione finale

L'implementazione basata nantu à SeekToCurrentErrorHandler ci hà incitatu à sviluppà u nostru propiu mecanismu per rinvià i missaghji.

Prima di tuttu, avemu vulsutu aduprà l'esperienza esistente è espansione secondu a logica di l'applicazione. Per una applicazione di logica lineale, seria ottimali di piantà di leghje novi messagi per un cortu periodu di tempu specificatu da a strategia di ricuperazione. Per altre applicazioni, vulia avè un puntu unicu chì rinforza a strategia di ritruvà. Inoltre, stu puntu unicu deve avè a funziunalità DLQ per i dui approcci.

A strategia di ritruvà stessu deve esse guardatu in l'applicazione, chì hè rispunsevuli di ricuperà u prossimu intervallu quandu si trova un errore tempurale.

Arresta u Consumatore per una Applicazione Logica Lineale

Quandu u travagliu cù spring-kafka, u codice per piantà u Consumatore puderia vede qualcosa cusì:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

In l'esempiu, retryAt hè u tempu di riavvia u MessageListenerContainer s'ellu hè sempre in esecuzione. U rilanciamentu sarà in un filu separatu lanciatu in TaskScheduler, l'implementazione di quale hè ancu furnita da a primavera.

Truvemu u valore retryAt in a manera seguente:

  1. U valore di u contatore di ricumpensa hè cercatu.
  2. Basatu annantu à u valore di u contatore, l'intervallu di ritardu attuale in a strategia di riprova hè cercatu. A strategia hè dichjarata in l'applicazione stessa; avemu sceltu u formatu JSON per almacenà.
  3. L'intervallu chì si trova in a matrice JSON cuntene u numeru di sicondi dopu à quale u prucessu deve esse ripetutu. Stu numeru di seconde hè aghjuntu à u tempu attuale per furmà u valore per retryAt.
  4. Se l'intervallu ùn hè micca truvatu, u valore di retryAt hè nulu è u messagiu serà mandatu à DLQ per l'analisi manuale.

Cù questu approcciu, tuttu ciò chì resta hè di salvà u numeru di chjama ripetuta per ogni missaghju chì hè attualmente processatu, per esempiu in a memoria di l'applicazione. Mantene u conte di retry in memoria ùn hè micca criticu per questu approcciu, postu chì una applicazione di logica lineale ùn pò micca trattà u prucessu in tuttu. A cuntrariu di primavera-retry, riavvia l'applicazione ùn pruvucarà micca tutti i missaghji persi per esse riprocessati, ma solu riavvia a strategia.

Stu approcciu aiuta à piglià a carica di u sistema esternu, chì pò esse indisponibile per via di una carica assai pisanti. In altri palori, in più di riprocessà, avemu ottinutu l'implementazione di u mudellu interruttore.

In u nostru casu, u limitu di l'errore hè solu 1, è per minimizzà i tempi di inattività di u sistema per via di l'interruzioni temporali di a rete, usemu una strategia di retry assai granulare cù intervalli di latenza chjuchi. Questu pò esse micca adattatu per tutte l'applicazioni di u gruppu, cusì a relazione trà u limitu di errore è u valore di l'intervallu deve esse sceltu nantu à e caratteristiche di u sistema.

Una applicazione separata per processà i missaghji da l'applicazioni cù una logica non deterministica

Eccu un esempiu di codice chì mandate un missaghju à una tale applicazione (Retryer), chì rimandarà à u tema DESTINATION quandu u tempu RETRY_AT hè ghjuntu:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

L'esempiu mostra chì assai infurmazione hè trasmessa in headers. U valore di RETRY_AT si trova in u listessu modu cum'è per u mecanismu di retry through the Consumer stop. In più di DESTINATION è RETRY_AT passemu:

  • GROUP_ID, da quale raggruppemu i missaghji per l'analisi manuale è a ricerca simplificata.
  • ORIGINAL_PARTITION per pruvà à mantene u listessu Consumatore per riprocessà. Stu paràmetru pò esse nulu, in quale casu a nova partizione serà ottenuta cù a chjave record.key() di u messagiu originale.
  • Valore COUNTER aghjurnatu per seguità a strategia di riprova.
  • SEND_TO hè una constante chì indica se u missaghju hè mandatu per riprocessà quandu ghjunghje à RETRY_AT o postu in DLQ.
  • REASON - u mutivu perchè u prucessu di missaghju hè stata interrotta.

Retryer guarda i missaghji per rinvià è analisi manuale in PostgreSQL. Un timer principia un compitu chì trova i missaghji cù RETRY_AT è li manda torna à a partizione ORIGINAL_PARTITION di u tema DESTINATION cù a chjave record.key().

Una volta mandati, i missaghji sò sguassati da PostgreSQL. L'analisi manuale di i missaghji si trova in una UI simplice chì interagisce cù Retryer via REST API. E so funziunalità principali sò risentà o sguassate i missaghji da DLQ, vede l'infurmazioni d'errore è circà i missaghji, per esempiu per nome di errore.

Siccomu u cuntrollu di l'accessu hè attivatu nantu à i nostri clusters, hè necessariu ancu dumandà l'accessu à u tema chì Retryer sta à sente, è permette à Retryer di scrive à u tema di DESTINAZIONE. Questu hè inconveniente, ma, à u cuntrariu di l'approcciu di u tema di intervallu, avemu un DLQ è UI cumpletu per gestisce.

Ci sò casi quandu un tema entrante hè lettu da parechji gruppi di cunsumatori diffirenti, chì l'applicazioni implementanu una logica diversa. Riprocessà un missaghju attraversu Retryer per una di queste applicazioni hà da esse un duplicatu in l'altru. Per pruteggiri contru à questu, creamu un tema separatu per a re-processazione. I temi entranti è ripruvati ponu esse leghje da u stessu Consumatore senza alcuna restrizioni.

Avvenimenti di riprocessazione ricevuti da Kafka

Per automaticamente, stu approcciu ùn furnisce micca funziunalità di circuit breaker, ma pò esse aghjuntu à l'applicazione utilizendu primavera-nuvola-netflix o novu interruttore di nuvola di primavera, imballendu i lochi induve i servizii esterni sò chjamati in astrazioni appropritate. Inoltre, diventa pussibule di sceglie una strategia per paratia mudellu, chì pò ancu esse utile. Per esempiu, in spring-cloud-netflix questu puderia esse un pool di filu o un semaforu.

cunchiusioni

In u risultatu, avemu una applicazione separata chì ci permette di ripetiri u prucessu di missaghju se un sistema esternu hè temporaneamente indisponibile.

Unu di i vantaghji principali di l'applicazione hè chì pò esse aduprata da i sistemi esterni chì funzionanu nantu à u stessu cluster Kafka, senza mudificazioni significativu da u so latu! Una tale applicazione averà solu bisognu di accede à u tema di riprova, riempie uni pochi intestazioni Kafka è mandà un missaghju à u Retryer. Ùn ci hè bisognu di crià alcuna infrastruttura supplementaria. È per riduce u nùmeru di missaghji trasferiti da l'applicazione à Retryer è torna, avemu identificatu l'applicazioni cù logica lineale è riprocessatu à traversu u stop Consumer.

Source: www.habr.com

Add a comment