Genbehandling af begivenheder modtaget fra Kafka

Genbehandling af begivenheder modtaget fra Kafka

Hej Habr.

For nylig I delte sin oplevelse om hvilke parametre vi som team oftest bruger for at Kafka Producer og Consumer kommer tættere på garanteret levering. I denne artikel vil jeg fortælle dig, hvordan vi organiserede genbehandlingen af ​​en begivenhed modtaget fra Kafka som følge af midlertidig utilgængelighed af det eksterne system.

Moderne applikationer fungerer i et meget komplekst miljø. Forretningslogik pakket ind i en moderne teknologistak, der kører i et Docker-image, der administreres af en orkestrator som Kubernetes eller OpenShift, og kommunikerer med andre applikationer eller virksomhedsløsninger gennem en kæde af fysiske og virtuelle routere. I sådan et miljø kan noget altid gå i stykker, så genbehandling af hændelser, hvis et af de eksterne systemer er utilgængeligt, er en vigtig del af vores forretningsprocesser.

Sådan var det før Kafka

Tidligere i projektet brugte vi IBM MQ til asynkron meddelelseslevering. Hvis der opstod en fejl under driften af ​​tjenesten, kunne den modtagne besked placeres i en dødbogstav-kø (DLQ) til yderligere manuel parsing. DLQ'en blev oprettet ved siden af ​​den indgående kø, meddelelsen blev overført inde i IBM MQ.

Hvis fejlen var midlertidig, og vi kunne bestemme den (for eksempel en ResourceAccessException på et HTTP-kald eller en MongoTimeoutException på en MongoDb-anmodning), så ville genforsøgsstrategien træde i kraft. Uanset applikationens forgreningslogik blev den oprindelige meddelelse flyttet enten til systemkøen for forsinket afsendelse eller til en separat applikation, der blev lavet for længe siden til at sende meddelelser igen. Dette inkluderer et gensend-nummer i meddelelseshovedet, som er knyttet til forsinkelsesintervallet eller slutningen af ​​strategien på applikationsniveau. Hvis vi er nået til slutningen af ​​strategien, men det eksterne system stadig ikke er tilgængeligt, vil meddelelsen blive placeret i DLQ'en til manuel parsing.

Løsningssøgning

Søgning på internettet, kan du finde følgende beslutning. Kort sagt foreslås det at oprette et emne for hvert forsinkelsesinterval og implementere forbrugerapplikationer på siden, som vil læse beskeder med den nødvendige forsinkelse.

Genbehandling af begivenheder modtaget fra Kafka

På trods af det store antal positive anmeldelser forekommer det mig ikke at være helt vellykket. Først og fremmest fordi udvikleren, ud over at implementere forretningskrav, skal bruge meget tid på at implementere den beskrevne mekanisme.

Derudover, hvis adgangskontrol er aktiveret på Kafka-klyngen, skal du bruge lidt tid på at oprette emner og give den nødvendige adgang til dem. Ud over dette skal du vælge den korrekte retention.ms-parameter for hvert af genforsøgsemnerne, så meddelelser har tid til at blive sendt igen og ikke forsvinder fra dem. Implementeringen og anmodningen om adgang skal gentages for hver eksisterende eller ny tjeneste.

Lad os nu se, hvilke mekanismer spring generelt og spring-kafka i særdeleshed giver os til genbehandling af beskeder. Spring-kafka har en transitiv afhængighed af spring-genry, som giver abstraktioner til styring af forskellige BackOffPolicies. Dette er et ret fleksibelt værktøj, men dets væsentlige ulempe er at gemme beskeder til genafsendelse i applikationshukommelsen. Det betyder, at genstart af applikationen på grund af en opdatering eller en driftsfejl vil resultere i tab af alle meddelelser, der afventer genbehandling. Da dette punkt er kritisk for vores system, har vi ikke overvejet det yderligere.

spring-kafka selv leverer flere implementeringer af ContainerAwareErrorHandler, for eksempel SeekToCurrentErrorHandler, hvormed du kan behandle beskeden senere uden at flytte offset i tilfælde af en fejl. Fra og med versionen af ​​spring-kafka 2.3 blev det muligt at indstille BackOffPolicy.

Denne tilgang tillader genbehandlede meddelelser at overleve genstart af applikationer, men der er stadig ingen DLQ-mekanisme. Vi valgte denne mulighed i begyndelsen af ​​2019, optimistisk i den tro, at DLQ ikke ville være nødvendig (vi var heldige og havde faktisk ikke brug for det efter flere måneders drift af applikationen med sådan et oparbejdningssystem). Midlertidige fejl fik SeekToCurrentErrorHandler til at udløses. De resterende fejl blev udskrevet i loggen, hvilket resulterede i en offset, og behandlingen fortsatte med den næste meddelelse.

Endelige beslutning

Implementeringen baseret på SeekToCurrentErrorHandler fik os til at udvikle vores egen mekanisme til at gensende meddelelser.

Først og fremmest ønskede vi at bruge den eksisterende oplevelse og udvide den afhængigt af applikationslogikken. For en lineær logikapplikation ville det være optimalt at stoppe med at læse nye meddelelser i en kort periode specificeret af genforsøgsstrategien. For andre applikationer ønskede jeg at have et enkelt punkt, der ville håndhæve genforsøgsstrategien. Derudover skal dette enkelte punkt have DLQ-funktionalitet for begge tilgange.

Selve genforsøgsstrategien skal gemmes i applikationen, som er ansvarlig for at hente det næste interval, når der opstår en midlertidig fejl.

Stopning af forbrugeren for en lineær logisk applikation

Når du arbejder med spring-kafka, kan koden til at stoppe forbrugeren se sådan ud:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

I eksemplet er retryAt tiden til at genstarte MessageListenerContainer, hvis den stadig kører. Genlanceringen vil ske i en separat tråd lanceret i TaskScheduler, hvis implementering også leveres af foråret.

Vi finder retryAt-værdien på følgende måde:

  1. Værdien af ​​genkaldstælleren slås op.
  2. Baseret på tællerværdien søges det aktuelle forsinkelsesinterval i genforsøgsstrategien. Strategien er deklareret i selve applikationen; vi valgte JSON-formatet til at gemme den.
  3. Intervallet fundet i JSON-arrayet indeholder antallet af sekunder, hvorefter behandlingen skal gentages. Dette antal sekunder lægges til det aktuelle klokkeslæt for at danne værdien for retryAt.
  4. Hvis intervallet ikke findes, er værdien af ​​retryAt null, og meddelelsen vil blive sendt til DLQ til manuel parsing.

Med denne fremgangsmåde er der kun tilbage at gemme antallet af gentagne opkald for hver besked, der i øjeblikket behandles, for eksempel i applikationshukommelsen. Det er ikke kritisk for denne tilgang at holde genforsøgstællingen i hukommelsen, da en lineær logikapplikation ikke kan håndtere behandlingen som helhed. I modsætning til forårsforsøg vil genstart af applikationen ikke medføre, at alle meddelelser, der går tabt, bliver behandlet igen, men vil blot genstarte strategien.

Denne tilgang hjælper med at tage belastningen af ​​det eksterne system, som kan være utilgængeligt på grund af en meget tung belastning. Med andre ord, udover oparbejdning opnåede vi implementeringen af ​​mønsteret afbryder.

I vores tilfælde er fejltærsklen kun 1, og for at minimere systemets nedetid på grund af midlertidige netværksudfald, bruger vi en meget granulær genforsøgsstrategi med små latensintervaller. Dette er muligvis ikke egnet til alle gruppeapplikationer, så forholdet mellem fejltærsklen og intervalværdien skal vælges baseret på systemets karakteristika.

En separat applikation til behandling af beskeder fra applikationer med ikke-deterministisk logik

Her er et eksempel på kode, der sender en besked til en sådan applikation (Retryer), som vil gensende til DESTINATION-emnet, når RETRY_AT-tiden er nået:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

Eksemplet viser, at der overføres meget information i headers. Værdien af ​​RETRY_AT findes på samme måde som for genforsøgsmekanismen gennem forbrugerstoppet. Ud over DESTINATION og RETRY_AT passerer vi:

  • GROUP_ID, som vi grupperer beskeder med til manuel analyse og forenklet søgning.
  • ORIGINAL_PARTITION for at forsøge at beholde den samme forbruger til genbehandling. Denne parameter kan være null, i hvilket tilfælde den nye partition vil blive hentet ved hjælp af record.key()-nøglen i den oprindelige meddelelse.
  • Opdateret COUNTER-værdi for at følge genforsøgsstrategien.
  • SEND_TO er en konstant, der angiver, om meddelelsen sendes til genbehandling ved at nå RETRY_AT eller placeres i DLQ.
  • REASON - årsagen til, at meddelelsesbehandlingen blev afbrudt.

Retryer gemmer meddelelser til genafsendelse og manuel parsing i PostgreSQL. En timer starter en opgave, der finder meddelelser med RETRY_AT og sender dem tilbage til ORIGINAL_PARTITION-partitionen i DESTINATION-emnet med nøglen record.key().

Når de er sendt, slettes beskeder fra PostgreSQL. Manuel parsing af meddelelser sker i en simpel brugergrænseflade, der interagerer med Retryer via REST API. Dens hovedfunktioner er genafsendelse eller sletning af meddelelser fra DLQ, visning af fejlinformation og søgning efter meddelelser, for eksempel efter fejlnavn.

Da adgangskontrol er aktiveret på vores klynger, er det nødvendigt yderligere at anmode om adgang til det emne, som Retryer lytter til, og tillade Retryer at skrive til DESTINATION-emnet. Dette er ubelejligt, men i modsætning til interval-emnetilgangen har vi en fuldgyldig DLQ og brugergrænseflade til at administrere det.

Der er tilfælde, hvor et indgående emne læses af flere forskellige forbrugergrupper, hvis applikationer implementerer forskellig logik. Genbehandling af en besked gennem Retryer for en af ​​disse applikationer vil resultere i en duplikat på den anden. For at beskytte mod dette opretter vi et separat emne til genbehandling. De indgående og genforsøgte emner kan læses af den samme forbruger uden nogen begrænsninger.

Genbehandling af begivenheder modtaget fra Kafka

Som standard giver denne tilgang ikke afbryderfunktionalitet, men den kan føjes til applikationen vha spring-cloud-netflix eller ny fjederskyafbryder, indpakning af de steder, hvor eksterne tjenester kaldes ind i passende abstraktioner. Derudover bliver det muligt at vælge en strategi for skot mønster, som også kan være nyttigt. For eksempel, i spring-cloud-netflix kan dette være en trådpulje eller en semafor.

Output

Som et resultat har vi en separat applikation, der giver os mulighed for at gentage meddelelsesbehandlingen, hvis et eksternt system er midlertidigt utilgængeligt.

En af de vigtigste fordele ved applikationen er, at den kan bruges af eksterne systemer, der kører på den samme Kafka-klynge, uden væsentlige ændringer på deres side! En sådan applikation behøver kun at få adgang til genforsøg-emnet, udfylde et par Kafka-overskrifter og sende en besked til Retryer. Der er ingen grund til at rejse yderligere infrastruktur. Og for at reducere antallet af meddelelser, der overføres fra applikationen til Retryer og tilbage, identificerede vi applikationer med lineær logik og genbehandlede dem gennem forbrugerstoppet.

Kilde: www.habr.com

Tilføj en kommentar