Das Buch „Kafka Streams in Action. Anwendungen und Microservices für die Echtzeitarbeit“

Das Buch „Kafka Streams in Action. Anwendungen und Microservices für die Echtzeitarbeit“ Hallo, Khabro-Bewohner! Dieses Buch ist für jeden Entwickler geeignet, der die Thread-Verarbeitung verstehen möchte. Wenn Sie die verteilte Programmierung verstehen, können Sie Kafka und Kafka Streams besser verstehen. Es wäre schön, das Kafka-Framework selbst zu kennen, aber das ist nicht notwendig: Ich werde Ihnen alles sagen, was Sie brauchen. Erfahrene Kafka-Entwickler und Neulinge erfahren in diesem Buch gleichermaßen, wie sie mithilfe der Kafka Streams-Bibliothek interessante Stream-Verarbeitungsanwendungen erstellen. Fortgeschrittene und fortgeschrittene Java-Entwickler, die bereits mit Konzepten wie Serialisierung vertraut sind, lernen, ihre Fähigkeiten anzuwenden, um Kafka Streams-Anwendungen zu erstellen. Der Quellcode des Buches ist in Java 8 geschrieben und nutzt in erheblichem Maße die Lambda-Ausdruckssyntax von Java 8. Daher ist es hilfreich zu wissen, wie man mit Lambda-Funktionen arbeitet (auch in einer anderen Programmiersprache).

Auszug. 5.3. Aggregations- und Fensteroperationen

In diesem Abschnitt werden wir uns mit den vielversprechendsten Teilen von Kafka Streams befassen. Bisher haben wir die folgenden Aspekte von Kafka Streams behandelt:

  • Erstellen einer Verarbeitungstopologie;
  • Verwenden des Status in Streaming-Anwendungen;
  • Durchführen von Datenstromverbindungen;
  • Unterschiede zwischen Ereignisströmen (KStream) und Aktualisierungsströmen (KTable).

In den folgenden Beispielen werden wir alle diese Elemente zusammenführen. Außerdem erfahren Sie etwas über Windowing, eine weitere großartige Funktion von Streaming-Anwendungen. Unser erstes Beispiel wird eine einfache Aggregation sein.

5.3.1. Aggregation der Lagerverkäufe nach Branchen

Aggregation und Gruppierung sind wichtige Werkzeuge bei der Arbeit mit Streaming-Daten. Die Prüfung einzelner Unterlagen beim Eingang reicht oft nicht aus. Um zusätzliche Informationen aus Daten zu extrahieren, ist es notwendig, diese zu gruppieren und zu kombinieren.

In diesem Beispiel schlüpfen Sie in die Rolle eines Daytraders, der das Verkaufsvolumen von Aktien von Unternehmen verschiedener Branchen verfolgen muss. Konkret interessieren Sie sich für die fünf Unternehmen mit den größten Aktienumsätzen in jeder Branche.

Eine solche Aggregation erfordert die folgenden Schritte, um die Daten in die gewünschte Form zu übersetzen (allgemein gesprochen).

  1. Erstellen Sie eine themenbasierte Quelle, die Rohinformationen zum Aktienhandel veröffentlicht. Wir müssen ein Objekt vom Typ StockTransaction einem Objekt vom Typ ShareVolume zuordnen. Der Punkt ist, dass das StockTransaction-Objekt Verkaufsmetadaten enthält, wir aber nur Daten über die Anzahl der verkauften Aktien benötigen.
  2. Gruppieren Sie ShareVolume-Daten nach Aktiensymbol. Nach der Gruppierung nach Symbol können Sie diese Daten in Zwischensummen der Lagerverkaufsmengen zusammenfassen. Es ist erwähnenswert, dass die Methode KStream.groupBy eine Instanz vom Typ KGroupedStream zurückgibt. Und Sie können eine KTable-Instanz erhalten, indem Sie die Methode KGroupedStream.reduce weiter aufrufen.

Was ist die KGroupedStream-Schnittstelle?

Die Methoden KStream.groupBy und KStream.groupByKey geben eine Instanz von KGroupedStream zurück. KGroupedStream ist eine Zwischendarstellung eines Ereignisstroms nach der Gruppierung nach Schlüsseln. Es ist überhaupt nicht für die direkte Arbeit damit gedacht. Stattdessen wird KGroupedStream für Aggregationsoperationen verwendet, die immer zu einer KTable führen. Und da das Ergebnis von Aggregationsvorgängen eine KTable ist und sie einen Statusspeicher verwenden, ist es möglich, dass nicht alle resultierenden Aktualisierungen weiter unten in der Pipeline gesendet werden.

Die Methode KTable.groupBy gibt eine ähnliche KGroupedTable zurück – eine Zwischendarstellung des Aktualisierungsstroms, neu gruppiert nach Schlüssel.

Machen wir eine kurze Pause und schauen uns Abb. an. 5.9, die zeigt, was wir erreicht haben. Diese Topologie dürfte Ihnen bereits sehr vertraut sein.

Das Buch „Kafka Streams in Action. Anwendungen und Microservices für die Echtzeitarbeit“
Schauen wir uns nun den Code für diese Topologie an (er befindet sich in der Datei src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listing 5.2).

Das Buch „Kafka Streams in Action. Anwendungen und Microservices für die Echtzeitarbeit“
Der gegebene Code zeichnet sich durch seine Kürze und den großen Umfang der in mehreren Zeilen ausgeführten Aktionen aus. Möglicherweise fällt Ihnen im ersten Parameter der Methode builder.stream etwas Neues auf: ein Wert vom Aufzählungstyp AutoOffsetReset.EARLIEST (es gibt auch LATEST), der mit der Methode Consumed.withOffsetResetPolicy festgelegt wird. Dieser Aufzählungstyp kann verwendet werden, um eine Offset-Reset-Strategie für jeden KStream oder KTable anzugeben und hat Vorrang vor der Offset-Reset-Option aus der Konfiguration.

GroupByKey und GroupBy

Die KStream-Schnittstelle verfügt über zwei Methoden zum Gruppieren von Datensätzen: GroupByKey und GroupBy. Beide geben eine KGroupedTable zurück. Sie fragen sich also vielleicht, was der Unterschied zwischen ihnen ist und wann welche verwendet werden sollte?

Die GroupByKey-Methode wird verwendet, wenn die Schlüssel im KStream bereits nicht leer sind. Und was am wichtigsten ist: Das Flag „Neupartitionierung erforderlich“ wurde nie gesetzt.

Die GroupBy-Methode geht davon aus, dass Sie die Gruppierungsschlüssel geändert haben, sodass das Neupartitionierungsflag auf „true“ gesetzt ist. Das Durchführen von Verknüpfungen, Aggregationen usw. nach der GroupBy-Methode führt zu einer automatischen Neupartitionierung.
Zusammenfassung: Wann immer möglich, sollten Sie GroupByKey anstelle von GroupBy verwenden.

Es ist klar, was die Methoden „mapValues“ und „groupBy“ bewirken. Werfen wir also einen Blick auf die Methode „sum()“ (zu finden in src/main/java/bbejeck/model/ShareVolume.java) (Listing 5.3).

Das Buch „Kafka Streams in Action. Anwendungen und Microservices für die Echtzeitarbeit“
Die Methode ShareVolume.sum gibt die laufende Summe des Aktienverkaufsvolumens zurück und das Ergebnis der gesamten Berechnungskette ist ein KTable-Objekt . Jetzt verstehen Sie die Rolle, die KTable spielt. Wenn ShareVolume-Objekte eintreffen, speichert das entsprechende KTable-Objekt das neueste aktuelle Update. Es ist wichtig zu bedenken, dass alle Aktualisierungen in der vorherigen shareVolumeKTable widergespiegelt werden, aber nicht alle weitergesendet werden.

Anschließend verwenden wir diese KTable zur Aggregation (nach Anzahl der gehandelten Aktien), um zu den fünf Unternehmen mit den höchsten gehandelten Aktienvolumina in jeder Branche zu gelangen. Unsere Aktionen werden in diesem Fall denen für die erste Aggregation ähneln.

  1. Führen Sie einen weiteren „groupBy“-Vorgang aus, um einzelne ShareVolume-Objekte nach Branche zu gruppieren.
  2. Beginnen Sie mit der Zusammenfassung von ShareVolume-Objekten. Diesmal ist das Aggregationsobjekt eine Prioritätswarteschlange mit fester Größe. In dieser Warteschlange mit fester Größe werden nur die fünf Unternehmen mit den meisten verkauften Aktien beibehalten.
  3. Ordnen Sie die Warteschlangen aus dem vorherigen Absatz einem Zeichenfolgenwert zu und geben Sie die fünf am häufigsten gehandelten Aktien nach Anzahl und Branche zurück.
  4. Schreiben Sie die Ergebnisse in Stringform zum Thema.

In Abb. Abbildung 5.10 zeigt das Diagramm der Datenflusstopologie. Wie Sie sehen, ist die zweite Verarbeitungsrunde recht einfach.

Das Buch „Kafka Streams in Action. Anwendungen und Microservices für die Echtzeitarbeit“
Nachdem wir nun ein klares Verständnis der Struktur dieser zweiten Verarbeitungsrunde haben, können wir uns dem Quellcode zuwenden (Sie finden ihn in der Datei src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listing 5.4) .

Dieser Initialisierer enthält eine Variable „fixedQueue“. Dies ist ein benutzerdefiniertes Objekt, das ein Adapter für java.util.TreeSet ist, der verwendet wird, um die Top-N-Ergebnisse in absteigender Reihenfolge der gehandelten Aktien zu verfolgen.

Das Buch „Kafka Streams in Action. Anwendungen und Microservices für die Echtzeitarbeit“
Sie haben die Aufrufe „groupBy“ und „mapValues“ bereits gesehen, daher werden wir nicht näher darauf eingehen (wir rufen die Methode KTable.toStream auf, da die Methode KTable.print veraltet ist). Aber Sie haben die KTable-Version vonaggregat() noch nicht gesehen, deshalb werden wir uns etwas Zeit nehmen, darüber zu diskutieren.

Wie Sie sich erinnern, besteht der Unterschied zwischen KTable darin, dass Datensätze mit denselben Schlüsseln als Aktualisierungen betrachtet werden. KTable ersetzt den alten Eintrag durch einen neuen. Die Aggregation erfolgt auf ähnliche Weise: Die neuesten Datensätze mit demselben Schlüssel werden aggregiert. Wenn ein Datensatz eintrifft, wird er mithilfe eines Addierers (zweiter Parameter im Aggregatmethodenaufruf) zur Instanz der FixedSizePriorityQueue-Klasse hinzugefügt. Wenn jedoch bereits ein anderer Datensatz mit demselben Schlüssel vorhanden ist, wird der alte Datensatz mithilfe eines Subtrahierers (dritter Parameter in) entfernt der aggregierte Methodenaufruf).

Dies alles bedeutet, dass unser Aggregator, FixedSizePriorityQueue, nicht alle Werte mit einem Schlüssel aggregiert, sondern eine gleitende Summe der Mengen der N am häufigsten gehandelten Aktienarten speichert. Jeder eingehende Eintrag enthält die Gesamtzahl der bisher verkauften Aktien. KTable liefert Ihnen Informationen darüber, welche Aktien der Unternehmen derzeit am meisten gehandelt werden, ohne dass jede Aktualisierung fortlaufend zusammengefasst werden muss.

Wir haben gelernt, zwei wichtige Dinge zu tun:

  • Gruppenwerte in KTable nach einem gemeinsamen Schlüssel;
  • Führen Sie nützliche Vorgänge wie Rollup und Aggregation für diese gruppierten Werte durch.

Zu wissen, wie diese Vorgänge ausgeführt werden, ist wichtig, um die Bedeutung der Daten zu verstehen, die durch eine Kafka Streams-Anwendung übertragen werden, und um zu verstehen, welche Informationen sie enthalten.

Wir haben auch einige der Schlüsselkonzepte zusammengestellt, die weiter oben in diesem Buch besprochen wurden. In Kapitel 4 haben wir darüber gesprochen, wie wichtig ein fehlertoleranter lokaler Zustand für eine Streaming-Anwendung ist. Das erste Beispiel in diesem Kapitel zeigte, warum der lokale Status so wichtig ist – er gibt Ihnen die Möglichkeit, den Überblick darüber zu behalten, welche Informationen Sie bereits gesehen haben. Durch den lokalen Zugriff werden Netzwerkverzögerungen vermieden, wodurch die Anwendung leistungsfähiger und fehlerresistenter wird.

Wenn Sie einen Rollup- oder Aggregationsvorgang durchführen, müssen Sie den Namen des Statusspeichers angeben. Die Rollup- und Aggregationsvorgänge geben eine KTable-Instanz zurück, und KTable verwendet den Statusspeicher, um alte Ergebnisse durch neue zu ersetzen. Wie Sie gesehen haben, werden nicht alle Aktualisierungen über die Pipeline gesendet. Dies ist wichtig, da Aggregationsvorgänge darauf ausgelegt sind, zusammenfassende Informationen zu erstellen. Wenn Sie den lokalen Status nicht anwenden, leitet KTable alle Aggregations- und Rollup-Ergebnisse weiter.

Als nächstes betrachten wir die Durchführung von Vorgängen wie der Aggregation innerhalb eines bestimmten Zeitraums – sogenannte Fensteroperationen.

5.3.2. Fensteroperationen

Im vorherigen Abschnitt haben wir die gleitende Faltung und Aggregation eingeführt. Die Anwendung führte eine kontinuierliche Zusammenfassung des Aktienverkaufsvolumens durch, gefolgt von einer Aggregation der fünf am häufigsten gehandelten Aktien an der Börse.

Manchmal ist eine solche kontinuierliche Aggregation und Zusammenfassung der Ergebnisse erforderlich. Und manchmal müssen Sie Operationen nur über einen bestimmten Zeitraum durchführen. Berechnen Sie beispielsweise, wie viele Umtauschtransaktionen mit Aktien eines bestimmten Unternehmens in den letzten 10 Minuten getätigt wurden. Oder wie viele Nutzer in den letzten 15 Minuten auf ein neues Werbebanner geklickt haben. Eine Anwendung kann solche Vorgänge mehrmals ausführen, allerdings mit Ergebnissen, die nur für bestimmte Zeiträume (Zeitfenster) gelten.

Zählung der Umtauschtransaktionen nach Käufer

Im nächsten Beispiel verfolgen wir Aktientransaktionen mehrerer Händler – entweder großer Organisationen oder intelligenter einzelner Finanziers.

Es gibt zwei mögliche Gründe für diese Nachverfolgung. Einer davon ist die Notwendigkeit zu wissen, was Marktführer kaufen/verkaufen. Wenn diese großen Player und erfahrenen Investoren Chancen sehen, ist es sinnvoll, ihrer Strategie zu folgen. Der zweite Grund ist der Wunsch, mögliche Anzeichen für illegalen Insiderhandel zu erkennen. Dazu müssen Sie den Zusammenhang zwischen großen Umsatzspitzen und wichtigen Pressemitteilungen analysieren.

Eine solche Nachverfolgung besteht aus den folgenden Schritten:

  • Erstellen eines Streams zum Lesen des Themas Aktientransaktionen;
  • Gruppierung eingehender Datensätze nach Käufer-ID und Aktiensymbol. Der Aufruf der Methode „groupBy“ gibt eine Instanz der Klasse „KGroupedStream“ zurück;
  • Die Methode KGroupedStream.windowedBy gibt einen auf ein Zeitfenster begrenzten Datenstrom zurück, der eine Fensteraggregation ermöglicht. Abhängig vom Fenstertyp wird entweder ein TimeWindowedKStream oder ein SessionWindowedKStream zurückgegeben;
  • Transaktionsanzahl für den Aggregationsvorgang. Der gefensterte Datenfluss bestimmt, ob ein bestimmter Datensatz bei dieser Zählung berücksichtigt wird;
  • Schreiben von Ergebnissen zu einem Thema oder Ausgeben dieser Ergebnisse an die Konsole während der Entwicklung.

Die Topologie dieser Anwendung ist einfach, aber ein klares Bild davon wäre hilfreich. Werfen wir einen Blick auf Abb. 5.11.

Als nächstes schauen wir uns die Funktionalität von Fensteroperationen und den entsprechenden Code an.

Das Buch „Kafka Streams in Action. Anwendungen und Microservices für die Echtzeitarbeit“

Fenstertypen

Es gibt drei Arten von Fenstern in Kafka Streams:

  • Sitzung;
  • „stolpern“ (stolpern);
  • rutschen/hüpfen.

Welche Sie wählen sollten, hängt von Ihren Geschäftsanforderungen ab. Wechsel- und Sprungfenster sind zeitlich begrenzt, während Sitzungsfenster durch die Benutzeraktivität begrenzt sind – die Dauer der Sitzung(en) wird ausschließlich davon bestimmt, wie aktiv der Benutzer ist. Beachten Sie vor allem, dass alle Fenstertypen auf den Datums-/Zeitstempeln der Einträge basieren, nicht auf der Systemzeit.

Als nächstes implementieren wir unsere Topologie mit jedem der Fenstertypen. Der vollständige Code wird nur im ersten Beispiel angegeben; für andere Fenstertypen ändert sich nichts außer der Art der Fensteroperation.

Sitzungsfenster

Sitzungsfenster unterscheiden sich stark von allen anderen Fenstertypen. Sie sind weniger durch die Zeit als vielmehr durch die Aktivität des Benutzers (oder die Aktivität der Entität, die Sie verfolgen möchten) begrenzt. Sitzungsfenster werden durch Zeiträume der Inaktivität begrenzt.

Abbildung 5.12 veranschaulicht das Konzept von Sitzungsfenstern. Die kleinere Sitzung wird mit der Sitzung links davon zusammengeführt. Und die Sitzung auf der rechten Seite wird separat sein, da sie auf eine lange Zeit der Inaktivität folgt. Sitzungsfenster basieren auf der Benutzeraktivität, verwenden jedoch Datums-/Zeitstempel von Einträgen, um zu bestimmen, zu welcher Sitzung der Eintrag gehört.

Das Buch „Kafka Streams in Action. Anwendungen und Microservices für die Echtzeitarbeit“

Verwendung von Sitzungsfenstern zur Verfolgung von Aktientransaktionen

Lassen Sie uns Sitzungsfenster verwenden, um Informationen über Börsentransaktionen zu erfassen. Die Implementierung von Sitzungsfenstern wird in Listing 5.5 gezeigt (zu finden in src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

Das Buch „Kafka Streams in Action. Anwendungen und Microservices für die Echtzeitarbeit“
Sie haben die meisten Vorgänge in dieser Topologie bereits gesehen, sodass es nicht nötig ist, sie hier noch einmal anzusehen. Aber auch hier gibt es einige neue Elemente, die wir nun besprechen werden.

Jeder „groupBy“-Vorgang führt normalerweise eine Art Aggregationsvorgang aus (Aggregation, Rollup oder Zählung). Sie können entweder eine kumulative Aggregation mit einer laufenden Summe oder eine Fensteraggregation durchführen, bei der Datensätze innerhalb eines bestimmten Zeitfensters berücksichtigt werden.

Der Code in Listing 5.5 zählt die Anzahl der Transaktionen innerhalb von Sitzungsfenstern. In Abb. 5.13 werden diese Aktionen Schritt für Schritt analysiert.

Durch den Aufruf von windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) erstellen wir ein Sitzungsfenster mit einem Inaktivitätsintervall von 20 Sekunden und einem Persistenzintervall von 15 Minuten. Ein Leerlaufintervall von 20 Sekunden bedeutet, dass die Anwendung jeden Eintrag, der innerhalb von 20 Sekunden nach Ende oder Start der aktuellen Sitzung eintrifft, in die aktuelle (aktive) Sitzung einfügt.

Das Buch „Kafka Streams in Action. Anwendungen und Microservices für die Echtzeitarbeit“
Als Nächstes geben wir an, welcher Aggregationsvorgang im Sitzungsfenster ausgeführt werden muss – in diesem Fall count. Wenn ein eingehender Eintrag außerhalb des Inaktivitätsfensters (auf beiden Seiten des Datums-/Zeitstempels) liegt, erstellt die Anwendung eine neue Sitzung. Unter Aufbewahrungsintervall versteht man das Aufrechterhalten einer Sitzung für einen bestimmten Zeitraum und ermöglicht verspätete Daten, die über den Inaktivitätszeitraum der Sitzung hinausgehen, aber dennoch angehängt werden können. Darüber hinaus entsprechen Beginn und Ende der neuen Sitzung, die sich aus der Zusammenführung ergibt, dem frühesten und neuesten Datums-/Zeitstempel.

Schauen wir uns einige Einträge der Zählmethode an, um zu sehen, wie Sitzungen funktionieren (Tabelle 5.1).

Das Buch „Kafka Streams in Action. Anwendungen und Microservices für die Echtzeitarbeit“
Wenn Datensätze eintreffen, suchen wir nach vorhandenen Sitzungen mit demselben Schlüssel, einer Endzeit, die kleiner als der aktuelle Datums-/Zeitstempel – Inaktivitätsintervall ist, und einer Startzeit, die größer als der aktuelle Datums-/Zeitstempel + Inaktivitätsintervall ist. Unter Berücksichtigung dessen vier Einträge aus der Tabelle. 5.1 werden wie folgt zu einer einzigen Sitzung zusammengeführt.

1. Datensatz 1 kommt zuerst an, daher ist die Startzeit gleich der Endzeit und beträgt 00:00:00.

2. Als nächstes kommt Eintrag 2 und wir suchen nach Sitzungen, die nicht früher als 23:59:55 enden und spätestens 00:00:35 beginnen. Wir finden Datensatz 1 und kombinieren die Sitzungen 1 und 2. Wir nehmen die Startzeit von Sitzung 1 (früher) und die Endzeit von Sitzung 2 (später), sodass unsere neue Sitzung um 00:00:00 Uhr beginnt und um 00 Uhr endet: 00:15.

3. Datensatz 3 kommt an, wir suchen nach Sitzungen zwischen 00:00:30 und 00:01:10 und finden keine. Fügen Sie eine zweite Sitzung für den Schlüssel 123-345-654,FFBE hinzu, die um 00:00:50 beginnt und endet.

4. Datensatz 4 kommt und wir suchen nach Sitzungen zwischen 23:59:45 und 00:00:25. Diesmal werden beide Sitzungen 1 und 2 gefunden. Alle drei Sitzungen werden zu einer zusammengefasst, mit einer Startzeit von 00:00:00 und einer Endzeit von 00:00:15.

Bei dem, was in diesem Abschnitt beschrieben wird, ist es wichtig, sich an die folgenden wichtigen Nuancen zu erinnern:

  • Sitzungen sind keine Fenster mit fester Größe. Die Dauer einer Sitzung wird durch die Aktivität innerhalb eines bestimmten Zeitraums bestimmt;
  • Die Datums-/Zeitstempel in den Daten bestimmen, ob das Ereignis in eine bestehende Sitzung oder in eine Leerlaufzeit fällt.

Als nächstes besprechen wir den nächsten Fenstertyp – „umkippende“ Fenster.

„Umkippende“ Fenster

Taumelnde Fenster erfassen Ereignisse, die in einen bestimmten Zeitraum fallen. Stellen Sie sich vor, Sie müssten alle 20 Sekunden alle Aktientransaktionen eines bestimmten Unternehmens erfassen, also erfassen Sie alle Ereignisse in diesem Zeitraum. Am Ende des 20-Sekunden-Intervalls rollt das Fenster um und wechselt zu einem neuen 20-Sekunden-Beobachtungsintervall. Abbildung 5.14 veranschaulicht diese Situation.

Das Buch „Kafka Streams in Action. Anwendungen und Microservices für die Echtzeitarbeit“
Wie Sie sehen, werden alle in den letzten 20 Sekunden empfangenen Ereignisse im Fenster angezeigt. Nach Ablauf dieser Zeitspanne wird ein neues Fenster erstellt.

Listing 5.6 zeigt Code, der die Verwendung von Taumelfenstern zur Erfassung von Aktientransaktionen alle 20 Sekunden demonstriert (zu finden in src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Das Buch „Kafka Streams in Action. Anwendungen und Microservices für die Echtzeitarbeit“
Mit dieser kleinen Änderung am TimeWindows.of-Methodenaufruf können Sie ein rollierendes Fenster verwenden. In diesem Beispiel wird die Methode Until() nicht aufgerufen, daher wird das Standardaufbewahrungsintervall von 24 Stunden verwendet.

Schließlich ist es an der Zeit, mit der letzten Fensteroption fortzufahren – dem „Hüpfen“ von Fenstern.

Schiebefenster („springende“) Fenster

Schiebe-/Sprungfenster ähneln Kippfenstern, weisen jedoch einen kleinen Unterschied auf. Schiebefenster warten nicht bis zum Ende des Zeitintervalls, bevor sie ein neues Fenster erstellen, um aktuelle Ereignisse zu verarbeiten. Sie starten neue Berechnungen nach einer Wartezeit, die kürzer als die Fensterdauer ist.

Um die Unterschiede zwischen Tumbling- und Jumping-Fenstern zu verdeutlichen, kehren wir zum Beispiel der Zählung von Börsentransaktionen zurück. Unser Ziel besteht immer noch darin, die Anzahl der Transaktionen zu zählen, aber wir möchten nicht die ganze Zeit warten, bevor wir den Zähler aktualisieren. Stattdessen werden wir den Zähler in kürzeren Abständen aktualisieren. Beispielsweise zählen wir weiterhin alle 20 Sekunden die Anzahl der Transaktionen, aktualisieren den Zähler jedoch alle 5 Sekunden, wie in Abb. 5.15. In diesem Fall erhalten wir drei Ergebnisfenster mit überlappenden Daten.

Das Buch „Kafka Streams in Action. Anwendungen und Microservices für die Echtzeitarbeit“
Listing 5.7 zeigt den Code zum Definieren von Schiebefenstern (zu finden in src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Das Buch „Kafka Streams in Action. Anwendungen und Microservices für die Echtzeitarbeit“
Ein rollierendes Fenster kann in ein springendes Fenster umgewandelt werden, indem ein Aufruf der advancedBy()-Methode hinzugefügt wird. Im gezeigten Beispiel beträgt das Speicherintervall 15 Minuten.

In diesem Abschnitt haben Sie gesehen, wie Sie Aggregationsergebnisse auf Zeitfenster beschränken. Insbesondere möchte ich, dass Sie sich an die folgenden drei Dinge aus diesem Abschnitt erinnern:

  • die Größe der Sitzungsfenster ist nicht durch den Zeitraum, sondern durch die Benutzeraktivität begrenzt;
  • „Taumelnde“ Fenster bieten einen Überblick über Ereignisse innerhalb eines bestimmten Zeitraums;
  • Die Dauer der springenden Fenster ist festgelegt, sie werden jedoch regelmäßig aktualisiert und können überlappende Einträge in allen Fenstern enthalten.

Als Nächstes lernen wir, wie man eine KTable für eine Verbindung wieder in einen KStream umwandelt.

5.3.3. Verbinden von KStream- und KTable-Objekten

In Kapitel 4 haben wir die Verbindung zweier KStream-Objekte besprochen. Jetzt müssen wir lernen, wie man KTable und KStream verbindet. Dies kann aus dem folgenden einfachen Grund erforderlich sein. KStream ist ein Stream von Datensätzen und KTable ist ein Stream von Datensatzaktualisierungen, aber manchmal möchten Sie dem Datensatzstream möglicherweise zusätzlichen Kontext hinzufügen, indem Sie Aktualisierungen aus der KTable verwenden.

Nehmen wir die Daten zur Anzahl der Börsentransaktionen und kombinieren sie mit den Börsennachrichten der relevanten Branchen. Hier ist, was Sie tun müssen, um dies zu erreichen, vorausgesetzt, Sie haben bereits Code.

  1. Konvertieren Sie ein KTable-Objekt mit Daten zur Anzahl der Aktientransaktionen in einen KStream und ersetzen Sie anschließend den Schlüssel durch den Schlüssel, der den Industriesektor angibt, der diesem Aktiensymbol entspricht.
  2. Erstellen Sie ein KTable-Objekt, das Daten aus einem Thema mit Börsennachrichten liest. Diese neue KTable wird nach Branchen kategorisiert.
  3. Verbinden Sie aktuelle Nachrichten mit Informationen zur Anzahl der Börsentransaktionen nach Branchen.

Sehen wir uns nun an, wie dieser Aktionsplan umgesetzt wird.

Konvertieren Sie KTable in KStream

Um KTable in KStream zu konvertieren, müssen Sie Folgendes tun.

  1. Rufen Sie die Methode KTable.toStream() auf.
  2. Ersetzen Sie durch Aufrufen der KStream.map-Methode den Schlüssel durch den Branchennamen und rufen Sie dann das TransactionSummary-Objekt aus der Windowed-Instanz ab.

Wir werden diese Operationen wie folgt miteinander verketten (der Code befindet sich in der Datei src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.8).

Das Buch „Kafka Streams in Action. Anwendungen und Microservices für die Echtzeitarbeit“
Da wir eine KStream.map-Operation ausführen, wird die zurückgegebene KStream-Instanz automatisch neu partitioniert, wenn sie in einer Verbindung verwendet wird.

Wir haben den Konvertierungsprozess abgeschlossen. Als nächstes müssen wir ein KTable-Objekt zum Lesen von Börsennachrichten erstellen.

Erstellung von KTable für Börsennachrichten

Glücklicherweise ist zum Erstellen eines KTable-Objekts nur eine Codezeile erforderlich (der Code befindet sich in src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.9).

Das Buch „Kafka Streams in Action. Anwendungen und Microservices für die Echtzeitarbeit“
Es ist zu beachten, dass keine Serde-Objekte angegeben werden müssen, da in den Einstellungen String-Serdes verwendet werden. Durch die Verwendung der EARLIEST-Enumeration wird die Tabelle außerdem gleich zu Beginn mit Datensätzen gefüllt.

Jetzt können wir zum letzten Schritt übergehen – der Verbindung.

Verknüpfung von Nachrichtenaktualisierungen mit Transaktionszahldaten

Eine Verbindung herzustellen ist nicht schwierig. Für den Fall, dass es für die entsprechende Branche keine Börsennachrichten gibt, nutzen wir einen Left-Join (den nötigen Code finden Sie in der Datei src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.10).

Das Buch „Kafka Streams in Action. Anwendungen und Microservices für die Echtzeitarbeit“
Dieser leftJoin-Operator ist recht einfach. Im Gegensatz zu den Joins in Kapitel 4 wird die JoinWindow-Methode nicht verwendet, da es bei der Durchführung eines KStream-KTable-Joins für jeden Schlüssel nur einen Eintrag in der KTable gibt. Eine solche Verbindung ist zeitlich nicht begrenzt: Der Datensatz befindet sich entweder in der KTable oder fehlt. Die wichtigste Schlussfolgerung: Mithilfe von KTable-Objekten können Sie KStream mit weniger häufig aktualisierten Referenzdaten anreichern.

Jetzt schauen wir uns eine effizientere Möglichkeit an, Ereignisse aus KStream anzureichern.

5.3.4. GlobalKTable-Objekte

Wie Sie sehen, besteht die Notwendigkeit, Ereignisströme anzureichern oder ihnen Kontext hinzuzufügen. In Kapitel 4 haben Sie die Verbindungen zwischen zwei KStream-Objekten gesehen, und im vorherigen Abschnitt haben Sie die Verbindung zwischen einem KStream und einer KTable gesehen. In all diesen Fällen ist es erforderlich, den Datenstrom neu zu partitionieren, wenn die Schlüssel einem neuen Typ oder Wert zugeordnet werden. Manchmal erfolgt die Neupartitionierung explizit und manchmal erfolgt dies automatisch durch Kafka Streams. Eine Neupartitionierung ist notwendig, da sich die Schlüssel geändert haben und die Datensätze in neuen Abschnitten landen müssen, andernfalls ist die Verbindung unmöglich (dies wurde in Kapitel 4 im Abschnitt „Neupartitionierung von Daten“ in Unterabschnitt 4.2.4 besprochen).

Eine Neupartitionierung ist mit Kosten verbunden

Eine Neupartitionierung erfordert Kosten – zusätzliche Ressourcenkosten für die Erstellung von Zwischenthemen und die Speicherung doppelter Daten in einem anderen Thema; Dies bedeutet auch eine erhöhte Latenz beim Schreiben und Lesen dieses Themas. Wenn Sie außerdem über mehr als einen Aspekt oder eine Dimension hinweg verknüpfen müssen, müssen Sie die Verknüpfungen verketten, die Datensätze neuen Schlüsseln zuordnen und den Neupartitionierungsprozess erneut ausführen.

Verbindung zu kleineren Datensätzen herstellen

In einigen Fällen ist die Menge der zu verbindenden Referenzdaten relativ gering, sodass vollständige Kopien davon problemlos lokal auf jedem Knoten Platz finden. Für Situationen wie diese stellt Kafka Streams die GlobalKTable-Klasse bereit.

GlobalKTable-Instanzen sind einzigartig, da die Anwendung alle Daten auf jeden der Knoten repliziert. Und da alle Daten auf jedem Knoten vorhanden sind, besteht keine Notwendigkeit, den Ereignisstrom nach Referenzdatenschlüssel zu partitionieren, damit er für alle Partitionen verfügbar ist. Sie können auch schlüssellose Verknüpfungen mithilfe von GlobalKTable-Objekten durchführen. Kehren wir zu einem der vorherigen Beispiele zurück, um diese Funktion zu demonstrieren.

KStream-Objekte mit GlobalKTable-Objekten verbinden

In Unterabschnitt 5.3.2 haben wir eine Fensteraggregation von Börsentransaktionen durch Käufer durchgeführt. Die Ergebnisse dieser Aggregation sahen etwa so aus:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Obwohl diese Ergebnisse ihren Zweck erfüllten, wäre es sinnvoller gewesen, wenn auch der Name des Kunden und der vollständige Firmenname angezeigt worden wären. Um den Kundennamen und den Firmennamen hinzuzufügen, können Sie normale Verknüpfungen durchführen, müssen jedoch zwei Schlüsselzuordnungen und eine Neupartitionierung durchführen. Mit GlobalKTable können Sie die Kosten für solche Operationen vermeiden.

Dazu verwenden wir das countStream-Objekt aus Listing 5.11 (den entsprechenden Code finden Sie in src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) und verbinden es mit zwei GlobalKTable-Objekten.

Das Buch „Kafka Streams in Action. Anwendungen und Microservices für die Echtzeitarbeit“
Wir haben das bereits besprochen, deshalb werde ich es nicht wiederholen. Ich stelle jedoch fest, dass der Code in der Funktion toStream().map aus Gründen der Lesbarkeit in ein Funktionsobjekt statt in einen Inline-Lambda-Ausdruck abstrahiert ist.

Der nächste Schritt besteht darin, zwei Instanzen von GlobalKTable zu deklarieren (der angezeigte Code befindet sich in der Datei src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listing 5.12).

Das Buch „Kafka Streams in Action. Anwendungen und Microservices für die Echtzeitarbeit“

Bitte beachten Sie, dass Themennamen mithilfe von Aufzählungstypen beschrieben werden.

Nachdem wir nun alle Komponenten bereit haben, müssen wir nur noch den Code für die Verbindung schreiben (zu finden in der Datei src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listing 5.13).

Das Buch „Kafka Streams in Action. Anwendungen und Microservices für die Echtzeitarbeit“
Obwohl es in diesem Code zwei Joins gibt, sind sie verkettet, da keines ihrer Ergebnisse separat verwendet wird. Die Ergebnisse werden am Ende des gesamten Vorgangs angezeigt.

Wenn Sie den oben genannten Join-Vorgang ausführen, erhalten Sie folgende Ergebnisse:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Das Wesentliche hat sich nicht geändert, aber diese Ergebnisse sehen klarer aus.

Wenn Sie bis Kapitel 4 herunterzählen, haben Sie bereits mehrere Arten von Verbindungen in Aktion gesehen. Sie sind in der Tabelle aufgeführt. 5.2. Diese Tabelle spiegelt die Konnektivitätsfunktionen ab Version 1.0.0 von Kafka Streams wider; In zukünftigen Versionen kann sich etwas ändern.

Das Buch „Kafka Streams in Action. Anwendungen und Microservices für die Echtzeitarbeit“
Lassen Sie uns zum Abschluss noch einmal die Grundlagen zusammenfassen: Sie können Ereignisströme (KStream) und Aktualisierungsströme (KTable) mithilfe des lokalen Status verbinden. Wenn die Referenzdaten nicht zu groß sind, können Sie alternativ das GlobalKTable-Objekt verwenden. GlobalKTables repliziert alle Partitionen auf jeden Kafka Streams-Anwendungsknoten und stellt so sicher, dass alle Daten verfügbar sind, unabhängig davon, welcher Partition der Schlüssel entspricht.

Als nächstes sehen wir uns die Kafka Streams-Funktion an, mit der wir Zustandsänderungen beobachten können, ohne Daten aus einem Kafka-Thema zu verbrauchen.

5.3.5. Abfragbarer Zustand

Wir haben bereits mehrere Operationen mit Status durchgeführt und geben die Ergebnisse immer auf der Konsole aus (für Entwicklungszwecke) oder schreiben sie in ein Thema (für Produktionszwecke). Wenn Sie Ergebnisse zu einem Thema schreiben, müssen Sie einen Kafka-Consumer verwenden, um diese anzuzeigen.

Das Lesen von Daten aus diesen Themen kann als eine Art materialisierter Ansichten betrachtet werden. Für unsere Zwecke können wir die Definition einer materialisierten Ansicht aus Wikipedia verwenden: „... ein physisches Datenbankobjekt, das die Ergebnisse einer Abfrage enthält. Es könnte sich beispielsweise um eine lokale Kopie entfernter Daten oder eine Teilmenge der Zeilen und/oder Spalten einer Tabelle oder Join-Ergebnisse oder eine durch Aggregation erhaltene Übersichtstabelle handeln“ (https://en.wikipedia.org/wiki /Materialized_view).

Mit Kafka Streams können Sie außerdem interaktive Abfragen in staatlichen Stores ausführen und so diese materialisierten Ansichten direkt lesen. Es ist wichtig zu beachten, dass die Abfrage an den Statusspeicher ein schreibgeschützter Vorgang ist. Dadurch wird sichergestellt, dass Sie sich keine Sorgen machen müssen, dass der Status versehentlich inkonsistent wird, während Ihre Anwendung Daten verarbeitet.

Die Möglichkeit, staatliche Geschäfte direkt abzufragen, ist wichtig. Das bedeutet, dass Sie Dashboard-Anwendungen erstellen können, ohne zunächst Daten vom Kafka-Consumer abrufen zu müssen. Es erhöht auch die Effizienz der Anwendung, da kein erneutes Schreiben von Daten erforderlich ist:

  • Dank der Lokalität der Daten ist ein schneller Zugriff möglich.
  • Eine Duplizierung der Daten entfällt, da diese nicht auf einen externen Speicher geschrieben werden.

Ich möchte Sie vor allem daran erinnern, dass Sie den Status direkt aus Ihrer Anwendung heraus abfragen können. Die Möglichkeiten, die sich Ihnen dadurch bieten, können nicht genug betont werden. Anstatt Daten von Kafka zu nutzen und Datensätze in einer Datenbank für die Anwendung zu speichern, können Sie Statusspeicher mit demselben Ergebnis abfragen. Direkte Abfragen an staatliche Speicher bedeuten weniger Code (kein Verbraucher) und weniger Software (keine Datenbanktabelle zum Speichern der Ergebnisse erforderlich).

Da wir in diesem Kapitel schon einiges behandelt haben, lassen wir die Erörterung interaktiver Abfragen an staatliche Geschäfte vorerst. Aber keine Sorge: In Kapitel 9 erstellen wir eine einfache Dashboard-Anwendung mit interaktiven Abfragen. Anhand einiger Beispiele aus diesem und früheren Kapiteln werden interaktive Abfragen veranschaulicht und erläutert, wie Sie diese zu Kafka Streams-Anwendungen hinzufügen können.

Zusammenfassung

  • KStream-Objekte stellen Ereignisströme dar, vergleichbar mit Einfügungen in eine Datenbank. KTable-Objekte stellen Aktualisierungsströme dar, eher wie Aktualisierungen einer Datenbank. Die Größe des KTable-Objekts wächst nicht, alte Datensätze werden durch neue ersetzt.
  • Für Aggregationsvorgänge sind KTable-Objekte erforderlich.
  • Mithilfe von Fensteroperationen können Sie aggregierte Daten in Zeitfenster aufteilen.
  • Dank GlobalKTable-Objekten können Sie unabhängig von der Partitionierung überall in der Anwendung auf Referenzdaten zugreifen.
  • Verbindungen zwischen KStream-, KTable- und GlobalKTable-Objekten sind möglich.

Bisher haben wir uns auf die Erstellung von Kafka Streams-Anwendungen mit dem High-Level-KStream-DSL konzentriert. Obwohl der High-Level-Ansatz es Ihnen ermöglicht, übersichtliche und prägnante Programme zu erstellen, stellt die Verwendung einen Kompromiss dar. Wenn Sie mit DSL KStream arbeiten, erhöhen Sie die Prägnanz Ihres Codes, indem Sie den Grad der Kontrolle verringern. Im nächsten Kapitel werden wir uns die Low-Level-Handler-Knoten-API ansehen und andere Kompromisse ausprobieren. Die Programme werden länger sein als zuvor, aber wir werden in der Lage sein, fast jeden Handler-Knoten zu erstellen, den wir benötigen.

→ Weitere Details zum Buch finden Sie unter Website des Verlags

→ Für Habrozhiteli 25 % Rabatt mit Gutschein - Kafka-Bäche

→ Nach Bezahlung der Papierversion des Buches wird ein elektronisches Buch per E-Mail verschickt.

Source: habr.com

Kommentar hinzufügen