U libru "Kafka Streams in Action. Applicazioni è microservizii per u travagliu in tempu reale "

U libru "Kafka Streams in Action. Applicazioni è microservizii per u travagliu in tempu reale " Hola, residenti di Khabro! Stu libru hè adattatu per qualsiasi sviluppatore chì vole capisce u processu di filu. Capisce a prugrammazione distribuita vi aiuterà à capisce megliu Kafka è Kafka Streams. Saria bellu di cunnosce u quadru Kafka stessu, ma questu ùn hè micca necessariu: ​​Vi dicu tuttu ciò chì avete bisognu. I sviluppatori Kafka sperimentati è i principianti imparanu cumu creà applicazioni interessanti di trasfurmazioni di flussu utilizendu a libreria Kafka Streams in stu libru. I sviluppatori Java intermedi è avanzati digià familiarizati cù cuncetti cum'è a serializazione amparanu à applicà e so cumpetenze per creà applicazioni Kafka Streams. U codice fonte di u libru hè scrittu in Java 8 è face un usu significativu di a sintassi di l'espressione lambda di Java 8, cusì sapè cumu travaglià cù e funzioni lambda (ancu in un altru linguaghju di prugrammazione) serà utile.

Estratto. 5.3. Aggregazione è operazioni di finestra

In questa sezione, andemu à scopre e parti più promettenti di Kafka Streams. Finu a ora avemu coperto i seguenti aspetti di Kafka Streams:

  • crià una topologia di trasfurmazioni;
  • usu di u statu in l'applicazioni streaming;
  • realizà cunnessione di flussu di dati;
  • differenze trà i flussi di eventi (KStream) è i flussi d'aghjurnamentu (KTable).

In l'esempii seguenti, riunitemu tutti questi elementi. Puderete ancu amparà nantu à a finestra, una altra grande funzione di l'applicazioni streaming. U nostru primu esempiu serà una aggregazione simplice.

5.3.1. Agregazione di vendite di stock per settore industriale

L'aggregazione è u raggruppamentu sò strumenti vitali quandu si travaglia cù dati in streaming. L'esaminazione di i registri individuali in quantu sò ricevuti hè spessu insufficiente. Per caccià infurmazioni supplementari da e dati, hè necessariu di raggruppà è cumminà.

In questu esempiu, vi mette nantu à u vestitu di un trader di ghjornu chì hà bisognu di seguità u voluminu di vendita di l'azzioni di l'imprese in parechje industrii. In particulare, site interessatu in e cinque cumpagnie cù a più grande vendita di parte in ogni industria.

Tali aggregazione richiederà i seguenti passi per traduce i dati in a forma desiderata (parlendu in termini generale).

  1. Crea una fonte basata nantu à u tema chì publica l'infurmazioni prima di cummercializazioni di stock. Avemu da mappe un ughjettu di tipu StockTransaction à un oggettu di tipu ShareVolume. U puntu hè chì l'ughjettu StockTransaction cuntene metadata di vendita, ma avemu solu bisognu di dati nantu à u numeru di azzioni chì sò venduti.
  2. Group ShareVolume di dati per u simbulu di scorta. Una volta raggruppati per simbulu, pudete colapsà sta dati in subtotali di volumi di vendita di stock. Hè da nutà chì u metudu KStream.groupBy torna una istanza di tipu KGroupedStream. È pudete ottene una istanza KTable chjamendu ancu u metudu KGroupedStream.reduce.

Cosa hè l'interfaccia KGroupedStream

I metudi KStream.groupBy è KStream.groupByKey restituiscenu una istanza di KGroupedStream. KGroupedStream hè una rapprisintazioni intermedia di un flussu di avvenimenti dopu a raggruppamentu per chjave. Ùn hè micca in tuttu destinatu à u travagliu direttu cun ellu. Invece, KGroupedStream hè utilizatu per operazioni di aggregazione, chì sempre risultatu in una KTable. E postu chì u risultatu di l'operazioni di aggregazione hè una KTable è usanu un magazinu statale, hè pussibule chì micca tutte l'aghjurnamenti in u risultatu sò mandati più in u pipeline.

U metudu KTable.groupBy torna un KGroupedTable simili - una rapprisintazioni intermediata di u flussu di l'aghjurnamenti, raggruppati per chjave.

Facemu una breve pausa è fighjemu a Fig. 5.9, chì mostra ciò chì avemu ottenutu. Sta topulugia deve esse digià assai familiarizata per voi.

U libru "Kafka Streams in Action. Applicazioni è microservizii per u travagliu in tempu reale "
Fighjemu avà u codice per sta topulugia (si pò truvà in u schedariu src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listing 5.2).

U libru "Kafka Streams in Action. Applicazioni è microservizii per u travagliu in tempu reale "
U codice datu hè distinatu da a so brevità è u grande volume di l'azzioni realizati in parechje linee. Puderete nutà qualcosa di novu in u primu paràmetru di u metudu builder.stream: un valore di u tipu enum AutoOffsetReset.EARLIEST (ci hè ancu un LATEST), stabilitu cù u metu Consumed.withOffsetResetPolicy. Stu tipu d'enumerazione pò esse usatu per specificà una strategia di reset offset per ogni KStream o KTable è piglia a precedenza annantu à l'opzione di reset offset da a cunfigurazione.

GroupByKey è GroupBy

L'interfaccia di KStream hà dui metudi per raggruppà i registri: GroupByKey è GroupBy. Tramindui tornanu un KGroupedTable, cusì vi puderete dumandà quale hè a diffarenza trà elli è quandu aduprà quale?

U metudu GroupByKey hè utilizatu quandu i chjavi in ​​u KStream sò digià micca vacanti. È più importantemente, a bandiera "esige re-partitioning" ùn hè mai stata stabilita.

U metudu GroupBy assume chì avete cambiatu i chjavi di raggruppamentu, cusì a bandiera di ripartizione hè impostata à vera. A realizazione di unisce, aggregazioni, etc., dopu à u metudu GroupBy, hà da risultatu in a ripartizione automatica.
Riassuntu: Sempre chì hè pussibule, duvete aduprà GroupByKey invece di GroupBy.

Hè chjaru ciò chì facenu i metudi mapValues ​​è groupBy, dunque fighjemu un ochju à u metudu sum() (truvatu in src/main/java/bbejeck/model/ShareVolume.java) (Listing 5.3).

U libru "Kafka Streams in Action. Applicazioni è microservizii per u travagliu in tempu reale "
U metudu ShareVolume.sum torna u tutale tutale di u voluminu di vendita di stock, è u risultatu di tutta a catena di calculi hè un ughjettu KTable. . Avà capisce u rolu di KTable. Quandu l'uggetti ShareVolume arrivanu, l'ughjettu KTable currispundente guarda l'ultima aghjurnazione attuale. Hè impurtante di ricurdà chì tutti l'aghjurnamenti sò riflessi in u shareVolumeKTable precedente, ma micca tutti sò mandati più.

Dopu aduprate stu KTable per aggregate (per u numeru di azioni scambiate) per arrivà à e cinque cumpagnie cù i più alti volumi di azioni cummercializati in ogni industria. I nostri azzioni in questu casu seranu simili à quelli per a prima aggregazione.

  1. Eseguite un'altra operazione groupBy per raggruppà oggetti ShareVolume individuali per industria.
  2. Cumincià à riassume l'uggetti ShareVolume. Questa volta l'ughjettu di aggregazione hè una fila di priorità di dimensione fissa. In questa fila di dimensione fissa, solu e cinque cumpagnie cù i più grandi quantità di azioni vendute sò ritenute.
  3. Mape e file di fila da u paràgrafu precedente à un valore di stringa è rinviate i primi cinque stocks più cummercializati per numeru per industria.
  4. Scrivite i risultati in forma di stringa à u tema.

In Fig. A Figura 5.10 mostra u graficu di a topologia di u flussu di dati. Comu pudete vede, a seconda volta di trasfurmazioni hè abbastanza simplice.

U libru "Kafka Streams in Action. Applicazioni è microservizii per u travagliu in tempu reale "
Avà chì avemu un capiscenu chjaru di a struttura di sta seconda volta di trasfurmazioni, pudemu turnà à u so codice fonte (a truverete in u schedariu src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listing 5.4) .

Questu inizializzatore cuntene una variabile FixedQueue. Questu hè un ughjettu persunalizatu chì hè un adattatore per java.util.TreeSet chì hè utilizatu per seguità i risultati superiori N in ordine decrescente di azioni scambiate.

U libru "Kafka Streams in Action. Applicazioni è microservizii per u travagliu in tempu reale "
Avete digià vistu i chjamati groupBy è mapValues, cusì ùn andemu micca in quelli (chjamemu u metudu KTable.toStream perchè u metudu KTable.print hè obsoletu). Ma ùn avete micca vistu a versione KTable di aggregate() per quessa, passeremu un pocu di tempu per discutiri questu.

Comu vi ricordate, ciò chì face KTable sfarente hè chì i registri cù e stesse chjave sò cunsiderate aghjurnamenti. KTable rimpiazza a vechja entrata cù una nova. L'aggregazione si faci in modu simili: l'ultimi records cù a listessa chjave sò aggregati. Quandu un registru ghjunghje, hè aghjuntu à l'istanza di a classe FixedSizePriorityQueue utilizendu un sommatore (secondu paràmetru in a chjama di u metudu aggregatu), ma se un altru registru esiste digià cù a listessa chjave, allora u vechju registru hè sguassatu cù un subtractor (terzu paràmetru in a chjama di u metudu aggregatu).

Tuttu chistu significa chì u nostru aggregatore, FixedSizePriorityQueue, ùn aggrega micca tutti i valori cù una chjave, ma guarda una somma in muvimentu di e quantità di i N tipi di azioni più cummercializati. Ogni entrata in entrata cuntene u numeru tutale di azioni vendute finu à avà. KTable vi darà infurmazioni nantu à quali azioni di l'imprese sò attualmente i più scambiati, senza esse bisognu di l'aggregazione continua di ogni aghjurnamentu.

Avemu amparatu à fà duie cose impurtanti:

  • valori di gruppu in KTable da una chjave cumuni;
  • eseguite operazioni utili cum'è rollup è aggregazione nantu à questi valori raggruppati.

Sapendu cumu fà queste operazioni hè impurtante per capiscenu u significatu di e dati chì si movenu attraversu una applicazione Kafka Streams è capisce quale infurmazione porta.

Avemu ancu riunitu alcuni di i cuncetti chjave discututi prima in stu libru. In u Capitulu 4, avemu discututu cumu u statu locale tolerante à i difetti hè impurtante per una applicazione streaming. U primu esempiu in stu capitulu hà dimustratu perchè u statu lucale hè cusì impurtante - vi permette di seguità ciò chì l'infurmazioni avete digià vistu. L'accessu lucale evita i ritardi di a rete, rende l'applicazione più performante è resistente à l'errore.

Quandu eseguite ogni operazione di rollup o aggregazione, deve specificà u nome di a tenda statale. L'operazioni di rollup è aggregazione restituiscenu una istanza di KTable, è u KTable usa l'almacenamiento statale per rimpiazzà i vechji risultati cù novi. Comu avete vistu, micca tutti l'aghjurnamenti sò mandati in u pipeline, è questu hè impurtante perchè l'operazioni di aggregazione sò pensate per pruduce infurmazioni riassuntu. Se ùn applicate micca u statu locale, KTable trasmetterà tutti i risultati di aggregazione è rollup.

In seguitu, guardemu a realizazione di operazioni cum'è l'agregazione in un periudu di tempu specificu - cusì chjamate operazioni di finestra.

5.3.2. Operazioni di a finestra

In a sezione precedente, avemu introduttu cunvoluzione sliding è aggregazione. L'applicazione hà realizatu un roll-up cuntinuu di u voluminu di vendita di stock, seguitu da l'agregazione di e cinque azzioni più scambiate in u scambiu.

Calchì volta un tali aggregazione cuntinuu è roll-up di i risultati hè necessariu. È qualchì volta avete bisognu di fà operazioni solu per un certu periodu di tempu. Per esempiu, calculate quante transazzioni di scambiu sò stati fatti cù azzioni di una cumpagnia particulare in l'ultimi 10 minuti. O quanti utilizatori anu clicatu nantu à un novu banner publicitariu in l'ultimi 15 minuti. Una applicazione pò eseguisce tali operazioni parechje volte, ma cù risultati chì s'applicanu solu à periodi di tempu specificati (finestre di tempu).

Cuntendu e transazzioni di scambiu da u cumpratore

In l'esempiu prossimu, seguiteremu e transazzioni di azioni in parechje cummircianti, sia grandi urganisazioni sia finanziarii individuali intelligenti.

Ci hè dui pussibuli ragiuni per questu seguimentu. Unu di elli hè a necessità di sapè ciò chì i capi di u mercatu compranu / vendenu. Se questi grandi attori è investitori sofisticati vedenu opportunità, hè sensu di seguità a so strategia. U sicondu mutivu hè a vuluntà di scopre qualsiasi signali pussibuli di insider trading illegale. Per fà questu, avete bisognu di analizà a correlazioni di grandi spikes di vendita cù comunicati di stampa impurtanti.

Un tali seguimentu si compone di i seguenti passi:

  • crià un flussu per leghje da u tema di e transazzioni di stock;
  • raggruppendu i registri entranti per ID di cumpratore è simbulu di stock. Chjamate u metudu groupBy torna una istanza di a classa KGroupedStream;
  • U metudu KGroupedStream.windowedBy torna un flussu di dati limitatu à una finestra di u tempu, chì permette l'aggregazione in finestra. Sicondu u tipu di finestra, sia un TimeWindowedKStream o un SessionWindowedKStream hè tornatu;
  • u numeru di transazzione per l'operazione di aggregazione. U flussu di dati windowed determina se un registru particulari hè cunsideratu in questu cuntu;
  • scrive risultati à un tema o trasmettenu à a cunsola durante u sviluppu.

A topologia di sta applicazione hè simplice, ma una stampa chjara di questu seria utile. Fighjemu a Fig. 5.11.

Next, avemu Mulateri Di L'guardà a funziunalità di u funziunamentu di finestra è u codice currispundenti.

U libru "Kafka Streams in Action. Applicazioni è microservizii per u travagliu in tempu reale "

Tipi di finestra

Ci sò trè tippi di finestri in Kafka Streams:

  • sessione;
  • « tumbling » (turming) ;
  • scivolare / saltà.

Qualessu di sceglie dipende da e vostre esigenze di cummerciale. Tumbling and jumping windows sò limitati in u tempu, mentri i finestri di sessione sò limitati da l'attività di l'utilizatori - a durata di a sessione (s) hè determinata solu da quantu attivu l'utilizatore hè. A cosa principal per ricurdà hè chì tutti i tipi di finestra sò basati nantu à i stampi di data / ora di l'entrata, micca u tempu di u sistema.

In seguitu, implementemu a nostra topologia cù ognunu di i tipi di finestra. U codice cumpletu serà datu solu in u primu esempiu; per altri tipi di finestri nunda ùn cambierà, salvu u tipu di operazione di finestra.

Finestre di sessione

Finestre di sessione sò assai diffirenti da tutti l'altri tipi di finestri. Sò limitati micca tantu da u tempu cum'è da l'attività di l'utilizatore (o l'attività di l'entità chì vulete seguità). I finestri di sessione sò delimitati da periodi di inattività.

A Figura 5.12 illustra u cuncettu di Windows di sessione. A sessione più chjuca si fusiona cù a sessione à a so manca. È a sessione à a diritta serà separata perchè seguita un longu periodu di inattività. I finestri di a sessione sò basati nantu à l'attività di l'utilizatori, ma utilizate stampi di data / ora da e entrate per determinà a quale sessione appartene l'entrata.

U libru "Kafka Streams in Action. Applicazioni è microservizii per u travagliu in tempu reale "

Utilizà e finestre di sessione per seguità e transazzione di azioni

Utilizemu e finestri di sessione per catturà infurmazioni nantu à e transazzione di scambiu. L'implementazione di Windows di sessione hè mostrata in Lista 5.5 (chì pò esse truvata in src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

U libru "Kafka Streams in Action. Applicazioni è microservizii per u travagliu in tempu reale "
Avete digià vistu a maiò parte di l'operazioni in questa topulugia, per quessa, ùn ci hè bisognu di guardà di novu quì. Ma ci sò ancu parechji elementi novi quì, chì avemu da discutiri avà.

Ogni operazione groupBy tipicamente esegue una certa operazione di aggregazione (aggregazione, rollup, o cunti). Pudete realizà sia l'agregazione cumulativa cù un totale in corsu, sia l'agregazione di finestra, chì piglia in contu i registri in una finestra di tempu specifica.

U codice in Listing 5.5 conta u numeru di transazzione in Windows di sessione. In Fig. 5.13 sti azzioni sò analizati passu à passu.

Chjamendu windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) creemu una finestra di sessione cù un intervalu di inattività di 20 seconde è un intervalu di persistenza di 15 minuti. Un intervallu inattivu di 20 seconde significa chì l'applicazione includerà qualsiasi entrata chì ghjunghje in 20 seconde da a fine o l'iniziu di a sessione attuale in a sessione attuale (attiva).

U libru "Kafka Streams in Action. Applicazioni è microservizii per u travagliu in tempu reale "
In seguitu, specifiemu quale operazione di aggregazione deve esse realizatu in a finestra di sessione - in questu casu, conta. Se una entrata entrante cade fora di a finestra di inattività (ognunu di i lati di u timbru di data / ora), l'applicazione crea una nova sessione. L'intervallu di ritenzione significa mantene una sessione per un certu tempu è permette dati tardi chì si estende oltre u periodu di inattività di a sessione, ma pò ancu esse attaccati. Inoltre, l'iniziu è a fine di a nova sessione risultante da a fusione currispondenu à a prima è a più recente marca di data / ora.

Fighjemu uni pochi di entrate da u metudu di cunti per vede cumu funziona e sessioni (Table 5.1).

U libru "Kafka Streams in Action. Applicazioni è microservizii per u travagliu in tempu reale "
Quandu i registri ghjunghjenu, cerchemu e sessioni esistenti cù a listessa chjave, un tempu di fine menu di u timbru di data / ora attuale - intervallu di inattività, è un tempu di iniziu più grande di u timbru di data / ora attuale + intervalu di inattività. Pigliendu questu in contu, quattru entrate da a tavula. 5.1 sò uniti in una sola sessione cum'è seguita.

1. Record 1 arriva prima, cusì l'ora di iniziu hè uguale à l'ora di fine è hè 00:00:00.

2. In seguitu, l'entrata 2 ghjunghje, è circhemu e sessioni chì finiscinu micca prima di 23:59:55 è cumincianu micca più tardi à 00:00:35. Truvemu u record 1 è unisce e sessioni 1 è 2. Pigliemu l'ora di principiu di a sessione 1 (prima) è l'ora di fine di a sessione 2 (più tardi), perchè a nostra nova sessione principia à 00:00:00 è finisce à 00: 00:15.

3. Arriva u record 3, circhemu sessione trà 00:00:30 è 00:01:10 è ùn ne truvamu micca. Aghjunghjite una seconda sessione per a chjave 123-345-654,FFBE, chì principia è finisce à 00:00:50.

4. Arriva u record 4 è circhemu sessione trà 23:59:45 è 00:00:25. Sta volta si trovanu e duie sessioni 1 è 2. Tutte e trè sessioni sò cumminate in una sola, cù un tempu di iniziu di 00:00:00 è un tempu di fine di 00:00:15.

Da ciò chì hè descrittu in questa sezione, vale a pena ricurdà e seguenti sfumature impurtanti:

  • e sessioni ùn sò micca finestri di dimensione fissa. A durata di una sessione hè determinata da l'attività in un certu periodu di tempu;
  • I stampi di data / ora in i dati determinanu se l'avvenimentu cade in una sessione esistente o durante un periodu inattivu.

In seguitu avemu da discutiri u prossimu tipu di finestra - "turming" windows.

Finestre "Tumbling".

Tumbling windows catturà l'avvenimenti chì cascanu in un certu periodu di tempu. Immaginate chì avete bisognu di catturà tutte e transazzione d'azzioni di una certa cumpagnia ogni 20 seconde, perchè recullate tutti l'avvenimenti durante quellu periodu di tempu. À a fine di l'intervallu di 20 seconde, a finestra si stende è si move à un novu intervalu di osservazione di 20 seconde. A figura 5.14 illustra sta situazione.

U libru "Kafka Streams in Action. Applicazioni è microservizii per u travagliu in tempu reale "
Comu pudete vede, tutti l'avvenimenti ricevuti in l'ultimi 20 seconde sò inclusi in a finestra. À a fine di stu periodu di tempu, una nova finestra hè creata.

U listinu 5.6 mostra u codice chì mostra l'usu di tumbling windows per catturà transazzione di stock ogni 20 seconde (truvatu in src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

U libru "Kafka Streams in Action. Applicazioni è microservizii per u travagliu in tempu reale "
Cù stu picculu cambiamentu à u TimeWindows.of chiama mètudu, vi ponu aduprà una finestra tumbling. Questu esempiu ùn chjama micca u metudu finu à (), cusì l'intervallu di retezione predeterminatu di 24 ore serà utilizatu.

Infine, hè ora di passà à l'ultima di l'opzioni di a finestra - "hopping" windows.

Finestre scorrevule ("saltà")

I finestri scorrevuli / salti sò simili à i finestri tumbling, ma cù una ligera differenza. I finestri scorrevuli ùn aspettanu micca finu à a fine di l'intervallu di tempu prima di creà una nova finestra per processà l'avvenimenti recenti. Accumincianu novi calculi dopu un intervallu di attesa menu di a durata di a finestra.

Per illustrà e differenze trà i finestri di tumbling è jumping, andemu à vultà à l'esempiu di cuntà e transacciones di borsa. U nostru scopu hè sempre di cuntà u numeru di transazzione, ma ùn vulemu aspittà tuttu u tempu prima di aghjurnà u contatore. Invece, aghjurneremu u contatore à intervalli più brevi. Per esempiu, avemu sempre cuntà u numeru di transazzione ogni 20 seconde, ma aghjurnà u contatore ogni 5 seconde, cum'è mostra in Fig. 5.15. In questu casu, finiscemu cù trè finestri di risultati cù dati sovrapposti.

U libru "Kafka Streams in Action. Applicazioni è microservizii per u travagliu in tempu reale "
A lista 5.7 mostra u codice per definisce e finestre scorrevule (truvate in src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

U libru "Kafka Streams in Action. Applicazioni è microservizii per u travagliu in tempu reale "
Una finestra tumbling pò esse cunvertita in una finestra di saltu aghjunghjendu una chjama à u metudu advanceBy(). In l'esempiu mostratu, l'intervallu di salvezza hè di 15 minuti.

Avete vistu in questa sezione cumu limità i risultati di aggregazione à i finestri di u tempu. In particulare, vogliu chì ricurdate di e seguenti trè cose da sta sezione:

  • a dimensione di e finestre di sessione hè limitata micca da u periodu di tempu, ma da l'attività di l'utilizatori;
  • i finestri "tumbling" furniscenu una visione generale di l'avvenimenti in un certu periodu di tempu;
  • A durata di i finestri di salto hè fissatu, ma sò aghjurnati spessu è ponu cuntene entrate sovrapposte in tutte e finestre.

Dopu, avemu da amparà cumu cunvertisce un KTable in un KStream per una cunnessione.

5.3.3. Cunnettendu l'uggetti KStream è KTable

In u Capitulu 4, avemu discututu di cunnette dui oggetti KStream. Avà avemu da amparà cumu cunnette KTable è KStream. Questu pò esse necessariu per i seguenti mutivi simplici. KStream hè un flussu di registri, è KTable hè un flussu di l'aghjurnamenti di record, ma qualchì volta pudete vulete aghjunghje un cuntestu supplementu à u flussu di record usendu l'aghjurnamenti da u KTable.

Pigliamu e dati nantu à u numeru di transazzioni di borsa è cumminendu cù e nutizie di borsa per l'industrii pertinenti. Eccu ciò chì duvete fà per ottene questu datu u codice chì avete digià.

  1. Cunvertisce un ughjettu KTable cù dati nantu à u numeru di transazzione di stock in un KStream, seguitatu da rimpiazzà a chjave cù a chjave chì indica u settore industriale chì currisponde à stu simbulu di stock.
  2. Crea un oggettu KTable chì leghje e dati da un tema cù nutizie di borsa. Stu novu KTable serà categurizatu per settore industriale.
  3. Cunnette l'aghjurnamenti di e nutizie cù l'infurmazioni nantu à u numeru di transazzioni di borsa per settore industriale.

Avà vedemu cumu implementà stu pianu d'azzione.

Cunvertisce KTable à KStream

Per cunvertisce KTable à KStream avete bisognu di fà i seguenti.

  1. Chjamate u metudu KTable.toStream().
  2. Chjamendu u metudu KStream.map, rimpiazzà a chjave cù u nome di l'industria, è dopu ricuperà l'ughjettu TransactionSummary da l'istanza Windowed.

Insememu queste operazioni cum'è seguita (u codice pò esse truvatu in u schedariu src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.8).

U libru "Kafka Streams in Action. Applicazioni è microservizii per u travagliu in tempu reale "
Perchè eseguimu una operazione KStream.map, l'istanza di KStream restituita hè ripartizionata automaticamente quandu hè utilizata in una cunnessione.

Avemu finitu u prucessu di cunversione, dopu avemu bisognu di creà un oggettu KTable per leghje e nutizie di stock.

Creazione di KTable per e notizie di stock

Fortunatamente, a creazione di un oggettu KTable piglia una sola linea di codice (u codice pò esse truvatu in src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.9).

U libru "Kafka Streams in Action. Applicazioni è microservizii per u travagliu in tempu reale "
Hè vale a nutà chì nisun uggettu Serde ùn deve esse specificatu, postu chì i Serde di stringa sò usati in i paràmetri. Inoltre, utilizendu l'enumerazione EARLIEST, a tavula hè piena di registri à u principiu.

Avà pudemu passà à u passu finali - cunnessione.

Cunnettendu l'aghjurnamenti di e nutizie cù i dati di u numeru di transazzione

Crià una cunnessione ùn hè micca difficiule. Adupremu un join left in casu chì ùn ci hè nutizie di stock per l'industria pertinente (u codice necessariu pò esse truvatu in u schedariu src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.10).

U libru "Kafka Streams in Action. Applicazioni è microservizii per u travagliu in tempu reale "
Questu operatore leftJoin hè abbastanza simplice. A cuntrariu di l'unioni in u Capitulu 4, u metudu JoinWindow ùn hè micca utilizatu perchè quandu eseguite una unione KStream-KTable, ci hè una sola entrata in u KTable per ogni chjave. Una tale cunnessione ùn hè micca limitata in u tempu: u record hè o in u KTable o assente. A cunclusione principale: utilizendu l'uggetti KTable, pudete arricchisce KStream cù dati di riferimentu aghjurnati menu freti.

Avà guardemu un modu più efficace per arricchisce l'avvenimenti da KStream.

5.3.4. Oggetti GlobalKTable

Comu pudete vede, ci hè bisognu di arricchisce i flussi di eventi o aghjunghje cuntestu à elli. In u Capitulu 4 avete vistu a cunnessione trà dui oggetti KStream, è in a sezione precedente avete vistu a cunnessione trà un KStream è un KTable. In tutti questi casi, hè necessariu di ripartizione di u flussu di dati quandu mapping the keys à un novu tipu o valore. A volte, a ripartizione hè fatta in modu esplicitu, è à volte Kafka Streams a face automaticamente. Re-partitioning hè necessariu perchè i chjavi anu cambiatu è i registri devenu finiscinu in novi rùbbriche, altrimenti a cunnessione serà impussibile (questu hè statu discutitu in u Chapter 4, in a sezione "Re-partitioning data" in a subsection 4.2.4).

A ripartizione hà un costu

A ripartizione richiede costi - costi di risorse supplementari per creà temi intermedi, almacenà dati duplicati in un altru tema; significa ancu una latenza aumentata per via di scrittura è lettura di stu tema. Inoltre, se avete bisognu di unisce in più di un aspettu o dimensione, duvete incatena e unisce, mappe i registri cù novi chjavi, è eseguite u prucessu di ripartizione di novu.

Cunnessione cù datasets più chjuchi

In certi casi, u voluminu di dati di riferimentu per esse cunnessu hè relativamente chjuca, cusì e copie cumplete di questu ponu facilmente adattà in u locu in ogni node. Per situazioni cum'è questu, Kafka Streams furnisce a classe GlobalKTable.

L'istanze GlobalKTable sò uniche perchè l'applicazione replica tutte e dati à ognunu di i nodi. E postu chì tutti i dati sò prisenti nantu à ogni node, ùn ci hè micca bisognu di particionà u flussu di l'avvenimentu per a chjave di dati di riferimentu in modu chì hè dispunibule per tutte e partizioni. Pudete ancu fà unisce senza chjave utilizendu oggetti GlobalKTable. Riturnemu à unu di l'esempi precedenti per dimustrà sta funzione.

Cunnettendu l'uggetti KStream à l'uggetti GlobalKTable

In a subsezione 5.3.2, avemu realizatu aggregazione di finestra di transazzione di scambiu da i cumpratori. I risultati di sta aggregazione parevanu cusì cusì:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Mentre chì questi risultati servenu u scopu, saria statu più utile se u nome di u cliente è u nome di l'impresa sanu era statu ancu affissatu. Per aghjunghje u nome di u cliente è u nome di l'impresa, pudete fà unisce normale, ma avete bisognu di fà dui mappings chjave è re-partitioning. Cù GlobalKTable pudete evitari u costu di tali operazioni.

Per fà questu, avemu aduprà l'ughjettu countStream da Listing 5.11 (u codice currispundente pò esse truvatu in src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) è cunnette à dui oggetti GlobalKTable.

U libru "Kafka Streams in Action. Applicazioni è microservizii per u travagliu in tempu reale "
Avemu digià discututu questu prima, cusì ùn l'aghju micca ripetutu. Ma aghju nutatu chì u codice in a funzione toStream().map hè astratta in un oggettu di funzione invece di una espressione lambda inline per a leggibilità.

U prossimu passu hè di dichjarà dui casi di GlobalKTable (u codice mostratu pò esse truvatu in u schedariu src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listing 5.12).

U libru "Kafka Streams in Action. Applicazioni è microservizii per u travagliu in tempu reale "

Per piacè nutate chì i nomi di temi sò descritti cù tipi enumerati.

Avà chì avemu tutti i cumpunenti pronti, tuttu ciò chì resta hè di scrive u codice per a cunnessione (chì pò esse truvatu in u schedariu src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listing 5.13).

U libru "Kafka Streams in Action. Applicazioni è microservizii per u travagliu in tempu reale "
Ancu s'ellu ci sò dui uni in stu codice, sò incatenati perchè nè di i so risultati sò usati separatamente. I risultati sò visualizati à a fine di l'operazione sana.

Quandu eseguite l'operazione di unisce sopra, uttene risultati cum'è questu:

{customer='Barney, Smith' company="Exxon", transactions= 17}

L'essenza ùn hà micca cambiatu, ma questi risultati pareanu più chjaru.

Se cuntate finu à u Capitulu 4, avete digià vistu parechji tipi di cunnessione in azzione. Sò listati in a tavula. 5.2. Questa tabella riflette e capacità di cunnessione da a versione 1.0.0 di Kafka Streams; Qualcosa pò cambià in versioni future.

U libru "Kafka Streams in Action. Applicazioni è microservizii per u travagliu in tempu reale "
Per conclude e cose, ricapitemu i punti fundamentali: pudete cunnette i flussi di eventi (KStream) è aghjurnà i flussi (KTable) cù u statu locale. In alternativa, se a dimensione di e dati di riferimentu ùn hè micca troppu grande, pudete aduprà l'ughjettu GlobalKTable. GlobalKTables riplica tutte e partizioni à ogni nodu di l'applicazione Kafka Streams, assicurendu chì tutte e dati sò dispunibuli indipendentemente da quale partizione currisponde a chjave.

Dopu vedemu a funzione Kafka Streams, grazia à quale pudemu osservà i cambiamenti statali senza cunsumà dati da un tema Kafka.

5.3.5. Statu di dumanda

Avemu digià realizatu parechje operazioni chì implicanu u statu è sempre i risultati à a cunsola (per scopi di sviluppu) o scrivite à un tema (per scopi di produzzione). Quandu scrive risultati à un tema, avete aduprà un cunsumadore Kafka per vede.

A lettura di dati da questi temi pò esse cunsideratu un tipu di vista materializatu. Per i nostri scopi, pudemu usà a definizione di una vista materializzata da Wikipedia: "... un oggettu fisicu di basa di dati chì cuntene i risultati di una dumanda. Per esempiu, puderia esse una copia lucale di dati remoti, o un sottumessu di e fila è / o colonne di una tabella o risultati di unisce, o una tabella riassuntu ottenuta per aggregazione "(https://en.wikipedia.org/wiki) /Vista_materializzata).

Kafka Streams permette ancu di eseguisce dumande interattive nantu à i magazzini statali, chì vi permettenu di leghje direttamente queste vedute materializate. Hè impurtante di nutà chì a dumanda à a tenda statale hè una operazione di sola lettura. Questu assicura chì ùn avete micca preoccupatu di fà accidentalmente u statu inconsistente mentre a vostra applicazione tratta dati.

A capacità di dumandà direttamente i magazzini statali hè impurtante. Questu significa chì pudete creà applicazioni di dashboard senza avè da prima piglià dati da u cunsumadore Kafka. Aumenta ancu l'efficienza di l'applicazione, per via di u fattu chì ùn ci hè bisognu di scrive di novu dati:

  • grazia à a località di e dati, ponu accede rapidamente;
  • a duplicazione di dati hè eliminata, postu chì ùn hè micca scrittu in u almacenamentu esternu.

A cosa principale chì vogliu chì ricurdate hè chì pudete dumandà direttamente u statu da a vostra applicazione. L'opportunità chì questu vi dà ùn ponu micca esse esagerate. Invece di cunsumà dati da Kafka è di almacenà registri in una basa di dati per l'applicazione, pudete interrogà i magazzini statali cù u listessu risultatu. E dumande dirette à i magazzini statali significanu menu codice (senza cunsumadore) è menu software (senza bisognu di una tabella di basa di dati per almacenà i risultati).

Avemu cupertu un pocu di terra in stu capitulu, cusì lasceremu a nostra discussione di e dumande interattive contr'à i magazzini statali per avà. Ma ùn vi preoccupate: in u Capitulu 9, creeremu una semplice applicazione dashboard cù dumande interattive. Aduprà alcuni di l'esempii da questu è capituli precedenti per dimustrà e dumande interattive è cumu pudete aghjunghje à l'applicazioni Kafka Streams.

Resumen

  • L'uggetti KStream rapprisentanu flussi di avvenimenti, paragunabili à inseriti in una basa di dati. L'oggetti KTable rapprisentanu flussi di aghjurnamentu, più cum'è l'aghjurnamenti à una basa di dati. A dimensione di l'ughjettu KTable ùn cresce, i vechji registri sò rimpiazzati da novi.
  • L'uggetti KTable sò necessarii per l'operazioni di aggregazione.
  • Utilizendu operazioni di finestra, pudete sparte e dati aggregati in buckets di tempu.
  • Grazie à l'uggetti GlobalKTable, pudete accede à e dati di riferimentu in ogni locu in l'applicazione, indipendentemente da a partizione.
  • Cunnessioni trà l'uggetti KStream, KTable è GlobalKTable sò pussibuli.

Finu a ora, avemu focu annantu à custruisce l'applicazioni Kafka Streams cù u KStream DSL d'altu livellu. Ancu l'approcciu d'altu livellu permette di creà prugrammi puliti è cuncisi, l'utilizanu rapprisenta un scambiu. U travagliu cù DSL KStream significa aumentà a concisione di u vostru codice riducendu u gradu di cuntrollu. In u prossimu capitulu, guardemu l'API di u nodu di gestore di livellu bassu è pruvà altre cummerci. I prugrammi seranu più longu ch'elli eranu prima, ma seremu capaci di creà quasi ogni node di handler chì pudemu avè bisognu.

→ Più dettagli nantu à u libru ponu esse truvati à situ web di l'editore

→ Per Habrozhiteli 25% di scontu cù coupon - Kafka Streams

→ À u pagamentu per a versione carta di u libru, un libru elettronicu serà mandatu per e-mail.

Source: www.habr.com

Add a comment