Il libro “Kafka Streams in Action. Applicazioni e microservizi per il lavoro in tempo reale"

Il libro “Kafka Streams in Action. Applicazioni e microservizi per il lavoro in tempo reale" Ciao, residenti di Khabro! Questo libro è adatto a qualsiasi sviluppatore che desideri comprendere l'elaborazione dei thread. Comprendere la programmazione distribuita ti aiuterà a comprendere meglio Kafka e Kafka Streams. Sarebbe bello conoscere il framework Kafka stesso, ma questo non è necessario: ti dirò tutto ciò di cui hai bisogno. In questo libro gli sviluppatori Kafka esperti e i principianti impareranno come creare interessanti applicazioni di elaborazione dei flussi utilizzando la libreria Kafka Streams. Gli sviluppatori Java di livello intermedio e avanzato che hanno già familiarità con concetti come la serializzazione impareranno ad applicare le proprie competenze per creare applicazioni Kafka Streams. Il codice sorgente del libro è scritto in Java 8 e fa un uso significativo della sintassi dell'espressione lambda di Java 8, quindi sapere come lavorare con le funzioni lambda (anche in un altro linguaggio di programmazione) tornerà utile.

Estratto. 5.3. Operazioni di aggregazione e windowing

In questa sezione passeremo all'esplorazione delle parti più promettenti di Kafka Streams. Finora abbiamo trattato i seguenti aspetti di Kafka Streams:

  • creazione di una topologia di elaborazione;
  • utilizzo dello stato nelle applicazioni di streaming;
  • eseguire connessioni di flussi di dati;
  • differenze tra flussi di eventi (KStream) e flussi di aggiornamento (KTable).

Negli esempi seguenti riuniremo tutti questi elementi. Imparerai anche le finestre, un'altra grande funzionalità delle applicazioni di streaming. Il nostro primo esempio sarà una semplice aggregazione.

5.3.1. Aggregazione delle vendite di azioni per settore industriale

L'aggregazione e il raggruppamento sono strumenti fondamentali quando si lavora con i dati in streaming. L'esame dei singoli documenti così come vengono ricevuti è spesso insufficiente. Per estrarre informazioni aggiuntive dai dati, è necessario raggrupparli e combinarli.

In questo esempio, indosserai il costume di un commerciante giornaliero che ha bisogno di monitorare il volume delle vendite di azioni di aziende in diversi settori. Nello specifico, sei interessato alle cinque società con le maggiori vendite di azioni in ciascun settore.

Tale aggregazione richiederà i seguenti passaggi per tradurre i dati nella forma desiderata (parlando in termini generali).

  1. Creare una fonte basata su argomenti che pubblichi informazioni grezze sul trading azionario. Dovremo mappare un oggetto di tipo StockTransaction su un oggetto di tipo ShareVolume. Il punto è che l'oggetto StockTransaction contiene metadati di vendita, ma abbiamo solo bisogno di dati sul numero di azioni vendute.
  2. Dati del volume di condivisione del gruppo in base al simbolo azionario. Una volta raggruppati per simbolo, è possibile ridurre questi dati ai totali parziali dei volumi di vendita delle azioni. Vale la pena notare che il metodo KStream.groupBy restituisce un'istanza di tipo KGroupedStream. E puoi ottenere un'istanza di KTable chiamando ulteriormente il metodo KGroupedStream.reduce.

Cos'è l'interfaccia di KGroupedStream

I metodi KStream.groupBy e KStream.groupByKey restituiscono un'istanza di KGroupedStream. KGroupedStream è una rappresentazione intermedia di un flusso di eventi dopo il raggruppamento per chiavi. Non è affatto inteso per il lavoro diretto con esso. KGroupedStream viene invece utilizzato per operazioni di aggregazione, che danno sempre come risultato una KTable. E poiché il risultato delle operazioni di aggregazione è una KTable e utilizzano un archivio di stati, è possibile che non tutti gli aggiornamenti risultanti vengano inviati più avanti nella pipeline.

Il metodo KTable.groupBy restituisce un KGroupedTable simile, una rappresentazione intermedia del flusso di aggiornamenti, raggruppati per chiave.

Facciamo una breve pausa e guardiamo la Fig. 5.9, che mostra ciò che abbiamo realizzato. Questa topologia dovrebbe già esserti molto familiare.

Il libro “Kafka Streams in Action. Applicazioni e microservizi per il lavoro in tempo reale"
Diamo ora un'occhiata al codice per questa topologia (può essere trovato nel file src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listato 5.2).

Il libro “Kafka Streams in Action. Applicazioni e microservizi per il lavoro in tempo reale"
Il codice indicato si distingue per la sua brevità e l'ampio volume di azioni eseguite su più righe. Potresti notare qualcosa di nuovo nel primo parametro del metodo builder.stream: un valore del tipo enum AutoOffsetReset.EARLIEST (esiste anche un LATEST), impostato utilizzando il metodo Consumed.withOffsetResetPolicy. Questo tipo di enumerazione può essere utilizzato per specificare una strategia di reimpostazione dell'offset per ogni KStream o KTable e ha la precedenza sull'opzione di reimpostazione dell'offset dalla configurazione.

GroupByKey e GroupBy

L'interfaccia KStream ha due metodi per raggruppare i record: GroupByKey e GroupBy. Entrambi restituiscono una KGroupedTable, quindi ti starai chiedendo qual è la differenza tra loro e quando usare quale?

Il metodo GroupByKey viene utilizzato quando le chiavi in ​​KStream sono già non vuote. E, cosa più importante, il flag "richiede ripartizionamento" non è mai stato impostato.

Il metodo GroupBy presuppone che tu abbia modificato le chiavi di raggruppamento, quindi il flag di ripartizione è impostato su true. L'esecuzione di join, aggregazioni, ecc. dopo il metodo GroupBy comporterà il ripartizionamento automatico.
Riepilogo: quando possibile, dovresti utilizzare GroupByKey anziché GroupBy.

È chiaro cosa fanno i metodi mapValues ​​e groupBy, quindi diamo un'occhiata al metodo sum() (che si trova in src/main/java/bbejeck/model/ShareVolume.java) (Listato 5.3).

Il libro “Kafka Streams in Action. Applicazioni e microservizi per il lavoro in tempo reale"
Il metodo ShareVolume.sum restituisce il totale parziale del volume delle vendite di azioni e il risultato dell'intera catena di calcoli è un oggetto KTable . Ora capisci il ruolo svolto da KTable. Quando arrivano gli oggetti ShareVolume, l'oggetto KTable corrispondente memorizza l'ultimo aggiornamento corrente. È importante ricordare che tutti gli aggiornamenti si riflettono nella precedente shareVolumeKTable, ma non tutti vengono inviati ulteriormente.

Utilizziamo quindi questa KTable per aggregare (per numero di azioni scambiate) per arrivare alle cinque società con i maggiori volumi di azioni scambiate in ciascun settore. Le nostre azioni in questo caso saranno simili a quelle della prima aggregazione.

  1. Eseguire un'altra operazione groupBy per raggruppare singoli oggetti ShareVolume per settore.
  2. Inizia a riepilogare gli oggetti ShareVolume. Questa volta l'oggetto di aggregazione è una coda con priorità a dimensione fissa. In questa coda di dimensioni fisse vengono mantenute solo le cinque società con il maggior numero di azioni vendute.
  3. Mappare le code del paragrafo precedente su un valore stringa e restituire i primi cinque titoli più scambiati in base al numero e al settore.
  4. Scrivere i risultati sotto forma di stringa nell'argomento.

Nella fig. La Figura 5.10 mostra il grafico della topologia del flusso di dati. Come puoi vedere, il secondo ciclo di elaborazione è abbastanza semplice.

Il libro “Kafka Streams in Action. Applicazioni e microservizi per il lavoro in tempo reale"
Ora che abbiamo capito bene la struttura di questo secondo ciclo di elaborazione, possiamo passare al suo codice sorgente (lo troverete nel file src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listato 5.4) .

Questo inizializzatore contiene una variabile FixedQueue. Si tratta di un oggetto personalizzato che è un adattatore per java.util.TreeSet utilizzato per tenere traccia dei primi N risultati in ordine decrescente di azioni scambiate.

Il libro “Kafka Streams in Action. Applicazioni e microservizi per il lavoro in tempo reale"
Hai già visto le chiamate groupBy e mapValues, quindi non le approfondiremo (chiameremo il metodo KTable.toStream perché il metodo KTable.print è deprecato). Ma non hai ancora visto la versione KTable di aggregate(), quindi ne discuteremo un po'.

Come ricorderete, ciò che rende KTable diverso è che i record con le stesse chiavi sono considerati aggiornamenti. KTable sostituisce la vecchia voce con una nuova. L'aggregazione avviene in modo simile: vengono aggregati gli ultimi record con la stessa chiave. Quando arriva un record, viene aggiunto all'istanza della classe FixedSizePriorityQueue utilizzando un sommatore (secondo parametro nella chiamata al metodo aggregato), ma se esiste già un altro record con la stessa chiave, il vecchio record viene rimosso utilizzando un sottrattore (terzo parametro in la chiamata al metodo aggregato).

Tutto ciò significa che il nostro aggregatore, FixedSizePriorityQueue, non aggrega tutti i valori con un'unica chiave, ma memorizza una somma mobile delle quantità degli N tipi di azioni più scambiate. Ogni voce in entrata contiene il numero totale di azioni vendute finora. KTable ti fornirà informazioni su quali azioni delle società sono attualmente le più scambiate, senza richiedere l'aggregazione continua di ogni aggiornamento.

Abbiamo imparato a fare due cose importanti:

  • raggruppare valori in KTable con una chiave comune;
  • eseguire operazioni utili come il rollup e l'aggregazione su questi valori raggruppati.

Sapere come eseguire queste operazioni è importante per comprendere il significato dei dati che si spostano attraverso un'applicazione Kafka Streams e capire quali informazioni trasportano.

Abbiamo anche riunito alcuni dei concetti chiave discussi in precedenza in questo libro. Nel capitolo 4 abbiamo discusso di quanto sia importante lo stato locale con tolleranza agli errori per un'applicazione di streaming. Il primo esempio in questo capitolo ha dimostrato perché lo stato locale è così importante: consente di tenere traccia delle informazioni già visualizzate. L'accesso locale evita ritardi di rete, rendendo l'applicazione più performante e resistente agli errori.

Quando si eseguono operazioni di rollup o aggregazione, è necessario specificare il nome dell'archivio stati. Le operazioni di rollup e aggregazione restituiscono un'istanza di KTable e KTable utilizza la memorizzazione dello stato per sostituire i vecchi risultati con quelli nuovi. Come si è visto, non tutti gli aggiornamenti vengono inviati in pipeline e questo è importante perché le operazioni di aggregazione sono progettate per produrre informazioni di riepilogo. Se non applichi lo stato locale, KTable inoltrerà tutti i risultati di aggregazione e rollup.

Successivamente, esamineremo l'esecuzione di operazioni come l'aggregazione entro un periodo di tempo specifico, le cosiddette operazioni di finestra.

5.3.2. Operazioni sulla finestra

Nella sezione precedente abbiamo introdotto la convoluzione scorrevole e l'aggregazione. L'applicazione ha eseguito un roll-up continuo del volume delle vendite di azioni, seguito dall'aggregazione delle cinque azioni più scambiate in borsa.

A volte è necessaria una tale aggregazione e accumulo continuo di risultati. E a volte è necessario eseguire operazioni solo in un determinato periodo di tempo. Ad esempio, calcola quante operazioni di scambio sono state effettuate con le azioni di una determinata società negli ultimi 10 minuti. Oppure quanti utenti hanno cliccato su un nuovo banner pubblicitario negli ultimi 15 minuti. Un'applicazione può eseguire tali operazioni più volte, ma con risultati che si applicano solo a periodi di tempo specificati (finestre temporali).

Conteggio delle transazioni di cambio per acquirente

Nel prossimo esempio, terremo traccia delle transazioni azionarie di più trader: grandi organizzazioni o singoli finanziatori intelligenti.

Ci sono due possibili ragioni per questo monitoraggio. Uno di questi è la necessità di sapere cosa acquistano/vendono i leader di mercato. Se questi grandi attori e investitori sofisticati vedono opportunità, ha senso seguire la loro strategia. Il secondo motivo è la volontà di individuare eventuali segnali di insider trading illegale. Per fare ciò, dovrai analizzare la correlazione tra grandi picchi di vendita e importanti comunicati stampa.

Tale tracciamento consiste nei seguenti passaggi:

  • creazione di un flusso per la lettura dell'argomento relativo alle transazioni azionarie;
  • raggruppando i record in entrata per ID acquirente e simbolo di borsa. La chiamata al metodo groupBy restituisce un'istanza della classe KGroupedStream;
  • Il metodo KGroupedStream.windowedBy restituisce un flusso di dati limitato a una finestra temporale, che consente l'aggregazione in finestre. A seconda del tipo di finestra, viene restituito TimeWindowedKStream o SessionWindowedKStream;
  • conteggio delle transazioni per l'operazione di aggregazione. Il flusso di dati a finestra determina se un particolare record viene preso in considerazione in questo conteggio;
  • scrivere risultati in un argomento o inviarli alla console durante lo sviluppo.

La topologia di questa applicazione è semplice, ma sarebbe utile averne un quadro chiaro. Diamo un'occhiata alla Fig. 5.11.

Successivamente, esamineremo la funzionalità delle operazioni della finestra e il codice corrispondente.

Il libro “Kafka Streams in Action. Applicazioni e microservizi per il lavoro in tempo reale"

Tipi di finestre

Esistono tre tipi di finestre in Kafka Streams:

  • sessione;
  • “cadere”;
  • scivolare/saltare.

Quale scegliere dipende dalle esigenze della tua azienda. Le finestre di rotazione e salto sono limitate nel tempo, mentre le finestre di sessione sono limitate dall'attività dell'utente: la durata delle sessioni è determinata esclusivamente da quanto è attivo l'utente. La cosa principale da ricordare è che tutti i tipi di finestra si basano sulla data/ora delle voci, non sull'ora del sistema.

Successivamente, implementiamo la nostra topologia con ciascuno dei tipi di finestra. Il codice completo verrà riportato solo nel primo esempio; per gli altri tipi di finestre non cambierà nulla se non il tipo di funzionamento della finestra.

Finestre di sessione

Le finestre di sessione sono molto diverse da tutti gli altri tipi di finestre. Sono limitati non tanto dal tempo quanto dall'attività dell'utente (o dall'attività dell'entità che desideri monitorare). Le finestre delle sessioni sono delimitate da periodi di inattività.

La Figura 5.12 illustra il concetto di finestre di sessione. La sessione più piccola si unirà alla sessione alla sua sinistra. E la sessione a destra sarà separata perché segue un lungo periodo di inattività. Le finestre di sessione si basano sull'attività dell'utente, ma utilizzano i timbri di data/ora delle voci per determinare a quale sessione appartiene la voce.

Il libro “Kafka Streams in Action. Applicazioni e microservizi per il lavoro in tempo reale"

Utilizzo delle finestre di sessione per tenere traccia delle transazioni azionarie

Usiamo le finestre di sessione per acquisire informazioni sulle transazioni di scambio. L'implementazione delle finestre di sessione è mostrata nel Listato 5.5 (che può essere trovato in src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

Il libro “Kafka Streams in Action. Applicazioni e microservizi per il lavoro in tempo reale"
La maggior parte delle operazioni in questa topologia è già stata vista, quindi non è necessario rivederle qui. Ma ci sono anche diverse novità, di cui ora parleremo.

Qualsiasi operazione groupBy esegue in genere un tipo di operazione di aggregazione (aggregazione, rollup o conteggio). È possibile eseguire l'aggregazione cumulativa con un totale parziale o l'aggregazione a finestra, che prende in considerazione i record entro un intervallo di tempo specificato.

Il codice nel Listato 5.5 conta il numero di transazioni all'interno delle finestre di sessione. Nella fig. 5.13 queste azioni vengono analizzate passo dopo passo.

Chiamando windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) creiamo una finestra di sessione con un intervallo di inattività di 20 secondi e un intervallo di persistenza di 15 minuti. Un intervallo di inattività di 20 secondi significa che l'applicazione includerà qualsiasi voce che arriva entro 20 secondi dalla fine o dall'inizio della sessione corrente nella sessione corrente (attiva).

Il libro “Kafka Streams in Action. Applicazioni e microservizi per il lavoro in tempo reale"
Successivamente specifichiamo quale operazione di aggregazione deve essere eseguita nella finestra della sessione: in questo caso, conteggio. Se una voce in entrata non rientra nella finestra di inattività (entrambi i lati dell'indicazione di data/ora), l'applicazione crea una nuova sessione. Intervallo di conservazione significa mantenere una sessione per un certo periodo di tempo e consente dati tardivi che si estendono oltre il periodo di inattività della sessione ma che possono comunque essere allegati. Inoltre, l'inizio e la fine della nuova sessione risultante dall'unione corrispondono alla prima e all'ultima data/ora.

Diamo un'occhiata ad alcune voci del metodo count per vedere come funzionano le sessioni (Tabella 5.1).

Il libro “Kafka Streams in Action. Applicazioni e microservizi per il lavoro in tempo reale"
Quando arrivano i record, cerchiamo le sessioni esistenti con la stessa chiave, un'ora di fine inferiore all'indicatore di data/ora corrente - intervallo di inattività e un'ora di inizio maggiore dell'indicatore di data/ora corrente + intervallo di inattività. Tenendo conto di ciò, quattro voci dalla tabella. 5.1 sono accorpati in un'unica sessione come segue.

1. Il record 1 arriva per primo, quindi l'ora di inizio è uguale all'ora di fine ed è 00:00:00.

2. Successivamente arriva la voce 2 e cerchiamo le sessioni che terminano non prima delle 23:59:55 e iniziano non più tardi delle 00:00:35. Troviamo il record 1 e combiniamo le sessioni 1 e 2. Prendiamo l'ora di inizio della sessione 1 (prima) e l'ora di fine della sessione 2 (dopo), in modo che la nostra nuova sessione inizi alle 00:00:00 e termini alle 00: 00:15.

3. Arriva il record 3, cerchiamo sessioni comprese tra 00:00:30 e 00:01:10 e non ne troviamo nessuna. Aggiungi una seconda sessione per la chiave 123-345-654,FFBE, che inizia e termina alle 00:00:50.

4. Arriva il record 4 e cerchiamo sessioni tra le 23:59:45 e le 00:00:25. Questa volta vengono trovate entrambe le sessioni 1 e 2. Tutte e tre le sessioni vengono combinate in una, con un'ora di inizio di 00:00:00 e un'ora di fine di 00:00:15.

Da quanto descritto in questa sezione, vale la pena ricordare le seguenti importanti sfumature:

  • le sessioni non sono finestre di dimensione fissa. La durata di una sessione è determinata dall'attività svolta in un determinato periodo di tempo;
  • I contrassegni di data/ora nei dati determinano se l'evento rientra in una sessione esistente o durante un periodo di inattività.

Successivamente discuteremo del prossimo tipo di finestra: le finestre "a ribalta".

Finestre "a caduta".

Le finestre a caduta catturano eventi che rientrano in un certo periodo di tempo. Immagina di dover acquisire tutte le transazioni azionarie di una determinata azienda ogni 20 secondi, in modo da raccogliere tutti gli eventi durante quel periodo di tempo. Al termine dell'intervallo di 20 secondi, la finestra si ribalta e si sposta su un nuovo intervallo di osservazione di 20 secondi. La Figura 5.14 illustra questa situazione.

Il libro “Kafka Streams in Action. Applicazioni e microservizi per il lavoro in tempo reale"
Come puoi vedere, tutti gli eventi ricevuti negli ultimi 20 secondi sono inclusi nella finestra. Al termine di questo periodo di tempo, viene creata una nuova finestra.

Il Listato 5.6 mostra il codice che dimostra l'uso di finestre a cascata per acquisire transazioni azionarie ogni 20 secondi (disponibile in src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Il libro “Kafka Streams in Action. Applicazioni e microservizi per il lavoro in tempo reale"
Con questa piccola modifica alla chiamata al metodo TimeWindows.of, puoi utilizzare una finestra a cascata. Questo esempio non chiama il metodo Until(), pertanto verrà utilizzato l'intervallo di conservazione predefinito di 24 ore.

Infine, è il momento di passare all'ultima delle opzioni della finestra: le finestre "saltanti".

Finestre scorrevoli ("saltanti").

Le finestre scorrevoli/saltellanti sono simili alle finestre a ribalta, ma con una leggera differenza. Le finestre scorrevoli non attendono fino alla fine dell'intervallo di tempo prima di creare una nuova finestra per elaborare gli eventi recenti. Iniziano nuovi calcoli dopo un intervallo di attesa inferiore alla durata della finestra.

Per illustrare le differenze tra finestre cadenti e finestre saltellanti, torniamo all'esempio del conteggio delle transazioni di borsa. Il nostro obiettivo è ancora contare il numero di transazioni, ma non vogliamo aspettare tutto il tempo prima di aggiornare il contatore. Aggiorneremo invece il contatore a intervalli più brevi. Ad esempio, conteremo comunque il numero di transazioni ogni 20 secondi, ma aggiorneremo il contatore ogni 5 secondi, come mostrato in Fig. 5.15. In questo caso, ci ritroveremo con tre finestre dei risultati con dati sovrapposti.

Il libro “Kafka Streams in Action. Applicazioni e microservizi per il lavoro in tempo reale"
Il Listato 5.7 mostra il codice per definire le finestre scorrevoli (disponibile in src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Il libro “Kafka Streams in Action. Applicazioni e microservizi per il lavoro in tempo reale"
Una finestra a cascata può essere convertita in una finestra saltellante aggiungendo una chiamata al metodo advanceBy(). Nell'esempio mostrato l'intervallo di salvataggio è di 15 minuti.

In questa sezione hai visto come limitare i risultati dell'aggregazione a finestre temporali. In particolare, voglio che ricordiate le seguenti tre cose di questa sezione:

  • la dimensione delle finestre di sessione è limitata non dal periodo di tempo, ma dall'attività dell'utente;
  • le finestre “tumbling” forniscono una panoramica degli eventi in un dato periodo di tempo;
  • La durata delle finestre di salto è fissa, ma vengono aggiornate frequentemente e possono contenere voci sovrapposte in tutte le finestre.

Successivamente, impareremo come riconvertire un KTable in un KStream per una connessione.

5.3.3. Collegamento di oggetti KStream e KTable

Nel capitolo 4 abbiamo discusso della connessione di due oggetti KStream. Ora dobbiamo imparare come connettere KTable e KStream. Ciò potrebbe essere necessario per il seguente semplice motivo. KStream è un flusso di record e KTable è un flusso di aggiornamenti di record, ma a volte potresti voler aggiungere ulteriore contesto al flusso di record utilizzando gli aggiornamenti di KTable.

Prendiamo i dati sul numero delle transazioni di borsa e combiniamoli con le notizie di borsa per i settori interessati. Ecco cosa devi fare per raggiungere questo obiettivo dato il codice che hai già.

  1. Converti un oggetto KTable con i dati sul numero di transazioni azionarie in un KStream, quindi sostituisci la chiave con la chiave che indica il settore industriale corrispondente a questo simbolo azionario.
  2. Crea un oggetto KTable che legga i dati da un argomento con notizie di borsa. Questo nuovo KTable sarà classificato per settore industriale.
  3. Collega gli aggiornamenti delle notizie con le informazioni sul numero di transazioni di borsa per settore industriale.

Vediamo ora come attuare questo piano d'azione.

Converti KTable in KStream

Per convertire KTable in KStream devi fare quanto segue.

  1. Chiama il metodo KTable.toStream().
  2. Chiamando il metodo KStream.map, sostituisci la chiave con il nome del settore, quindi recupera l'oggetto TransactionSummary dall'istanza Windowed.

Concatenamo queste operazioni come segue (il codice può essere trovato nel file src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listato 5.8).

Il libro “Kafka Streams in Action. Applicazioni e microservizi per il lavoro in tempo reale"
Poiché stiamo eseguendo un'operazione KStream.map, l'istanza KStream restituita viene ripartizionata automaticamente quando viene utilizzata in una connessione.

Abbiamo completato il processo di conversione, quindi dobbiamo creare un oggetto KTable per leggere le notizie di borsa.

Creazione di KTable per le notizie di borsa

Fortunatamente, la creazione di un oggetto KTable richiede solo una riga di codice (il codice può essere trovato in src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listato 5.9).

Il libro “Kafka Streams in Action. Applicazioni e microservizi per il lavoro in tempo reale"
Vale la pena notare che non è necessario specificare alcun oggetto Serde, poiché nelle impostazioni vengono utilizzate le string Serdes. Inoltre, utilizzando l'enumerazione EARLIEST, la tabella viene riempita di record proprio all'inizio.

Ora possiamo passare al passaggio finale: la connessione.

Collegamento degli aggiornamenti delle notizie con i dati sul conteggio delle transazioni

Creare una connessione non è difficile. Utilizzeremo un left join nel caso in cui non ci siano novità azionarie per il settore in questione (il codice necessario può essere trovato nel file src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listato 5.10).

Il libro “Kafka Streams in Action. Applicazioni e microservizi per il lavoro in tempo reale"
Questo operatore leftJoin è abbastanza semplice. A differenza delle unioni del Capitolo 4, il metodo JoinWindow non viene utilizzato perché quando si esegue un'unione KStream-KTable, c'è solo una voce nella KTable per ciascuna chiave. Tale connessione non è limitata nel tempo: il record è nella KTable oppure è assente. La conclusione principale: utilizzando gli oggetti KTable puoi arricchire KStream con dati di riferimento aggiornati meno frequentemente.

Ora esamineremo un modo più efficiente per arricchire gli eventi da KStream.

5.3.4. Oggetti GlobalKTable

Come puoi vedere, è necessario arricchire i flussi di eventi o aggiungere loro contesto. Nel capitolo 4 hai visto le connessioni tra due oggetti KStream e nella sezione precedente hai visto la connessione tra un KStream e una KTable. In tutti questi casi, è necessario ripartizionare il flusso di dati quando si mappano le chiavi su un nuovo tipo o valore. A volte il ripartizionamento viene eseguito in modo esplicito e talvolta Kafka Streams lo esegue automaticamente. Il ripartizionamento è necessario perché le chiavi sono cambiate e i record devono finire in nuove sezioni, altrimenti la connessione sarà impossibile (di questo si è parlato nel capitolo 4, nella sezione “Ripartizionamento dei dati” nella sottosezione 4.2.4).

Il ripartizionamento ha un costo

Il ripartizionamento richiede dei costi: costi aggiuntivi per le risorse per la creazione di argomenti intermedi, l'archiviazione di dati duplicati in un altro argomento; significa anche una maggiore latenza dovuta alla scrittura e alla lettura di questo argomento. Inoltre, se è necessario unire più di un aspetto o dimensione, è necessario concatenare i join, mappare i record con nuove chiavi ed eseguire nuovamente il processo di ripartizionamento.

Connessione a set di dati più piccoli

In alcuni casi, il volume dei dati di riferimento da connettere è relativamente piccolo, quindi copie complete degli stessi possono facilmente adattarsi localmente su ciascun nodo. Per situazioni come questa, Kafka Streams fornisce la classe GlobalKTable.

Le istanze GlobalKTable sono uniche perché l'applicazione replica tutti i dati su ciascuno dei nodi. Inoltre, poiché tutti i dati sono presenti su ciascun nodo, non è necessario partizionare il flusso di eventi in base alla chiave dati di riferimento in modo che sia disponibile per tutte le partizioni. Puoi anche effettuare unioni senza chiave utilizzando oggetti GlobalKTable. Torniamo a uno degli esempi precedenti per dimostrare questa caratteristica.

Connettere oggetti KStream a oggetti GlobalKTable

Nella sottosezione 5.3.2, abbiamo eseguito l'aggregazione delle finestre delle transazioni di scambio da parte degli acquirenti. I risultati di questa aggregazione erano simili a questi:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Anche se questi risultati sono serviti allo scopo, sarebbe stato più utile se fossero stati visualizzati anche il nome del cliente e il nome completo dell'azienda. Per aggiungere il nome del cliente e il nome dell'azienda, puoi eseguire unioni normali, ma dovrai eseguire due mappature dei tasti e un nuovo partizionamento. Con GlobalKTable puoi evitare il costo di tali operazioni.

Per fare ciò, utilizzeremo l'oggetto countStream del Listato 5.11 (il codice corrispondente può essere trovato in src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) e lo collegheremo a due oggetti GlobalKTable.

Il libro “Kafka Streams in Action. Applicazioni e microservizi per il lavoro in tempo reale"
Ne abbiamo già parlato prima, quindi non lo ripeterò. Ma noto che il codice nella funzione toStream().map viene astratto in un oggetto funzione anziché in un'espressione lambda inline per motivi di leggibilità.

Il passo successivo è dichiarare due istanze di GlobalKTable (il codice mostrato può essere trovato nel file src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listato 5.12).

Il libro “Kafka Streams in Action. Applicazioni e microservizi per il lavoro in tempo reale"

Tieni presente che i nomi degli argomenti sono descritti utilizzando tipi enumerati.

Ora che abbiamo tutti i componenti pronti, non resta che scrivere il codice per la connessione (che potete trovare nel file src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listato 5.13).

Il libro “Kafka Streams in Action. Applicazioni e microservizi per il lavoro in tempo reale"
Sebbene in questo codice siano presenti due join, questi sono concatenati perché nessuno dei due risultati viene utilizzato separatamente. I risultati vengono visualizzati al termine dell'intera operazione.

Quando esegui l'operazione di unione sopra, otterrai risultati come questi:

{customer='Barney, Smith' company="Exxon", transactions= 17}

L'essenza non è cambiata, ma questi risultati appaiono più chiari.

Se conti alla rovescia fino al Capitolo 4, hai già visto diversi tipi di connessioni in azione. Sono elencati nella tabella. 5.2. Questa tabella riflette le funzionalità di connettività a partire dalla versione 1.0.0 di Kafka Streams; Qualcosa potrebbe cambiare nelle versioni future.

Il libro “Kafka Streams in Action. Applicazioni e microservizi per il lavoro in tempo reale"
Per concludere, ricapitoliamo le nozioni di base: puoi connettere flussi di eventi (KStream) e flussi di aggiornamento (KTable) utilizzando lo stato locale. In alternativa, se la dimensione dei dati di riferimento non è troppo grande, puoi utilizzare l'oggetto GlobalKTable. GlobalKTables replica tutte le partizioni su ciascun nodo dell'applicazione Kafka Streams, garantendo che tutti i dati siano disponibili indipendentemente dalla partizione a cui corrisponde la chiave.

Successivamente vedremo la funzionalità Kafka Streams, grazie alla quale possiamo osservare i cambiamenti di stato senza consumare dati da un argomento Kafka.

5.3.5. Stato interrogabile

Abbiamo già eseguito diverse operazioni che coinvolgono lo stato e abbiamo sempre inviato i risultati alla console (per scopi di sviluppo) o li abbiamo scritti in un argomento (per scopi di produzione). Quando scrivi risultati in un argomento, devi utilizzare un consumatore Kafka per visualizzarli.

La lettura dei dati provenienti da questi argomenti può essere considerata un tipo di visualizzazioni materializzate. Per i nostri scopi, possiamo utilizzare la definizione di vista materializzata da Wikipedia: “...un oggetto fisico di database contenente i risultati di una query. Ad esempio potrebbe trattarsi di una copia locale di dati remoti, oppure di un sottoinsieme di righe e/o colonne di una tabella o di risultati di join, oppure di una tabella riassuntiva ottenuta tramite aggregazione” (https://en.wikipedia.org/wiki /vista_materializzata).

Kafka Streams consente inoltre di eseguire query interattive sugli archivi statali, consentendoti di leggere direttamente queste visualizzazioni materializzate. È importante notare che la query sull'archivio stati è un'operazione di sola lettura. In questo modo non dovrai preoccuparti di rendere accidentalmente lo stato incoerente mentre l'applicazione elabora i dati.

La possibilità di interrogare direttamente gli archivi statali è importante. Ciò significa che puoi creare applicazioni dashboard senza dover prima recuperare i dati dal consumer Kafka. Aumenta anche l'efficienza dell'applicazione, poiché non è necessario scrivere nuovamente i dati:

  • grazie alla localizzazione dei dati, sono rapidamente accessibili;
  • la duplicazione dei dati viene eliminata poiché non vengono scritti su un dispositivo di archiviazione esterno.

La cosa principale che voglio che ricordi è che puoi interrogare direttamente lo stato dall'interno della tua applicazione. Le opportunità che questo ti offre non possono essere sopravvalutate. Invece di utilizzare i dati di Kafka e archiviare i record in un database per l'applicazione, è possibile eseguire query sugli archivi di stato con lo stesso risultato. Le query dirette agli archivi statali significano meno codice (nessun consumatore) e meno software (non è necessaria una tabella di database per archiviare i risultati).

Abbiamo trattato parecchi argomenti in questo capitolo, quindi per ora lasceremo la discussione sulle query interattive sui negozi statali. Ma non preoccupatevi: nel Capitolo 9 creeremo una semplice applicazione dashboard con query interattive. Utilizzerà alcuni degli esempi di questo e dei capitoli precedenti per dimostrare le query interattive e come aggiungerle alle applicazioni Kafka Streams.

Riassunto

  • Gli oggetti KStream rappresentano flussi di eventi, paragonabili agli inserimenti in un database. Gli oggetti KTable rappresentano flussi di aggiornamento, più simili agli aggiornamenti di un database. La dimensione dell'oggetto KTable non aumenta, i vecchi record vengono sostituiti da nuovi.
  • Gli oggetti KTable sono richiesti per le operazioni di aggregazione.
  • Utilizzando le operazioni di finestratura, è possibile suddividere i dati aggregati in intervalli di tempo.
  • Grazie agli oggetti GlobalKTable, puoi accedere ai dati di riferimento ovunque nell'applicazione, indipendentemente dal partizionamento.
  • Sono possibili connessioni tra oggetti KStream, KTable e GlobalKTable.

Finora ci siamo concentrati sulla creazione di applicazioni Kafka Streams utilizzando KStream DSL di alto livello. Sebbene l'approccio di alto livello consenta di creare programmi chiari e concisi, il suo utilizzo rappresenta un compromesso. Lavorare con DSL KStream significa aumentare la concisione del proprio codice riducendo il grado di controllo. Nel prossimo capitolo esamineremo l'API del nodo del gestore di basso livello e proveremo altri compromessi. I programmi saranno più lunghi di prima, ma saremo in grado di creare quasi tutti i nodi handler di cui potremmo aver bisogno.

→ Maggiori dettagli sul libro possono essere trovati su sito web dell'editore

→ Per Habrozhiteli sconto del 25% utilizzando il coupon - Flussi di Kafka

→ A fronte del pagamento della versione cartacea del libro verrà inviato via e-mail il libro elettronico.

Fonte: habr.com

Aggiungi un commento