Eventi di rielaborazione ricevuti da Kafka

Eventi di rielaborazione ricevuti da Kafka

Ciao Habr.

Recentemente io ha condiviso la sua esperienza su quali parametri utilizziamo più spesso come team affinché il produttore e il consumatore Kafka si avvicinino alla consegna garantita. In questo articolo voglio raccontarti come abbiamo organizzato la rielaborazione di un evento ricevuto da Kafka a seguito di indisponibilità temporanea del sistema esterno.

Le applicazioni moderne operano in un ambiente molto complesso. Logica aziendale racchiusa in uno stack tecnologico moderno, in esecuzione in un'immagine Docker gestita da un orchestratore come Kubernetes o OpenShift e comunicante con altre applicazioni o soluzioni aziendali attraverso una catena di router fisici e virtuali. In un ambiente di questo tipo, qualcosa può sempre rompersi, quindi la rielaborazione degli eventi se uno dei sistemi esterni non è disponibile è una parte importante dei nostri processi aziendali.

Com'era prima di Kafka

All'inizio del progetto abbiamo utilizzato IBM MQ per la consegna asincrona dei messaggi. Se si verifica un errore durante il funzionamento del servizio, il messaggio ricevuto potrebbe essere inserito in una coda di lettere morte (DLQ) per un'ulteriore analisi manuale. Il DLQ è stato creato accanto alla coda in entrata, il messaggio è stato trasferito all'interno di IBM MQ.

Se l'errore fosse temporaneo e potessimo determinarlo (ad esempio, una ResourceAccessException su una chiamata HTTP o una MongoTimeoutException su una richiesta MongoDb), la strategia di nuovo tentativo avrebbe effetto. Indipendentemente dalla logica di ramificazione dell'applicazione, il messaggio originale è stato spostato nella coda di sistema per l'invio ritardato oppure in un'applicazione separata creata molto tempo fa per inviare nuovamente i messaggi. Ciò include un numero di reinvio nell'intestazione del messaggio, che è legato all'intervallo di ritardo o alla fine della strategia a livello di applicazione. Se abbiamo raggiunto la fine della strategia ma il sistema esterno non è ancora disponibile, il messaggio verrà inserito nel DLQ per l'analisi manuale.

soluzioni di ricerca

Ricerca su Internet, puoi trovare quanto segue decisione. In breve, si propone di creare un argomento per ogni intervallo di ritardo e di implementare lateralmente le applicazioni Consumer, che leggeranno i messaggi con il ritardo richiesto.

Eventi di rielaborazione ricevuti da Kafka

Nonostante il gran numero di recensioni positive, mi sembra non del tutto riuscito. Innanzitutto perché lo sviluppatore, oltre a implementare i requisiti aziendali, dovrà dedicare molto tempo all'implementazione del meccanismo descritto.

Inoltre, se il controllo degli accessi è abilitato sul cluster Kafka, dovrai dedicare del tempo alla creazione degli argomenti e alla fornitura dell'accesso necessario agli stessi. Oltre a ciò, dovrai selezionare il parametro di ritenzione.ms corretto per ciascuno degli argomenti di ripetizione in modo che i messaggi abbiano il tempo di essere reinviati e non scomparire da esso. L'implementazione e la richiesta di accesso dovranno essere ripetute per ogni servizio esistente o nuovo.

Vediamo ora quali meccanismi spring in generale e spring-kafka in particolare ci mettono a disposizione per la rielaborazione dei messaggi. Spring-kafka ha una dipendenza transitiva da spring-retry, che fornisce astrazioni per la gestione di diverse BackOffPolicies. Si tratta di uno strumento abbastanza flessibile, ma il suo svantaggio significativo è la memorizzazione dei messaggi per il reinvio nella memoria dell'applicazione. Ciò significa che il riavvio dell'applicazione a causa di un aggiornamento o di un errore operativo comporterà la perdita di tutti i messaggi in attesa di rielaborazione. Poiché questo punto è fondamentale per il nostro sistema, non lo abbiamo considerato ulteriormente.

spring-kafka stesso fornisce diverse implementazioni di ContainerAwareErrorHandler, ad esempio SeekToCurrentErrorHandler, con il quale è possibile elaborare il messaggio in un secondo momento senza spostare l'offset in caso di errore. A partire dalla versione di spring-kafka 2.3, è diventato possibile impostare BackOffPolicy.

Questo approccio consente ai messaggi rielaborati di sopravvivere al riavvio dell'applicazione, ma non esiste ancora un meccanismo DLQ. Abbiamo scelto questa opzione all'inizio del 2019, credendo ottimisticamente che DLQ non sarebbe stato necessario (siamo stati fortunati e in realtà non ne abbiamo avuto bisogno dopo diversi mesi di utilizzo dell'applicazione con un tale sistema di rielaborazione). Errori temporanei hanno causato l'attivazione di SeekToCurrentErrorHandler. Gli errori rimanenti sono stati stampati nel registro, determinando un offset e l'elaborazione è continuata con il messaggio successivo.

Decisione finale

L'implementazione basata su SeekToCurrentErrorHandler ci ha spinto a sviluppare il nostro meccanismo per inviare nuovamente i messaggi.

Innanzitutto abbiamo voluto sfruttare l'esperienza esistente ed espanderla a seconda della logica applicativa. Per un'applicazione a logica lineare, sarebbe ottimale interrompere la lettura dei nuovi messaggi per un breve periodo di tempo specificato dalla strategia di ripetizione. Per altre applicazioni, volevo avere un unico punto che applicasse la strategia dei nuovi tentativi. Inoltre, questo singolo punto deve avere funzionalità DLQ per entrambi gli approcci.

La strategia di ripetizione stessa deve essere archiviata nell'applicazione, che è responsabile del recupero dell'intervallo successivo quando si verifica un errore temporaneo.

Arresto del consumatore per un'applicazione di logica lineare

Quando si lavora con spring-kafka, il codice per fermare il Consumatore potrebbe assomigliare a questo:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

Nell'esempio, retryAt è il momento in cui riavviare MessageListenerContainer se è ancora in esecuzione. Il rilancio avverrà in un thread separato lanciato in TaskScheduler, la cui implementazione è prevista anch'essa entro la primavera.

Troviamo il valore retryAt nel modo seguente:

  1. Viene cercato il valore del contatore delle richiamate.
  2. In base al valore del contatore, viene ricercato l'intervallo di ritardo corrente nella strategia di ripetizione. La strategia è dichiarata nell'applicazione stessa; abbiamo scelto il formato JSON per memorizzarla.
  3. L'intervallo trovato nell'array JSON contiene il numero di secondi dopo i quali sarà necessario ripetere l'elaborazione. Questo numero di secondi viene aggiunto all'ora corrente per formare il valore di retryAt.
  4. Se l'intervallo non viene trovato, il valore di retryAt è null e il messaggio verrà inviato a DLQ per l'analisi manuale.

Con questo approccio non resta che salvare il numero di chiamate ripetute per ciascun messaggio attualmente in elaborazione, ad esempio nella memoria dell'applicazione. Mantenere il numero dei tentativi in ​​memoria non è fondamentale per questo approccio, poiché un'applicazione di logica lineare non è in grado di gestire l'elaborazione nel suo complesso. A differenza del nuovo tentativo di primavera, il riavvio dell'applicazione non causerà la rielaborazione di tutti i messaggi persi, ma riavvierà semplicemente la strategia.

Questo approccio aiuta a scaricare il carico dal sistema esterno, che potrebbe non essere disponibile a causa di un carico molto pesante. In altre parole, oltre alla rielaborazione, abbiamo ottenuto l'implementazione del modello interruttore.

Nel nostro caso, la soglia di errore è solo 1 e per ridurre al minimo i tempi di inattività del sistema dovuti a interruzioni temporanee della rete, utilizziamo una strategia di tentativi molto granulare con intervalli di latenza ridotti. Questo potrebbe non essere adatto a tutte le applicazioni del gruppo, quindi il rapporto tra la soglia di errore e il valore dell'intervallo deve essere scelto in base alle caratteristiche dell'impianto.

Un'applicazione separata per l'elaborazione di messaggi da applicazioni con logica non deterministica

Ecco un esempio di codice che invia un messaggio a tale applicazione (Retryer), che verrà inviato nuovamente all'argomento DESTINATION quando viene raggiunto il tempo RETRY_AT:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

L'esempio mostra che molte informazioni vengono trasmesse nelle intestazioni. Il valore di RETRY_AT si trova allo stesso modo del meccanismo di nuovo tentativo tramite l'arresto del Consumer. Oltre a DESTINATION e RETRY_AT passiamo:

  • GRUPPO_ID, in base al quale raggruppiamo i messaggi per l'analisi manuale e la ricerca semplificata.
  • ORIGINAL_PARTITION per provare a mantenere lo stesso consumatore per la rielaborazione. Questo parametro può essere nullo, nel qual caso la nuova partizione verrà ottenuta utilizzando la chiave record.key() del messaggio originale.
  • Valore COUNTER aggiornato per seguire la strategia dei nuovi tentativi.
  • SEND_TO è una costante che indica se il messaggio viene inviato per la rielaborazione una volta raggiunto RETRY_AT o inserito in DLQ.
  • MOTIVO: il motivo per cui l'elaborazione del messaggio è stata interrotta.

Retryer memorizza i messaggi per il reinvio e l'analisi manuale in PostgreSQL. Un timer avvia un'attività che trova i messaggi con RETRY_AT e li rimanda alla partizione ORIGINAL_PARTITION dell'argomento DESTINATION con la chiave record.key().

Una volta inviati, i messaggi vengono eliminati da PostgreSQL. L'analisi manuale dei messaggi avviene in una semplice interfaccia utente che interagisce con Retryer tramite API REST. Le sue caratteristiche principali sono il reinvio o l'eliminazione dei messaggi da DLQ, la visualizzazione delle informazioni sugli errori e la ricerca dei messaggi, ad esempio in base al nome dell'errore.

Poiché il controllo degli accessi è abilitato sui nostri cluster, è necessario richiedere inoltre l'accesso all'argomento che Retryer sta ascoltando e consentire a Retryer di scrivere sull'argomento DESTINATION. Questo è scomodo ma, a differenza dell'approccio con argomenti a intervalli, disponiamo di un DLQ e di un'interfaccia utente completi per gestirlo.

Ci sono casi in cui un argomento in entrata viene letto da diversi gruppi di consumatori, le cui applicazioni implementano logiche diverse. La rielaborazione di un messaggio tramite Retryer per una di queste applicazioni risulterà in un duplicato sull'altra. Per proteggersi da ciò, creiamo un argomento separato per la rielaborazione. Gli argomenti in entrata e di nuovo tentativo possono essere letti dallo stesso Consumatore senza alcuna restrizione.

Eventi di rielaborazione ricevuti da Kafka

Per impostazione predefinita, questo approccio non fornisce la funzionalità dell'interruttore, tuttavia può essere aggiunto all'applicazione utilizzando primavera-nuvola-netflix o nuovo interruttore automatico primaverile, avvolgendo in opportune astrazioni i luoghi in cui i servizi esterni sono chiamati in causa. Inoltre, diventa possibile scegliere una strategia per paratia modello, che può anche essere utile. Ad esempio, in spring-cloud-netflix potrebbe trattarsi di un pool di thread o di un semaforo.

conclusione

Di conseguenza, disponiamo di un'applicazione separata che ci consente di ripetere l'elaborazione dei messaggi se un sistema esterno è temporaneamente non disponibile.

Uno dei principali vantaggi dell'applicazione è che può essere utilizzata da sistemi esterni in esecuzione sullo stesso cluster Kafka, senza modifiche significative da parte loro! Tale applicazione dovrà solo accedere all'argomento del nuovo tentativo, compilare alcune intestazioni Kafka e inviare un messaggio al Retryer. Non è necessario realizzare alcuna infrastruttura aggiuntiva. E per ridurre il numero di messaggi trasferiti dall'applicazione a Retryer e ritorno, abbiamo individuato le applicazioni con logica lineare e le abbiamo rielaborate attraverso lo stop Consumer.

Fonte: habr.com

Aggiungi un commento