Ripërpunimi i ngjarjeve të marra nga Kafka

Ripërpunimi i ngjarjeve të marra nga Kafka

Hej Habr.

Kohët e fundit unë ndau përvojën e tij se cilat parametra ne si ekip përdorim më shpesh për Prodhuesin dhe Konsumatorin e Kafkës për t'iu afruar dorëzimit të garantuar. Në këtë artikull dua t'ju tregoj se si organizuam ripërpunimin e një ngjarjeje të marrë nga Kafka si rezultat i mungesës së përkohshme të sistemit të jashtëm.

Aplikacionet moderne funksionojnë në një mjedis shumë kompleks. Logjika e biznesit e mbështjellë në një pirg teknologjie moderne, që funksionon në një imazh Docker të menaxhuar nga një orkestrues si Kubernetes ose OpenShift, dhe duke komunikuar me aplikacione të tjera ose zgjidhje ndërmarrjesh përmes një zinxhiri ruterash fizikë dhe virtualë. Në një mjedis të tillë, diçka mund të prishet gjithmonë, kështu që ripërpunimi i ngjarjeve nëse një nga sistemet e jashtme nuk është i disponueshëm është një pjesë e rëndësishme e proceseve tona të biznesit.

Si ishte përpara Kafkës

Më parë në projekt ne përdorëm IBM MQ për dërgimin e mesazheve asinkrone. Nëse ka ndodhur ndonjë gabim gjatë funksionimit të shërbimit, mesazhi i marrë mund të vendoset në një radhë të shkronjave të vdekura (DLQ) për analizë të mëtejshme manuale. DLQ u krijua pranë radhës në hyrje, mesazhi u transferua brenda IBM MQ.

Nëse gabimi ishte i përkohshëm dhe ne mund ta përcaktonim atë (për shembull, një ResourceAccessException në një telefonatë HTTP ose një MongoTimeoutException në një kërkesë MongoDb), atëherë strategjia e riprovës do të hynte në fuqi. Pavarësisht nga logjika e degëzimit të aplikacionit, mesazhi origjinal u zhvendos ose në radhën e sistemit për dërgim të vonuar, ose në një aplikacion të veçantë që ishte bërë shumë kohë më parë për të ridërguar mesazhe. Kjo përfshin një numër të ridërgimit në kokën e mesazhit, i cili është i lidhur me intervalin e vonesës ose fundin e strategjisë së nivelit të aplikacionit. Nëse kemi arritur në fund të strategjisë, por sistemi i jashtëm është ende i padisponueshëm, atëherë mesazhi do të vendoset në DLQ për analizë manuale.

Kërko për një zgjidhje

Duke kërkuar në internet, mund të gjeni sa më poshtë zgjidhje. Me pak fjalë, propozohet të krijohet një temë për çdo interval vonese dhe të zbatohen aplikacionet e konsumatorit në anë, të cilat do të lexojnë mesazhet me vonesën e kërkuar.

Ripërpunimi i ngjarjeve të marra nga Kafka

Megjithë numrin e madh të vlerësimeve pozitive, më duket jo plotësisht i suksesshëm. Para së gjithash, sepse zhvilluesi, përveç zbatimit të kërkesave të biznesit, do të duhet të shpenzojë shumë kohë për zbatimin e mekanizmit të përshkruar.

Përveç kësaj, nëse kontrolli i aksesit është i aktivizuar në grupin Kafka, do t'ju duhet të kaloni pak kohë duke krijuar tema dhe duke ofruar aksesin e nevojshëm në to. Përveç kësaj, do t'ju duhet të zgjidhni parametrin e duhur retention.ms për secilën nga temat e riprovës, në mënyrë që mesazhet të kenë kohë për t'u dërguar dhe të mos zhduken prej tij. Zbatimi dhe kërkesa për akses do të duhet të përsëritet për çdo shërbim ekzistues ose të ri.

Le të shohim tani se çfarë mekanizmash na ofron Spring në përgjithësi dhe Spring-kafka në veçanti për ripërpunimin e mesazhit. Spring-kafka ka një varësi kalimtare nga riprovimi i pranverës, i cili ofron abstraksione për menaxhimin e politikave të ndryshme BackOff. Ky është një mjet mjaft fleksibël, por pengesa e tij e rëndësishme është ruajtja e mesazheve për ridërgim në kujtesën e aplikacionit. Kjo do të thotë që rinisja e aplikacionit për shkak të një përditësimi ose një gabimi operacional do të rezultojë në humbjen e të gjitha mesazheve në pritje të ripërpunimit. Meqenëse kjo pikë është kritike për sistemin tonë, ne nuk e morëm parasysh më tej.

Spring-kafka vetë ofron disa implementime të ContainerAwareErrorHandler, për shembull SeekToCurrentError Handler, me të cilin mund të përpunoni mesazhin më vonë pa zhvendosur offset në rast gabimi. Duke filluar me versionin e Spring-kafka 2.3, u bë e mundur vendosja e BackOffPolicy.

Kjo qasje lejon që mesazhet e ripërpunuara t'i mbijetojnë rinisjes së aplikacionit, por ende nuk ka asnjë mekanizëm DLQ. Ne zgjodhëm këtë opsion në fillim të vitit 2019, duke besuar me optimizëm se DLQ nuk do të nevojitej (ishim me fat dhe në fakt nuk na duhej pas disa muajsh funksionim të aplikacionit me një sistem të tillë ripërpunimi). Gabimet e përkohshme shkaktuan ndezjen e SeekToCurrentErrorHandler. Gabimet e mbetura u printuan në regjistër, duke rezultuar në një kompensim dhe përpunimi vazhdoi me mesazhin tjetër.

Vendimi përfundimtar

Zbatimi i bazuar në SeekToCurrentErrorHandler na nxiti të zhvillojmë mekanizmin tonë për ridërgimin e mesazheve.

Para së gjithash, ne donim të përdorim përvojën ekzistuese dhe ta zgjerojmë atë në varësi të logjikës së aplikacionit. Për një aplikim logjik linear, do të ishte optimale të ndaloni leximin e mesazheve të reja për një periudhë të shkurtër kohe të specifikuar nga strategjia e riprovës. Për aplikacionet e tjera, doja të kisha një pikë të vetme që do të zbatonte strategjinë e riprovës. Për më tepër, kjo pikë e vetme duhet të ketë funksionalitet DLQ për të dyja qasjet.

Vetë strategjia e riprovës duhet të ruhet në aplikacion, i cili është përgjegjës për marrjen e intervalit të ardhshëm kur ndodh një gabim i përkohshëm.

Ndalimi i konsumatorit për një aplikim të logjikës lineare

Kur punoni me Spring-kafka, kodi për të ndaluar Konsumatorin mund të duket diçka si kjo:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

Në shembull, retryAt është koha për të rifilluar MessageListenerContainer nëse është ende në punë. Rinisja do të ndodhë në një fill të veçantë të lançuar në TaskScheduler, zbatimi i të cilit sigurohet gjithashtu deri në pranverë.

Ne e gjejmë vlerën retryAt në mënyrën e mëposhtme:

  1. Vlera e numëruesit të ri-thirrjes është kërkuar.
  2. Bazuar në vlerën e numëruesit, kërkohet intervali aktual i vonesës në strategjinë e riprovës. Strategjia është deklaruar në vetë aplikacionin; ne kemi zgjedhur formatin JSON për ta ruajtur atë.
  3. Intervali i gjetur në grupin JSON përmban numrin e sekondave pas të cilave përpunimi do të duhet të përsëritet. Ky numër sekondash i shtohet kohës aktuale për të formuar vlerën për retryAt.
  4. Nëse intervali nuk gjendet, atëherë vlera e retryAt është null dhe mesazhi do të dërgohet në DLQ për analizë manuale.

Me këtë qasje, gjithçka që mbetet është të ruhet numri i thirrjeve të përsëritura për çdo mesazh që aktualisht është duke u përpunuar, për shembull në kujtesën e aplikacionit. Mbajtja e numërimit të riprovave në kujtesë nuk është kritike për këtë qasje, pasi një aplikacion logjik linear nuk mund të trajtojë përpunimin në tërësi. Ndryshe nga riprovimi i pranverës, rinisja e aplikacionit nuk do të bëjë që të gjitha mesazhet të humbasin për t'u ripërpunuar, por thjesht do të rifillojë strategjinë.

Kjo qasje ndihmon në heqjen e ngarkesës nga sistemi i jashtëm, i cili mund të jetë i padisponueshëm për shkak të një ngarkese shumë të rëndë. Me fjalë të tjera, përveç ripërpunimit, ne arritëm zbatimin e modelit ndërprerës.

Në rastin tonë, pragu i gabimit është vetëm 1, dhe për të minimizuar kohën e ndërprerjes së sistemit për shkak të ndërprerjeve të përkohshme të rrjetit, ne përdorim një strategji shumë të grimcuar të riprovës me intervale të vogla vonese. Kjo mund të mos jetë e përshtatshme për të gjitha aplikacionet e grupit, kështu që marrëdhënia midis pragut të gabimit dhe vlerës së intervalit duhet të zgjidhet bazuar në karakteristikat e sistemit.

Një aplikacion i veçantë për përpunimin e mesazheve nga aplikacionet me logjikë jo-përcaktuese

Këtu është një shembull i kodit që i dërgon një mesazh një aplikacioni të tillë (Riprovues), i cili do të ridërgohet në temën DESTINATION kur të arrihet ora RETRY_AT:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

Shembulli tregon se shumë informacione transmetohen në kokë. Vlera e RETRY_AT gjendet në të njëjtën mënyrë si për mekanizmin e riprovës përmes ndalimit të konsumatorit. Përveç DESTINATION dhe RETRY_AT kalojmë:

  • GROUP_ID, me anë të së cilës ne grupojmë mesazhet për analizë manuale dhe kërkim të thjeshtuar.
  • ORIGINAL_PARTITION të përpiqet të mbajë të njëjtin Konsumator për ripërpunim. Ky parametër mund të jetë null, në të cilin rast ndarja e re do të merret duke përdorur tastin record.key() të mesazhit origjinal.
  • U përditësua vlera COUNTER për të ndjekur strategjinë e riprovës.
  • SEND_TO është një konstante që tregon nëse mesazhi dërgohet për ripërpunim pasi të arrijë në RETRY_AT ose vendoset në DLQ.
  • ARSYEJA - arsyeja pse u ndërpre përpunimi i mesazhit.

Retrier ruan mesazhe për ridërgim dhe analizim manual në PostgreSQL. Një kohëmatës fillon një detyrë që gjen mesazhe me RETRY_AT dhe i dërgon ato në ndarjen ORIGINAL_PARTITION të temës DESTINATION me çelësin record.key().

Pasi të dërgohen, mesazhet fshihen nga PostgreSQL. Analizimi manual i mesazheve ndodh në një ndërfaqe të thjeshtë që ndërvepron me Retryer nëpërmjet REST API. Karakteristikat e tij kryesore janë ridërgimi ose fshirja e mesazheve nga DLQ, shikimi i informacionit të gabimit dhe kërkimi i mesazheve, për shembull me emrin e gabimit.

Meqenëse kontrolli i aksesit është i aktivizuar në grupimet tona, është e nevojshme të kërkohet gjithashtu akses në temën që Riprovuesi po dëgjon dhe të lejohet Retrier të shkruajë në temën DESTINATION. Kjo është e papërshtatshme, por, ndryshe nga qasja e temës së intervalit, ne kemi një DLQ dhe UI të plotë për ta menaxhuar atë.

Ka raste kur një temë hyrëse lexohet nga disa grupe të ndryshme të konsumatorëve, aplikacionet e të cilëve zbatojnë logjikë të ndryshme. Ripërpunimi i një mesazhi përmes Riprovuesit për një nga këto aplikacione do të rezultojë në një dublikatë nga ana tjetër. Për t'u mbrojtur nga kjo, ne krijojmë një temë të veçantë për ripërpunim. Temat hyrëse dhe të riprovuara mund të lexohen nga i njëjti Konsumator pa asnjë kufizim.

Ripërpunimi i ngjarjeve të marra nga Kafka

Si parazgjedhje kjo qasje nuk ofron funksionalitet të ndërprerësit, megjithatë mund të shtohet në aplikacion duke përdorur pranverë-cloud-netflix ose e re ndërprerësi i resë së pranverës, duke mbështjellë vendet ku shërbimet e jashtme thirren në abstraksione të përshtatshme. Përveç kësaj, bëhet e mundur të zgjidhni një strategji për pjesa kryesore model, i cili gjithashtu mund të jetë i dobishëm. Për shembull, në spring-cloud-netflix kjo mund të jetë një grup thread ose një semafor.

Prodhim

Si rezultat, ne kemi një aplikacion të veçantë që na lejon të përsërisim përpunimin e mesazheve nëse ndonjë sistem i jashtëm është përkohësisht i padisponueshëm.

Një nga avantazhet kryesore të aplikacionit është se mund të përdoret nga sisteme të jashtme që funksionojnë në të njëjtin grup Kafka, pa modifikime të rëndësishme nga ana e tyre! Një aplikacion i tillë do të duhet vetëm të aksesojë temën e riprovimit, të plotësojë disa tituj të Kafkës dhe t'i dërgojë një mesazh Riprovuesit. Nuk ka nevojë të ngrihet ndonjë infrastrukturë shtesë. Dhe për të reduktuar numrin e mesazheve të transferuara nga aplikacioni te Retrier dhe mbrapa, ne identifikuam aplikacione me logjikë lineare dhe i ripërpunuam ato përmes ndalimit të konsumatorit.

Burimi: www.habr.com

Bleni një host të besueshëm për faqet me mbrojtje DDoS, serverë VPS VDS 🔥 Bleni hosting të besueshëm të faqeve të internetit me mbrojtje DDoS, servera VPS VDS | ProHoster