Ponovna obdelava dogodkov, prejetih od Kafke

Ponovna obdelava dogodkov, prejetih od Kafke

Hej Habr.

Pred kratkim sem delil svojo izkušnjo o tem, katere parametre kot ekipa najpogosteje uporabljamo za Kafka Producer in Consumer, da se približamo zajamčeni dostavi. V tem članku vam želim povedati, kako smo organizirali ponovno obdelavo dogodka, prejetega od Kafke zaradi začasne nedostopnosti zunanjega sistema.

Sodobne aplikacije delujejo v zelo kompleksnem okolju. Poslovna logika, zavita v sodoben tehnološki sklad, deluje v sliki Docker, ki jo upravlja orkestrator, kot sta Kubernetes ali OpenShift, in komunicira z drugimi aplikacijami ali rešitvami podjetja prek verige fizičnih in virtualnih usmerjevalnikov. V takšnem okolju se vedno lahko kaj pokvari, zato je ponovna obdelava dogodkov, če eden od zunanjih sistemov ni na voljo, pomemben del naših poslovnih procesov.

Kako je bilo pred Kafko

Prej v projektu smo uporabili IBM MQ za asinhrono dostavo sporočil. Če je med delovanjem storitve prišlo do kakršne koli napake, se lahko prejeto sporočilo postavi v čakalno vrsto mrtvih pisem (DLQ) za nadaljnje ročno razčlenjevanje. DLQ je bil ustvarjen poleg dohodne čakalne vrste, sporočilo je bilo preneseno znotraj IBM MQ.

Če bi bila napaka začasna in bi jo lahko ugotovili (na primer ResourceAccessException pri klicu HTTP ali MongoTimeoutException pri zahtevi MongoDb), bi strategija ponovnega poskusa začela veljati. Ne glede na logiko razvejanja aplikacije je bilo prvotno sporočilo premaknjeno bodisi v sistemsko čakalno vrsto za odloženo pošiljanje bodisi v ločeno aplikacijo, ki je bila že zdavnaj narejena za ponovno pošiljanje sporočil. To vključuje številko za ponovno pošiljanje v glavi sporočila, ki je vezana na interval zakasnitve ali konec strategije na ravni aplikacije. Če smo dosegli konec strategije, vendar zunanji sistem še vedno ni na voljo, bo sporočilo postavljeno v DLQ za ročno razčlenjevanje.

Poiščite rešitev

Iskanje po internetu, lahko najdete naslednje Odločitev. Skratka, predlagano je ustvariti temo za vsak interval zakasnitve in ob strani implementirati potrošniške aplikacije, ki bodo brale sporočila z zahtevano zakasnitvijo.

Ponovna obdelava dogodkov, prejetih od Kafke

Kljub velikemu številu pozitivnih ocen se mi zdi ne povsem uspešen. Najprej zato, ker bo moral razvijalec poleg implementacije poslovnih zahtev porabiti veliko časa za implementacijo opisanega mehanizma.

Poleg tega, če je nadzor dostopa omogočen v gruči Kafka, boste morali porabiti nekaj časa za ustvarjanje tem in zagotavljanje potrebnega dostopa do njih. Poleg tega boste morali izbrati pravilen parameter retention.ms za vsako od tem ponovnega poskusa, tako da bodo sporočila imela čas za ponovno pošiljanje in ne bodo izginila iz njih. Implementacijo in zahtevo za dostop bo treba ponoviti za vsako obstoječo ali novo storitev.

Poglejmo zdaj, katere mehanizme nam spring na splošno in še posebej spring-kafka nudita za ponovno obdelavo sporočil. Spring-kafka ima prehodno odvisnost od spring-retry, ki zagotavlja abstrakcije za upravljanje različnih BackOffPolicies. To je dokaj prilagodljivo orodje, vendar je njegova pomembna pomanjkljivost shranjevanje sporočil za ponovno pošiljanje v pomnilnik aplikacije. To pomeni, da bo ponovni zagon aplikacije zaradi posodobitve ali operativne napake povzročil izgubo vseh sporočil, ki čakajo na ponovno obdelavo. Ker je ta točka kritična za naš sistem, je nismo več obravnavali.

spring-kafka sama nudi na primer več izvedb ContainerAwareErrorHandler SeekToCurrentErrorHandler, s katerim lahko kasneje obdelate sporočilo brez premika odmika v primeru napake. Od različice spring-kafka 2.3 je postalo mogoče nastaviti BackOffPolicy.

Ta pristop omogoča, da ponovno obdelana sporočila preživijo ponovni zagon aplikacije, vendar še vedno ni mehanizma DLQ. Za to možnost smo se odločili v začetku leta 2019, optimistično prepričani, da DLQ ne bo potreben (imeli smo srečo in je po večmesečnem delovanju aplikacije s takim sistemom reprocesiranja dejansko nismo potrebovali). Začasne napake so povzročile sprožitev SeekToCurrentErrorHandler. Preostale napake so bile natisnjene v dnevniku, kar je povzročilo zamik, obdelava pa se je nadaljevala z naslednjim sporočilom.

Končna odločitev

Izvedba, ki temelji na SeekToCurrentErrorHandler, nas je spodbudila k razvoju lastnega mehanizma za ponovno pošiljanje sporočil.

Najprej smo želeli uporabiti obstoječe izkušnje in jih razširiti glede na logiko aplikacije. Za linearno logično aplikacijo bi bilo optimalno prenehati brati nova sporočila za kratek čas, ki ga določa strategija ponovnega poskusa. Za druge aplikacije sem želel imeti eno točko, ki bi uveljavila strategijo ponovnega poskusa. Poleg tega mora ta posamezna točka imeti funkcijo DLQ za oba pristopa.

Sama strategija ponovnega poskusa mora biti shranjena v aplikaciji, ki je odgovorna za pridobitev naslednjega intervala, ko pride do začasne napake.

Zaustavitev potrošnika za aplikacijo linearne logike

Pri delu s spring-kafko je lahko koda za zaustavitev potrošnika videti nekako takole:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

V primeru je retryAt čas za ponovni zagon MessageListenerContainer, če se še vedno izvaja. Ponovni zagon se bo zgodil v ločeni niti, zagnani v TaskSchedulerju, katere izvedba je prav tako zagotovljena do pomladi.

Vrednost retryAt najdemo na naslednji način:

  1. Poišče se vrednost števca ponovnega klica.
  2. Na podlagi vrednosti števca se išče trenutni interval zakasnitve v strategiji ponovnega poskusa. Strategija je deklarirana v sami aplikaciji, za shranjevanje smo izbrali format JSON.
  3. Interval, najden v matriki JSON, vsebuje število sekund, po katerih je treba obdelavo ponoviti. To število sekund se doda trenutnemu času, da se oblikuje vrednost za retryAt.
  4. Če interval ni najden, je vrednost retryAt ničelna in sporočilo bo poslano v DLQ za ročno razčlenjevanje.

S tem pristopom ostane le še shranjevanje števila ponovljenih klicev za vsako sporočilo, ki je trenutno v obdelavi, na primer v pomnilnik aplikacije. Ohranjanje števila ponovnih poskusov v pomnilniku ni ključnega pomena za ta pristop, saj aplikacija z linearno logiko ne more obdelati celotne obdelave. Za razliko od pomladnega ponovnega poskusa ponovni zagon aplikacije ne bo povzročil vnovične obdelave izgubljenih vseh sporočil, ampak bo preprosto znova zagnal strategijo.

Ta pristop pomaga razbremeniti zunanji sistem, ki morda ni na voljo zaradi zelo velike obremenitve. Z drugimi besedami, poleg ponovne obdelave smo dosegli implementacijo vzorca varovalka.

V našem primeru je prag napake samo 1 in za zmanjšanje izpadov sistema zaradi začasnih izpadov omrežja uporabljamo zelo natančno strategijo ponovnega poskusa z majhnimi zakasnitvenimi intervali. To morda ni primerno za vse skupinske aplikacije, zato je treba razmerje med pragom napake in vrednostjo intervala izbrati na podlagi značilnosti sistema.

Ločena aplikacija za obdelavo sporočil iz aplikacij z nedeterministično logiko

Tukaj je primer kode, ki pošlje sporočilo taki aplikaciji (Retryer), ki bo ponovno poslano temi DESTINATION, ko je dosežen čas RETRY_AT:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

Primer kaže, da se veliko informacij prenaša v glavah. Vrednost RETRY_AT se najde na enak način kot za mehanizem ponovnega poskusa prek zaustavitve potrošnika. Poleg DESTINATION in RETRY_AT prevozimo:

  • GROUP_ID, po katerem združujemo sporočila za ročno analizo in poenostavljeno iskanje.
  • ORIGINAL_PARTITION, da poskusite obdržati istega potrošnika za ponovno obdelavo. Ta parameter je lahko ničelna vrednost, v tem primeru bo nova particija pridobljena s ključem record.key() izvirnega sporočila.
  • Posodobljena vrednost COUNTER za sledenje strategiji ponovnega poskusa.
  • SEND_TO je konstanta, ki označuje, ali je sporočilo poslano v ponovno obdelavo, ko doseže RETRY_AT ali je postavljeno v DLQ.
  • RAZLOG - razlog, zakaj je bila obdelava sporočila prekinjena.

Retryer shrani sporočila za ponovno pošiljanje in ročno razčlenjevanje v PostgreSQL. Časovnik zažene nalogo, ki najde sporočila z RETRY_AT in jih pošlje nazaj na particijo ORIGINAL_PARTITION teme DESTINATION s ključem record.key().

Ko so sporočila poslana, so izbrisana iz PostgreSQL. Ročno razčlenjevanje sporočil poteka v preprostem uporabniškem vmesniku, ki komunicira z Retryerjem prek REST API-ja. Njegove glavne funkcije so ponovno pošiljanje ali brisanje sporočil iz DLQ, ogled informacij o napakah in iskanje sporočil, na primer po imenu napake.

Ker je na naših gručah omogočen nadzor dostopa, je potrebno dodatno zahtevati dostop do teme, ki jo posluša Retryer, in omogočiti Retryerju pisanje v temo DESTINATION. To je neprijetno, vendar imamo za razliko od pristopa intervalne teme na voljo popoln DLQ in uporabniški vmesnik za upravljanje.

Obstajajo primeri, ko prejeto temo bere več različnih skupin potrošnikov, katerih aplikacije izvajajo različno logiko. Ponovna obdelava sporočila prek Retryerja za eno od teh aplikacij bo povzročila dvojnik v drugi. Za zaščito pred tem ustvarimo ločeno temo za ponovno obdelavo. Dohodne in ponovne teme lahko bere isti potrošnik brez kakršnih koli omejitev.

Ponovna obdelava dogodkov, prejetih od Kafke

Ta pristop privzeto ne zagotavlja funkcije odklopnika, vendar ga je mogoče dodati v aplikacijo z uporabo spring-cloud-netflix ali novo odklopnik spomladanskega oblaka, ovijanje mest, kjer so priklicane zunanje storitve, v ustrezne abstrakcije. Poleg tega je mogoče izbrati strategijo za pregrada vzorec, ki je lahko tudi uporaben. Na primer, v spring-cloud-netflix je to lahko skupina niti ali semafor.

Izhod

Posledično imamo ločeno aplikacijo, ki nam omogoča ponavljanje obdelave sporočil, če je kateri koli zunanji sistem začasno nedosegljiv.

Ena izmed glavnih prednosti aplikacije je, da jo lahko uporabljajo zunanji sistemi, ki tečejo v isti gruči Kafka, brez bistvenih sprememb na njihovi strani! Takšna aplikacija bo morala le dostopati do teme o ponovnem poskusu, izpolniti nekaj glav Kafka in poslati sporočilo Retryerju. Nobena dodatna infrastruktura ni potrebna. In da bi zmanjšali število sporočil, prenesenih iz aplikacije v Retryer in nazaj, smo identificirali aplikacije z linearno logiko in jih ponovno obdelali prek Consumer stop.

Vir: www.habr.com

Dodaj komentar