Bearbeiding av hendelser mottatt fra Kafka

Bearbeiding av hendelser mottatt fra Kafka

Hei Habr.

Nylig jeg delte sin erfaring om hvilke parametere vi som team oftest bruker for at Kafka Produsent og Consumer skal komme nærmere garantert levering. I denne artikkelen vil jeg fortelle deg hvordan vi organiserte re-prosesseringen av en hendelse mottatt fra Kafka som et resultat av midlertidig utilgjengelighet av det eksterne systemet.

Moderne applikasjoner opererer i et svært komplekst miljø. Forretningslogikk pakket inn i en moderne teknologistabel, kjører i et Docker-bilde administrert av en orkestrator som Kubernetes eller OpenShift, og kommuniserer med andre applikasjoner eller bedriftsløsninger gjennom en kjede av fysiske og virtuelle rutere. I et slikt miljø kan alltid noe gå i stykker, så reprosessering av hendelser hvis et av de eksterne systemene er utilgjengelig er en viktig del av forretningsprosessene våre.

Slik var det før Kafka

Tidligere i prosjektet brukte vi IBM MQ for asynkron meldingslevering. Hvis det oppstod en feil under driften av tjenesten, kan den mottatte meldingen plasseres i en dødbokstavkø (DLQ) for videre manuell analyse. DLQ ble opprettet ved siden av den innkommende køen, meldingen ble overført i IBM MQ.

Hvis feilen var midlertidig og vi kunne fastslå den (for eksempel et ResourceAccessException på et HTTP-kall eller et MongoTimeoutException på en MongoDb-forespørsel), vil forsøksstrategien tre i kraft. Uavhengig av forgreningslogikken til applikasjonen, ble den opprinnelige meldingen flyttet enten til systemkøen for forsinket sending, eller til en egen applikasjon som ble laget for lenge siden for å sende meldinger på nytt. Dette inkluderer et gjensendingsnummer i meldingshodet, som er knyttet til forsinkelsesintervallet eller slutten av strategien på applikasjonsnivå. Hvis vi har nådd slutten av strategien, men det eksterne systemet fortsatt er utilgjengelig, vil meldingen bli plassert i DLQ for manuell parsing.

Løsningssøk

Søker på Internett, kan du finne følgende avgjørelse. Kort fortalt foreslås det å opprette et emne for hvert forsinkelsesintervall og implementere forbrukerapplikasjoner på siden, som vil lese meldinger med den nødvendige forsinkelsen.

Bearbeiding av hendelser mottatt fra Kafka

Til tross for det store antallet positive anmeldelser, virker det for meg ikke helt vellykket. Først av alt, fordi utvikleren, i tillegg til å implementere forretningskrav, må bruke mye tid på å implementere den beskrevne mekanismen.

I tillegg, hvis tilgangskontroll er aktivert på Kafka-klyngen, må du bruke litt tid på å lage emner og gi nødvendig tilgang til dem. I tillegg til dette må du velge riktig retention.ms-parameter for hvert av emnene på nytt, slik at meldinger har tid til å sendes på nytt og ikke forsvinner fra den. Implementeringen og forespørselen om tilgang vil måtte gjentas for hver eksisterende eller ny tjeneste.

La oss nå se hvilke mekanismer spring generelt og spring-kafka spesielt gir oss for meldingsbehandling. Spring-kafka har en transitiv avhengighet av spring-retry, som gir abstraksjoner for å administrere forskjellige BackOffPolicies. Dette er et ganske fleksibelt verktøy, men dets betydelige ulempe er å lagre meldinger for omsending i applikasjonsminnet. Dette betyr at omstart av applikasjonen på grunn av en oppdatering eller en driftsfeil vil føre til tap av alle meldinger i påvente av rebehandling. Siden dette punktet er kritisk for systemet vårt, har vi ikke vurdert det videre.

spring-kafka selv tilbyr flere implementeringer av ContainerAwareErrorHandler, for eksempel SeekToCurrentErrorHandler, som du kan behandle meldingen med senere uten å skifte offset i tilfelle feil. Fra og med versjon av spring-kafka 2.3 ble det mulig å sette BackOffPolicy.

Denne tilnærmingen lar ombehandlede meldinger overleve omstart av applikasjoner, men det er fortsatt ingen DLQ-mekanisme. Vi valgte dette alternativet i begynnelsen av 2019, optimistisk i troen på at DLQ ikke ville være nødvendig (vi var heldige og trengte det faktisk ikke etter flere måneder med bruk av applikasjonen med et slikt reprosesseringssystem). Midlertidige feil fikk SeekToCurrentErrorHandler til å utløses. De resterende feilene ble skrevet ut i loggen, noe som resulterte i en offset, og behandlingen fortsatte med neste melding.

Siste avgjørelse

Implementeringen basert på SeekToCurrentErrorHandler fikk oss til å utvikle vår egen mekanisme for å sende meldinger på nytt.

Først av alt ønsket vi å bruke den eksisterende opplevelsen og utvide den avhengig av applikasjonslogikken. For en lineær logikkapplikasjon vil det være optimalt å slutte å lese nye meldinger i en kort periode spesifisert av strategien for å prøve på nytt. For andre applikasjoner ønsket jeg å ha et enkelt punkt som ville håndheve strategien for å prøve på nytt. I tillegg må dette enkeltpunktet ha DLQ-funksjonalitet for begge tilnærmingene.

Selve forsøksstrategien må lagres i applikasjonen, som er ansvarlig for å hente neste intervall når det oppstår en midlertidig feil.

Stoppe forbrukeren for en lineær logikkapplikasjon

Når du arbeider med spring-kafka, kan koden for å stoppe forbrukeren se omtrent slik ut:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

I eksemplet er retryAt tiden for å starte MessageListenerContainer på nytt hvis den fortsatt kjører. Relanseringen vil skje i en egen tråd lansert i TaskScheduler, og implementeringen av denne er også levert av våren.

Vi finner retryAt-verdien på følgende måte:

  1. Verdien av gjenoppringingstelleren slås opp.
  2. Basert på tellerverdien søkes det gjeldende forsinkelsesintervallet i nyforsøksstrategien. Strategien er deklarert i selve applikasjonen; vi valgte JSON-formatet for å lagre den.
  3. Intervallet funnet i JSON-matrisen inneholder antall sekunder som behandlingen må gjentas etter. Dette antallet sekunder legges til gjeldende tid for å danne verdien for retryAt.
  4. Hvis intervallet ikke blir funnet, er verdien av retryAt null og meldingen vil bli sendt til DLQ for manuell analyse.

Med denne tilnærmingen gjenstår det bare å lagre antall gjentatte anrop for hver melding som for øyeblikket behandles, for eksempel i applikasjonsminnet. Det er ikke kritisk for denne tilnærmingen å beholde antallet forsøk på nytt i minnet, siden en lineær logikkapplikasjon ikke kan håndtere behandlingen som helhet. I motsetning til vår-forsøk, vil ikke omstart av applikasjonen føre til at alle meldinger går tapt blir behandlet på nytt, men vil ganske enkelt starte strategien på nytt.

Denne tilnærmingen hjelper til med å ta belastningen av det eksterne systemet, som kan være utilgjengelig på grunn av en veldig tung belastning. Med andre ord, i tillegg til reprosessering, oppnådde vi implementeringen av mønsteret effektbryter.

I vårt tilfelle er feilterskelen kun 1, og for å minimere nedetid på systemet på grunn av midlertidige nettverksbrudd, bruker vi en veldig granulær strategi for gjenforsøk med små latensintervaller. Dette er kanskje ikke egnet for alle gruppeapplikasjoner, så forholdet mellom feilterskelen og intervallverdien må velges basert på egenskapene til systemet.

En egen applikasjon for behandling av meldinger fra applikasjoner med ikke-deterministisk logikk

Her er et eksempel på kode som sender en melding til en slik applikasjon (Retryer), som vil sendes på nytt til DESTINASJON-emnet når RETRY_AT-tiden er nådd:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

Eksemplet viser at mye informasjon overføres i overskrifter. Verdien til RETRY_AT finnes på samme måte som for prøvemekanismen på nytt gjennom forbrukerstoppen. I tillegg til DESTINATION og RETRY_AT passerer vi:

  • GROUP_ID, der vi grupperer meldinger for manuell analyse og forenklet søk.
  • ORIGINAL_PARTITION for å prøve å beholde den samme forbrukeren for ny behandling. Denne parameteren kan være null, i så fall vil den nye partisjonen fås ved å bruke record.key()-nøkkelen til den opprinnelige meldingen.
  • Oppdatert COUNTER-verdi for å følge strategien for forsøk på nytt.
  • SEND_TO er en konstant som indikerer om meldingen sendes for reprosessering når den når RETRY_AT eller plasseres i DLQ.
  • REASON - årsaken til at meldingsbehandlingen ble avbrutt.

Retryer lagrer meldinger for resending og manuell analyse i PostgreSQL. En tidtaker starter en oppgave som finner meldinger med RETRY_AT og sender dem tilbake til ORIGINAL_PARTITION-partisjonen til DESTINATION-emnet med nøkkelen record.key().

Når de er sendt, slettes meldinger fra PostgreSQL. Manuell analyse av meldinger skjer i et enkelt brukergrensesnitt som samhandler med Retryer via REST API. Hovedfunksjonene er å sende eller slette meldinger fra DLQ, vise feilinformasjon og søke etter meldinger, for eksempel etter feilnavn.

Siden tilgangskontroll er aktivert på våre klynger, er det nødvendig å i tillegg be om tilgang til emnet som Retryer lytter til, og la Retryer skrive til DESTINASJON-emnet. Dette er upraktisk, men i motsetning til intervalltematilnærmingen har vi en fullverdig DLQ og brukergrensesnitt for å administrere det.

Det er tilfeller når et innkommende emne leses av flere forskjellige forbrukergrupper, hvis applikasjoner implementerer forskjellig logikk. Behandling av en melding gjennom Retryer for ett av disse programmene vil resultere i en duplikat på den andre. For å beskytte oss mot dette oppretter vi et eget emne for re-behandling. Emnene for innkommende og gjentatte forsøk kan leses av samme forbruker uten noen begrensninger.

Bearbeiding av hendelser mottatt fra Kafka

Som standard gir ikke denne tilnærmingen effektbryterfunksjonalitet, men den kan legges til applikasjonen ved hjelp av vår-sky-nettflix eller ny fjærskybryter, pakke inn stedene der eksterne tjenester kalles inn i passende abstraksjoner. I tillegg blir det mulig å velge strategi for skott mønster, som også kan være nyttig. For eksempel, i spring-cloud-netflix kan dette være en trådbasseng eller en semafor.

Utgang

Som et resultat har vi en egen applikasjon som lar oss gjenta meldingsbehandling hvis et eksternt system er midlertidig utilgjengelig.

En av hovedfordelene med applikasjonen er at den kan brukes av eksterne systemer som kjører på samme Kafka-klynge, uten vesentlige modifikasjoner på deres side! En slik applikasjon trenger bare å få tilgang til emnet for å prøve på nytt, fylle ut noen få Kafka-overskrifter og sende en melding til Retryer. Det er ikke nødvendig å bygge opp ytterligere infrastruktur. Og for å redusere antallet meldinger som overføres fra applikasjonen til Retryer og tilbake, identifiserte vi applikasjoner med lineær logikk og behandlet dem på nytt gjennom Consumer Stop.

Kilde: www.habr.com

Legg til en kommentar