Cartea „Kafka Streams in Action. Aplicații și microservicii pentru lucru în timp real”

Cartea „Kafka Streams in Action. Aplicații și microservicii pentru lucru în timp real” Bună, locuitorii Khabro! Această carte este potrivită pentru orice dezvoltator care dorește să înțeleagă procesarea firelor. Înțelegerea programării distribuite vă va ajuta să înțelegeți mai bine Kafka și Kafka Streams. Ar fi bine să cunoașteți cadrul Kafka în sine, dar acest lucru nu este necesar: vă voi spune tot ce aveți nevoie. Dezvoltatorii Kafka cu experiență și începătorii deopotrivă vor învăța cum să creeze aplicații interesante de procesare a fluxurilor folosind biblioteca Kafka Streams din această carte. Dezvoltatorii Java intermediari și avansați deja familiarizați cu concepte precum serializarea vor învăța să-și aplice abilitățile pentru a crea aplicații Kafka Streams. Codul sursă al cărții este scris în Java 8 și folosește în mod semnificativ sintaxa expresiei lambda din Java 8, așa că știi cum să lucrezi cu funcțiile lambda (chiar și într-un alt limbaj de programare) va fi util.

Extras. 5.3. Operațiuni de agregare și ferestre

În această secțiune, vom trece la explorarea celor mai promițătoare părți ale Kafka Streams. Până acum am acoperit următoarele aspecte ale Kafka Streams:

  • crearea unei topologii de procesare;
  • utilizarea stării în aplicațiile de streaming;
  • realizarea de conexiuni de flux de date;
  • diferențele dintre fluxurile de evenimente (KStream) și fluxurile de actualizare (KTable).

În următoarele exemple vom reuni toate aceste elemente. Veți învăța și despre windowing, o altă caracteristică excelentă a aplicațiilor de streaming. Primul nostru exemplu va fi o simplă agregare.

5.3.1. Agregarea vânzărilor de stoc pe sectorul industrial

Agregarea și gruparea sunt instrumente vitale atunci când lucrați cu date în flux. Examinarea înregistrărilor individuale pe măsură ce sunt primite este adesea insuficientă. Pentru a extrage informații suplimentare din date, este necesar să le grupați și să le combinați.

În acest exemplu, veți îmbrăca costumul unui comerciant zilnic care trebuie să urmărească volumul vânzărilor acțiunilor companiilor din mai multe industrii. Mai exact, sunteți interesat de cele cinci companii cu cele mai mari vânzări de acțiuni din fiecare industrie.

O astfel de agregare va necesita următorii câțiva pași pentru a traduce datele în forma dorită (vorbind în termeni generali).

  1. Creați o sursă bazată pe subiecte care publică informații brute de tranzacționare cu acțiuni. Va trebui să mapam un obiect de tip StockTransaction la un obiect de tip ShareVolume. Ideea este că obiectul StockTransaction conține metadate de vânzări, dar avem nevoie doar de date despre numărul de acțiuni vândute.
  2. Grupați datele de volum după simbolul bursier. Odată grupate după simbol, puteți restrânge aceste date în subtotaluri ale volumelor vânzărilor de stoc. Este de remarcat faptul că metoda KStream.groupBy returnează o instanță de tip KGroupedStream. Și puteți obține o instanță KTable apelând în continuare metoda KGroupedStream.reduce.

Ce este interfața KGroupedStream

Metodele KStream.groupBy și KStream.groupByKey returnează o instanță a KGroupedStream. KGroupedStream este o reprezentare intermediară a unui flux de evenimente după gruparea după chei. Nu este deloc destinat lucrului direct cu acesta. În schimb, KGroupedStream este folosit pentru operațiuni de agregare, al căror rezultat este întotdeauna un KTable. Și deoarece rezultatul operațiunilor de agregare este un KTable și folosesc un magazin de stat, este posibil ca nu toate actualizările ca rezultat să fie trimise mai departe în conductă.

Metoda KTable.groupBy returnează un KGroupedTable similar - o reprezentare intermediară a fluxului de actualizări, regrupată după cheie.

Să luăm o scurtă pauză și să ne uităm la Fig. 5.9, care arată ceea ce am realizat. Această topologie ar trebui să vă fie deja foarte familiară.

Cartea „Kafka Streams in Action. Aplicații și microservicii pentru lucru în timp real”
Să ne uităm acum la codul pentru această topologie (poate fi găsit în fișierul src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listarea 5.2).

Cartea „Kafka Streams in Action. Aplicații și microservicii pentru lucru în timp real”
Codul dat se distinge prin concizia sa și prin volumul mare de acțiuni efectuate în mai multe rânduri. Este posibil să observați ceva nou în primul parametru al metodei builder.stream: o valoare de tipul de enumerare AutoOffsetReset.EARLIEST (există și LATEST), setată folosind metoda Consumed.withOffsetResetPolicy. Acest tip de enumerare poate fi folosit pentru a specifica o strategie de resetare a decalajului pentru fiecare KStream sau KTable și are prioritate față de opțiunea de resetare a decalajului din configurație.

GroupByKey și GroupBy

Interfața KStream are două metode de grupare a înregistrărilor: GroupByKey și GroupBy. Ambele returnează un KGroupedTable, așa că s-ar putea să vă întrebați care este diferența dintre ele și când să utilizați pe care?

Metoda GroupByKey este utilizată atunci când cheile din KStream sunt deja negoale. Și, cel mai important, indicatorul „necesită re-partiționare” nu a fost niciodată setat.

Metoda GroupBy presupune că ați schimbat cheile de grupare, deci indicatorul de repartiție este setat la adevărat. Efectuarea îmbinărilor, agregărilor etc. după metoda GroupBy va avea ca rezultat re-partiționarea automată.
Rezumat: ori de câte ori este posibil, ar trebui să utilizați GroupByKey în loc de GroupBy.

Este clar ce fac metodele mapValues ​​și groupBy, așa că haideți să aruncăm o privire la metoda sum() (găsită în src/main/java/bbejeck/model/ShareVolume.java) (Listarea 5.3).

Cartea „Kafka Streams in Action. Aplicații și microservicii pentru lucru în timp real”
Metoda ShareVolume.sum returnează totalul curent al volumului vânzărilor de stoc, iar rezultatul întregului lanț de calcule este un obiect KTable . Acum înțelegeți rolul pe care îl joacă KTable. Când sosesc obiectele ShareVolume, obiectul KTable corespunzător stochează cea mai recentă actualizare curentă. Este important să rețineți că toate actualizările sunt reflectate în shareVolumeKTable anterior, dar nu toate sunt trimise mai departe.

Apoi folosim acest KTable pentru a agrega (în funcție de numărul de acțiuni tranzacționate) pentru a ajunge la cele cinci companii cu cele mai mari volume de acțiuni tranzacționate în fiecare industrie. Acțiunile noastre în acest caz vor fi similare cu cele pentru prima agregare.

  1. Efectuați o altă operațiune groupBy pentru a grupa obiecte individuale ShareVolume în funcție de industrie.
  2. Începeți să rezumați obiectele ShareVolume. De data aceasta, obiectul de agregare este o coadă cu prioritate de dimensiune fixă. În această coadă de dimensiuni fixe, sunt reținute doar cele cinci companii cu cele mai mari cantități de acțiuni vândute.
  3. Mapați cozile din paragraful anterior la o valoare șir și returnați primele cinci acțiuni cele mai tranzacționate după număr, în funcție de industrie.
  4. Scrieți rezultatele sub formă de șir la subiect.

În fig. Figura 5.10 prezintă graficul topologiei fluxului de date. După cum puteți vedea, a doua rundă de procesare este destul de simplă.

Cartea „Kafka Streams in Action. Aplicații și microservicii pentru lucru în timp real”
Acum că avem o înțelegere clară a structurii acestei a doua runde de procesare, ne putem întoarce la codul sursă (il veți găsi în fișierul src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listarea 5.4) .

Acest inițializator conține o variabilă fixedQueue. Acesta este un obiect personalizat care este un adaptor pentru java.util.TreeSet care este utilizat pentru a urmări primele N rezultate în ordinea descrescătoare a acțiunilor tranzacționate.

Cartea „Kafka Streams in Action. Aplicații și microservicii pentru lucru în timp real”
Ați văzut deja apelurile groupBy și mapValues, așa că nu vom intra în acestea (apelăm metoda KTable.toStream deoarece metoda KTable.print este depreciată). Dar nu ați văzut încă versiunea KTable a aggregate(), așa că vom petrece puțin timp discutând despre asta.

După cum vă amintiți, ceea ce face KTable diferit este faptul că înregistrările cu aceleași chei sunt considerate actualizări. KTable înlocuiește intrarea veche cu una nouă. Agregarea are loc într-un mod similar: cele mai recente înregistrări cu aceeași cheie sunt agregate. Când sosește o înregistrare, aceasta este adăugată la instanța clasei FixedSizePriorityQueue folosind un agregator (al doilea parametru din apelul metodei agregate), dar dacă o altă înregistrare există deja cu aceeași cheie, atunci înregistrarea veche este eliminată folosind un scădere (al treilea parametru din apelul metodei agregate).

Toate acestea înseamnă că agregatorul nostru, FixedSizePriorityQueue, nu adună toate valorile cu o singură cheie, ci stochează o sumă mobilă a cantităților celor N cele mai tranzacționate tipuri de acțiuni. Fiecare intrare primită conține numărul total de acțiuni vândute până acum. KTable vă va oferi informații despre acțiunile companiilor care sunt cele mai tranzacționate în prezent, fără a necesita agregarea continuă a fiecărei actualizări.

Am învățat să facem două lucruri importante:

  • grupați valorile în KTable printr-o cheie comună;
  • efectuați operațiuni utile, cum ar fi acumularea și agregarea pe aceste valori grupate.

Este important să știi cum să efectuezi aceste operațiuni pentru a înțelege semnificația datelor care se deplasează printr-o aplicație Kafka Streams și pentru a înțelege ce informații transportă.

De asemenea, am reunit câteva dintre conceptele cheie discutate mai devreme în această carte. În capitolul 4, am discutat cât de importantă este starea locală tolerantă la erori pentru o aplicație de streaming. Primul exemplu din acest capitol a demonstrat de ce statul local este atât de important – vă permite să urmăriți informațiile pe care le-ați văzut deja. Accesul local evită întârzierile în rețea, făcând aplicația mai performantă și mai rezistentă la erori.

Când efectuați orice operație de acumulare sau agregare, trebuie să specificați numele magazinului de stat. Operațiile de acumulare și agregare returnează o instanță KTable, iar KTable folosește stocarea stării pentru a înlocui rezultatele vechi cu altele noi. După cum ați văzut, nu toate actualizările sunt trimise în continuare, iar acest lucru este important deoarece operațiunile de agregare sunt concepute pentru a produce informații rezumative. Dacă nu aplicați statul local, KTable va transmite toate rezultatele de agregare și cumulare.

În continuare, ne vom uita la efectuarea de operațiuni precum agregarea într-o anumită perioadă de timp - așa-numitele operațiuni de ferestre.

5.3.2. Operații cu ferestre

În secțiunea anterioară, am introdus convoluția și agregarea glisante. Aplicația a efectuat o acumulare continuă a vânzărilor de acțiuni, urmată de agregarea celor cinci acțiuni cele mai tranzacționate la bursă.

Uneori este necesară o astfel de agregare și acumulare continuă a rezultatelor. Și uneori trebuie să efectuați operațiuni doar pe o anumită perioadă de timp. De exemplu, calculați câte tranzacții de schimb au fost efectuate cu acțiuni ale unei anumite companii în ultimele 10 minute. Sau câți utilizatori au dat clic pe un nou banner publicitar în ultimele 15 minute. O aplicație poate efectua astfel de operațiuni de mai multe ori, dar cu rezultate care se aplică doar unor perioade de timp specificate (ferestre de timp).

Numărarea tranzacțiilor de schimb valutar de către cumpărător

În exemplul următor, vom urmări tranzacțiile cu acțiuni la mai mulți comercianți, fie organizații mari, fie finanțatori individuali inteligenți.

Există două motive posibile pentru această urmărire. Una dintre ele este nevoia de a cunoaște ce cumpără/vând liderii de piață. Dacă acești jucători mari și investitori sofisticați văd oportunități, este logic să-și urmeze strategia. Al doilea motiv este dorința de a identifica orice semne posibile de tranzacționare ilegală a informațiilor privilegiate. Pentru a face acest lucru, va trebui să analizați corelația vârfurilor mari de vânzări cu comunicatele de presă importante.

O astfel de urmărire constă din următorii pași:

  • crearea unui flux de citire din subiectul tranzacții bursiere;
  • gruparea înregistrărilor primite după ID-ul cumpărătorului și simbolul stocului. Apelarea metodei groupBy returnează o instanță a clasei KGroupedStream;
  • Metoda KGroupedStream.windowedBy returnează un flux de date limitat la o fereastră de timp, care permite agregarea cu ferestre. În funcție de tipul ferestrei, este returnat fie un TimeWindowedKStream, fie un SessionWindowedKStream;
  • numărul de tranzacții pentru operațiunea de agregare. Fluxul de date cu ferestre determină dacă o anumită înregistrare este luată în considerare în acest număr;
  • scrierea rezultatelor într-un subiect sau trimiterea lor către consolă în timpul dezvoltării.

Topologia acestei aplicații este simplă, dar o imagine clară a acesteia ar fi utilă. Să aruncăm o privire la Fig. 5.11.

În continuare, ne vom uita la funcționalitatea operațiunilor cu ferestre și la codul corespunzător.

Cartea „Kafka Streams in Action. Aplicații și microservicii pentru lucru în timp real”

Tipuri de ferestre

Există trei tipuri de ferestre în Kafka Streams:

  • sesional;
  • „turbling” (turbling);
  • alunecând/sărind.

Pe care să alegi depinde de cerințele afacerii tale. Ferestrele de turnare și sărituri sunt limitate în timp, în timp ce ferestrele de sesiune sunt limitate de activitatea utilizatorului - durata sesiunii (sesiunilor) este determinată numai de cât de activ este utilizatorul. Principalul lucru de reținut este că toate tipurile de ferestre se bazează pe ștampila date/ora ale intrărilor, nu pe ora sistemului.

În continuare, implementăm topologia noastră cu fiecare dintre tipurile de ferestre. Codul complet va fi dat doar în primul exemplu pentru alte tipuri de ferestre nu se va schimba nimic în afară de tipul de operare a ferestrei.

Ferestre de sesiune

Ferestrele de sesiune sunt foarte diferite de toate celelalte tipuri de ferestre. Ele sunt limitate nu atât de timp, cât de activitatea utilizatorului (sau de activitatea entității pe care ați dori să o urmăriți). Ferestrele de sesiune sunt delimitate de perioade de inactivitate.

Figura 5.12 ilustrează conceptul de ferestre de sesiune. Sesiunea mai mică se va îmbina cu sesiunea din stânga ei. Iar sesiunea din dreapta va fi separată pentru că urmează o lungă perioadă de inactivitate. Ferestrele de sesiune se bazează pe activitatea utilizatorului, dar folosesc ștampile de dată/ora din intrări pentru a determina cărei sesiuni îi aparține intrarea.

Cartea „Kafka Streams in Action. Aplicații și microservicii pentru lucru în timp real”

Utilizarea ferestrelor de sesiune pentru a urmări tranzacțiile cu acțiuni

Să folosim ferestrele de sesiune pentru a captura informații despre tranzacțiile de schimb. Implementarea ferestrelor de sesiune este prezentată în Lista 5.5 (care poate fi găsită în src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

Cartea „Kafka Streams in Action. Aplicații și microservicii pentru lucru în timp real”
Ați văzut deja majoritatea operațiunilor din această topologie, așa că nu este nevoie să le priviți din nou aici. Dar există și câteva elemente noi aici, pe care le vom discuta acum.

Orice operație groupBy efectuează de obicei un fel de operație de agregare (agregare, acumulare sau numărare). Puteți efectua fie o agregare cumulată cu un total cumulat, fie o agregare în fereastră, care ia în considerare înregistrările dintr-o fereastră de timp specificată.

Codul din Lista 5.5 contorizează numărul de tranzacții din ferestrele de sesiune. În fig. 5.13 aceste acțiuni sunt analizate pas cu pas.

Apelând windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) creăm o fereastră de sesiune cu un interval de inactivitate de 20 de secunde și un interval de persistență de 15 minute. Un interval inactiv de 20 de secunde înseamnă că aplicația va include orice intrare care ajunge în 20 de secunde de la sfârșitul sau începutul sesiunii curente în sesiunea curentă (activă).

Cartea „Kafka Streams in Action. Aplicații și microservicii pentru lucru în timp real”
Apoi, specificăm ce operație de agregare trebuie efectuată în fereastra de sesiune - în acest caz, numărați. Dacă o intrare primită se încadrează în afara ferestrei de inactivitate (orice parte a ștampilei datei/ora), aplicația creează o nouă sesiune. Intervalul de păstrare înseamnă menținerea unei sesiuni pentru o anumită perioadă de timp și permite date cu întârziere care se extind dincolo de perioada de inactivitate a sesiunii, dar care pot fi încă atașate. În plus, începutul și sfârșitul noii sesiuni rezultate din îmbinare corespund celei mai vechi și cele mai recente ștampile de dată/ora.

Să ne uităm la câteva intrări din metoda numărării pentru a vedea cum funcționează sesiunile (Tabelul 5.1).

Cartea „Kafka Streams in Action. Aplicații și microservicii pentru lucru în timp real”
Când sosesc înregistrările, căutăm sesiuni existente cu aceeași cheie, o oră de încheiere mai mică decât ștampila curentă de dată/oră - interval de inactivitate și o oră de început mai mare decât ștampila curentă de dată/oră + interval de inactivitate. Ținând cont de acest lucru, patru intrări din tabel. 5.1 sunt îmbinate într-o singură sesiune, după cum urmează.

1. Înregistrarea 1 ajunge prima, deci ora de început este egală cu ora de încheiere și este 00:00:00.

2. În continuare, sosește intrarea 2 și căutăm sesiunile care se încheie nu mai devreme de 23:59:55 și încep nu mai târziu de 00:00:35. Găsim înregistrarea 1 și combinăm sesiunile 1 și 2. Luăm ora de începere a sesiunii 1 (mai devreme) și ora de încheiere a sesiunii 2 (mai târziu), astfel încât noua noastră sesiune începe la 00:00:00 și se încheie la 00: 00:15.

3. Sosește recordul 3, căutăm sesiuni între 00:00:30 și 00:01:10 și nu găsim niciuna. Adăugați o a doua sesiune pentru cheia 123-345-654,FFBE, care începe și se termină la 00:00:50.

4. Sosește recordul 4 și căutăm sesiuni între 23:59:45 și 00:00:25. De această dată se găsesc ambele sesiuni 1 și 2. Toate cele trei sesiuni sunt combinate într-una singură, cu o oră de început de 00:00:00 și o oră de sfârșit de 00:00:15.

Din ceea ce este descris în această secțiune, merită să ne amintim următoarele nuanțe importante:

  • sesiunile nu sunt ferestre de dimensiuni fixe. Durata unei sesiuni este determinată de activitate într-o anumită perioadă de timp;
  • Marcajele date/ora din date determină dacă evenimentul se încadrează într-o sesiune existentă sau într-o perioadă inactivă.

În continuare, vom discuta despre următorul tip de fereastră - ferestre „turbling”.

Ferestre „tumbling”.

Ferestrele care se prăbușesc captează evenimente care se încadrează într-o anumită perioadă de timp. Imaginați-vă că trebuie să captați toate tranzacțiile cu acțiuni ale unei anumite companii la fiecare 20 de secunde, astfel încât să colectați toate evenimentele din acea perioadă de timp. La sfârșitul intervalului de 20 de secunde, fereastra se rotește și trece la un nou interval de observare de 20 de secunde. Figura 5.14 ilustrează această situație.

Cartea „Kafka Streams in Action. Aplicații și microservicii pentru lucru în timp real”
După cum puteți vedea, toate evenimentele primite în ultimele 20 de secunde sunt incluse în fereastră. La sfârșitul acestei perioade de timp, este creată o nouă fereastră.

Lista 5.6 arată codul care demonstrează utilizarea ferestrelor rotative pentru a captura tranzacții cu acțiuni la fiecare 20 de secunde (găsit în src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Cartea „Kafka Streams in Action. Aplicații și microservicii pentru lucru în timp real”
Cu această mică modificare a apelului de metodă TimeWindows.of, puteți utiliza o fereastră de rulare. Acest exemplu nu apelează metoda until(), deci se va folosi intervalul implicit de reținere de 24 de ore.

În cele din urmă, este timpul să trecem la ultima dintre opțiunile ferestrei - ferestre „sărite”.

Ferestre glisante („săritoare”)

Ferestrele glisante/săritoare sunt similare cu ferestrele care se rotesc, dar cu o ușoară diferență. Ferestrele glisante nu așteaptă până la sfârșitul intervalului de timp înainte de a crea o nouă fereastră pentru a procesa evenimentele recente. Ei încep calcule noi după un interval de așteptare mai mic decât durata ferestrei.

Pentru a ilustra diferențele dintre ferestrele care se prăbușesc și cele care sări, să revenim la exemplul numărării tranzacțiilor bursiere. Scopul nostru este încă să numărăm numărul de tranzacții, dar nu vrem să așteptăm toată perioada de timp înainte de a actualiza contorul. În schimb, vom actualiza contorul la intervale mai scurte. De exemplu, vom număra în continuare numărul de tranzacții la fiecare 20 de secunde, dar vom actualiza contorul la fiecare 5 secunde, așa cum se arată în Fig. 5.15. În acest caz, ajungem la trei ferestre de rezultate cu date suprapuse.

Cartea „Kafka Streams in Action. Aplicații și microservicii pentru lucru în timp real”
Lista 5.7 arată codul pentru definirea ferestrelor glisante (găsit în src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Cartea „Kafka Streams in Action. Aplicații și microservicii pentru lucru în timp real”
O fereastră de turnare poate fi convertită într-o fereastră de salt adăugând un apel la metoda advanceBy(). În exemplul prezentat, intervalul de salvare este de 15 minute.

Ați văzut în această secțiune cum să limitați rezultatele de agregare la ferestre de timp. În special, vreau să vă amintiți următoarele trei lucruri din această secțiune:

  • dimensiunea ferestrelor de sesiune este limitată nu de perioada de timp, ci de activitatea utilizatorului;
  • ferestrele „turbling” oferă o imagine de ansamblu asupra evenimentelor dintr-o anumită perioadă de timp;
  • Durata ferestrelor de salt este fixă, dar acestea sunt actualizate frecvent și pot conține intrări care se suprapun în toate ferestrele.

În continuare, vom învăța cum să convertim un KTable înapoi într-un KStream pentru o conexiune.

5.3.3. Conectarea obiectelor KStream și KTable

În capitolul 4, am discutat conectarea a două obiecte KStream. Acum trebuie să învățăm cum să conectăm KTable și KStream. Acest lucru poate fi necesar din următorul motiv simplu. KStream este un flux de înregistrări, iar KTable este un flux de actualizări de înregistrări, dar uneori poate doriți să adăugați context suplimentar fluxului de înregistrări folosind actualizări din KTable.

Să luăm date despre numărul de tranzacții bursiere și să le combinăm cu știrile bursiere pentru industriile relevante. Iată ce trebuie să faceți pentru a realiza acest lucru, având în vedere codul pe care îl aveți deja.

  1. Convertiți un obiect KTable cu date despre numărul de tranzacții cu acțiuni într-un KStream, urmat de înlocuirea cheii cu cheia care indică sectorul industrial corespunzător acestui simbol bursier.
  2. Creați un obiect KTable care citește date dintr-un subiect cu știri de la bursă. Acest nou KTable va fi clasificat în funcție de sectorul industrial.
  3. Conectați actualizările de știri cu informații despre numărul de tranzacții bursiere pe sectorul industrial.

Acum să vedem cum să implementăm acest plan de acțiune.

Convertiți KTable în KStream

Pentru a converti KTable în KStream, trebuie să faceți următoarele.

  1. Apelați metoda KTable.toStream().
  2. Apelând metoda KStream.map, înlocuiți cheia cu numele industriei și apoi preluați obiectul TransactionSummary din instanța Windowed.

Vom înlănțui aceste operații după cum urmează (codul poate fi găsit în fișierul src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listarea 5.8).

Cartea „Kafka Streams in Action. Aplicații și microservicii pentru lucru în timp real”
Deoarece efectuăm o operație KStream.map, instanța KStream returnată este re-partiționată automat atunci când este utilizată într-o conexiune.

Am finalizat procesul de conversie, apoi trebuie să creăm un obiect KTable pentru citirea știrilor bursiere.

Crearea KTable pentru știrile bursiere

Din fericire, crearea unui obiect KTable necesită o singură linie de cod (codul poate fi găsit în src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.9).

Cartea „Kafka Streams in Action. Aplicații și microservicii pentru lucru în timp real”
Este demn de remarcat faptul că nu este necesar să fie specificate niciun obiect Serde, deoarece în setări sunt folosite șirurile Serde. De asemenea, prin utilizarea celei mai devreme enumerari, tabelul este umplut cu înregistrări chiar de la început.

Acum putem trece la pasul final - conexiunea.

Conectarea actualizărilor de știri cu datele privind numărul de tranzacții

Crearea unei conexiuni nu este dificilă. Vom folosi o alăturare stângă în cazul în care nu există știri de stoc pentru industria relevantă (codul necesar poate fi găsit în fișierul src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.10).

Cartea „Kafka Streams in Action. Aplicații și microservicii pentru lucru în timp real”
Acest operator leftJoin este destul de simplu. Spre deosebire de îmbinările din Capitolul 4, metoda JoinWindow nu este utilizată deoarece atunci când se efectuează o îmbinare KStream-KTable, există o singură intrare în KTable pentru fiecare cheie. O astfel de conexiune nu este limitată în timp: înregistrarea este fie în KTtable, fie absentă. Concluzia principală: folosind obiecte KTable puteți îmbogăți KStream cu date de referință actualizate mai rar.

Acum vom analiza o modalitate mai eficientă de a îmbogăți evenimentele din KStream.

5.3.4. Obiecte GlobalKTable

După cum puteți vedea, este nevoie să îmbogățiți fluxurile de evenimente sau să le adăugați context. În capitolul 4 ați văzut conexiunile dintre două obiecte KStream, iar în secțiunea anterioară ați văzut conexiunea dintre un KStream și un KTable. În toate aceste cazuri, este necesară repartiționarea fluxului de date atunci când mapați cheile la un nou tip sau valoare. Uneori repartiționarea se face în mod explicit, iar uneori Kafka Streams o face automat. Re-partiționarea este necesară deoarece cheile s-au schimbat și înregistrările trebuie să ajungă în secțiuni noi, altfel conexiunea va fi imposibilă (a fost discutat în Capitolul 4, în secțiunea „Re-partiționarea datelor” din subsecțiunea 4.2.4).

Repartiționarea are un cost

Re-partiționarea necesită costuri - costuri suplimentare de resurse pentru crearea subiectelor intermediare, stocarea datelor duplicate într-un alt subiect; înseamnă și o latență crescută datorită scrierii și citirii din acest subiect. În plus, dacă trebuie să vă alăturați mai mult de un aspect sau dimensiune, trebuie să legați conexiunile, să mapați înregistrările cu chei noi și să rulați din nou procesul de re-partiționare.

Conectarea la seturi de date mai mici

În unele cazuri, volumul de date de referință care trebuie conectat este relativ mic, astfel încât copii complete ale acestora se pot potrivi cu ușurință local pe fiecare nod. Pentru situații ca aceasta, Kafka Streams oferă clasa GlobalKTable.

Instanțele GlobalKTable sunt unice deoarece aplicația reproduce toate datele la fiecare dintre noduri. Și, deoarece toate datele sunt prezente pe fiecare nod, nu este nevoie să partiționați fluxul de evenimente prin cheia de date de referință, astfel încât să fie disponibil pentru toate partițiile. De asemenea, puteți face alăturari fără cheie folosind obiecte GlobalKTable. Să ne întoarcem la unul dintre exemplele anterioare pentru a demonstra această caracteristică.

Conectarea obiectelor KStream la obiectele GlobalKTable

În subsecțiunea 5.3.2, am efectuat agregarea în fereastră a tranzacțiilor de schimb de către cumpărători. Rezultatele acestei agregari arata cam asa:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Deși aceste rezultate au servit scopului, ar fi fost mai util dacă s-ar fi afișat și numele clientului și numele complet al companiei. Pentru a adăuga numele clientului și numele companiei, puteți face alăturari normale, dar va trebui să faceți două mapări ale cheilor și re-partiționare. Cu GlobalKTable puteți evita costurile unor astfel de operațiuni.

Pentru a face acest lucru, vom folosi obiectul countStream din Listarea 5.11 (codul corespunzător poate fi găsit în src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) și îl vom conecta la două obiecte GlobalKTable.

Cartea „Kafka Streams in Action. Aplicații și microservicii pentru lucru în timp real”
Am mai discutat despre asta, așa că nu o voi repeta. Dar observ că codul din funcția toStream().map este abstractizat într-un obiect funcție în loc de o expresie lambda inline, de dragul lizibilității.

Următorul pas este declararea a două instanțe ale GlobalKTable (codul afișat poate fi găsit în fișierul src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listarea 5.12).

Cartea „Kafka Streams in Action. Aplicații și microservicii pentru lucru în timp real”

Vă rugăm să rețineți că numele subiectelor sunt descrise folosind tipuri enumerate.

Acum că avem toate componentele pregătite, tot ce rămâne este să scriem codul pentru conexiune (care poate fi găsit în fișierul src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listing 5.13).

Cartea „Kafka Streams in Action. Aplicații și microservicii pentru lucru în timp real”
Deși există două îmbinări în acest cod, acestea sunt înlănțuite deoarece niciunul dintre rezultatele lor nu este utilizat separat. Rezultatele sunt afișate la sfârșitul întregii operațiuni.

Când executați operația de alăturare de mai sus, veți obține rezultate ca acestea:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Esența nu s-a schimbat, dar aceste rezultate par mai clare.

Dacă numărați invers până la capitolul 4, ați văzut deja câteva tipuri de conexiuni în acțiune. Sunt enumerate în tabel. 5.2. Acest tabel reflectă capacitățile de conectivitate din versiunea 1.0.0 a Kafka Streams; Se poate schimba ceva în versiunile viitoare.

Cartea „Kafka Streams in Action. Aplicații și microservicii pentru lucru în timp real”
Pentru a încheia lucrurile, să recapitulăm elementele de bază: puteți conecta fluxuri de evenimente (KStream) și actualizați fluxuri (KTable) folosind starea locală. Alternativ, dacă dimensiunea datelor de referință nu este prea mare, puteți utiliza obiectul GlobalKTable. GlobalKTables replică toate partițiile în fiecare nod al aplicației Kafka Streams, asigurându-se că toate datele sunt disponibile indiferent de partiția căreia îi corespunde cheia.

În continuare vom vedea caracteristica Kafka Streams, datorită căreia putem observa schimbările de stare fără a consuma date dintr-un subiect Kafka.

5.3.5. Stare interogabilă

Am efectuat deja mai multe operațiuni care implică stare și scoatem întotdeauna rezultatele în consolă (în scopuri de dezvoltare) sau le scriem într-un subiect (în scopuri de producție). Când scrieți rezultate pentru un subiect, trebuie să utilizați un consumator Kafka pentru a le vizualiza.

Citirea datelor din aceste subiecte poate fi considerată un tip de vederi materializate. În scopurile noastre, putem folosi definiția unei vederi materializate din Wikipedia: „...un obiect fizic de bază de date care conține rezultatele unei interogări. De exemplu, ar putea fi o copie locală a datelor de la distanță sau un subset de rânduri și/sau coloane ale unui tabel sau rezultate de unire sau un tabel rezumat obținut prin agregare” (https://en.wikipedia.org/wiki /Vizualizare_materializată).

Kafka Streams vă permite, de asemenea, să rulați interogări interactive în magazinele de stat, permițându-vă să citiți direct aceste vizualizări materializate. Este important de reținut că interogarea către magazinul de stat este o operațiune numai în citire. Acest lucru vă asigură că nu trebuie să vă faceți griji că starea este inconsistentă în timp ce aplicația dvs. procesează date.

Abilitatea de a interoga direct magazinele de stare este importantă. Aceasta înseamnă că puteți crea aplicații de tablou de bord fără a fi nevoie să preluați mai întâi date de la consumatorul Kafka. De asemenea, crește eficiența aplicației, datorită faptului că nu este nevoie să scrieți din nou date:

  • datorită localității datelor, acestea pot fi accesate rapid;
  • duplicarea datelor este eliminată, deoarece acestea nu sunt scrise pe stocarea externă.

Principalul lucru pe care vreau să-l amintiți este că puteți interoga direct starea din cadrul aplicației dvs. Oportunitățile pe care vi le oferă acest lucru nu pot fi exagerate. În loc să consumați date de la Kafka și să stocați înregistrări într-o bază de date pentru aplicație, puteți interoga stocurile de stare cu același rezultat. Interogările directe către magazinele de stat înseamnă mai puțin cod (fără consumator) și mai puțin software (nu este nevoie de un tabel de bază de date pentru a stoca rezultatele).

Am acoperit destul de mult teren în acest capitol, așa că vom lăsa deocamdată discuția despre interogările interactive împotriva magazinelor de stat. Dar nu vă faceți griji: în capitolul 9, vom crea o aplicație de tablou de bord simplă cu interogări interactive. Va folosi câteva dintre exemplele din acest capitol și din capitolele anterioare pentru a demonstra interogările interactive și modul în care le puteți adăuga la aplicațiile Kafka Streams.

Rezumat

  • Obiectele KStream reprezintă fluxuri de evenimente, comparabile cu inserările într-o bază de date. Obiectele KTable reprezintă fluxuri de actualizare, mai mult ca actualizările unei baze de date. Dimensiunea obiectului KTable nu crește, înregistrările vechi sunt înlocuite cu altele noi.
  • Obiectele KTable sunt necesare pentru operațiunile de agregare.
  • Folosind operațiunile de ferestre, puteți împărți datele agregate în intervale de timp.
  • Datorită obiectelor GlobalKTable, puteți accesa datele de referință oriunde în aplicație, indiferent de partiționare.
  • Sunt posibile conexiuni între obiectele KStream, KTable și GlobalKTable.

Până acum, ne-am concentrat pe construirea de aplicații Kafka Streams folosind KStream DSL de nivel înalt. Deși abordarea la nivel înalt vă permite să creați programe ordonate și concise, utilizarea acesteia reprezintă un compromis. Lucrul cu DSL KStream înseamnă creșterea conciziei codului dvs. prin reducerea gradului de control. În capitolul următor, ne vom uita la API-ul nodului de gestionare de nivel scăzut și vom încerca alte compromisuri. Programele vor fi mai lungi decât erau înainte, dar vom putea crea aproape orice nod de gestionare de care am putea avea nevoie.

→ Mai multe detalii despre carte găsiți la site-ul editorului

→ Pentru Habrozhiteli 25% reducere folosind cupon - Kafka Streams

→ La plata versiunii pe hârtie a cărții, se va trimite o carte electronică prin e-mail.

Sursa: www.habr.com

Adauga un comentariu