Reprocessament d'esdeveniments rebuts de Kafka

Reprocessament d'esdeveniments rebuts de Kafka

Hola Habr.

Recentment jo va compartir la seva experiència sobre quins paràmetres, com a equip, utilitzem amb més freqüència perquè el productor i el consumidor de Kafka s'apropin al lliurament garantit. En aquest article vull explicar-vos com hem organitzat el reprocessament d'un esdeveniment rebut de Kafka com a conseqüència de la indisponibilitat temporal del sistema extern.

Les aplicacions modernes funcionen en un entorn molt complex. Lògica empresarial embolicada en una pila de tecnologia moderna, que s'executa en una imatge de Docker gestionada per un orquestrador com Kubernetes o OpenShift, i es comunica amb altres aplicacions o solucions empresarials mitjançant una cadena d'encaminadors físics i virtuals. En aquest entorn, alguna cosa sempre es pot trencar, de manera que el reprocessament d'esdeveniments si un dels sistemes externs no està disponible és una part important dels nostres processos de negoci.

Com era abans de Kafka

Al principi del projecte vam utilitzar IBM MQ per al lliurament de missatges asíncrons. Si es produïa algun error durant l'operació del servei, el missatge rebut es podria col·locar en una cua de cartes inèdites (DLQ) per a una posterior anàlisi manual. El DLQ es va crear al costat de la cua entrant, el missatge es va transferir dins d'IBM MQ.

Si l'error era temporal i podríem determinar-lo (per exemple, una ResourceAccessException en una trucada HTTP o una MongoTimeoutException en una sol·licitud MongoDb), llavors l'estratègia de reintent tindria efecte. Independentment de la lògica de ramificació de l'aplicació, el missatge original es va traslladar a la cua del sistema per a l'enviament retardat o a una aplicació independent que es va fer fa temps per tornar a enviar missatges. Això inclou un número de reenviament a la capçalera del missatge, que està lligat a l'interval de retard o al final de l'estratègia a nivell d'aplicació. Si hem arribat al final de l'estratègia però el sistema extern encara no està disponible, llavors el missatge es col·locarà al DLQ per a l'anàlisi manual.

Trobar una solució

Cercant a Internet, podeu trobar el següent la decisió. En resum, es proposa crear un tema per a cada interval de retard i implementar aplicacions de consum al costat, que llegiran missatges amb el retard necessari.

Reprocessament d'esdeveniments rebuts de Kafka

Tot i la gran quantitat de crítiques positives, em sembla que no és del tot exitosa. En primer lloc, perquè el desenvolupador, a més d'implementar els requisits empresarials, haurà de dedicar molt de temps a implementar el mecanisme descrit.

A més, si el control d'accés està habilitat al clúster Kafka, haureu de dedicar una estona a crear temes i proporcionar-hi l'accés necessari. A més d'això, haureu de seleccionar el paràmetre retention.ms correcte per a cadascun dels temes de reintent perquè els missatges tinguin temps de tornar-se a enviar i no desapareixin d'ell. La implantació i sol·licitud d'accés s'haurà de repetir per a cada servei existent o nou.

Vegem ara quins mecanismes ens proporcionen spring en general i spring-kafka en particular per al reprocessament de missatges. Spring-kafka té una dependència transitiva de spring-retry, que proporciona abstraccions per gestionar diferents BackOffPolicies. Aquesta és una eina bastant flexible, però el seu inconvenient important és emmagatzemar missatges per reenviar-los a la memòria de l'aplicació. Això vol dir que reiniciar l'aplicació a causa d'una actualització o d'un error operatiu provocarà la pèrdua de tots els missatges pendents de reprocessament. Com que aquest punt és crític per al nostre sistema, no ho hem considerat més.

Spring-kafka proporciona diverses implementacions de ContainerAwareErrorHandler, per exemple SeekToCurrentErrorHandler, amb el qual podeu processar el missatge més tard sense desplaçar el desplaçament en cas d'error. A partir de la versió de spring-kafka 2.3, es va fer possible configurar BackOffPolicy.

Aquest enfocament permet que els missatges reprocessats sobrevisquin als reinicis de l'aplicació, però encara no hi ha cap mecanisme DLQ. Vam triar aquesta opció a principis de 2019, creient amb optimisme que DLQ no seria necessari (vam tenir sort i, de fet, no en vam necessitar després de diversos mesos d'operar l'aplicació amb aquest sistema de reprocessament). Errors temporals van provocar que SeekToCurrentErrorHandler s'activés. Els errors restants es van imprimir al registre, donant lloc a un desplaçament i el processament va continuar amb el missatge següent.

Decisió final

La implementació basada en SeekToCurrentErrorHandler ens va impulsar a desenvolupar el nostre propi mecanisme per reenviar missatges.

En primer lloc, hem volgut utilitzar l'experiència existent i ampliar-la en funció de la lògica de l'aplicació. Per a una aplicació de lògica lineal, seria òptim deixar de llegir missatges nous durant un període curt de temps especificat per l'estratègia de reintent. Per a altres aplicacions, volia tenir un únic punt que fes complir l'estratègia de reintent. A més, aquest únic punt ha de tenir la funcionalitat DLQ per a tots dos enfocaments.

La pròpia estratègia de reintent s'ha d'emmagatzemar a l'aplicació, que s'encarrega de recuperar el següent interval quan es produeix un error temporal.

Aturar el consumidor per a una aplicació de lògica lineal

Quan es treballa amb spring-kafka, el codi per aturar el consumidor podria semblar a això:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

A l'exemple, retryAt és el moment de reiniciar el MessageListenerContainer si encara s'està executant. El rellançament es produirà en un fil separat llançat a TaskScheduler, la implementació del qual també es proporciona a la primavera.

Trobem el valor retryAt de la manera següent:

  1. Es consulta el valor del comptador de recuperacions.
  2. En funció del valor del comptador, es cerca l'interval de retard actual a l'estratègia de reintent. L'estratègia es declara a la pròpia aplicació; hem escollit el format JSON per emmagatzemar-la.
  3. L'interval que es troba a la matriu JSON conté el nombre de segons després dels quals s'haurà de repetir el processament. Aquest nombre de segons s'afegeix al temps actual per formar el valor de retryAt.
  4. Si no es troba l'interval, aleshores el valor de retryAt és nul i el missatge s'enviarà a DLQ per a l'anàlisi manual.

Amb aquest enfocament, només queda guardar el nombre de trucades repetides per a cada missatge que s'està processant actualment, per exemple a la memòria de l'aplicació. Mantenir el recompte de reintents a la memòria no és crític per a aquest enfocament, ja que una aplicació de lògica lineal no pot gestionar el processament en conjunt. A diferència del spring-retry, el reinici de l'aplicació no farà que es tornin a processar tots els missatges, sinó que simplement reiniciarà l'estratègia.

Aquest enfocament ajuda a treure la càrrega del sistema extern, que pot no estar disponible a causa d'una càrrega molt pesada. És a dir, a més del reprocessament, hem aconseguit la implantació del patró tallacircuits.

En el nostre cas, el llindar d'error és només 1 i per minimitzar el temps d'inactivitat del sistema a causa d'interrupcions temporals de la xarxa, utilitzem una estratègia de reintent molt granular amb intervals de latència petits. Pot ser que això no sigui adequat per a totes les aplicacions de grup, de manera que la relació entre el llindar d'error i el valor de l'interval s'ha de seleccionar en funció de les característiques del sistema.

Una aplicació independent per processar missatges d'aplicacions amb lògica no determinista

Aquí teniu un exemple de codi que envia un missatge a una aplicació d'aquest tipus (Reintent), que es tornarà a enviar al tema DESTINATION quan s'arribi a l'hora RETRY_AT:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

L'exemple mostra que es transmet molta informació a les capçaleres. El valor de RETRY_AT es troba de la mateixa manera que per al mecanisme de reintent mitjançant l'aturada del consumidor. A més de DESTINATION i RETRY_AT passem:

  • GROUP_ID, mitjançant el qual agrupem missatges per a l'anàlisi manual i la cerca simplificada.
  • ORIGINAL_PARTITION per intentar mantenir el mateix consumidor per tornar-lo a processar. Aquest paràmetre pot ser nul, en aquest cas la nova partició s'obtindrà mitjançant la clau record.key() del missatge original.
  • S'ha actualitzat el valor COUNTER per seguir l'estratègia de reintent.
  • SEND_TO és una constant que indica si el missatge s'envia per reprocessar en arribar a RETRY_AT o si es col·loca a DLQ.
  • REASON: el motiu pel qual s'ha interromput el processament del missatge.

Retryer emmagatzema missatges per tornar-los a enviar i analitzar manualment a PostgreSQL. Un temporitzador inicia una tasca que troba missatges amb RETRY_AT i els torna a enviar a la partició ORIGINAL_PARTITION del tema DESTINATION amb la clau record.key().

Un cop enviats, els missatges s'eliminen de PostgreSQL. L'anàlisi manual dels missatges es produeix en una interfície d'usuari senzilla que interactua amb Retryer mitjançant l'API REST. Les seves principals característiques són reenviar o esborrar missatges de DLQ, visualitzar informació d'error i cercar missatges, per exemple pel nom de l'error.

Com que el control d'accés està habilitat als nostres clústers, cal sol·licitar, addicionalment, accés al tema que el Retryer està escoltant i permetre que Retryer escrigui al tema DESTINACIÓ. Això és incòmode, però, a diferència de l'enfocament del tema d'interval, tenim un DLQ i una interfície d'usuari complets per gestionar-ho.

Hi ha casos en què un tema entrant és llegit per diversos grups de consumidors diferents, les aplicacions dels quals implementen una lògica diferent. El reprocessament d'un missatge mitjançant Retryer per a una d'aquestes aplicacions donarà lloc a un duplicat de l'altra. Per protegir-nos d'això, creem un tema separat per tornar-lo a processar. El mateix consumidor pot llegir els temes entrants i reintents sense cap restricció.

Reprocessament d'esdeveniments rebuts de Kafka

Per defecte, aquest enfocament no proporciona la funcionalitat d'interruptor, però es pot afegir a l'aplicació mitjançant Spring-cloud-netflix o nou interruptor de circuit de núvol de primavera, embolicant els llocs on es demanen serveis externs en abstraccions adequades. A més, és possible triar una estratègia per mampara patró, que també pot ser útil. Per exemple, a spring-cloud-netflix pot ser un grup de fils o un semàfor.

Sortida

Com a resultat, tenim una aplicació independent que ens permet repetir el processament de missatges si algun sistema extern no està disponible temporalment.

Un dels principals avantatges de l'aplicació és que pot ser utilitzada per sistemes externs que s'executen al mateix clúster Kafka, sense modificacions significatives per la seva banda! Aquesta aplicació només haurà d'accedir al tema de reintentar, omplir unes quantes capçaleres de Kafka i enviar un missatge al reintent. No cal augmentar cap infraestructura addicional. I per tal de reduir el nombre de missatges transferits de l'aplicació a Retryer i viceversa, vam identificar aplicacions amb lògica lineal i les vam tornar a processar a través de la parada del consumidor.

Font: www.habr.com

Afegeix comentari