Înțelegerea brokerilor de mesaje. Învățarea mecanismelor de mesagerie cu ActiveMQ și Kafka. Capitolul 3. Kafka

Continuarea traducerii unei cărți mici:
Înțelegerea brokerilor de mesaje
autor: Jakub Korab, editor: O'Reilly Media, Inc., data publicării: iunie 2017, ISBN: 9781492049296.

Partea anterioară tradusă: Înțelegerea brokerilor de mesaje. Învățarea mecanismelor de mesagerie cu ActiveMQ și Kafka. Capitolul 1 Introducere

CAPITOLUL 3

Kafka

Kafka a fost dezvoltat de LinkedIn pentru a ocoli unele dintre limitările brokerilor tradiționali de mesaje și pentru a evita nevoia de a configura mai mulți brokeri de mesaje pentru diferite interacțiuni punct-la-punct, care este descris în această carte la „Scalarea și extinderea” la pagina 28. . Cazuri de utilizare LinkedIn s-a bazat în mare măsură pe asimilarea unidirecțională a unor cantități foarte mari de date, cum ar fi clicurile pe pagină și jurnalele de acces, permițând totuși ca aceste date să fie utilizate de mai multe sisteme fără a afecta productivitatea producătorilor sau a altor consumatori. De fapt, motivul pentru care există Kafka este pentru a obține tipul de arhitectură de mesagerie pe care o descrie Universal Data Pipeline.

Având în vedere acest scop final, au apărut în mod natural și alte cerințe. Kafka ar trebui să:

  • Fii extrem de rapid
  • Oferiți mai multă lățime de bandă atunci când lucrați cu mesaje
  • Suporta modele Editor-Abonat și Point-to-Point
  • Nu încetini cu adăugarea de consumatori. De exemplu, performanța atât a cozii, cât și a subiectului în ActiveMQ se degradează pe măsură ce crește numărul de consumatori de pe destinație.
  • Să fie scalabil pe orizontală; dacă un broker care persistă mesajele poate face acest lucru doar la viteza maximă a discului, atunci este logic să depășești o singură instanță de broker pentru a crește performanța
  • Limitați accesul la stocarea și recuperarea mesajelor

Pentru a realiza toate acestea, Kafka a adoptat o arhitectură care a redefinit rolurile și responsabilitățile clienților și brokerilor de mesagerie. Modelul JMS este foarte orientat către broker, unde brokerul este responsabil pentru distribuirea mesajelor, iar clienții trebuie să-și facă griji doar cu trimiterea și primirea mesajelor. Kafka, pe de altă parte, este centrat pe client, clientul asumând multe dintre caracteristicile unui broker tradițional, cum ar fi distribuirea corectă a mesajelor relevante către consumatori, în schimbul unui broker extrem de rapid și scalabil. Pentru persoanele care au lucrat cu sisteme tradiționale de mesagerie, lucrul cu Kafka necesită o schimbare fundamentală de gândire.
Această direcție de inginerie a condus la crearea unei infrastructuri de mesagerie capabilă să crească debitul cu multe ordine de mărime în comparație cu un broker convențional. După cum vom vedea, această abordare vine cu compromisuri, ceea ce înseamnă că Kafka nu este potrivit pentru anumite tipuri de sarcini de lucru și software instalat.

Model de destinație unificată

Pentru a îndeplini cerințele descrise mai sus, Kafka a combinat mesajele de publicare-abonare și mesaje punct la punct sub un singur tip de destinație - subiect. Acest lucru este confuz pentru persoanele care au lucrat cu sisteme de mesagerie, unde cuvântul „subiect” se referă la un mecanism de difuzare din care (din subiect) lectura nu este durabilă. Subiectele Kafka ar trebui considerate un tip de destinație hibrid, așa cum este definit în introducerea acestei cărți.

Pentru restul acestui capitol, cu excepția cazului în care precizăm în mod explicit altfel, termenul „subiect” se va referi la un subiect Kafka.

Pentru a înțelege pe deplin cum se comportă subiectele și ce garanții oferă, trebuie să ne uităm mai întâi la modul în care sunt implementate în Kafka.
Fiecare subiect din Kafka are propriul său jurnal.
Producătorii care trimit mesaje către Kafka scriu în acest jurnal, iar consumatorii citesc din jurnal folosind indicatori care avansează constant. Periodic, Kafka șterge cele mai vechi părți ale jurnalului, indiferent dacă mesajele din acele părți au fost citite sau nu. O parte centrală a designului Kafka este că brokerului nu îi pasă dacă mesajele sunt citite sau nu - aceasta este responsabilitatea clientului.

Termenii „jurnal” și „pointer” nu apar în documentație Kafka. Acești termeni bine-cunoscuți sunt folosiți aici pentru a ajuta înțelegerea.

Acest model este complet diferit de ActiveMQ, unde mesajele din toate cozile sunt stocate în același jurnal, iar brokerul marchează mesajele ca fiind șterse după ce au fost citite.
Să săpăm acum puțin mai adânc și să ne uităm la jurnalul subiectului mai detaliat.
Jurnalul Kafka este format din mai multe partiții (Figura 3-1). Kafka garantează o ordonare strictă în fiecare partiție. Aceasta înseamnă că mesajele scrise pe partiție într-o anumită ordine vor fi citite în aceeași ordine. Fiecare partiție este implementată ca un fișier jurnal rulant care conține subset (subset) al tuturor mesajelor trimise subiectului de către producătorii săi. Subiectul creat conține, implicit, o partiție. Ideea de partiții este ideea centrală a lui Kafka pentru scalarea orizontală.

Înțelegerea brokerilor de mesaje. Învățarea mecanismelor de mesagerie cu ActiveMQ și Kafka. Capitolul 3. Kafka
Figura 3-1. Despărțitori Kafka

Când un producător trimite un mesaj către un subiect Kafka, acesta decide la ce partiție să trimită mesajul. Vom analiza acest lucru mai în detaliu mai târziu.

Citirea mesajelor

Clientul care dorește să citească mesajele administrează un pointer numit numit grup de consumatori, care indică decalaj mesajele din partiție. Un offset este o poziție incrementală care începe la 0 la începutul unei partiții. Acest grup de consumatori, la care se face referire în API prin codul group_id definit de utilizator, îi corespunde un singur consumator sau sistem logic.

Majoritatea sistemelor de mesagerie citesc date de la destinație folosind mai multe instanțe și fire pentru a procesa mesajele în paralel. Astfel, vor exista de obicei multe instanțe de consumatori care împart același grup de consumatori.

Problema lecturii poate fi reprezentată astfel:

  • Subiectul are mai multe partiții
  • Mai multe grupuri de consumatori pot folosi un subiect în același timp
  • Un grup de consumatori poate avea mai multe instanțe separate

Aceasta este o problemă non-trivială de la mulți la mulți. Pentru a înțelege modul în care Kafka gestionează relațiile dintre grupurile de consumatori, instanțe de consumatori și partiții, să ne uităm la o serie de scenarii de citire din ce în ce mai complexe.

Consumatorii și grupurile de consumatori

Să luăm ca punct de plecare un subiect cu o singură partiție (Figura 3-2).

Înțelegerea brokerilor de mesaje. Învățarea mecanismelor de mesagerie cu ActiveMQ și Kafka. Capitolul 3. Kafka
Figura 3-2. Consumatorul citește din partiție

Când o instanță de consum se conectează cu propriul grup_id la acest subiect, i se atribuie o partiție de citire și un offset în acea partiție. Poziția acestui offset este configurată în client ca un pointer către cea mai recentă poziție (cel mai nou mesaj) sau cea mai veche poziție (cel mai vechi mesaj). Consumatorul solicită (sonda) mesaje din subiect, ceea ce face ca acestea să fie citite secvenţial din jurnal.
Poziția de compensare este în mod regulat trimisă înapoi către Kafka și stocată ca mesaje într-un subiect intern _compensații_consumator. Mesajele citite încă nu sunt șterse, spre deosebire de un broker obișnuit, iar clientul poate derula offset-ul pentru a reprocesa mesajele deja vizualizate.

Când un al doilea consumator logic se conectează folosind un grup_id diferit, gestionează un al doilea pointer care este independent de primul (Figura 3-3). Astfel, un subiect Kafka acționează ca o coadă în care există un singur consumator și ca un subiect normal de publicare-abonare (pub-sub) la care se abonează mai mulți consumatori, cu avantajul suplimentar că toate mesajele sunt stocate și pot fi procesate de mai multe ori.

Înțelegerea brokerilor de mesaje. Învățarea mecanismelor de mesagerie cu ActiveMQ și Kafka. Capitolul 3. Kafka
Figura 3-3. Doi consumatori din grupuri diferite de consumatori citesc din aceeași partiție

Consumatorii dintr-un grup de consumatori

Când o instanță de consum citește date dintr-o partiție, are controlul deplin asupra pointerului și procesează mesajele așa cum este descris în secțiunea anterioară.
Dacă mai multe instanțe de consumatori au fost conectate cu același group_id la un subiect cu o partiție, atunci instanței care s-a conectat ultima va primi controlul asupra indicatorului și din acel moment va primi toate mesajele (Figura 3-4).

Înțelegerea brokerilor de mesaje. Învățarea mecanismelor de mesagerie cu ActiveMQ și Kafka. Capitolul 3. Kafka
Figura 3-4. Doi consumatori din același grup de consumatori citesc din aceeași partiție

Acest mod de procesare, în care numărul de instanțe de consum depășește numărul de partiții, poate fi gândit ca un fel de consumator exclusiv. Acest lucru poate fi util dacă aveți nevoie de o grupare „activ-pasiv” (sau „fierbinte”) a instanțelor dvs. de consum, deși rularea mai multor consumatori în paralel („activ-activ” sau „fierbinte”) este mult mai tipică decât consumatori.În standby.

Acest comportament de distribuție a mesajelor descris mai sus poate fi surprinzător în comparație cu modul în care se comportă o coadă JMS normală. În acest model, mesajele trimise la coadă vor fi distribuite uniform între cei doi consumatori.

Cel mai adesea, atunci când creăm mai multe instanțe de consumatori, facem asta fie pentru a procesa mesaje în paralel, fie pentru a crește viteza de citire, fie pentru a crește stabilitatea procesului de citire. Deoarece o singură instanță de consum poate citi date dintr-o partiție la un moment dat, cum se realizează acest lucru în Kafka?

O modalitate de a face acest lucru este să utilizați o singură instanță de consum pentru a citi toate mesajele și a le transmite grupului de fire. În timp ce această abordare crește debitul de procesare, crește complexitatea logicii consumatorului și nu face nimic pentru a crește robustețea sistemului de citire. Dacă o copie a consumatorului scade din cauza unei căderi de curent sau a unui eveniment similar, atunci scăderea se oprește.

Modalitatea canonică de a rezolva această problemă în Kafka este utilizarea bОmai multe partiții.

Compartimentare

Partițiile sunt mecanismul principal pentru paralelizarea citirii și scalarea unui subiect dincolo de lățimea de bandă a unei singure instanțe de broker. Pentru a înțelege mai bine acest lucru, să luăm în considerare o situație în care există un subiect cu două partiții și un consumator se abonează la acest subiect (Figura 3-5).

Înțelegerea brokerilor de mesaje. Învățarea mecanismelor de mesagerie cu ActiveMQ și Kafka. Capitolul 3. Kafka
Figura 3-5. Un consumator citește din mai multe partiții

În acest scenariu, consumatorului i se acordă controlul asupra pointerilor corespunzători grupului_id-ului său din ambele partiții și începe să citească mesajele din ambele partiții.
Când un consumator suplimentar pentru același group_id este adăugat la acest subiect, Kafka realocă una dintre partiții de la primul consumator la al doilea. După aceea, fiecare instanță a consumatorului va citi dintr-o partiție a subiectului (Figura 3-6).

Pentru a vă asigura că mesajele sunt procesate în paralel în 20 de fire, aveți nevoie de cel puțin 20 de partiții. Dacă există mai puține partiții, veți rămâne cu consumatori care nu au ce să lucreze, așa cum a fost descris mai devreme în discuția despre consumatorii exclusivi.

Înțelegerea brokerilor de mesaje. Învățarea mecanismelor de mesagerie cu ActiveMQ și Kafka. Capitolul 3. Kafka
Figura 3-6. Doi consumatori din același grup de consumatori citesc din partiții diferite

Această schemă reduce foarte mult complexitatea brokerului Kafka în comparație cu distribuția de mesaje necesară pentru a menține coada JMS. Aici nu trebuie să vă faceți griji cu privire la următoarele puncte:

  • Care consumator ar trebui să primească următorul mesaj, pe baza alocării round-robin, a capacității curente a bufferelor de preluare preliminară sau a mesajelor anterioare (ca și în cazul grupurilor de mesaje JMS).
  • Ce mesaje sunt trimise la care consumatori și dacă ar trebui să fie re-livrate în caz de eșec.

Tot ce trebuie să facă brokerul Kafka este să transmită mesaje secvenţial consumatorului atunci când acesta din urmă le solicită.

Cu toate acestea, cerințele pentru paralelizarea corecturii și retrimiterea mesajelor eșuate nu dispar - responsabilitatea pentru acestea trece pur și simplu de la broker la client. Aceasta înseamnă că acestea trebuie luate în considerare în codul dvs.

Trimiterea mesajelor

Este responsabilitatea producătorului acelui mesaj să decidă la ce partiție să trimită mesajul. Pentru a înțelege mecanismul prin care se face acest lucru, trebuie mai întâi să luăm în considerare ce anume trimitem de fapt.

În timp ce în JMS folosim o structură de mesaj cu metadate (anteturi și proprietăți) și un corp care conține sarcina utilă (sarcina utilă), în Kafka mesajul este perechea „cheie-valoare”. Sarcina utilă a mesajului este trimisă ca valoare. Cheia, pe de altă parte, este folosită în principal pentru partiționare și trebuie să conțină cheie specifică logicii de afaceripentru a pune mesajele conexe în aceeași partiție.

În capitolul 2, am discutat scenariul de pariuri online în care evenimentele conexe trebuie procesate în ordine de către un singur consumator:

  1. Contul de utilizator este configurat.
  2. Banii sunt creditați în cont.
  3. Se face un pariu care retrage bani din cont.

Dacă fiecare eveniment este un mesaj postat la un subiect, atunci cheia naturală ar fi ID-ul contului.
Când un mesaj este trimis folosind API-ul Kafka Producer, acesta este transmis unei funcții de partiție care, având în vedere mesajul și starea curentă a clusterului Kafka, returnează ID-ul partiției la care ar trebui trimis mesajul. Această caracteristică este implementată în Java prin interfața Partitioner.

Această interfață arată astfel:

interface Partitioner {
    int partition(String topic,
        Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
}

Implementarea Partitioner folosește algoritmul implicit de hashing de uz general peste cheie pentru a determina partiția, sau round-robin dacă nu este specificată nicio cheie. Această valoare implicită funcționează bine în majoritatea cazurilor. Cu toate acestea, în viitor vei dori să-l scrii pe al tău.

Scrierea propriei strategii de partiţionare

Să ne uităm la un exemplu în care doriți să trimiteți metadate împreună cu încărcarea utilă a mesajului. Sarcina utilă din exemplul nostru este o instrucțiune pentru a face o depunere în contul de joc. O instrucțiune este ceva despre care am dori să fim garantați că nu va fi modificată la transmisie și dorim să fim siguri că numai un sistem de încredere din amonte poate iniția acea instrucțiune. În acest caz, sistemele de expediere și de primire convin cu privire la utilizarea unei semnături pentru autentificarea mesajului.
În JMS normal, definim pur și simplu o proprietate „semnătură de mesaj” și o adăugăm la mesaj. Cu toate acestea, Kafka nu ne oferă un mecanism de transmitere a metadatelor, ci doar o cheie și o valoare.

Deoarece valoarea este o sarcină utilă de transfer bancar a cărei integritate dorim să o păstrăm, nu avem de ales decât să definim structura de date de utilizat în cheie. Presupunând că avem nevoie de un ID de cont pentru partiționare, deoarece toate mesajele legate de un cont trebuie procesate în ordine, vom veni cu următoarea structură JSON:

{
  "signature": "541661622185851c248b41bf0cea7ad0",
  "accountId": "10007865234"
}

Deoarece valoarea semnăturii va varia în funcție de sarcina utilă, strategia de hashing implicită a interfeței Partitioner nu va grupa în mod fiabil mesajele asociate. Prin urmare, va trebui să scriem propria noastră strategie care va analiza această cheie și va partiționa valoarea accountId.

Kafka include sume de verificare pentru a detecta corupția mesajelor din magazin și are un set complet de caracteristici de securitate. Chiar și așa, uneori apar cerințe specifice industriei, precum cea de mai sus.

Strategia de partiționare a utilizatorului trebuie să se asigure că toate mesajele asociate ajung în aceeași partiție. Deși acest lucru pare simplu, cerința poate fi complicată de importanța ordonării postărilor asociate și de cât de fix este numărul de partiții dintr-un subiect.

Numărul de partiții dintr-un subiect se poate schimba în timp, deoarece acestea pot fi adăugate dacă traficul depășește așteptările inițiale. Astfel, cheile de mesaje pot fi asociate cu partiția la care au fost trimise inițial, ceea ce implică o parte de stare care urmează să fie partajată între instanțele producătorului.

Un alt factor de luat în considerare este distribuția uniformă a mesajelor între partiții. De obicei, cheile nu sunt distribuite uniform între mesaje, iar funcțiile hash nu garantează o distribuție corectă a mesajelor pentru un set mic de chei.
Este important să rețineți că, indiferent dacă alegeți să împărțiți mesajele, separatorul în sine poate fi necesar să fie reutilizat.

Luați în considerare cerința de a replica datele între grupurile Kafka din diferite locații geografice. În acest scop, Kafka vine cu un instrument de linie de comandă numit MirrorMaker, care este folosit pentru a citi mesajele dintr-un cluster și a le transfera în altul.

MirrorMaker trebuie să înțeleagă cheile subiectului replicat pentru a menține ordinea relativă între mesaje la replicarea între clustere, deoarece numărul de partiții pentru acel subiect poate să nu fie același în două clustere.

Strategiile de partiționare personalizate sunt relativ rare, deoarece hashingul implicit sau round robin funcționează bine în majoritatea scenariilor. Cu toate acestea, dacă aveți nevoie de garanții puternice de comandă sau aveți nevoie să extrageți metadate din încărcături utile, atunci partiționarea este ceva la care ar trebui să vă uitați mai atent.

Beneficiile de scalabilitate și performanță ale Kafka provin din transferul unora dintre responsabilitățile brokerului tradițional către client. În acest caz, se ia decizia de a distribui mesaje potențial legate între mai mulți consumatori care lucrează în paralel.

Brokerii JMS trebuie, de asemenea, să se ocupe de astfel de cerințe. Interesant este că mecanismul de trimitere a mesajelor asociate către același consumator, implementat prin JMS Message Groups (o variație a strategiei de echilibrare a încărcăturii sticky (SLB)), necesită, de asemenea, expeditorului să marcheze mesajele ca fiind legate. În cazul JMS, brokerul este responsabil pentru trimiterea acestui grup de mesaje asociate unui consumator din mulți și pentru transferul dreptului de proprietate asupra grupului în cazul în care consumatorul cade.

Acorduri cu producătorii

Partiționarea nu este singurul lucru de luat în considerare atunci când trimiteți mesaje. Să aruncăm o privire la metodele send() ale clasei Producer din API-ul Java:

Future < RecordMetadata > send(ProducerRecord < K, V > record);
Future < RecordMetadata > send(ProducerRecord < K, V > record, Callback callback);

Trebuie remarcat imediat că ambele metode returnează viitor, ceea ce indică faptul că operația de trimitere nu este efectuată imediat. Rezultatul este că un mesaj (ProducerRecord) este scris în buffer-ul de trimitere pentru fiecare partiție activă și trimis către broker ca fir de execuție în biblioteca clientului Kafka. În timp ce acest lucru face lucrurile incredibil de rapid, înseamnă că o aplicație fără experiență poate pierde mesaje dacă procesul ei este oprit.

Ca întotdeauna, există o modalitate de a face operațiunea de trimitere mai fiabilă cu prețul performanței. Mărimea acestui buffer poate fi setată la 0, iar firul de aplicație de trimitere va fi forțat să aștepte până când transferul mesajului către broker este finalizat, după cum urmează:

RecordMetadata metadata = producer.send(record).get();

Mai multe despre citirea mesajelor

Citirea mesajelor are complexități suplimentare despre care trebuie să se speculeze. Spre deosebire de API-ul JMS, care poate rula un ascultător de mesaje ca răspuns la un mesaj, Consumator Kafka face doar sondaje. Să aruncăm o privire mai atentă asupra metodei sondaj()folosit in acest scop:

ConsumerRecords < K, V > poll(long timeout);

Valoarea returnată a metodei este o structură de container care conține mai multe obiecte fișa consumatorului din potențial mai multe partiții. fișa consumatorului este el însuși un obiect deținător pentru o pereche cheie-valoare cu metadate asociate, cum ar fi partiția din care este derivată.

După cum sa discutat în Capitolul 2, trebuie să ținem cont de ce se întâmplă cu mesajele după ce acestea au fost procesate cu succes sau fără succes, de exemplu, dacă clientul nu poate procesa mesajul sau dacă acesta renunță. În JMS, acest lucru a fost gestionat printr-un mod de confirmare. Brokerul fie va șterge mesajul procesat cu succes, fie va livra din nou mesajul brut sau fals (presupunând că au fost utilizate tranzacții).
Kafka funcționează foarte diferit. Mesajele nu sunt șterse în broker după corectare, iar ceea ce se întâmplă în caz de eșec este responsabilitatea codului de corectare în sine.

După cum am spus, grupul de consumatori este asociat cu offset-ul din jurnal. Poziția jurnalului asociată cu acest offset corespunde următorului mesaj care urmează să fie emis ca răspuns la sondaj(). Momentul în care acest offset crește este decisiv pentru citire.

Revenind la modelul de citire discutat mai devreme, procesarea mesajelor constă în trei etape:

  1. Preluați un mesaj pentru citire.
  2. Procesați mesajul.
  3. Confirmați mesajul.

Consumatorul Kafka vine cu o opțiune de configurare enable.auto.commit. Aceasta este o setare implicită folosită frecvent, așa cum este obișnuit cu setările care conțin cuvântul „auto”.

Înainte de Kafka 0.10, un client care folosea această opțiune trimitea offset-ul ultimului mesaj citit la următorul apel sondaj() după prelucrare. Aceasta însemna că orice mesaje care au fost deja preluate puteau fi reprocesate dacă clientul le-a procesat deja, dar a fost distrus în mod neașteptat înainte de a apela sondaj(). Deoarece brokerul nu păstrează nicio stare despre câte ori a fost citit un mesaj, următorul consumator care preia acel mesaj nu va ști că s-a întâmplat nimic rău. Acest comportament a fost pseudo-tranzacțional. Compensarea a fost comisă numai dacă mesajul a fost procesat cu succes, dar dacă clientul a renunțat, brokerul ar trimite din nou același mesaj către alt client. Acest comportament a fost în concordanță cu garanția de livrare a mesajelor "cel puțin o dată“.

În Kafka 0.10, codul clientului a fost modificat astfel încât commit-ul să fie declanșat periodic de biblioteca client, așa cum a fost configurat auto.commit.interval.ms. Acest comportament este undeva între modurile JMS AUTO_ACKNOWLEDGE și DUPS_OK_ACKNOWLEDGE. Când utilizați autocommit, mesajele ar putea fi comise indiferent dacă au fost efectiv procesate - acest lucru s-ar putea întâmpla în cazul unui consumator lent. Dacă un consumator a avortat, mesajele ar fi preluate de către următorul consumator, începând de la poziția angajată, ceea ce ar putea duce la un mesaj pierdut. În acest caz, Kafka nu a pierdut mesajele, codul de citire pur și simplu nu le-a procesat.

Acest mod are aceeași promisiune ca și în versiunea 0.9: mesajele pot fi procesate, dar dacă eșuează, compensarea poate să nu fie comisă, ceea ce poate duce la dublarea livrării. Cu cât primiți mai multe mesaje când executați sondaj(), cu atât mai mult această problemă.

După cum sa discutat în „Citirea mesajelor dintr-o coadă” la pagina 21, nu există o livrare unică a unui mesaj într-un sistem de mesagerie atunci când sunt luate în considerare modurile de eșec.

În Kafka, există două moduri de a comite (comita) un offset (offset): automat și manual. În ambele cazuri, mesajele pot fi procesate de mai multe ori dacă mesajul a fost procesat, dar a eșuat înainte de comitere. De asemenea, puteți alege să nu procesați deloc mesajul dacă commit-ul a avut loc în fundal și codul dvs. a fost finalizat înainte de a putea fi procesat (poate în Kafka 0.9 și versiuni anterioare).

Puteți controla procesul manual de comitere a compensației în API-ul pentru consumatori Kafka prin setarea parametrului enable.auto.commit pentru a apela fals și explicit una dintre următoarele metode:

void commitSync();
void commitAsync();

Dacă doriți să procesați mesajul „cel puțin o dată”, trebuie să efectuați manual compensarea cu commitSync()prin executarea acestei comenzi imediat după procesarea mesajelor.

Aceste metode nu permit ca mesajele să fie confirmate înainte de a fi procesate, dar nu fac nimic pentru a elimina potențialele întârzieri de procesare, dând în același timp aspectul că sunt tranzacționale. Nu există tranzacții în Kafka. Clientul nu are capacitatea de a face următoarele:

  • Derulează automat înapoi un mesaj fals. Consumatorii înșiși trebuie să gestioneze excepțiile care decurg din sarcinile utile problematice și întreruperile backend-ului, deoarece nu se pot baza pe broker pentru a livra din nou mesajele.
  • Trimiteți mesaje către mai multe subiecte într-o singură operațiune atomică. După cum vom vedea în curând, controlul asupra diferitelor subiecte și partiții poate locui pe diferite mașini din clusterul Kafka care nu coordonează tranzacțiile atunci când sunt trimise. La momentul scrierii acestui articol, s-au făcut unele lucrări pentru a face acest lucru posibil cu KIP-98.
  • Asociați citirea unui mesaj dintr-un subiect cu trimiterea unui alt mesaj către alt subiect. Din nou, arhitectura lui Kafka depinde de multe mașini independente care rulează ca un singur autobuz și nu se încearcă ascunde acest lucru. De exemplu, nu există componente API care să vă permită conectarea consumator и producător într-o tranzacție. În JMS, acest lucru este furnizat de obiect Sesiunedin care sunt create Producători de mesaje и MesajConsumatori.

Dacă nu ne putem baza pe tranzacții, cum putem oferi o semantică mai apropiată de cea oferită de sistemele tradiționale de mesagerie?

Dacă există posibilitatea ca compensarea consumatorului să crească înainte ca mesajul să fie procesat, cum ar fi în timpul unui accident de consumator, atunci consumatorul nu are de unde să știe dacă grupul său de consumatori a ratat mesajul atunci când i se atribuie o partiție. Deci, o strategie este de a derula offset-ul la poziția anterioară. API-ul pentru consumatori Kafka oferă următoarele metode pentru aceasta:

void seek(TopicPartition partition, long offset);
void seekToBeginning(Collection < TopicPartition > partitions);

metodă căuta() poate fi folosit cu metoda
offsetsForTimes(Hartă timestampsToSearch) a derula înapoi într-o stare la un anumit moment din trecut.

Implicit, utilizarea acestei abordări înseamnă că este foarte probabil ca unele mesaje care au fost procesate anterior să fie citite și procesate din nou. Pentru a evita acest lucru, putem folosi citirea idempotent, așa cum este descris în Capitolul 4, pentru a ține evidența mesajelor vizualizate anterior și pentru a elimina duplicatele.

Alternativ, codul dvs. de consumator poate fi păstrat simplu, atâta timp cât pierderea sau duplicarea mesajului este acceptabilă. Când luăm în considerare cazurile de utilizare pentru care Kafka este folosit în mod obișnuit, cum ar fi gestionarea evenimentelor din jurnal, valorile, urmărirea clicurilor etc., înțelegem că pierderea mesajelor individuale este puțin probabil să aibă un impact semnificativ asupra aplicațiilor din jur. În astfel de cazuri, valorile implicite sunt perfect acceptabile. Pe de altă parte, dacă aplicația dvs. trebuie să trimită plăți, trebuie să aveți grijă cu atenție de fiecare mesaj individual. Totul se reduce la context.

Observațiile personale arată că pe măsură ce intensitatea mesajelor crește, valoarea fiecărui mesaj individual scade. Mesajele mari tind să fie valoroase atunci când sunt vizualizate într-o formă agregată.

Valabilitate ridicată

Abordarea lui Kafka față de disponibilitatea ridicată este foarte diferită de abordarea ActiveMQ. Kafka este proiectat în jurul clusterelor de scalare în care toate instanțele de broker primesc și distribuie mesaje în același timp.

Un cluster Kafka este format din mai multe instanțe de broker care rulează pe servere diferite. Kafka a fost proiectat pentru a rula pe hardware autonom obișnuit, unde fiecare nod are propriul său spațiu de stocare dedicat. Utilizarea spațiului de stocare atașat la rețea (SAN) nu este recomandată deoarece mai multe noduri de calcul pot concura pentru timp.Ыe intervalele de stocare și creează conflicte.

Kafka este mereu pe sistem. Mulți utilizatori mari Kafka nu își închid niciodată clusterele, iar software-ul se actualizează întotdeauna cu o repornire secvențială. Acest lucru se realizează prin garantarea compatibilității cu versiunea anterioară pentru mesaje și interacțiuni între brokeri.

Brokerii conectați la un cluster de servere Ingrijitor zoo, care acționează ca un registru de date de configurare și este folosit pentru a coordona rolurile fiecărui broker. ZooKeeper în sine este un sistem distribuit care oferă disponibilitate ridicată prin replicarea informațiilor prin stabilire cvorum.

În cazul de bază, un subiect este creat într-un cluster Kafka cu următoarele proprietăți:

  • Numărul de partiții. După cum sa discutat mai devreme, valoarea exactă folosită aici depinde de nivelul dorit de citire paralelă.
  • Factorul de replicare (factor) determină câte instanțe de broker din cluster ar trebui să conțină jurnalele pentru această partiție.

Folosind ZooKeepers pentru coordonare, Kafka încearcă să distribuie echitabil noi partiții între brokerii din cluster. Acest lucru este realizat de o singură instanță care acționează ca un Controller.

În timpul rulării pentru fiecare partiție de subiect controlor atribuie roluri unui broker lider (lider, maestru, prezentator) și urmași (adepți, sclavi, subordonați). Brokerul, acționând ca lider pentru această partiție, este responsabil pentru primirea tuturor mesajelor trimise de producători și distribuirea mesajelor către consumatori. Când mesajele sunt trimise către o partiție de subiecte, ele sunt replicate la toate nodurile broker care acționează ca adepți pentru acea partiție. Fiecare nod care conține jurnalele pentru o partiție este apelat replica. Un broker poate acționa ca lider pentru unele partiții și ca adept pentru altele.

Este apelat un adept care conține toate mesajele deținute de lider replica sincronizata (o replică care se află într-o stare sincronizată, replică în sincronizare). Dacă un broker care acționează ca lider pentru o partiție scade, orice broker care este actualizat sau sincronizat pentru acea partiție poate prelua rolul de lider. Este un design incredibil de durabil.

O parte a configurației producătorului este parametrul acks, care determină câte replici trebuie să confirme (să confirme) primirea unui mesaj înainte ca firul de execuție al aplicației să continue trimiterea: 0, 1 sau toate. Dacă este setat la toate, apoi atunci când un mesaj este primit, liderul va trimite o confirmare înapoi producătorului de îndată ce primește confirmări (recunoștințe) ale înregistrării de la mai multe indicii (inclusiv el însuși) definite de setarea subiectului min.insync.replicas (implicit 1). Dacă mesajul nu poate fi replicat cu succes, atunci producătorul va lansa o excepție a aplicației (NotEnoughReplicas sau NotEnoughReplicasAfterAppend).

O configurație tipică creează un subiect cu un factor de replicare de 3 (1 lider, 2 urmăritori per partiție) și parametrul min.insync.replicas este setat la 2. În acest caz, cluster-ul va permite unuia dintre brokerii care gestionează partiția de subiecte să coboare fără a afecta aplicațiile client.

Acest lucru ne readuce la compromisul deja familiar între performanță și fiabilitate. Replicarea are loc în detrimentul unui timp suplimentar de așteptare pentru confirmări (mulțumiri) de la adepți. Deși, deoarece rulează în paralel, replicarea la cel puțin trei noduri are aceeași performanță ca două (ignorând creșterea utilizării lățimii de bandă a rețelei).

Prin utilizarea acestei scheme de replicare, Kafka evită în mod inteligent necesitatea de a scrie fizic fiecare mesaj pe disc cu operația sincronizare(). Fiecare mesaj trimis de producător va fi scris în jurnalul de partiție, dar așa cum sa discutat în Capitolul 2, scrierea într-un fișier se face inițial în memoria tampon a sistemului de operare. Dacă acest mesaj este replicat la o altă instanță Kafka și se află în memoria sa, pierderea liderului nu înseamnă că mesajul în sine a fost pierdut - poate fi preluat de o replică sincronizată.
Refuzul efectuării operației sincronizare() înseamnă că Kafka poate primi mesaje la fel de repede pe cât le poate scrie în memorie. Dimpotrivă, cu cât puteți evita mai mult timp înlăturarea memoriei pe disc, cu atât mai bine. Din acest motiv, nu este neobișnuit ca brokerilor Kafka să li se aloce 64 GB sau mai mult de memorie. Această utilizare a memoriei înseamnă că o singură instanță Kafka poate rula cu ușurință la viteze de multe mii de ori mai rapide decât un broker de mesaje tradițional.

Kafka poate fi configurat și pentru a aplica operația sincronizare() la pachete de mesaje. Deoarece totul în Kafka este orientat spre pachete, de fapt funcționează destul de bine pentru multe cazuri de utilizare și este un instrument util pentru utilizatorii care necesită garanții foarte puternice. O mare parte din performanța pură a lui Kafka provine din mesajele care sunt trimise brokerului sub formă de pachete și că aceste mesaje sunt citite de la broker în blocuri secvențiale folosind zero-copie operații (operații în timpul cărora sarcina de copiere a datelor dintr-o zonă de memorie în alta nu este efectuată). Acesta din urmă reprezintă un câștig mare de performanță și resurse și este posibil numai prin utilizarea unei structuri de date de jurnal de bază care definește schema de partiție.

Performanțe mult mai bune sunt posibile într-un cluster Kafka decât cu un singur broker Kafka, deoarece partițiile de subiecte se pot extinde pe mai multe mașini separate.

Rezultatele

În acest capitol, am analizat modul în care arhitectura Kafka reimaginează relația dintre clienți și brokeri pentru a oferi o conductă de mesagerie incredibil de robustă, cu un debit de multe ori mai mare decât cel al unui broker de mesaje convențional. Am discutat despre funcționalitatea pe care o folosește pentru a realiza acest lucru și am analizat pe scurt arhitectura aplicațiilor care oferă această funcționalitate. În capitolul următor, ne vom uita la problemele comune pe care aplicațiile bazate pe mesagerie trebuie să le rezolve și să discutăm despre strategii pentru a le rezolva. Vom încheia capitolul subliniind cum să vorbim despre tehnologiile de mesagerie în general, astfel încât să puteți evalua adecvarea acestora pentru cazurile dvs. de utilizare.

Partea anterioară tradusă: Înțelegerea brokerilor de mesaje. Învățarea mecanismelor de mesagerie cu ActiveMQ și Kafka. Capitolul 1

Traducerea facuta: tele.gg/middle_java

Pentru a fi continuat ...

Numai utilizatorii înregistrați pot participa la sondaj. Loghează-te, Vă rog.

Este folosit Kafka în organizația dvs.?

  • Da

  • Nu

  • Folosit anterior, acum nu

  • Intenționăm să folosim

Au votat 38 utilizatori. 8 utilizatori s-au abținut.

Sursa: www.habr.com

Adauga un comentariu