No Kafkas saņemto notikumu atkārtota apstrāde

No Kafkas saņemto notikumu atkārtota apstrāde

Čau, Habr.

Nesen es dalījās savā pieredzē par to, kādus parametrus mēs kā komanda visbiežāk izmantojam, lai Kafka ražotājs un patērētājs tuvotos garantētai piegādei. Šajā rakstā vēlos pastāstīt, kā mēs organizējām no Kafkas saņemtā notikuma atkārtotu apstrādi ārējās sistēmas īslaicīgas nepieejamības rezultātā.

MÅ«sdienu lietojumprogrammas darbojas ļoti sarežģītā vidē. Biznesa loÄ£ika, kas ietÄ«ta modernā tehnoloÄ£iju kaudzē, darbojas Docker attēlā, ko pārvalda orÄ·estrētājs, piemēram, Kubernetes vai OpenShift, un sazinās ar citām lietojumprogrammām vai uzņēmuma risinājumiem, izmantojot fizisko un virtuālo marÅ”rutētāju ķēdi. Šādā vidē vienmēr kaut kas var salÅ«zt, tāpēc notikumu atkārtota apstrāde, ja kāda no ārējām sistēmām nav pieejama, ir svarÄ«ga mÅ«su biznesa procesu sastāvdaļa.

Kā tas bija pirms Kafkas

Projekta sākumā mēs izmantojām IBM MQ asinhronai ziņojumu piegādei. Ja pakalpojuma darbÄ«bas laikā radās kļūda, saņemto ziņojumu var ievietot miruÅ”o burtu rindā (DLQ) turpmākai manuālai parsÄ“Å”anai. DLQ tika izveidots blakus ienākoÅ”ajai rindai, ziņojums tika pārsÅ«tÄ«ts IBM MQ iekÅ”pusē.

Ja kļūda bija Ä«slaicÄ«ga un mēs to varētu noteikt (piemēram, ResourceAccessException HTTP zvanam vai MongoTimeoutException MongoDb pieprasÄ«jumam), stātos spēkā atkārtota mēģinājuma stratēģija. NeatkarÄ«gi no lietojumprogrammas sazaroÅ”anas loÄ£ikas, sākotnējais ziņojums tika pārvietots vai nu uz sistēmas rindu aizkavētai sÅ«tÄ«Å”anai, vai uz atseviŔķu lietojumprogrammu, kas tika izveidota jau sen, lai atkārtoti nosÅ«tÄ«tu ziņojumus. Tas ietver atkārtotas nosÅ«tÄ«Å”anas numuru ziņojuma galvenē, kas ir saistÄ«ts ar aizkaves intervālu vai lietojumprogrammas lÄ«meņa stratēģijas beigām. Ja esam sasnieguÅ”i stratēģijas beigas, bet ārējā sistēma joprojām nav pieejama, ziņojums tiks ievietots DLQ manuālai parsÄ“Å”anai.

Risinājumu meklÄ“Å”ana

MeklÄ“Å”ana internetā, varat atrast tālāk norādÄ«to lēmums. ÄŖsāk sakot, tiek piedāvāts izveidot tēmu katram aizkaves intervālam un malā ieviest Consumer lietojumprogrammas, kas nolasÄ«s ziņojumus ar nepiecieÅ”amo aizkavi.

No Kafkas saņemto notikumu atkārtota apstrāde

Neskatoties uz lielo pozitÄ«vo atsauksmju skaitu, man Ŕķiet, ka tas nav pilnÄ«bā veiksmÄ«gs. Pirmkārt, tāpēc, ka izstrādātājam papildus biznesa prasÄ«bu ievieÅ”anai bÅ«s jāpavada daudz laika, lai ieviestu aprakstÄ«to mehānismu.

Turklāt, ja Kafka klasterÄ« ir iespējota piekļuves kontrole, jums bÅ«s jāpavada zināms laiks, veidojot tēmas un nodroÅ”inot tām nepiecieÅ”amo piekļuvi. Papildus tam jums bÅ«s jāatlasa pareizais retention.ms parametrs katrai atkārtotā mēģinājuma tēmai, lai ziņojumiem bÅ«tu laiks atkārtoti nosÅ«tÄ«t un tie nepazustu. IevieÅ”ana un piekļuves pieprasÄ«jums bÅ«s jāatkārto katram esoÅ”ajam vai jaunajam pakalpojumam.

Tagad paskatÄ«simies, kādus mehānismus atspere kopumā un jo Ä«paÅ”i atspere-kafka mums nodroÅ”ina ziņojumu pārstrādei. Spring-kafka ir pārejoÅ”a atkarÄ«ba no atsperes atkārtoÅ”anas, kas nodroÅ”ina abstrakcijas dažādu BackOffPolicies pārvaldÄ«bai. Å is ir diezgan elastÄ«gs rÄ«ks, taču tā bÅ«tisks trÅ«kums ir ziņojumu glabāŔana atkārtotai nosÅ«tÄ«Å”anai lietojumprogrammas atmiņā. Tas nozÄ«mē, ka, restartējot lietojumprogrammu atjaunināŔanas vai darbÄ«bas kļūdas dēļ, tiks zaudēti visi ziņojumi, kas gaida atkārtotu apstrādi. Tā kā Å”is punkts ir ļoti svarÄ«gs mÅ«su sistēmai, mēs to sÄ«kāk neapskatÄ«jām.

Spring-kafka pati nodroÅ”ina, piemēram, vairākas ContainerAwareErrorHandler implementācijas SeekToCurrentErrorHandler, ar kuru varat vēlāk apstrādāt ziņojumu, kļūdas gadÄ«jumā nepārslēdzot nobÄ«di. Sākot ar Spring-kafka 2.3 versiju, kļuva iespējams iestatÄ«t BackOffPolicy.

Å Ä« pieeja ļauj atkārtoti apstrādātajiem ziņojumiem izturēt lietojumprogrammu restartÄ“Å”anu, taču joprojām nav DLQ mehānisma. Å o opciju izvēlējāmies 2019. gada sākumā, optimistiski uzskatot, ka DLQ nebÅ«s vajadzÄ«gs (mums paveicās un pēc vairāku mēneÅ”u ilgas aplikācijas darbÄ«bas ar Ŕādu pārstrādes sistēmu tas faktiski vairs nebija vajadzÄ«gs). Pagaidu kļūdu dēļ tika aktivizēts SeekToCurrentErrorHandler. AtlikuŔās kļūdas tika izdrukātas žurnālā, kā rezultātā tika veikta nobÄ«de, un apstrāde tika turpināta ar nākamo ziņojumu.

Gala lēmums

IevieÅ”ana, kuras pamatā ir SeekToCurrentErrorHandler, pamudināja mÅ«s izstrādāt savu mehānismu ziņojumu atkārtotai sÅ«tÄ«Å”anai.

Pirmkārt, vēlējāmies izmantot esoÅ”o pieredzi un paplaÅ”ināt to atkarÄ«bā no pielietojuma loÄ£ikas. Lineārās loÄ£ikas lietojumprogrammai bÅ«tu optimāli pārtraukt jaunu ziņojumu lasÄ«Å”anu uz Ä«su laika periodu, ko nosaka atkārtoÅ”anas stratēģija. Citām lietojumprogrammām es gribēju, lai bÅ«tu viens punkts, kas Ä«stenotu atkārtoÅ”anas stratēģiju. Turklāt Å”im vienam punktam ir jābÅ«t DLQ funkcionalitātei abām pieejām.

Pati atkārtotā mēģinājuma stratēģija ir jāsaglabā lietojumprogrammā, kas ir atbildÄ«ga par nākamā intervāla izgÅ«Å”anu, ja rodas Ä«slaicÄ«ga kļūda.

Lineārās loÄ£ikas lietojumprogrammas patērētāja apturÄ“Å”ana

Strādājot ar spring-kafka, patērētāja apturÄ“Å”anas kods var izskatÄ«ties apmēram Ŕādi:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

Piemērā retryAt ir laiks, lai restartētu MessageListenerContainer, ja tas joprojām darbojas. Atkārtota palaiÅ”ana notiks atseviŔķā pavedienā, kas palaists TaskScheduler, kura ievieÅ”anu nodroÅ”ina arÄ« pavasaris.

Mēs atrodam retryAt vērtÄ«bu Ŕādā veidā:

  1. Tiek meklēta atkārtotu zvanu skaitītāja vērtība.
  2. Pamatojoties uz skaitÄ«tāja vērtÄ«bu, tiek meklēts paÅ”reizējais aizkaves intervāls atkārtotā mēģinājuma stratēģijā. Stratēģija ir deklarēta paŔā lietojumprogrammā; mēs izvēlējāmies JSON formātu, lai to saglabātu.
  3. JSON masÄ«vā atrastais intervāls satur sekunžu skaitu, pēc kura apstrāde bÅ«s jāatkārto. Å is sekunžu skaits tiek pievienots paÅ”reizējam laikam, lai izveidotu vērtÄ«bu retryAt.
  4. Ja intervāls netiek atrasts, retryAt vērtÄ«ba ir nulle un ziņojums tiks nosÅ«tÄ«ts uz DLQ manuālai parsÄ“Å”anai.

Izmantojot Å”o pieeju, atliek tikai saglabāt atkārtoto zvanu skaitu katram ziņojumam, kas paÅ”laik tiek apstrādāts, piemēram, lietojumprogrammas atmiņā. Atkārtoto mēģinājumu skaita saglabāŔana atmiņā Å”ai pieejai nav bÅ«tiska, jo lineārās loÄ£ikas lietojumprogramma nevar apstrādāt apstrādi kopumā. AtŔķirÄ«bā no pavasara atkārtotas mēģinājuma, lietojumprogrammas restartÄ“Å”ana neizraisÄ«s visu pazaudēto ziņojumu atkārtotu apstrādi, bet vienkārÅ”i restartēs stratēģiju.

Å Ä« pieeja palÄ«dz noņemt slodzi no ārējās sistēmas, kas var nebÅ«t pieejama ļoti lielas slodzes dēļ. Citiem vārdiem sakot, papildus atkārtotai apstrādei mēs panācām modeļa ievieÅ”anu ķēdes pārtraucējs.

MÅ«su gadÄ«jumā kļūdu slieksnis ir tikai 1, un, lai samazinātu sistēmas dÄ«kstāvi Ä«slaicÄ«gu tÄ«kla pārtraukumu dēļ, mēs izmantojam ļoti detalizētu atkārtoÅ”anas stratēģiju ar maziem latentuma intervāliem. Tas var nebÅ«t piemērots visām grupu lietojumprogrammām, tāpēc attiecÄ«bas starp kļūdu slieksni un intervāla vērtÄ«bu ir jāizvēlas, pamatojoties uz sistēmas Ä«paŔībām.

AtseviŔķa lietojumprogramma ziņojumu apstrādei no lietojumprogrammām ar nedeterministisku loÄ£iku

Å eit ir koda piemērs, kas nosÅ«ta ziņojumu Ŕādai lietojumprogrammai (atkārtotajam mēģinājumam), kas tiks atkārtoti nosÅ«tÄ«ts uz tēmu DESTINATION, kad bÅ«s sasniegts RETRY_AT laiks:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

Piemērā redzams, ka liela daļa informācijas tiek pārsÅ«tÄ«ta galvenēs. RETRY_AT vērtÄ«ba tiek atrasta tādā paŔā veidā kā atkārtota mēģinājuma mehānismam, izmantojot patērētāju pieturu. Papildus DESTINATION un RETRY_AT mēs izbraucam:

  • GROUP_ID, ar kuru mēs grupējam ziņojumus manuālai analÄ«zei un vienkārÅ”otai meklÄ“Å”anai.
  • ORIGINAL_PARTITION, lai mēģinātu saglabāt to paÅ”u patērētāju atkārtotai apstrādei. Å is parametrs var bÅ«t nulle, un tādā gadÄ«jumā jaunais nodalÄ«jums tiks iegÅ«ts, izmantojot sākotnējā ziņojuma atslēgu record.key().
  • Atjaunināta COUNTER vērtÄ«ba, lai ievērotu atkārtotā mēģinājuma stratēģiju.
  • SEND_TO ir konstante, kas norāda, vai ziņojums tiek nosÅ«tÄ«ts atkārtotai apstrādei, sasniedzot RETRY_AT, vai ievietots DLQ.
  • REASON ā€” iemesls, kāpēc ziņojumu apstrāde tika pārtraukta.

Atkārtoti mēģināt saglabāt ziņojumus atkārtotai nosÅ«tÄ«Å”anai un manuālai parsÄ“Å”anai programmā PostgreSQL. Taimeris sāk uzdevumu, kas atrod ziņojumus ar RETRY_AT un nosÅ«ta tos atpakaļ uz tēmas MĒRĶA nodalÄ«jumu ORIGINAL_PARTITION ar atslēgu record.key().

Pēc nosÅ«tÄ«Å”anas ziņojumi tiek dzēsti no PostgreSQL. Ziņojumu manuāla parsÄ“Å”ana notiek vienkārŔā lietotāja saskarnē, kas mijiedarbojas ar atkārtotāju, izmantojot REST API. Tās galvenās funkcijas ir ziņojumu atkārtota nosÅ«tÄ«Å”ana vai dzÄ“Å”ana no DLQ, kļūdu informācijas skatÄ«Å”ana un ziņojumu meklÄ“Å”ana, piemēram, pēc kļūdas nosaukuma.

Tā kā mÅ«su klasteros ir iespējota piekļuves kontrole, ir papildus jāpieprasa piekļuve tēmai, ko klausās Retryer, un jāļauj atkārtotajam mēģinājumam rakstÄ«t tēmai DESTINATION. Tas ir neērti, taču atŔķirÄ«bā no intervāla tēmu pieejas mums ir pilnvērtÄ«gs DLQ un lietotāja interfeiss, lai to pārvaldÄ«tu.

Ir gadÄ«jumi, kad ienākoÅ”o tēmu lasa vairākas dažādas patērētāju grupas, kuru lietotnēs tiek realizēta atŔķirÄ«ga loÄ£ika. Atkārtoti apstrādājot ziņojumu, izmantojot atkārtotu mēģinājumu vienai no Ŕīm lietojumprogrammām, tiks izveidots dublikāts otrā. Lai aizsargātos pret to, mēs izveidojam atseviŔķu tēmu atkārtotai apstrādei. IenākoŔās un atkārtotās tēmas var lasÄ«t viens un tas pats Patērētājs bez ierobežojumiem.

No Kafkas saņemto notikumu atkārtota apstrāde

Pēc noklusējuma Ŕī pieeja nenodroÅ”ina ķēdes pārtraucēja funkcionalitāti, taču to var pievienot lietojumprogrammai, izmantojot pavasaris-mākonis-netflix vai jauns pavasara mākoņu ķēdes pārtraucējs, iekļaujot atbilstoŔās abstrakcijās vietas, kur tiek izsaukti ārējie pakalpojumi. Turklāt kļūst iespējams izvēlēties stratēģiju starpsienu modelis, kas arÄ« var bÅ«t noderÄ«gs. Piemēram, pavasarÄ«-cloud-netflix tas varētu bÅ«t pavedienu baseins vai semafors.

secinājums

Rezultātā mums ir atseviŔķa lietojumprogramma, kas ļauj atkārtot ziņojumu apstrādi, ja kāda ārēja sistēma Ä«slaicÄ«gi nav pieejama.

Viena no galvenajām lietojumprogrammas priekÅ”rocÄ«bām ir tā, ka to var izmantot ārējās sistēmas, kas darbojas tajā paŔā Kafka klasterÄ«, bez bÅ«tiskām izmaiņām to pusē! Šādai lietojumprogrammai vajadzēs tikai piekļūt atkārtotā mēģinājuma tēmai, aizpildÄ«t dažas Kafka galvenes un nosÅ«tÄ«t ziņojumu atkārtoti mēģinātājam. Nav nepiecieÅ”ams celt papildu infrastruktÅ«ru. Un, lai samazinātu ziņojumu skaitu, kas tiek pārsÅ«tÄ«ti no lietojumprogrammas uz Retryer un atpakaļ, mēs identificējām lietojumprogrammas ar lineāru loÄ£iku un atkārtoti apstrādājām tās, izmantojot patērētāju pieturu.

Avots: www.habr.com

Pievieno komentāru