Le livre « Kafka Streams in Action. Applications et microservices pour le travail en temps réel"

Le livre « Kafka Streams in Action. Applications et microservices pour le travail en temps réel" Bonjour, résidents de Khabro ! Ce livre convient à tout développeur souhaitant comprendre le traitement des threads. Comprendre la programmation distribuée vous aidera à mieux comprendre Kafka et Kafka Streams. Ce serait bien de connaître le framework Kafka lui-même, mais ce n'est pas nécessaire : je vous dirai tout ce dont vous avez besoin. Les développeurs Kafka expérimentés et les novices apprendront comment créer des applications intéressantes de traitement de flux à l'aide de la bibliothèque Kafka Streams dans ce livre. Les développeurs Java intermédiaires et avancés déjà familiarisés avec des concepts tels que la sérialisation apprendront à appliquer leurs compétences pour créer des applications Kafka Streams. Le code source du livre est écrit en Java 8 et utilise de manière significative la syntaxe des expressions lambda de Java 8, donc savoir comment travailler avec les fonctions lambda (même dans un autre langage de programmation) sera utile.

Extrait. 5.3. Opérations d’agrégation et de fenêtrage

Dans cette section, nous allons explorer les parties les plus prometteuses de Kafka Streams. Jusqu'à présent, nous avons couvert les aspects suivants de Kafka Streams :

  • créer une topologie de traitement ;
  • utiliser l'état dans les applications de streaming ;
  • effectuer des connexions de flux de données ;
  • différences entre les flux d'événements (KStream) et les flux de mise à jour (KTable).

Dans les exemples suivants, nous rassemblerons tous ces éléments. Vous découvrirez également le fenêtrage, une autre fonctionnalité intéressante des applications de streaming. Notre premier exemple sera une simple agrégation.

5.3.1. Agrégation des ventes de stocks par secteur d'activité

L'agrégation et le regroupement sont des outils essentiels lorsque vous travaillez avec des données en streaming. L’examen des dossiers individuels au fur et à mesure de leur réception est souvent insuffisant. Pour extraire des informations supplémentaires des données, il est nécessaire de les regrouper et de les combiner.

Dans cet exemple, vous enfilerez le costume d'un day trader qui doit suivre le volume des ventes d'actions d'entreprises de plusieurs secteurs. Plus précisément, vous vous intéressez aux cinq entreprises dont les ventes d’actions sont les plus importantes dans chaque secteur.

Une telle agrégation nécessitera les étapes suivantes pour traduire les données sous la forme souhaitée (en termes généraux).

  1. Créez une source thématique qui publie des informations brutes sur les opérations boursières. Nous devrons mapper un objet de type StockTransaction à un objet de type ShareVolume. Le fait est que l'objet StockTransaction contient des métadonnées de ventes, mais nous n'avons besoin que de données sur le nombre d'actions vendues.
  2. Regroupez les données ShareVolume par symbole boursier. Une fois regroupées par symbole, vous pouvez réduire ces données en sous-totaux des volumes de ventes de stocks. Il convient de noter que la méthode KStream.groupBy renvoie une instance du type KGroupedStream. Et vous pouvez obtenir une instance de KTable en appelant davantage la méthode KGroupedStream.reduce.

Qu'est-ce que l'interface KGroupedStream

Les méthodes KStream.groupBy et KStream.groupByKey renvoient une instance de KGroupedStream. KGroupedStream est une représentation intermédiaire d'un flux d'événements après regroupement par clés. Il n'est pas du tout destiné à travailler directement avec lui. Au lieu de cela, KGroupedStream est utilisé pour les opérations d'agrégation, qui aboutissent toujours à une KTable. Et comme le résultat des opérations d'agrégation est une KTable et qu'elles utilisent un magasin d'état, il est possible que toutes les mises à jour qui en résultent ne soient pas envoyées plus loin dans le pipeline.

La méthode KTable.groupBy renvoie un KGroupedTable similaire - une représentation intermédiaire du flux de mises à jour, regroupé par clé.

Faisons une petite pause et regardons la figure. 5.9, qui montre ce que nous avons réalisé. Cette topologie devrait déjà vous être très familière.

Le livre « Kafka Streams in Action. Applications et microservices pour le travail en temps réel"
Regardons maintenant le code de cette topologie (il se trouve dans le fichier src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listing 5.2).

Le livre « Kafka Streams in Action. Applications et microservices pour le travail en temps réel"
Le code donné se distingue par sa brièveté et le grand volume d'actions réalisées sur plusieurs lignes. Vous remarquerez peut-être quelque chose de nouveau dans le premier paramètre de la méthode builder.stream : une valeur du type enum AutoOffsetReset.EARLIEST (il existe également un LATEST), définie à l'aide de la méthode Consumed.withOffsetResetPolicy. Ce type d'énumération peut être utilisé pour spécifier une stratégie de réinitialisation de décalage pour chaque KStream ou KTable et est prioritaire sur l'option de réinitialisation de décalage de la configuration.

GroupByKey et GroupBy

L'interface KStream dispose de deux méthodes pour regrouper les enregistrements : GroupByKey et GroupBy. Les deux renvoient un KGroupedTable, vous vous demandez peut-être quelle est la différence entre eux et quand utiliser lequel ?

La méthode GroupByKey est utilisée lorsque les clés du KStream ne sont déjà pas vides. Et plus important encore, l’indicateur « nécessite un re-partitionnement » n’a jamais été activé.

La méthode GroupBy suppose que vous avez modifié les clés de regroupement, l'indicateur de répartition est donc défini sur true. Effectuer des jointures, des agrégations, etc. après la méthode GroupBy entraînera un repartitionnement automatique.
Résumé : Dans la mesure du possible, vous devez utiliser GroupByKey plutôt que GroupBy.

Ce que font les méthodes mapValues ​​​​et groupBy est clair, alors jetons un coup d'œil à la méthode sum() (trouvée dans src/main/java/bbejeck/model/ShareVolume.java) (liste 5.3).

Le livre « Kafka Streams in Action. Applications et microservices pour le travail en temps réel"
La méthode ShareVolume.sum renvoie le total cumulé du volume des ventes d'actions, et le résultat de toute la chaîne de calculs est un objet KTable . Vous comprenez maintenant le rôle que joue KTable. Lorsque les objets ShareVolume arrivent, l'objet KTable correspondant stocke la dernière mise à jour actuelle. Il est important de se rappeler que toutes les mises à jour sont reflétées dans la shareVolumeKTable précédente, mais que toutes ne sont pas envoyées plus loin.

Nous utilisons ensuite ce KTable pour agréger (par nombre d'actions négociées) afin d'arriver aux cinq sociétés avec les volumes d'actions négociés les plus élevés dans chaque secteur. Nos actions dans ce cas seront similaires à celles de la première agrégation.

  1. Effectuez une autre opération groupBy pour regrouper des objets ShareVolume individuels par secteur.
  2. Commencez à résumer les objets ShareVolume. Cette fois, l'objet d'agrégation est une file d'attente prioritaire de taille fixe. Dans cette file d’attente de taille fixe, seules les cinq sociétés ayant vendu le plus grand nombre d’actions sont retenues.
  3. Mappez les files d'attente du paragraphe précédent sur une valeur de chaîne et renvoyez les cinq actions les plus négociées en nombre et par secteur.
  4. Écrivez les résultats sous forme de chaîne dans le sujet.

En figue. La figure 5.10 montre le graphique de topologie du flux de données. Comme vous pouvez le constater, le deuxième cycle de traitement est assez simple.

Le livre « Kafka Streams in Action. Applications et microservices pour le travail en temps réel"
Maintenant que nous avons bien compris la structure de ce deuxième tour de traitement, nous pouvons nous tourner vers son code source (vous le trouverez dans le fichier src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listing 5.4) .

Cet initialiseur contient une variable fixedQueue. Il s'agit d'un objet personnalisé qui est un adaptateur pour java.util.TreeSet utilisé pour suivre les N premiers résultats par ordre décroissant des actions négociées.

Le livre « Kafka Streams in Action. Applications et microservices pour le travail en temps réel"
Vous avez déjà vu les appels groupBy et mapValues, nous n'entrerons donc pas dans ceux-ci (nous appelons la méthode KTable.toStream car la méthode KTable.print est obsolète). Mais vous n'avez pas encore vu la version KTable de Aggregate(), nous allons donc passer un peu de temps à en discuter.

Comme vous vous en souvenez, ce qui différencie KTable est que les enregistrements avec les mêmes clés sont considérés comme des mises à jour. KTable remplace l'ancienne entrée par une nouvelle. L'agrégation se déroule de la même manière : les derniers enregistrements avec la même clé sont agrégés. Lorsqu'un enregistrement arrive, il est ajouté à l'instance de classe FixedSizePriorityQueue à l'aide d'un additionneur (deuxième paramètre dans l'appel de la méthode d'agrégation), mais si un autre enregistrement existe déjà avec la même clé, alors l'ancien enregistrement est supprimé à l'aide d'un soustracteur (troisième paramètre dans l'appel de la méthode d'agrégation).

Tout cela signifie que notre agrégateur, FixedSizePriorityQueue, n'agrège pas toutes les valeurs avec une seule clé, mais stocke une somme mobile des quantités des N types d'actions les plus négociés. Chaque entrée entrante contient le nombre total d'actions vendues jusqu'à présent. KTable vous donnera des informations sur les actions des sociétés actuellement les plus négociées, sans nécessiter une agrégation continue de chaque mise à jour.

Nous avons appris à faire deux choses importantes :

  • regrouper les valeurs dans KTable par une clé commune ;
  • effectuer des opérations utiles telles que le cumul et l'agrégation sur ces valeurs groupées.

Savoir comment effectuer ces opérations est important pour comprendre la signification des données circulant dans une application Kafka Streams et comprendre quelles informations elles transportent.

Nous avons également rassemblé certains des concepts clés abordés plus tôt dans ce livre. Au chapitre 4, nous avons expliqué à quel point l'état local tolérant aux pannes est important pour une application de streaming. Le premier exemple de ce chapitre montre pourquoi l'état local est si important : il vous donne la possibilité de garder une trace des informations que vous avez déjà consultées. L'accès local évite les retards du réseau, rendant l'application plus performante et résistante aux erreurs.

Lors de l'exécution d'une opération de cumul ou d'agrégation, vous devez spécifier le nom du magasin d'état. Les opérations de cumul et d'agrégation renvoient une instance de KTable, et KTable utilise le stockage d'état pour remplacer les anciens résultats par de nouveaux. Comme vous l'avez vu, toutes les mises à jour ne sont pas envoyées dans le pipeline, ce qui est important car les opérations d'agrégation sont conçues pour produire des informations récapitulatives. Si vous n'appliquez pas l'état local, KTable transmettra tous les résultats d'agrégation et de cumul.

Nous examinerons ensuite la manière d'effectuer des opérations telles que l'agrégation au cours d'une période de temps spécifique - ce que l'on appelle les opérations de fenêtrage.

5.3.2. Opérations sur les fenêtres

Dans la section précédente, nous avons introduit la convolution glissante et l'agrégation. L'application effectue un cumul continu du volume des ventes d'actions, suivi d'une agrégation des cinq actions les plus négociées en bourse.

Parfois, une telle agrégation et consolidation continue des résultats est nécessaire. Et parfois, vous devez effectuer des opérations uniquement sur une période de temps donnée. Par exemple, calculez combien de transactions de change ont été effectuées avec des actions d'une société particulière au cours des 10 dernières minutes. Ou combien d’utilisateurs ont cliqué sur une nouvelle bannière publicitaire au cours des 15 dernières minutes. Une application peut effectuer de telles opérations plusieurs fois, mais avec des résultats qui s'appliquent uniquement à des périodes de temps spécifiées (fenêtres horaires).

Comptabilisation des opérations d'échange par acheteur

Dans l'exemple suivant, nous suivrons les transactions boursières de plusieurs traders, qu'il s'agisse de grandes organisations ou de financiers individuels intelligents.

Il y a deux raisons possibles à ce suivi. L’un d’eux est la nécessité de savoir ce que les leaders du marché achètent/vendent. Si ces grands acteurs et investisseurs avertis voient des opportunités, il est logique de suivre leur stratégie. La deuxième raison est la volonté de détecter d’éventuels signes de délit d’initié. Pour ce faire, vous devrez analyser la corrélation entre les pics de ventes importants et les communiqués de presse importants.

Un tel suivi comprend les étapes suivantes :

  • créer un flux de lecture à partir du sujet des transactions boursières ;
  • regrouper les enregistrements entrants par identifiant d'acheteur et symbole boursier. L'appel de la méthode groupBy renvoie une instance de la classe KGroupedStream ;
  • La méthode KGroupedStream.windowedBy renvoie un flux de données limité à une fenêtre temporelle, ce qui permet une agrégation fenêtrée. Selon le type de fenêtre, un TimeWindowedKStream ou un SessionWindowedKStream est renvoyé ;
  • nombre de transactions pour l’opération d’agrégation. Le flux de données fenêtré détermine si un enregistrement particulier est pris en compte dans ce décompte ;
  • écrire les résultats dans un sujet ou les afficher sur la console pendant le développement.

La topologie de cette application est simple, mais une image claire serait utile. Jetons un coup d'oeil à la Fig. 5.11.

Ensuite, nous examinerons la fonctionnalité des opérations sur les fenêtres et le code correspondant.

Le livre « Kafka Streams in Action. Applications et microservices pour le travail en temps réel"

Types de fenêtres

Il existe trois types de fenêtres dans Kafka Streams :

  • en session ;
  • « culbuter » (culbuter) ;
  • glisser/sauter.

Le choix dépend des besoins de votre entreprise. Les fenêtres de basculement et de saut sont limitées dans le temps, tandis que les fenêtres de session sont limitées par l'activité de l'utilisateur : la durée de la ou des sessions est déterminée uniquement par le degré d'activité de l'utilisateur. La principale chose à retenir est que tous les types de fenêtres sont basés sur les horodatages des entrées, et non sur l'heure du système.

Ensuite, nous implémentons notre topologie avec chacun des types de fenêtres. Le code complet ne sera donné que dans le premier exemple ; pour les autres types de fenêtres rien ne changera sauf le type de fonctionnement de la fenêtre.

Fenêtres de session

Les fenêtres de session sont très différentes de tous les autres types de fenêtres. Ils sont limités non pas tant par le temps que par l'activité de l'utilisateur (ou l'activité de l'entité que vous souhaitez suivre). Les fenêtres de session sont délimitées par des périodes d'inactivité.

La figure 5.12 illustre le concept de fenêtres de session. La plus petite session fusionnera avec la session à sa gauche. Et la séance de droite sera séparée car elle fait suite à une longue période d'inactivité. Les fenêtres de session sont basées sur l'activité de l'utilisateur, mais utilisent les horodatages des entrées pour déterminer à quelle session appartient l'entrée.

Le livre « Kafka Streams in Action. Applications et microservices pour le travail en temps réel"

Utiliser les fenêtres de session pour suivre les transactions boursières

Utilisons les fenêtres de session pour capturer des informations sur les transactions d'échange. L'implémentation des fenêtres de session est présentée dans le listing 5.5 (qui peut être trouvé dans src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

Le livre « Kafka Streams in Action. Applications et microservices pour le travail en temps réel"
Vous avez déjà vu la plupart des opérations dans cette topologie, il n'est donc pas nécessaire de les revenir ici. Mais il y a aussi ici plusieurs éléments nouveaux, dont nous allons maintenant discuter.

Toute opération groupBy effectue généralement une sorte d’opération d’agrégation (agrégation, cumul ou comptage). Vous pouvez effectuer soit une agrégation cumulative avec un total cumulé, soit une agrégation par fenêtre, qui prend en compte les enregistrements dans une fenêtre de temps spécifiée.

Le code du listing 5.5 compte le nombre de transactions dans les fenêtres de session. En figue. 5.13 ces actions sont analysées étape par étape.

En appelant windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) nous créons une fenêtre de session avec un intervalle d'inactivité de 20 secondes et un intervalle de persistance de 15 minutes. Un intervalle d'inactivité de 20 secondes signifie que l'application inclura toute entrée arrivant dans les 20 secondes suivant la fin ou le début de la session en cours dans la session (active) en cours.

Le livre « Kafka Streams in Action. Applications et microservices pour le travail en temps réel"
Ensuite, nous spécifions quelle opération d'agrégation doit être effectuée dans la fenêtre de session - dans ce cas, comptez. Si une entrée entrante se situe en dehors de la fenêtre d'inactivité (de chaque côté de l'horodatage), l'application crée une nouvelle session. L'intervalle de conservation signifie le maintien d'une session pendant un certain temps et autorise les données tardives qui s'étendent au-delà de la période d'inactivité de la session, mais qui peuvent toujours être jointes. De plus, le début et la fin de la nouvelle session résultant de la fusion correspondent à l'horodatage le plus ancien et le plus récent.

Examinons quelques entrées de la méthode count pour voir comment fonctionnent les sessions (Tableau 5.1).

Le livre « Kafka Streams in Action. Applications et microservices pour le travail en temps réel"
Lorsque les enregistrements arrivent, nous recherchons les sessions existantes avec la même clé, une heure de fin inférieure à la date/heure actuelle - intervalle d'inactivité et une heure de début supérieure à la date/heure actuelle + intervalle d'inactivité. En tenant compte de cela, quatre entrées du tableau. 5.1 sont fusionnés en une seule session comme suit.

1. L'enregistrement 1 arrive en premier, donc l'heure de début est égale à l'heure de fin et est 00:00:00.

2. Ensuite, l'entrée 2 arrive et nous recherchons les sessions qui se terminent au plus tôt à 23:59:55 et commencent au plus tard à 00:00:35. Nous trouvons l'enregistrement 1 et combinons les sessions 1 et 2. Nous prenons l'heure de début de la session 1 (plus tôt) et l'heure de fin de la session 2 (plus tard), pour que notre nouvelle session commence à 00:00:00 et se termine à 00 : 00h15.

3. L'enregistrement 3 arrive, nous cherchons les séances entre 00:00:30 et 00:01:10 et n'en trouvons aucune. Ajoutez une deuxième session pour la clé 123-345-654,FFBE, commençant et se terminant à 00:00:50.

4. L'enregistrement 4 arrive et nous recherchons des sessions entre 23:59:45 et 00:00:25. Cette fois, on retrouve les sessions 1 et 2. Les trois sessions sont combinées en une seule, avec une heure de début à 00:00:00 et une heure de fin à 00:00:15.

D’après ce qui est décrit dans cette section, il convient de rappeler les nuances importantes suivantes :

  • les sessions ne sont pas des fenêtres de taille fixe. La durée d'une séance est déterminée par l'activité réalisée sur une période de temps donnée ;
  • Les horodatages dans les données déterminent si l'événement s'inscrit dans une session existante ou pendant une période d'inactivité.

Nous discuterons ensuite du prochain type de fenêtre : les fenêtres « culbutantes ».

Fenêtres « culbutantes »

Les fenêtres déroulantes capturent les événements qui se déroulent sur une certaine période de temps. Imaginez que vous ayez besoin de capturer toutes les transactions boursières d'une certaine entreprise toutes les 20 secondes, de sorte que vous collectiez tous les événements au cours de cette période. À la fin de l'intervalle de 20 secondes, la fenêtre se retourne et passe à un nouvel intervalle d'observation de 20 secondes. La figure 5.14 illustre cette situation.

Le livre « Kafka Streams in Action. Applications et microservices pour le travail en temps réel"
Comme vous pouvez le constater, tous les événements reçus au cours des 20 dernières secondes sont inclus dans la fenêtre. A l'issue de ce délai, une nouvelle fenêtre est créée.

Le listing 5.6 montre du code qui démontre l'utilisation de fenêtres bascules pour capturer les transactions boursières toutes les 20 secondes (trouvé dans src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Le livre « Kafka Streams in Action. Applications et microservices pour le travail en temps réel"
Avec cette petite modification apportée à l’appel de méthode TimeWindows.of, vous pouvez utiliser une fenêtre bascule. Cet exemple n'appelle pas la méthode jusqu'à(), donc l'intervalle de rétention par défaut de 24 heures sera utilisé.

Enfin, il est temps de passer à la dernière des options de fenêtre : les fenêtres « sautantes ».

Fenêtres coulissantes (« sautantes »)

Les fenêtres coulissantes/sautantes sont similaires aux fenêtres basculantes, mais avec une légère différence. Les fenêtres glissantes n'attendent pas la fin de l'intervalle de temps avant de créer une nouvelle fenêtre pour traiter les événements récents. Ils démarrent de nouveaux calculs après un intervalle d'attente inférieur à la durée de la fenêtre.

Pour illustrer les différences entre les fenêtres culbutantes et sautantes, revenons à l'exemple du comptage des transactions boursières. Notre objectif est toujours de compter le nombre de transactions, mais nous ne voulons pas attendre tout le temps avant de mettre à jour le compteur. Au lieu de cela, nous mettrons à jour le compteur à des intervalles plus courts. Par exemple, nous compterons toujours le nombre de transactions toutes les 20 secondes, mais mettrons à jour le compteur toutes les 5 secondes, comme le montre la Fig. 5.15. Dans ce cas, on se retrouve avec trois fenêtres de résultats avec des données qui se chevauchent.

Le livre « Kafka Streams in Action. Applications et microservices pour le travail en temps réel"
Le listing 5.7 montre le code pour définir les fenêtres coulissantes (trouvé dans src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Le livre « Kafka Streams in Action. Applications et microservices pour le travail en temps réel"
Une fenêtre bascule peut être convertie en fenêtre sautante en ajoutant un appel à la méthode advanceBy(). Dans l'exemple présenté, l'intervalle de sauvegarde est de 15 minutes.

Vous avez vu dans cette section comment limiter les résultats d'agrégation à des fenêtres temporelles. En particulier, je veux que vous vous souveniez des trois choses suivantes de cette section :

  • la taille des fenêtres de session n'est pas limitée par la période de temps, mais par l'activité de l'utilisateur ;
  • des fenêtres « culbutantes » donnent un aperçu des événements sur une période de temps donnée ;
  • La durée des fenêtres sautées est fixe, mais elles sont mises à jour fréquemment et peuvent contenir des entrées qui se chevauchent dans toutes les fenêtres.

Ensuite, nous apprendrons comment reconvertir une KTable en KStream pour une connexion.

5.3.3. Connexion des objets KStream et KTable

Au chapitre 4, nous avons discuté de la connexion de deux objets KStream. Nous devons maintenant apprendre à connecter KTable et KStream. Cela peut être nécessaire pour la simple raison suivante. KStream est un flux d'enregistrements et KTable est un flux de mises à jour d'enregistrements, mais vous souhaiterez parfois ajouter un contexte supplémentaire au flux d'enregistrements à l'aide des mises à jour de KTable.

Prenons les données sur le nombre de transactions boursières et combinons-les avec l'actualité boursière des secteurs concernés. Voici ce que vous devez faire pour y parvenir étant donné le code que vous possédez déjà.

  1. Convertissez un objet KTable avec des données sur le nombre de transactions boursières en un KStream, puis remplacez la clé par la clé indiquant le secteur industriel correspondant à ce symbole boursier.
  2. Créez un objet KTable qui lit les données d'un sujet avec l'actualité boursière. Ce nouveau KTable sera classé par secteur industriel.
  3. Connectez les mises à jour avec des informations sur le nombre de transactions boursières par secteur industriel.

Voyons maintenant comment mettre en œuvre ce plan d'action.

Convertir KTable en KStream

Pour convertir KTable en KStream, vous devez procéder comme suit.

  1. Appelez la méthode KTable.toStream().
  2. En appelant la méthode KStream.map, remplacez la clé par le nom du secteur, puis récupérez l'objet TransactionSummary à partir de l'instance Windowed.

Nous allons enchaîner ces opérations comme suit (le code se trouve dans le fichier src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.8).

Le livre « Kafka Streams in Action. Applications et microservices pour le travail en temps réel"
Étant donné que nous effectuons une opération KStream.map, l'instance KStream renvoyée est automatiquement repartitionnée lorsqu'elle est utilisée dans une connexion.

Nous avons terminé le processus de conversion, nous devons ensuite créer un objet KTable pour lire les actualités boursières.

Création de KTable pour l'actualité boursière

Heureusement, la création d'un objet KTable ne nécessite qu'une seule ligne de code (le code peut être trouvé dans src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (extrait 5.9).

Le livre « Kafka Streams in Action. Applications et microservices pour le travail en temps réel"
Il convient de noter qu'aucun objet Serde ne doit être spécifié, car la chaîne Serdes est utilisée dans les paramètres. De plus, en utilisant l'énumération EARLIEST, la table est remplie d'enregistrements au tout début.

Nous pouvons maintenant passer à la dernière étape : la connexion.

Connecter les mises à jour d'actualités aux données sur le nombre de transactions

Créer une connexion n’est pas difficile. Nous utiliserons une jointure gauche au cas où il n'y aurait pas d'actualité boursière pour le secteur concerné (le code nécessaire se trouve dans le fichier src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (List 5.10).

Le livre « Kafka Streams in Action. Applications et microservices pour le travail en temps réel"
Cet opérateur leftJoin est assez simple. Contrairement aux jointures du chapitre 4, la méthode JoinWindow n'est pas utilisée car lors de l'exécution d'une jointure KStream-KTable, il n'y a qu'une seule entrée dans la KTable pour chaque clé. Une telle connexion n'est pas limitée dans le temps : l'enregistrement est soit dans la KTable, soit absent. La principale conclusion : en utilisant les objets KTable, vous pouvez enrichir KStream avec des données de référence moins fréquemment mises à jour.

Nous allons maintenant examiner un moyen plus efficace d'enrichir les événements de KStream.

5.3.4. Objets GlobalKTable

Comme vous pouvez le constater, il est nécessaire d’enrichir les flux d’événements ou de leur ajouter du contexte. Au chapitre 4 vous avez vu les connexions entre deux objets KStream, et dans la section précédente vous avez vu la connexion entre un KStream et une KTable. Dans tous ces cas, il est nécessaire de repartitionner le flux de données lors du mappage des clés vers un nouveau type ou une nouvelle valeur. Parfois, le partitionnement est effectué explicitement, et parfois Kafka Streams le fait automatiquement. Un repartitionnement est nécessaire car les clés ont changé et les enregistrements doivent se retrouver dans de nouvelles sections, sinon la connexion sera impossible (cela a été abordé au chapitre 4, dans la section « Repartitionnement des données » de la sous-section 4.2.4).

Le repartitionnement a un coût

Le repartitionnement nécessite des coûts - coûts de ressources supplémentaires pour créer des sujets intermédiaires, stocker des données en double dans un autre sujet ; cela signifie également une latence accrue due à l'écriture et à la lecture de ce sujet. De plus, si vous devez effectuer une jointure sur plusieurs aspects ou dimensions, vous devez chaîner les jointures, mapper les enregistrements avec de nouvelles clés et réexécuter le processus de repartitionnement.

Connexion à des ensembles de données plus petits

Dans certains cas, le volume de données de référence à connecter est relativement faible, de sorte que des copies complètes de celles-ci peuvent facilement s'adapter localement sur chaque nœud. Pour des situations comme celle-ci, Kafka Streams fournit la classe GlobalKTable.

Les instances GlobalKTable sont uniques car l'application réplique toutes les données sur chacun des nœuds. Et comme toutes les données sont présentes sur chaque nœud, il n'est pas nécessaire de partitionner le flux d'événements par clé de données de référence afin qu'il soit disponible pour toutes les partitions. Vous pouvez également effectuer des jointures sans clé à l'aide d'objets GlobalKTable. Revenons à l'un des exemples précédents pour illustrer cette fonctionnalité.

Connexion d'objets KStream aux objets GlobalKTable

Dans la sous-section 5.3.2, nous avons effectué une agrégation par fenêtre des transactions d'échange par acheteurs. Les résultats de cette agrégation ressemblaient à ceci :

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Bien que ces résultats aient été utiles, il aurait été plus utile que le nom du client et le nom complet de l'entreprise soient également affichés. Pour ajouter le nom du client et le nom de l'entreprise, vous pouvez effectuer des jointures normales, mais vous devrez effectuer deux mappages de clés et un repartitionnement. Avec GlobalKTable, vous pouvez éviter le coût de telles opérations.

Pour ce faire, nous utiliserons l'objet countStream du listing 5.11 (le code correspondant se trouve dans src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) et le connecterons à deux objets GlobalKTable.

Le livre « Kafka Streams in Action. Applications et microservices pour le travail en temps réel"
Nous en avons déjà discuté auparavant, je ne le répéterai donc pas. Mais je note que le code de la fonction toStream().map est résumé dans un objet fonction au lieu d'une expression lambda en ligne pour des raisons de lisibilité.

L'étape suivante consiste à déclarer deux instances de GlobalKTable (le code affiché se trouve dans le fichier src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listing 5.12).

Le livre « Kafka Streams in Action. Applications et microservices pour le travail en temps réel"

Veuillez noter que les noms de sujets sont décrits à l'aide de types énumérés.

Maintenant que tous les composants sont prêts, il ne reste plus qu'à écrire le code de la connexion (qui se trouve dans le fichier src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listing 5.13).

Le livre « Kafka Streams in Action. Applications et microservices pour le travail en temps réel"
Bien qu'il y ait deux jointures dans ce code, elles sont chaînées car aucun de leurs résultats n'est utilisé séparément. Les résultats sont affichés à la fin de toute l'opération.

Lorsque vous exécutez l'opération de jointure ci-dessus, vous obtiendrez des résultats comme celui-ci :

{customer='Barney, Smith' company="Exxon", transactions= 17}

L’essentiel n’a pas changé, mais ces résultats paraissent plus clairs.

Si vous comptez jusqu'au chapitre 4, vous avez déjà vu plusieurs types de connexions en action. Ils sont répertoriés dans le tableau. 5.2. Ce tableau reflète les capacités de connectivité à partir de la version 1.0.0 de Kafka Streams ; Quelque chose pourrait changer dans les prochaines versions.

Le livre « Kafka Streams in Action. Applications et microservices pour le travail en temps réel"
Pour conclure, récapitulons les bases : vous pouvez connecter des flux d'événements (KStream) et mettre à jour des flux (KTable) en utilisant l'état local. Alternativement, si la taille des données de référence n'est pas trop grande, vous pouvez utiliser l'objet GlobalKTable. GlobalKTables réplique toutes les partitions sur chaque nœud d'application Kafka Streams, garantissant que toutes les données sont disponibles quelle que soit la partition à laquelle correspond la clé.

Nous verrons ensuite la fonctionnalité Kafka Streams, grâce à laquelle nous pouvons observer les changements d'état sans consommer les données d'un sujet Kafka.

5.3.5. État interrogeable

Nous avons déjà effectué plusieurs opérations impliquant l'état et affichons toujours les résultats sur la console (à des fins de développement) ou les écrivons dans un sujet (à des fins de production). Lorsque vous écrivez des résultats dans un sujet, vous devez utiliser un consommateur Kafka pour les afficher.

La lecture des données de ces sujets peut être considérée comme un type de vues matérialisées. Pour nos besoins, nous pouvons utiliser la définition d'une vue matérialisée de Wikipédia : « …un objet de base de données physique contenant les résultats d'une requête. Par exemple, il peut s'agir d'une copie locale de données distantes, ou d'un sous-ensemble de lignes et/ou de colonnes d'une table ou de résultats de jointure, ou encore d'un tableau récapitulatif obtenu par agrégation » (https://en.wikipedia.org/wiki /Vue_matérialisée).

Kafka Streams vous permet également d'exécuter des requêtes interactives sur les magasins d'état, vous permettant de lire directement ces vues matérialisées. Il est important de noter que la requête adressée au magasin d'état est une opération en lecture seule. Cela garantit que vous n'avez pas à vous soucier de rendre accidentellement un état incohérent pendant que votre application traite des données.

La possibilité d’interroger directement les magasins d’état est importante. Cela signifie que vous pouvez créer des applications de tableau de bord sans avoir à récupérer au préalable les données du consommateur Kafka. Cela augmente également l'efficacité de l'application, du fait qu'il n'est pas nécessaire de réécrire les données :

  • grâce à la localisation des données, elles sont rapidement accessibles ;
  • la duplication des données est éliminée, car elles ne sont pas écrites sur un stockage externe.

La principale chose dont je veux que vous vous souveniez est que vous pouvez directement interroger l’état depuis votre application. Les opportunités que cela vous offre ne peuvent être surestimées. Au lieu de consommer des données de Kafka et de stocker des enregistrements dans une base de données pour l'application, vous pouvez interroger les magasins d'état avec le même résultat. Les requêtes directes vers les magasins d'état signifient moins de code (pas de consommateur) et moins de logiciels (pas besoin d'une table de base de données pour stocker les résultats).

Nous avons abordé pas mal de sujets dans ce chapitre, nous allons donc laisser pour l'instant notre discussion sur les requêtes interactives sur les magasins d'État. Mais ne vous inquiétez pas : au chapitre 9, nous créerons une application de tableau de bord simple avec des requêtes interactives. Il utilisera certains des exemples de ce chapitre et des chapitres précédents pour démontrer les requêtes interactives et comment vous pouvez les ajouter aux applications Kafka Streams.

Résumé

  • Les objets KStream représentent des flux d'événements, comparables à des insertions dans une base de données. Les objets KTable représentent des flux de mise à jour, plus comme des mises à jour d'une base de données. La taille de l'objet KTable n'augmente pas, les anciens enregistrements sont remplacés par de nouveaux.
  • Les objets KTable sont requis pour les opérations d'agrégation.
  • À l’aide d’opérations de fenêtrage, vous pouvez diviser les données agrégées en intervalles de temps.
  • Grâce aux objets GlobalKTable, vous pouvez accéder aux données de référence n'importe où dans l'application, quel que soit le partitionnement.
  • Des connexions entre les objets KStream, KTable et GlobalKTable sont possibles.

Jusqu'à présent, nous nous sommes concentrés sur la création d'applications Kafka Streams à l'aide du KStream DSL de haut niveau. Bien que l’approche de haut niveau vous permette de créer des programmes soignés et concis, son utilisation représente un compromis. Travailler avec DSL KStream signifie augmenter la concision de votre code en réduisant le degré de contrôle. Dans le chapitre suivant, nous examinerons l'API du nœud de gestionnaire de bas niveau et tenterons d'autres compromis. Les programmes seront plus longs qu'avant, mais nous pourrons créer presque tous les nœuds de gestion dont nous pourrions avoir besoin.

→ Plus de détails sur le livre peuvent être trouvés sur site de l'éditeur

→ Pour Habrozhiteli 25% de réduction en utilisant le coupon - Flux Kafka

→ Dès paiement de la version papier du livre, un livre électronique vous sera envoyé par e-mail.

Source: habr.com

Ajouter un commentaire