Gebeurtenissen opnieuw verwerken ontvangen van Kafka

Gebeurtenissen opnieuw verwerken ontvangen van Kafka

Hé Habr.

Onlangs heb ik deelde zijn ervaring over welke parameters wij als team het meest gebruiken voor Kafka Producent en Consument om dichter bij gegarandeerde levering te komen. In dit artikel wil ik je vertellen hoe we de herverwerking hebben georganiseerd van een gebeurtenis die we van Kafka hebben ontvangen als gevolg van de tijdelijke onbeschikbaarheid van het externe systeem.

Moderne applicaties opereren in een zeer complexe omgeving. Bedrijfslogica verpakt in een moderne technologiestapel, uitgevoerd in een Docker-image beheerd door een orkestrator zoals Kubernetes of OpenShift, en communiceert met andere applicaties of bedrijfsoplossingen via een keten van fysieke en virtuele routers. In een dergelijke omgeving kan er altijd iets kapot gaan, dus het opnieuw verwerken van gebeurtenissen als een van de externe systemen niet beschikbaar is, is een belangrijk onderdeel van onze bedrijfsprocessen.

Hoe het was vóór Kafka

Eerder in het project gebruikten we IBM MQ voor asynchrone berichtbezorging. Als er een fout optreedt tijdens de werking van de service, kan het ontvangen bericht in een dode-letter-wachtrij (DLQ) worden geplaatst voor verdere handmatige parsering. De DLQ werd naast de inkomende wachtrij aangemaakt, het bericht werd binnen IBM MQ overgebracht.

Als de fout tijdelijk was en we deze konden vaststellen (bijvoorbeeld een ResourceAccessException bij een HTTP-aanroep of een MongoTimeoutException bij een MongoDb-verzoek), zou de strategie voor opnieuw proberen van kracht worden. Ongeacht de vertakkingslogica van de applicatie werd het oorspronkelijke bericht verplaatst naar de systeemwachtrij voor vertraagde verzending, of naar een aparte applicatie die lang geleden was gemaakt om berichten opnieuw te verzenden. Dit omvat een nummer voor opnieuw verzenden in de berichtkop, dat is gekoppeld aan het vertragingsinterval of het einde van de strategie op applicatieniveau. Als we het einde van de strategie hebben bereikt, maar het externe systeem nog steeds niet beschikbaar is, wordt het bericht in de DLQ geplaatst voor handmatige parsering.

Oplossing zoeken

Zoeken op internet, kunt u het volgende vinden beslissing. Kortom, er wordt voorgesteld om voor elk vertragingsinterval een onderwerp te maken en daarnaast Consumer-applicaties te implementeren, die berichten met de vereiste vertraging zullen lezen.

Gebeurtenissen opnieuw verwerken ontvangen van Kafka

Ondanks het grote aantal positieve recensies lijkt het mij niet geheel geslaagd. Allereerst omdat de ontwikkelaar, naast het implementeren van zakelijke vereisten, veel tijd zal moeten besteden aan het implementeren van het beschreven mechanisme.

Als toegangscontrole is ingeschakeld op het Kafka-cluster, moet u bovendien enige tijd besteden aan het maken van onderwerpen en het verlenen van de nodige toegang daartoe. Daarnaast moet u voor elk van de onderwerpen voor opnieuw proberen de juiste retention.ms-parameter selecteren, zodat berichten de tijd hebben om opnieuw te worden verzonden en er niet uit verdwijnen. Voor elke bestaande of nieuwe dienst zal de implementatie en het aanvragen van toegang herhaald moeten worden.

Laten we nu eens kijken welke mechanismen Spring in het algemeen en Spring-Kafka in het bijzonder ons bieden voor het opnieuw verwerken van berichten. Spring-kafka heeft een transitieve afhankelijkheid van spring-retry, die abstracties biedt voor het beheren van verschillende BackOffPolicies. Dit is een redelijk flexibel hulpmiddel, maar het grote nadeel is dat berichten in het applicatiegeheugen worden opgeslagen om opnieuw te worden verzonden. Dit betekent dat het herstarten van de applicatie vanwege een update of een operationele fout zal resulteren in het verlies van alle berichten in afwachting van herverwerking. Omdat dit punt van cruciaal belang is voor ons systeem, hebben we er niet verder over nagedacht.

spring-kafka zelf biedt bijvoorbeeld verschillende implementaties van ContainerAwareErrorHandler SeekToCurrentErrorHandler, waarmee u het bericht later kunt verwerken zonder de offset te verschuiven bij een fout. Vanaf versie van spring-kafka 2.3 werd het mogelijk om BackOffPolicy in te stellen.

Deze aanpak maakt het mogelijk dat opnieuw verwerkte berichten het opnieuw opstarten van de applicatie overleven, maar er is nog steeds geen DLQ-mechanisme. We kozen begin 2019 voor deze optie, in de optimistische overtuiging dat DLQ niet nodig zou zijn (we hadden geluk en hadden het eigenlijk niet nodig na enkele maanden de applicatie met een dergelijk herverwerkingssysteem te hebben gebruikt). Tijdelijke fouten zorgden ervoor dat SeekToCurrentErrorHandler werd geactiveerd. De resterende fouten werden in het logboek afgedrukt, wat resulteerde in een verschuiving, en de verwerking werd voortgezet met het volgende bericht.

Laatste beslissing

De implementatie op basis van SeekToCurrentErrorHandler heeft ons ertoe aangezet ons eigen mechanisme voor het opnieuw verzenden van berichten te ontwikkelen.

Allereerst wilden we de bestaande ervaring gebruiken en uitbreiden afhankelijk van de applicatielogica. Voor een lineaire logische toepassing zou het optimaal zijn om te stoppen met het lezen van nieuwe berichten gedurende een korte periode die wordt gespecificeerd door de strategie voor opnieuw proberen. Voor andere toepassingen wilde ik één enkel punt hebben dat de strategie voor opnieuw proberen zou afdwingen. Bovendien moet dit enkele punt voor beide benaderingen DLQ-functionaliteit hebben.

De strategie voor opnieuw proberen zelf moet worden opgeslagen in de applicatie, die verantwoordelijk is voor het ophalen van het volgende interval wanneer er een tijdelijke fout optreedt.

De consument stoppen voor een lineaire logische toepassing

Als je met spring-kafka werkt, kan de code om de consument te stoppen er ongeveer zo uitzien:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

In het voorbeeld is retryAt het moment om de MessageListenerContainer opnieuw te starten als deze nog actief is. De herlancering zal plaatsvinden in een aparte thread die wordt gelanceerd in TaskScheduler, waarvan de implementatie ook in het voorjaar zal plaatsvinden.

We vinden de retryAt-waarde op de volgende manier:

  1. De waarde van de heroproepteller wordt opgezocht.
  2. Op basis van de tellerwaarde wordt gezocht naar het huidige vertragingsinterval in de strategie voor opnieuw proberen. De strategie wordt in de applicatie zelf aangegeven; we hebben het JSON-formaat gekozen om deze op te slaan.
  3. Het interval in de JSON-array bevat het aantal seconden waarna de verwerking moet worden herhaald. Dit aantal seconden wordt opgeteld bij de huidige tijd om de waarde voor retryAt te vormen.
  4. Als het interval niet wordt gevonden, is de waarde van retryAt nul en wordt het bericht naar DLQ verzonden voor handmatige parsering.

Met deze aanpak hoeft u alleen nog maar het aantal herhaalde oproepen op te slaan voor elk bericht dat momenteel wordt verwerkt, bijvoorbeeld in het applicatiegeheugen. Het bijhouden van het aantal nieuwe pogingen in het geheugen is niet van cruciaal belang voor deze aanpak, aangezien een lineaire logische toepassing de verwerking als geheel niet kan verwerken. In tegenstelling tot een nieuwe poging in het voorjaar zal het herstarten van de applicatie er niet voor zorgen dat alle berichten verloren gaan om opnieuw te worden verwerkt, maar zal de strategie eenvoudigweg opnieuw worden gestart.

Deze aanpak helpt het externe systeem te ontlasten, dat mogelijk niet beschikbaar is vanwege een zeer zware belasting. Met andere woorden: naast de herverwerking hebben we de implementatie van het patroon gerealiseerd stroomonderbreker.

In ons geval is de foutdrempel slechts 1, en om de systeemuitval als gevolg van tijdelijke netwerkstoringen tot een minimum te beperken, gebruiken we een zeer gedetailleerde strategie voor opnieuw proberen met kleine latentie-intervallen. Dit is mogelijk niet geschikt voor alle groepstoepassingen, dus de relatie tussen de foutdrempel en de intervalwaarde moet worden geselecteerd op basis van de kenmerken van het systeem.

Een aparte applicatie voor het verwerken van berichten van applicaties met niet-deterministische logica

Hier is een voorbeeld van code die een bericht naar een dergelijke toepassing (Retryer) verzendt, die opnieuw naar het DESTINATION-onderwerp wordt verzonden wanneer de RETRY_AT-tijd is bereikt:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

Uit het voorbeeld blijkt dat veel informatie in headers wordt verzonden. De waarde van RETRY_AT wordt op dezelfde manier gevonden als voor het mechanisme voor opnieuw proberen via de Consumentenstop. Naast DESTINATION en RETRY_AT passeren we:

  • GROUP_ID, waarmee we berichten groeperen voor handmatige analyse en vereenvoudigd zoeken.
  • ORIGINAL_PARTITION om te proberen dezelfde consument te behouden voor herverwerking. Deze parameter kan nul zijn, in welk geval de nieuwe partitie wordt verkregen met behulp van de record.key() sleutel van het originele bericht.
  • Bijgewerkte COUNTER-waarde om de strategie voor opnieuw proberen te volgen.
  • SEND_TO is een constante die aangeeft of het bericht wordt verzonden voor herverwerking bij het bereiken van RETRY_AT of in DLQ wordt geplaatst.
  • REDEN - de reden waarom de berichtverwerking werd onderbroken.

Retryer slaat berichten op voor opnieuw verzenden en handmatig parseren in PostgreSQL. Een timer start een taak die berichten met RETRY_AT vindt en deze terugstuurt naar de ORIGINAL_PARTITION partitie van het DESTINATION onderwerp met de sleutel record.key().

Eenmaal verzonden, worden berichten verwijderd uit PostgreSQL. Het handmatig parseren van berichten vindt plaats in een eenvoudige gebruikersinterface die via REST API samenwerkt met Retryer. De belangrijkste functies zijn het opnieuw verzenden of verwijderen van berichten uit DLQ, het bekijken van foutinformatie en het zoeken naar berichten, bijvoorbeeld op foutnaam.

Omdat toegangscontrole is ingeschakeld op onze clusters, is het noodzakelijk om bovendien toegang te vragen tot het onderwerp waar Retryer naar luistert, en Retryer toe te staan ​​naar het DESTINATION-onderwerp te schrijven. Dit is lastig, maar in tegenstelling tot de intervalonderwerpbenadering hebben we een volwaardige DLQ en gebruikersinterface om dit te beheren.

Er zijn gevallen waarin een binnenkomend onderwerp wordt gelezen door verschillende consumentengroepen, wier toepassingen verschillende logica implementeren. Het opnieuw verwerken van een bericht via Retryer voor een van deze toepassingen zal resulteren in een duplicaat voor de andere. Om hiertegen te beschermen, maken we een apart onderwerp voor herverwerking. De inkomende en nieuwe onderwerpen kunnen zonder enige beperking door dezelfde Consument worden gelezen.

Gebeurtenissen opnieuw verwerken ontvangen van Kafka

Deze aanpak biedt standaard geen functionaliteit voor stroomonderbrekers, maar kan wel aan de toepassing worden toegevoegd met behulp van lente-cloud-netflix of nieuw lentewolk stroomonderbreker, waarbij de plaatsen waar externe diensten worden opgeroepen in passende abstracties worden verpakt. Daarnaast wordt het mogelijk om daar een strategie voor te kiezen waterdicht schot patroon, wat ook handig kan zijn. In spring-cloud-netflix kan dit bijvoorbeeld een threadpool of een semafoor zijn.

Uitgang

Hierdoor hebben we een aparte applicatie waarmee we de berichtverwerking kunnen herhalen als een extern systeem tijdelijk niet beschikbaar is.

Een van de belangrijkste voordelen van de applicatie is dat deze kan worden gebruikt door externe systemen die op hetzelfde Kafka-cluster draaien, zonder noemenswaardige wijzigingen aan hun kant! Een dergelijke toepassing hoeft alleen maar toegang te krijgen tot het opnieuw proberen-onderwerp, een paar Kafka-headers in te vullen en een bericht naar de Retryer te sturen. Het is niet nodig om extra infrastructuur aan te leggen. En om het aantal berichten dat van de applicatie naar Retryer en terug wordt overgedragen te verminderen, hebben we applicaties met lineaire logica geïdentificeerd en deze opnieuw verwerkt via de Consumer-stop.

Bron: www.habr.com

Voeg een reactie