El llibre “Kafka Streams in Action. Aplicacions i microserveis per al treball en temps real"

El llibre “Kafka Streams in Action. Aplicacions i microserveis per al treball en temps real" Hola, residents de Khabro! Aquest llibre és adequat per a qualsevol desenvolupador que vulgui entendre el processament de fils. Entendre la programació distribuïda us ajudarà a entendre millor Kafka i Kafka Streams. Estaria bé conèixer el marc Kafka en si, però això no és necessari: us diré tot el que necessiteu. Tant els desenvolupadors Kafka amb experiència com els novells aprendran a crear aplicacions de processament de flux interessants mitjançant la biblioteca Kafka Streams d'aquest llibre. Els desenvolupadors de Java intermedis i avançats que ja estiguin familiaritzats amb conceptes com la serialització aprendran a aplicar les seves habilitats per crear aplicacions Kafka Streams. El codi font del llibre està escrit en Java 8 i fa un ús important de la sintaxi d'expressió lambda de Java 8, de manera que saber treballar amb funcions lambda (fins i tot en un altre llenguatge de programació) serà molt útil.

Fragment. 5.3. Operacions d'agregació i finestres

En aquesta secció, passarem a explorar les parts més prometedores de Kafka Streams. Fins ara hem cobert els aspectes següents de Kafka Streams:

  • crear una topologia de processament;
  • ús de l'estat en aplicacions de streaming;
  • realitzar connexions de flux de dades;
  • diferències entre els fluxos d'esdeveniments (KStream) i els fluxos d'actualització (KTable).

En els exemples següents reunirem tots aquests elements. També aprendràs sobre les finestres, una altra gran característica de les aplicacions de streaming. El nostre primer exemple serà una simple agregació.

5.3.1. Agregació de vendes d'estocs per sectors industrials

L'agregació i l'agrupació són eines vitals quan es treballa amb dades en temps real. L'examen dels registres individuals a mesura que es reben sovint és insuficient. Per extreure informació addicional de les dades, cal agrupar-les i combinar-les.

En aquest exemple, us posareu la disfressa d'un comerciant diari que necessita fer un seguiment del volum de vendes d'existències d'empreses de diverses indústries. Concretament, esteu interessats en les cinc empreses amb les vendes de quotes més grans de cada indústria.

Aquesta agregació requerirà els següents passos per traduir les dades a la forma desitjada (parlant en termes generals).

  1. Creeu una font basada en temes que publiqui informació de negociació d'accions en brut. Haurem de mapar un objecte del tipus StockTransaction amb un objecte del tipus ShareVolume. La qüestió és que l'objecte StockTransaction conté metadades de vendes, però només necessitem dades sobre el nombre d'accions que es venen.
  2. Agrupa les dades del volum per símbol d'accions. Un cop agrupades per símbol, podeu replegar aquestes dades en subtotals de volums de vendes d'estocs. Val la pena assenyalar que el mètode KStream.groupBy retorna una instància del tipus KGroupedStream. I podeu obtenir una instància de KTable cridant encara més el mètode KGroupedStream.reduce.

Què és la interfície KGroupedStream

Els mètodes KStream.groupBy i KStream.groupByKey retornen una instància de KGroupedStream. KGroupedStream és una representació intermèdia d'un flux d'esdeveniments després de l'agrupació per claus. No està pensat en absolut per treballar directament amb ell. En canvi, KGroupedStream s'utilitza per a operacions d'agregació, el resultat de les quals és sempre una KTable. I com que el resultat de les operacions d'agregació és una KTable i utilitzen un magatzem d'estat, és possible que no totes les actualitzacions com a resultat s'enviïn més avall.

El mètode KTable.groupBy retorna una KGroupedTable similar: una representació intermèdia del flux d'actualitzacions, reagrupada per clau.

Fem una petita pausa i mirem la Fig. 5.9, que mostra el que hem aconseguit. Aquesta topologia ja us hauria de ser molt familiar.

El llibre “Kafka Streams in Action. Aplicacions i microserveis per al treball en temps real"
Vegem ara el codi d'aquesta topologia (es pot trobar al fitxer src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Llistat 5.2).

El llibre “Kafka Streams in Action. Aplicacions i microserveis per al treball en temps real"
El codi donat es distingeix per la seva brevetat i el gran volum d'accions realitzades en diverses línies. És possible que noteu alguna cosa nova al primer paràmetre del mètode builder.stream: un valor del tipus d'enumeració AutoOffsetReset.EARLIEST (també hi ha un LATEST), establert mitjançant el mètode Consumed.withOffsetResetPolicy. Aquest tipus d'enumeració es pot utilitzar per especificar una estratègia de restabliment de compensació per a cada KStream o KTable i té prioritat sobre l'opció de restabliment de compensació de la configuració.

GroupByKey i GroupBy

La interfície KStream té dos mètodes per agrupar registres: GroupByKey i GroupBy. Tots dos retornen una KGroupedTable, així que potser us preguntareu quina diferència hi ha entre ells i quan utilitzar-ne quina?

El mètode GroupByKey s'utilitza quan les claus del KStream ja no estan buides. I el més important, la bandera "requereix tornar a particionar" mai es va establir.

El mètode GroupBy suposa que heu canviat les claus d'agrupació, de manera que el senyalador de repartició s'estableix en true. La realització d'unions, agregacions, etc. després del mètode GroupBy donarà lloc a una nova partició automàtica.
Resum: sempre que sigui possible, hauríeu d'utilitzar GroupByKey en lloc de GroupBy.

Està clar què fan els mètodes mapValues ​​i groupBy, així que fem una ullada al mètode sum() (que es troba a src/main/java/bbejeck/model/ShareVolume.java) (Llistat 5.3).

El llibre “Kafka Streams in Action. Aplicacions i microserveis per al treball en temps real"
El mètode ShareVolume.sum retorna el total acumulat del volum de vendes d'estocs i el resultat de tota la cadena de càlculs és un objecte KTable . Ara enteneu el paper que juga KTable. Quan arriben els objectes ShareVolume, l'objecte KTable corresponent emmagatzema la darrera actualització actual. És important recordar que totes les actualitzacions es reflecteixen a l'anterior shareVolumeKTable, però no totes s'envien més.

A continuació, utilitzem aquesta KTable per agregar (per nombre d'accions negociades) per arribar a les cinc empreses amb els volums més alts d'accions negociades en cada indústria. Les nostres accions en aquest cas seran similars a les de la primera agregació.

  1. Realitzeu una altra operació groupBy per agrupar objectes ShareVolume individuals per indústria.
  2. Comenceu a resumir objectes ShareVolume. Aquesta vegada, l'objecte d'agregació és una cua de prioritat de mida fixa. En aquesta cua de mida fixa, només es conserven les cinc empreses amb més quantitats d'accions venudes.
  3. Assigna les cues del paràgraf anterior a un valor de cadena i retorna les cinc accions més negociades per nombre per sector.
  4. Escriu els resultats en forma de cadena al tema.

A la Fig. La figura 5.10 mostra el gràfic de la topologia del flux de dades. Com podeu veure, la segona ronda de processament és bastant senzilla.

El llibre “Kafka Streams in Action. Aplicacions i microserveis per al treball en temps real"
Ara que tenim una comprensió clara de l'estructura d'aquesta segona ronda de processament, podem recórrer al seu codi font (el trobareu al fitxer src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Llistat 5.4) .

Aquest inicialitzador conté una variable fixedQueue. Aquest és un objecte personalitzat que és un adaptador per a java.util.TreeSet que s'utilitza per fer un seguiment dels N resultats principals en ordre descendent de les accions negociades.

El llibre “Kafka Streams in Action. Aplicacions i microserveis per al treball en temps real"
Ja heu vist les trucades groupBy i mapValues, així que no entrarem en aquestes (estem cridant al mètode KTable.toStream perquè el mètode KTable.print està obsolet). Però encara no heu vist la versió KTable d'aggregate(), així que passarem una mica de temps discutint-ho.

Com recordeu, el que fa diferent a KTable és que els registres amb les mateixes claus es consideren actualitzacions. KTable substitueix l'entrada antiga per una de nova. L'agregació es produeix de manera similar: s'agreguen els últims registres amb la mateixa clau. Quan arriba un registre, s'afegeix a la instància de classe FixedSizePriorityQueue mitjançant un sumador (segon paràmetre a la trucada del mètode agregat), però si ja existeix un altre registre amb la mateixa clau, llavors el registre antic s'elimina mitjançant un subtractor (tercer paràmetre de la trucada al mètode agregat).

Tot això vol dir que el nostre agregador, FixedSizePriorityQueue, no agrega tots els valors amb una sola clau, sinó que emmagatzema una suma mòbil de les quantitats dels N tipus d'accions més negociats. Cada entrada entrant conté el nombre total d'accions venudes fins ara. KTable us donarà informació sobre quines accions de les empreses es cotitzen actualment, sense requerir l'agregació continuada de cada actualització.

Hem après a fer dues coses importants:

  • agrupar els valors a KTable mitjançant una clau comuna;
  • realitzar operacions útils com ara acumulació i agregació d'aquests valors agrupats.

Saber com realitzar aquestes operacions és important per entendre el significat de les dades que es mouen a través d'una aplicació Kafka Streams i per entendre quina informació transporten.

També hem reunit alguns dels conceptes clau comentats anteriorment en aquest llibre. Al capítol 4, vam parlar de com és important l'estat local tolerant a errors per a una aplicació de streaming. El primer exemple d'aquest capítol va demostrar per què l'estat local és tan important: us permet fer un seguiment de la informació que ja heu vist. L'accés local evita els retards de la xarxa, fent que l'aplicació sigui més eficient i resistent als errors.

Quan feu qualsevol operació d'agregació o acumulació, heu d'especificar el nom de la botiga d'estat. Les operacions de acumulació i agregació retornen una instància de KTable i KTable utilitza l'emmagatzematge d'estat per substituir els resultats antics per de nous. Com heu vist, no totes les actualitzacions s'envien pel canal, i això és important perquè les operacions d'agregació estan dissenyades per produir informació resumida. Si no apliqueu l'estat local, KTable reenviarà tots els resultats d'agregació i acumulació.

A continuació, analitzarem la realització d'operacions com ara l'agregació en un període de temps específic: les anomenades operacions de finestres.

5.3.2. Operacions de la finestra

A la secció anterior, vam introduir la convolució lliscant i l'agregació. L'aplicació va realitzar una acumulació contínua de vendes d'accions seguida de l'agregació de les cinc accions més negociades a la borsa.

De vegades és necessària aquesta agregació i acumulació contínua de resultats. I de vegades només cal fer operacions durant un període de temps determinat. Per exemple, calculeu quantes transaccions de canvi s'han fet amb accions d'una empresa determinada en els darrers 10 minuts. O quants usuaris han fet clic en un nou bàner publicitari durant els darrers 15 minuts. Una aplicació pot realitzar aquestes operacions diverses vegades, però amb resultats que s'apliquen només a períodes de temps especificats (finestres de temps).

Recompte de transaccions de canvi per part del comprador

En el següent exemple, farem un seguiment de les transaccions d'accions entre diversos comerciants, ja siguin grans organitzacions o financers individuals intel·ligents.

Hi ha dos possibles motius per a aquest seguiment. Un d'ells és la necessitat de saber què estan comprant/venent els líders del mercat. Si aquests grans jugadors i inversors sofisticats veuen oportunitats, té sentit seguir la seva estratègia. La segona raó és el desig de detectar possibles signes de comerç il·legal d'informació privilegiada. Per fer-ho, caldrà analitzar la correlació de grans pics de vendes amb comunicats de premsa importants.

Aquest seguiment consta dels passos següents:

  • crear un flux per llegir a partir del tema de les transaccions d'accions;
  • agrupar els registres entrants per identificador de comprador i símbol de valors. Cridar el mètode groupBy retorna una instància de la classe KGroupedStream;
  • El mètode KGroupedStream.windowedBy retorna un flux de dades limitat a una finestra de temps, que permet l'agregació per finestres. Depenent del tipus de finestra, es retorna un TimeWindowedKStream o un SessionWindowedKStream;
  • recompte de transaccions per a l'operació d'agregació. El flux de dades amb finestra determina si un registre concret es té en compte en aquest recompte;
  • escriure resultats a un tema o enviar-los a la consola durant el desenvolupament.

La topologia d'aquesta aplicació és senzilla, però una imatge clara d'ella seria útil. Fem una ullada a la Fig. 5.11.

A continuació, veurem la funcionalitat de les operacions de la finestra i el codi corresponent.

El llibre “Kafka Streams in Action. Aplicacions i microserveis per al treball en temps real"

Tipus de finestres

Hi ha tres tipus de finestres a Kafka Streams:

  • de sessió;
  • “tombant” (tombant);
  • lliscant/saltant.

Quina triar depèn de les necessitats del vostre negoci. Les finestres de caiguda i salt estan limitades en el temps, mentre que les finestres de sessió estan limitades per l'activitat de l'usuari; la durada de les sessions es determina únicament per l'activitat de l'usuari. El més important a recordar és que tots els tipus de finestres es basen en els segells de data/hora de les entrades, no en l'hora del sistema.

A continuació, implementem la nostra topologia amb cadascun dels tipus de finestres. El codi complet es donarà només al primer exemple per a altres tipus de finestres, no canviarà res excepte el tipus d'operació de la finestra.

Finestres de sessió

Les finestres de sessió són molt diferents de tots els altres tipus de finestres. Estan limitats no tant pel temps com per l'activitat de l'usuari (o l'activitat de l'entitat de la qual voleu fer un seguiment). Les finestres de sessió estan delimitades per períodes d'inactivitat.

La figura 5.12 il·lustra el concepte de finestres de sessió. La sessió més petita es fusionarà amb la sessió a la seva esquerra. I la sessió de la dreta serà separada perquè segueix un llarg període d'inactivitat. Les finestres de sessió es basen en l'activitat de l'usuari, però utilitzen segells de data/hora de les entrades per determinar a quina sessió pertany l'entrada.

El llibre “Kafka Streams in Action. Aplicacions i microserveis per al treball en temps real"

Ús de finestres de sessió per fer un seguiment de les transaccions d'accions

Utilitzem les finestres de sessió per capturar informació sobre transaccions d'intercanvi. La implementació de les finestres de sessió es mostra al Llistat 5.5 (que es pot trobar a src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

El llibre “Kafka Streams in Action. Aplicacions i microserveis per al treball en temps real"
Ja heu vist la majoria de les operacions d'aquesta topologia, de manera que no cal que les torneu a mirar aquí. Però també hi ha diversos elements nous, que ara parlarem.

Qualsevol operació groupBy normalment realitza algun tipus d'operació d'agregació (agregació, acumulació o recompte). Podeu realitzar una agregació acumulada amb un total acumulat o una agregació de finestres, que té en compte els registres dins d'una finestra de temps especificada.

El codi del llistat 5.5 compta el nombre de transaccions dins de les finestres de sessió. A la Fig. 5.13 aquestes accions s'analitzen pas a pas.

En cridar a windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) creem una finestra de sessió amb un interval d'inactivitat de 20 segons i un interval de persistència de 15 minuts. Un interval d'inactivitat de 20 segons significa que l'aplicació inclourà qualsevol entrada que arribi dins dels 20 segons del final o l'inici de la sessió actual a la sessió actual (activa).

El llibre “Kafka Streams in Action. Aplicacions i microserveis per al treball en temps real"
A continuació, especifiquem quina operació d'agregació s'ha de realitzar a la finestra de sessió; en aquest cas, comptar. Si una entrada entrant es troba fora de la finestra d'inactivitat (a qualsevol costat del segell de data/hora), l'aplicació crea una sessió nova. L'interval de retenció significa mantenir una sessió durant un cert període de temps i permet dades tardanes que s'estenen més enllà del període d'inactivitat de la sessió, però que encara es poden adjuntar. A més, l'inici i el final de la nova sessió resultant de la fusió corresponen al segell de data i hora més antic i més recent.

Vegem algunes entrades del mètode de recompte per veure com funcionen les sessions (taula 5.1).

El llibre “Kafka Streams in Action. Aplicacions i microserveis per al treball en temps real"
Quan arriben els registres, busquem les sessions existents amb la mateixa clau, una hora de finalització inferior a la marca de data/hora actual - interval d'inactivitat i una hora d'inici superior a la marca de data/hora actual + interval d'inactivitat. Tenint això en compte, quatre entrades de la taula. 5.1 es fusionen en una única sessió de la manera següent.

1. El registre 1 arriba primer, de manera que l'hora d'inici és igual a l'hora de finalització i és 00:00:00.

2. A continuació, arriba l'entrada 2, i busquem sessions que acabin no abans de les 23:59:55 i comencen com a molt tard a les 00:00:35. Trobem el registre 1 i combinem les sessions 1 i 2. Prenem l'hora d'inici de la sessió 1 (abans) i l'hora de finalització de la sessió 2 (més tard), de manera que la nostra nova sessió comenci a les 00:00:00 i acabi a les 00:00: 15:XNUMX.

3. Arriba el registre 3, busquem sessions entre les 00:00:30 i les 00:01:10 i no en trobem cap. Afegiu una segona sessió per a la clau 123-345-654,FFBE, començant i acabant a les 00:00:50.

4. Arriba el registre 4 i busquem sessions entre les 23:59:45 i les 00:00:25. Aquesta vegada es troben les dues sessions 1 i 2. Les tres sessions es combinen en una, amb una hora d'inici de 00:00:00 i una hora de finalització de 00:00:15.

Del que es descriu en aquesta secció, val la pena recordar els següents matisos importants:

  • les sessions no són finestres de mida fixa. La durada d'una sessió ve determinada per l'activitat en un període de temps determinat;
  • Els segells de data i hora de les dades determinen si l'esdeveniment es troba dins d'una sessió existent o durant un període d'inactivitat.

A continuació, parlarem del següent tipus de finestra: finestres "enrotllables".

Finestres "caigudes".

Les finestres caigudes capturen esdeveniments que cauen en un període de temps determinat. Imagineu que necessiteu capturar totes les transaccions d'accions d'una determinada empresa cada 20 segons, de manera que recolliu tots els esdeveniments durant aquest període de temps. Al final de l'interval de 20 segons, la finestra gira i es mou a un nou interval d'observació de 20 segons. La figura 5.14 il·lustra aquesta situació.

El llibre “Kafka Streams in Action. Aplicacions i microserveis per al treball en temps real"
Com podeu veure, tots els esdeveniments rebuts en els darrers 20 segons s'inclouen a la finestra. Al final d'aquest període de temps, es crea una nova finestra.

El Llistat 5.6 mostra el codi que demostra l'ús de finestres de caiguda per capturar transaccions d'accions cada 20 segons (es troba a src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

El llibre “Kafka Streams in Action. Aplicacions i microserveis per al treball en temps real"
Amb aquest petit canvi a la trucada al mètode TimeWindows.of, podeu utilitzar una finestra de caiguda. Aquest exemple no crida al mètode until(), de manera que s'utilitzarà l'interval de retenció predeterminat de 24 hores.

Finalment, és hora de passar a l'última de les opcions de la finestra: finestres de "salt".

Finestres corredisses ("saltant")

Les finestres lliscants/saltants són similars a les finestres que cauen, però amb una lleugera diferència. Les finestres lliscants no esperen fins al final de l'interval de temps abans de crear una nova finestra per processar els esdeveniments recents. Comencen nous càlculs després d'un interval d'espera inferior a la durada de la finestra.

Per il·lustrar les diferències entre les finestres que s'enfonsen i les que salten, tornem a l'exemple de comptar transaccions de borsa. El nostre objectiu encara és comptar el nombre de transaccions, però no volem esperar tot el temps abans d'actualitzar el comptador. En canvi, actualitzarem el comptador a intervals més curts. Per exemple, encara comptarem el nombre de transaccions cada 20 segons, però actualitzarem el comptador cada 5 segons, tal com es mostra a la Fig. 5.15. En aquest cas, acabem amb tres finestres de resultats amb dades superposades.

El llibre “Kafka Streams in Action. Aplicacions i microserveis per al treball en temps real"
El Llistat 5.7 mostra el codi per definir finestres lliscants (que es troba a src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

El llibre “Kafka Streams in Action. Aplicacions i microserveis per al treball en temps real"
Una finestra giratòria es pot convertir en una finestra de salt afegint una crida al mètode advanceBy(). A l'exemple mostrat, l'interval d'estalvi és de 15 minuts.

En aquesta secció heu vist com limitar els resultats de l'agregació a les finestres de temps. En particular, vull que recordeu les tres coses següents d'aquesta secció:

  • la mida de les finestres de sessió està limitada no pel període de temps, sinó per l'activitat de l'usuari;
  • les finestres "enrotllables" ofereixen una visió general dels esdeveniments en un període de temps determinat;
  • La durada de les finestres de salt és fixa, però s'actualitzen amb freqüència i poden contenir entrades superposades a totes les finestres.

A continuació, aprendrem a convertir un KTable de nou en un KStream per a una connexió.

5.3.3. Connexió d'objectes KStream i KTable

Al capítol 4, vam parlar de la connexió de dos objectes KStream. Ara hem d'aprendre a connectar KTable i KStream. Això pot ser necessari per la següent raó senzilla. KStream és un flux de registres i KTable és un flux d'actualitzacions de registres, però de vegades és possible que vulgueu afegir context addicional al flux de registres mitjançant les actualitzacions del KTable.

Agafem dades sobre el nombre de transaccions de la borsa i les combinem amb les notícies de la borsa de les indústries rellevants. Això és el que heu de fer per aconseguir-ho tenint en compte el codi que ja teniu.

  1. Convertiu un objecte KTable amb dades sobre el nombre de transaccions d'accions en un KStream, seguit de substitució de la clau per la clau que indica el sector industrial corresponent a aquest símbol d'accions.
  2. Creeu un objecte KTable que llegeixi dades d'un tema amb notícies de la borsa. Aquesta nova KTable es classificarà per sectors industrials.
  3. Connecteu les actualitzacions de notícies amb informació sobre el nombre de transaccions de borsa per sectors industrials.

Ara vegem com implementar aquest pla d'acció.

Converteix KTable a KStream

Per convertir KTable a KStream, heu de fer el següent.

  1. Truqueu al mètode KTable.toStream().
  2. En cridar el mètode KStream.map, substituïu la clau pel nom de la indústria i, a continuació, recupereu l'objecte TransactionSummary de la instància de la finestra.

Encadenarem aquestes operacions de la següent manera (el codi es pot trobar al fitxer src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Llistat 5.8).

El llibre “Kafka Streams in Action. Aplicacions i microserveis per al treball en temps real"
Com que estem realitzant una operació KStream.map, la instància de KStream retornada es torna a particionar automàticament quan s'utilitza en una connexió.

Hem completat el procés de conversió, a continuació hem de crear un objecte KTable per llegir notícies d'accions.

Creació de KTable per a notícies d'accions

Afortunadament, crear un objecte KTable només necessita una línia de codi (el codi es pot trobar a src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Llistat 5.9).

El llibre “Kafka Streams in Action. Aplicacions i microserveis per al treball en temps real"
Val la pena assenyalar que no cal especificar cap objecte Serde, ja que la cadena Serdes s'utilitza a la configuració. A més, utilitzant l'enumeració MÉS PRIMERA, la taula s'omple de registres al principi.

Ara podem passar al pas final: connexió.

Connectar actualitzacions de notícies amb dades de recompte de transaccions

Crear una connexió no és difícil. Utilitzarem una unió a l'esquerra en cas que no hi hagi notícies d'estoc per al sector rellevant (el codi necessari es pot trobar al fitxer src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Llistat 5.10).

El llibre “Kafka Streams in Action. Aplicacions i microserveis per al treball en temps real"
Aquest operador leftJoin és bastant senzill. A diferència de les unions del Capítol 4, el mètode JoinWindow no s'utilitza perquè quan es realitza una unió KStream-KTable, només hi ha una entrada a la KTable per a cada clau. Aquesta connexió no està limitada en el temps: el registre es troba a la KTable o està absent. La conclusió principal: amb objectes KTable podeu enriquir KStream amb dades de referència actualitzades amb menys freqüència.

Ara veurem una manera més eficient d'enriquir els esdeveniments de KStream.

5.3.4. Objectes GlobalKTable

Com podeu veure, cal enriquir els fluxos d'esdeveniments o afegir-hi context. Al capítol 4 heu vist les connexions entre dos objectes KStream, i a la secció anterior heu vist la connexió entre un KStream i una KTable. En tots aquests casos, és necessari tornar a particionar el flux de dades quan s'assignen les claus a un nou tipus o valor. De vegades, el reparticionament es fa de manera explícita i, de vegades, Kafka Streams ho fa automàticament. El reparticionament és necessari perquè les claus han canviat i els registres han d'acabar en seccions noves, en cas contrari la connexió serà impossible (això es va comentar al capítol 4, a l'apartat "Reparticionament de dades" a la subsecció 4.2.4).

Reparticionar té un cost

La repartició requereix costos: costos de recursos addicionals per crear temes intermedis, emmagatzemar dades duplicades en un altre tema; també significa un augment de la latència a causa de l'escriptura i la lectura d'aquest tema. A més, si necessiteu unir-vos a més d'un aspecte o dimensió, heu d'encadenar les unions, mapejar els registres amb claus noves i executar de nou el procés de repartició.

Connexió a conjunts de dades més petits

En alguns casos, el volum de dades de referència a connectar és relativament petit, de manera que les còpies completes es poden adaptar fàcilment localment a cada node. Per a situacions com aquesta, Kafka Streams proporciona la classe GlobalKTable.

Les instàncies de GlobalKTable són úniques perquè l'aplicació replica totes les dades a cadascun dels nodes. I com que totes les dades estan presents a cada node, no cal particionar el flux d'esdeveniments mitjançant la clau de dades de referència perquè estigui disponible per a totes les particions. També podeu fer unions sense clau mitjançant objectes GlobalKTable. Tornem a un dels exemples anteriors per demostrar aquesta característica.

Connectant objectes KStream amb objectes GlobalKTable

A la subsecció 5.3.2, vam realitzar l'agregació de finestres de transaccions de canvi per part dels compradors. Els resultats d'aquesta agregació semblaven a això:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Tot i que aquests resultats van complir el propòsit, hauria estat més útil que també s'hagués mostrat el nom del client i el nom complet de l'empresa. Per afegir el nom del client i el nom de l'empresa, podeu fer unions normals, però haureu de fer dues assignacions de claus i tornar a particionar. Amb GlobalKTable podeu evitar el cost d'aquestes operacions.

Per fer-ho, utilitzarem l'objecte countStream del Llistat 5.11 (el codi corresponent es pot trobar a src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) i el connectarem a dos objectes GlobalKTable.

El llibre “Kafka Streams in Action. Aplicacions i microserveis per al treball en temps real"
Ja ho hem parlat abans, així que no ho repetiré. Però observo que el codi de la funció toStream().map s'abstraeix en un objecte de funció en lloc d'una expressió lambda en línia per tal de facilitar la lectura.

El següent pas és declarar dues instàncies de GlobalKTable (el codi mostrat es pot trobar al fitxer src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Llistat 5.12).

El llibre “Kafka Streams in Action. Aplicacions i microserveis per al treball en temps real"

Tingueu en compte que els noms dels temes es descriuen mitjançant tipus enumerats.

Ara que ja tenim tots els components preparats, només queda escriure el codi de la connexió (que es pot trobar al fitxer src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Llistat 5.13).

El llibre “Kafka Streams in Action. Aplicacions i microserveis per al treball en temps real"
Tot i que hi ha dues unions en aquest codi, s'encadenen perquè cap dels seus resultats s'utilitza per separat. Els resultats es mostren al final de tota l'operació.

Quan executeu l'operació d'unió anterior, obtindreu resultats com aquest:

{customer='Barney, Smith' company="Exxon", transactions= 17}

L'essència no ha canviat, però aquests resultats semblen més clars.

Si feu el compte enrere fins al capítol 4, ja heu vist diversos tipus de connexions en acció. S'enumeren a la taula. 5.2. Aquesta taula reflecteix les capacitats de connectivitat a partir de la versió 1.0.0 de Kafka Streams; Alguna cosa pot canviar en futures versions.

El llibre “Kafka Streams in Action. Aplicacions i microserveis per al treball en temps real"
Per acabar, resumim els conceptes bàsics: podeu connectar fluxos d'esdeveniments (KStream) i actualitzar fluxos (KTable) mitjançant l'estat local. Alternativament, si la mida de les dades de referència no és massa gran, podeu utilitzar l'objecte GlobalKTable. GlobalKTables replica totes les particions a cada node de l'aplicació Kafka Streams, assegurant que totes les dades estiguin disponibles independentment de quina partició correspongui la clau.

A continuació veurem la funció Kafka Streams, gràcies a la qual podem observar canvis d'estat sense consumir dades d'un tema de Kafka.

5.3.5. Estat consultable

Ja hem realitzat diverses operacions que impliquen l'estat i sempre escrivim els resultats a la consola (per a finalitats de desenvolupament) o escriure'ls en un tema (per a finalitats de producció). Quan escriu resultats per a un tema, has d'utilitzar un consumidor de Kafka per veure'ls.

La lectura de dades d'aquests temes es pot considerar un tipus de vistes materialitzades. Per als nostres propòsits, podem utilitzar la definició d'una vista materialitzada de la Viquipèdia: “...un objecte de base de dades física que conté els resultats d'una consulta. Per exemple, podria ser una còpia local de dades remotes, o un subconjunt de files i/o columnes d'una taula o resultats d'unió, o una taula resum obtinguda mitjançant l'agregació” (https://en.wikipedia.org/wiki). /vista_materialitzada).

Kafka Streams també us permet executar consultes interactives a les botigues estatals, la qual cosa us permet llegir directament aquestes vistes materialitzades. És important tenir en compte que la consulta a la botiga d'estat és una operació de només lectura. Això garanteix que no us haureu de preocupar per fer que l'estat sigui inconsistent mentre la vostra aplicació processa dades.

La capacitat de consultar directament les botigues d'estat és important. Això vol dir que podeu crear aplicacions de tauler sense haver d'obtenir primer dades del consumidor de Kafka. També augmenta l'eficiència de l'aplicació, ja que no cal tornar a escriure dades:

  • gràcies a la localitat de les dades, s'hi pot accedir ràpidament;
  • s'elimina la duplicació de dades, ja que no s'escriu a l'emmagatzematge extern.

El més important que vull que recordeu és que podeu consultar directament l'estat des de la vostra aplicació. Les oportunitats que això us ofereix no es poden exagerar. En lloc de consumir dades de Kafka i emmagatzemar registres en una base de dades per a l'aplicació, podeu consultar els magatzems d'estat amb el mateix resultat. Les consultes directes a les botigues estatals signifiquen menys codi (sense consumidor) i menys programari (no cal una taula de base de dades per emmagatzemar els resultats).

Hem tractat una mica de terreny en aquest capítol, així que deixarem de moment la nostra discussió sobre les consultes interactives a les botigues estatals. Però no us preocupeu: al capítol 9, crearem una aplicació de tauler senzilla amb consultes interactives. Utilitzarà alguns dels exemples d'aquest i dels capítols anteriors per demostrar les consultes interactives i com podeu afegir-les a les aplicacions de Kafka Streams.

Resum

  • Els objectes KStream representen fluxos d'esdeveniments, comparables a les insercions en una base de dades. Els objectes KTable representen fluxos d'actualització, més aviat com actualitzacions d'una base de dades. La mida de l'objecte KTable no augmenta, els registres antics se substitueixen per de nous.
  • Els objectes KTable són necessaris per a les operacions d'agregació.
  • Mitjançant operacions de finestres, podeu dividir les dades agregades en grups de temps.
  • Gràcies als objectes GlobalKTable, podeu accedir a les dades de referència en qualsevol lloc de l'aplicació, independentment de la partició.
  • Són possibles connexions entre els objectes KStream, KTable i GlobalKTable.

Fins ara, ens hem centrat a crear aplicacions Kafka Streams utilitzant el KStream DSL d'alt nivell. Tot i que l'enfocament d'alt nivell us permet crear programes nets i concisos, utilitzar-lo representa una compensació. Treballar amb DSL KStream significa augmentar la concisió del vostre codi reduint el grau de control. Al capítol següent, veurem l'API del node de controlador de baix nivell i provarem altres intercanvis. Els programes seran més llargs del que eren abans, però podrem crear gairebé qualsevol node gestor que puguem necessitar.

→ Podeu trobar més detalls sobre el llibre a lloc web de l'editor

→ Per a Habrozhiteli 25% de descompte amb cupó - Kafka Streams

→ Un cop pagat la versió en paper del llibre, s'enviarà un llibre electrònic per correu electrònic.

Font: www.habr.com

Afegeix comentari