著曞『Kafka Streams in Action. リアルタむム䜜業のためのアプリケヌションずマむクロサヌビス」

著曞『Kafka Streams in Action. リアルタむム䜜業のためのアプリケヌションずマむクロサヌビス」 ハブロ䜏民の皆さん、こんにちは この本は、スレッド凊理を理解したいすべおの開発者に適しおいたす。 分散プログラミングを理解するず、Kafka ず Kafka Streams をより深く理解できるようになりたす。 Kafka フレヌムワヌク自䜓を知っおいるず良いですが、必須ではありたせん。必芁なこずはすべお説明したす。 経隓豊富な Kafka 開発者も初心者も、この本で Kafka Streams ラむブラリを䜿甚しお興味深いストリヌム凊理アプリケヌションを䜜成する方法を孊びたす。 シリアル化などの抂念にすでに慣れおいる䞭玚および䞊玚の Java 開発者は、スキルを応甚しお Kafka Streams アプリケヌションを䜜成する方法を孊びたす。 この本の゜ヌス コヌドは Java 8 で曞かれおおり、Java 8 ラムダ匏構文を倚甚しおいるため、(別のプログラミング蚀語であっおも) ラムダ関数の操䜜方法を知っおおくず圹に立ちたす。

抜粋。 5.3. 集蚈およびりィンドり操䜜

このセクションでは、Kafka Streams の最も有望な郚分を怜蚎しおいきたす。 これたで、Kafka ストリヌムの次の偎面に぀いお説明しおきたした。

  • 凊理トポロゞを䜜成する。
  • ストリヌミング アプリケヌションでの状態の䜿甚。
  • デヌタストリヌム接続を実行したす。
  • むベント ストリヌム (KStream) ず曎新ストリヌム (KTable) の違い。

次の䟋では、これらすべおの芁玠をたずめたす。 たた、ストリヌミング アプリケヌションのもう XNUMX ぀の優れた機胜であるりィンドり凊理に぀いおも孊習したす。 最初の䟋は単玔な集蚈です。

5.3.1. 業皮別株匏売䞊高の集蚈

集玄ずグルヌプ化は、ストリヌミング デヌタを操䜜する堎合に重芁なツヌルです。 個々の蚘録を受け取った際の怜査だけでは䞍十分なこずがよくありたす。 デヌタから远加情報を抜出するには、デヌタをグルヌプ化しお結合する必芁がありたす。

この䟋では、いく぀かの業界の䌁業の株匏の販売量を远跡する必芁があるデむトレヌダヌのコスチュヌムを着たす。 具䜓的には、各業界で最倧のシェア売䞊高を誇る XNUMX 瀟に興味がありたす。

このような集蚈では、デヌタを目的の圢匏に倉換するために次のいく぀かの手順が必芁になりたす (䞀般的に蚀えば)。

  1. 生の株取匕情報を公開するトピックベヌスの゜ヌスを䜜成したす。 StockTransaction 型のオブゞェクトを ShareVolume 型のオブゞェクトにマップする必芁がありたす。 重芁なのは、StockTransaction オブゞェクトには販売メタデヌタが含たれおいたすが、必芁なのは販売されおいる株匏数に関するデヌタだけであるずいうこずです。
  2. ShareVolume デヌタを銘柄蚘号ごずにグルヌプ化したす。 シンボルごずにグルヌプ化するず、このデヌタを株匏販売量の小蚈に折りたたむこずができたす。 KStream.groupBy メ゜ッドは KGroupedStream 型のむンスタンスを返すこずに泚意しおください。 さらに、KGroupedStream.reduce メ゜ッドを呌び出すこずで、KTable むンスタンスを取埗できたす。

KGroupedStream むンタヌフェむスずは

KStream.groupBy メ゜ッドず KStream.groupByKey メ゜ッドは、KGroupedStream のむンスタンスを返したす。 KGroupedStream は、キヌによっおグルヌプ化された埌のむベントのストリヌムの䞭間衚珟です。 これは、盎接䜜業するこずを目的ずしたものではありたせん。 代わりに、KGroupedStream が集蚈操䜜に䜿甚され、その結果は垞に KTable になりたす。 たた、集蚈操䜜の結果は KTable であり、状態ストアを䜿甚するため、結果ずしおのすべおの曎新がパむプラむンのさらに䞋流に送信されるわけではない可胜性がありたす。

KTable.groupBy メ゜ッドは、同様の KGroupedTable (キヌによっお再グルヌプ化された、曎新ストリヌムの䞭間衚珟) を返したす。

少し䌑憩しお図を芋おみたしょう。 5.9 は、私たちが達成したこずを瀺しおいたす。 このトポロゞはすでによく知られおいるはずです。

著曞『Kafka Streams in Action. リアルタむム䜜業のためのアプリケヌションずマむクロサヌビス」
次に、このトポロゞヌのコヌドを芋おみたしょう (ファむル src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java にありたす) (リスト 5.2)。

著曞『Kafka Streams in Action. リアルタむム䜜業のためのアプリケヌションずマむクロサヌビス」
指定されたコヌドは、その簡朔さず、数行で実行される倧量のアクションによっお区別されたす。 builder.stream メ゜ッドの最初のパラメヌタヌに新しい点があるこずに気づくかもしれたせん。列挙型 AutoOffsetReset.EARLIEST (LATEST もありたす) の倀は、Consumed.withOffsetResetPolicy メ゜ッドを䜿甚しお蚭定されたす。 この列挙型は、各 KStream たたは KTable のオフセット リセット戊略を指定するために䜿甚でき、構成のオフセット リセット オプションよりも優先されたす。

GroupByKey ず GroupBy

KStream むンタヌフェむスには、レコヌドをグルヌプ化するための XNUMX ぀のメ゜ッド、GroupByKey ず GroupBy がありたす。 どちらも KGroupedTable を返すので、䞡者の違いは䜕なのか、い぀どちらを䜿甚すればよいのか疑問に思われるかもしれたせん。

GroupByKey メ゜ッドは、KStream 内のキヌがすでに空でない堎合に䜿甚されたす。 そしお最も重芁なこずは、「再パヌティション化が必芁」フラグが蚭定されおいないこずです。

GroupBy メ゜ッドは、グルヌプ化キヌが倉曎されたこずを前提ずしおいるため、再分割フラグが true に蚭定されおいたす。 GroupBy メ゜ッドの埌に結合や集蚈などを実行するず、自動的に再パヌティション化されたす。
抂芁: 可胜な限り、GroupBy ではなく GroupByKey を䜿甚する必芁がありたす。

mapValues メ゜ッドず groupBy メ゜ッドが䜕を行うかは明らかなので、sum() メ゜ッド (src/main/java/bbejeck/model/ShareVolume.java にありたす) を芋おみたしょう (リスト 5.3)。

著曞『Kafka Streams in Action. リアルタむム䜜業のためのアプリケヌションずマむクロサヌビス」
ShareVolume.sum メ゜ッドは圚庫販売量の环蚈を返し、䞀連の蚈算党䜓の結果が KTable オブゞェクトになりたす。 。 これで、KTable が果たす圹割が理解できたした。 ShareVolume オブゞェクトが到着するず、察応する KTable オブゞェクトに最新の曎新が保存されたす。 すべおの曎新は前の shareVolumeKTable に反映されたすが、すべおがさらに送信されるわけではないこずに泚意するこずが重芁です。

次に、この KTable を䜿甚しお (取匕株匏数ごずに) 集蚈し、各業界で最も取匕高の倚い XNUMX 瀟を導き出したす。 この堎合のアクションは、最初の集蚈の堎合ず同様になりたす。

  1. 別の groupBy 操䜜を実行しお、個々の ShareVolume オブゞェクトを業界ごずにグルヌプ化したす。
  2. ShareVolume オブゞェクトの芁玄を開始したす。 今回の集玄オブゞェクトは固定サむズの優先キュヌです。 この固定サむズのキュヌでは、売华された株匏の量が最も倚い XNUMX 瀟だけが保持されたす。
  3. 前の段萜のキュヌを文字列倀にマップし、業界ごずに最も取匕されおいる䞊䜍 XNUMX ぀の銘柄を数倀で返したす。
  4. 結果を文字列圢匏でトピックに曞き蟌みたす。

図では、 図 5.10 にデヌタ フロヌ トポロゞ グラフを瀺したす。 ご芧のずおり、XNUMX 回目の凊理は非垞に単玔です。

著曞『Kafka Streams in Action. リアルタむム䜜業のためのアプリケヌションずマむクロサヌビス」
この 5 回目の凊理の構造を明確に理解したので、゜ヌス コヌドに移りたす (ファむル src/main/java/bbejeck/chapter_5.4/AggregationsAndReducingExample.java にありたす) (リスト XNUMX)。 。

このむニシャラむザには、fixedQueue 倉数が含たれおいたす。 これは、取匕された株匏の降順で䞊䜍 N 件の結果を远跡するために䜿甚される java.util.TreeSet のアダプタヌであるカスタム オブゞェクトです。

著曞『Kafka Streams in Action. リアルタむム䜜業のためのアプリケヌションずマむクロサヌビス」
groupBy 呌び出しず mapValues 呌び出しに぀いおはすでに説明したので、それらに぀いおは説明したせん (KTable.print メ゜ッドは非掚奚であるため、KTable.toStream メ゜ッドを呌び出しおいたす)。 ただし、aggregate() の KTable バヌゞョンをただ芋おいないので、少し時間をかけお説明したす。

芚えおいるずおり、KTable が異なるのは、同じキヌを持぀レコヌドが曎新ず芋なされるこずです。 KTable は叀い゚ントリを新しい゚ントリに眮き換えたす。 集蚈も同様の方法で行われ、同じキヌを持぀最新のレコヌドが集蚈されたす。 レコヌドが到着するず、加算噚 (集玄メ゜ッド呌び出しの XNUMX 番目のパラメヌタヌ) を䜿甚しお、FixedSizePriorityQueue クラスのむンスタンスに远加されたすが、同じキヌを持぀別のレコヌドが既に存圚する堎合は、枛算噚 (集玄メ゜ッド呌び出しの XNUMX 番目のパラメヌタヌ) を䜿甚しお叀いレコヌドが削陀されたす。集玄メ゜ッド呌び出し)。

これは、アグリゲヌタヌであるFixedSizePriorityQueueが、すべおの倀をXNUMX぀のキヌで集蚈するのではなく、最も取匕されおいるN皮類の株の数量の移動合蚈を保存するこずを意味したす。 受信した各゚ントリには、これたでに販売された株匏の総数が含たれおいたす。 KTable は、各曎新のロヌリング集蚈を必芁ずせずに、珟圚どの䌁業の株が最も取匕されおいるかに関する情報を提䟛したす。

私たちは XNUMX ぀の重芁なこずを行うこずを孊びたした。

  • KTable 内の倀を共通キヌでグルヌプ化したす。
  • これらのグルヌプ化された倀に察しおロヌルアップや集蚈などの䟿利な操䜜を実行したす。

これらの操䜜の実行方法を知るこずは、Kafka Streams アプリケヌションを介しお移動するデヌタの意味を理解し、デヌタがどのような情報を運ぶかを理解するために重芁です。

たた、本曞の前半で説明した重芁な抂念のいく぀かもたずめたした。 第 4 章では、ストリヌミング アプリケヌションにずっおフォヌルト トレラントなロヌカル状態がいかに重芁であるかに぀いお説明したした。 この章の最初の䟋では、ロヌカル状態がなぜ非垞に重芁であるかを説明したした。ロヌカル状態により、すでに芋た情報を远跡できるようになりたす。 ロヌカル アクセスによりネットワヌク遅延が回避され、アプリケヌションのパフォヌマンスが向䞊し、゚ラヌ耐性が向䞊したす。

ロヌルアップたたは集蚈操䜜を実行するずきは、状態ストアの名前を指定する必芁がありたす。 ロヌルアップおよび集蚈操䜜は KTable むンスタンスを返し、KTable は状態ストレヌゞを䜿甚しお叀い結果を新しい結果に眮き換えたす。 これたで芋おきたように、すべおの曎新がパむプラむンに送信されるわけではありたせん。集蚈操䜜は抂芁情報を生成するように蚭蚈されおいるため、これは重芁です。 ロヌカル状態を適甚しない堎合、KTable はすべおの集蚈結果ずロヌルアップ結果を転送したす。

次に、特定の期間内での集蚈などの操䜜、いわゆるりィンドり操䜜の実行に぀いお芋おいきたす。

5.3.2. りィンドり操䜜

前のセクションでは、スラむディング畳み蟌みず集蚈に぀いお玹介したした。 このアプリケヌションは、株匏販売量の継続的なロヌルアップを実行し、その埌、取匕所で最も取匕されおいる XNUMX ぀の株匏を集蚈したした。

堎合によっおは、このような継続的な集蚈ず結果のロヌルアップが必芁になるこずがありたす。 たた、特定の期間にのみ操䜜を実行する必芁がある堎合もありたす。 たずえば、過去 10 分間に特定の䌁業の株匏に察しお行われた為替取匕の数を蚈算したす。 たたは、過去 15 分間に新しい広告バナヌをクリックしたナヌザヌの数。 アプリケヌションはこのような操䜜を耇数回実行する堎合がありたすが、その結果は指定された期間 (タむム りィンドり) にのみ適甚されたす。

買い手ごずの為替取匕のカりント

次の䟋では、耇数のトレヌダヌ (倧芏暡な組織たたは賢明な個人投資家) にわたる株匏取匕を远跡したす。

この远跡には XNUMX ぀の理由が考えられたす。 その XNUMX ぀は、垂堎リヌダヌが䜕を売買しおいるかを知る必芁性です。 これらの倧手䌁業や掗緎された投資家がチャンスを芋出したのであれば、圌らの戊略に埓うのは理にかなっおいたす。 XNUMX 番目の理由は、違法なむンサむダヌ取匕の可胜性のある兆候を芋぀けたいずいう欲求です。 これを行うには、倧芏暡な売䞊の急増ず重芁なプレス リリヌスの盞関関係を分析する必芁がありたす。

このような远跡は次の手順で構成されたす。

  • 株匏取匕トピックから読み取るためのストリヌムを䜜成したす。
  • 受信レコヌドを賌入者 ID ず銘柄蚘号ごずにグルヌプ化したす。 groupBy メ゜ッドを呌び出すず、KGroupedStream クラスのむンスタンスが返されたす。
  • KGroupedStream.windowedBy メ゜ッドは、時間りィンドりに限定されたデヌタ ストリヌムを返したす。これにより、りィンドり集蚈が可胜になりたす。 りィンドりの皮類に応じお、TimeWindowedKStream たたは SessionWindowedKStream が返されたす。
  • 集蚈操䜜のトランザクション数。 りィンドり化されたデヌタ フロヌによっお、このカりントで特定のレコヌドが考慮されるかどうかが決たりたす。
  • 開発䞭に結果をトピックに曞き蟌むか、コン゜ヌルに出力したす。

このアプリケヌションのトポロゞは単玔ですが、明確な図があるず圹立ちたす。 図を芋おみたしょう。 5.11。

次に、りィンドり操䜜の機胜ず察応するコヌドを芋おいきたす。

著曞『Kafka Streams in Action. リアルタむム䜜業のためのアプリケヌションずマむクロサヌビス」

窓の皮類

Kafka Streams には XNUMX 皮類のりィンドりがありたす。

  • セッション的;
  • 「タンブリング」タンブリング。
  • スラむディング/ホッピング。

どちらを遞択するかは、ビゞネス芁件によっお異なりたす。 タンブリング りィンドりずゞャンピング りィンドりは時間制限がありたすが、セッション りィンドりはナヌザヌのアクティビティによっお制限されたす。セッションの継続時間はナヌザヌがどれだけアクティブであるかによっおのみ決たりたす。 芚えおおくべき䞻な点は、すべおのりィンドり タむプはシステム時間ではなく、゚ントリの日付/タむム スタンプに基づいおいるずいうこずです。

次に、各りィンドり タむプでトポロゞを実装したす。 完党なコヌドは最初の䟋でのみ瀺されおおり、他のタむプのりィンドりの堎合は、りィンドり操䜜のタむプ以倖は䜕も倉わりたせん。

セッションりィンドり

セッション りィンドりは、他の皮類のりィンドりずは倧きく異なりたす。 これらは、時間ではなく、ナヌザヌのアクティビティ (たたは远跡する゚ンティティのアクティビティ) によっお制限されたす。 セッション りィンドりは、非アクティブな期間によっお区切られたす。

図 5.12 にセッション りィンドりの抂念を瀺したす。 小さいセッションは、その巊偎のセッションずマヌゞされたす。 右偎のセッションは、長期間非アクティブな状態が続くため、分離されたす。 セッション りィンドりはナヌザヌ アクティビティに基づいおいたすが、゚ントリの日付/時刻スタンプを䜿甚しお、゚ントリがどのセッションに属しおいるかを刀断したす。

著曞『Kafka Streams in Action. リアルタむム䜜業のためのアプリケヌションずマむクロサヌビス」

セッションりィンドりを䜿甚しお株匏取匕を远跡する

セッション りィンドりを䜿甚しお、為替トランザクションに関する情報を取埗しおみたしょう。 セッションりィンドりの実装をリスト 5.5 に瀺したす (これは src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java にありたす)。

著曞『Kafka Streams in Action. リアルタむム䜜業のためのアプリケヌションずマむクロサヌビス」
このトポロゞのほずんどの操䜜はすでに芋おきたので、ここで再床確認する必芁はありたせん。 ただし、ここにはいく぀かの新しい芁玠もありたすので、それに぀いおはこれから説明したす。

通垞、groupBy 操䜜は、ある皮の集蚈操䜜 (集蚈、ロヌルアップ、たたはカりント) を実行したす。 珟圚たでの合蚈を䜿甚した环積集蚈、たたは指定した時間枠内のレコヌドを考慮するりィンドり集蚈のいずれかを実行できたす。

リスト 5.5 のコヌドは、セッション りィンドり内のトランザクションの数をカりントしたす。 図では、 5.13 これらのアクションは段階的に分析されたす。

windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) を呌び出すこずで、非アクティブ期間が 20 秒、持続期間が 15 分のセッション りィンドりを䜜成したす。 20 秒のアむドル間隔は、アプリケヌションが珟圚のセッションの終了たたは開始から 20 秒以内に到着した゚ントリを珟圚の (アクティブな) セッションに含めるこずを意味したす。

著曞『Kafka Streams in Action. リアルタむム䜜業のためのアプリケヌションずマむクロサヌビス」
次に、セッション りィンドりでどの集蚈操䜜を実行する必芁があるかを指定したす (この堎合は count)。 受信゚ントリが非アクティブ期間 (日付/タむムスタンプのどちらかの偎) の倖偎にある堎合、アプリケヌションは新しいセッションを䜜成したす。 保持間隔ずは、セッションを䞀定時間維持するこずを意味し、セッションの非アクティブ期間を超えおも接続できる遅延デヌタを蚱可したす。 さらに、マヌゞの結果ずしお埗られる新しいセッションの開始ず終了は、最も叀い日付/タむム スタンプず最新の日付/タむム スタンプに察応したす。

count メ゜ッドのいく぀かの゚ントリを芋お、セッションがどのように機胜するかを芋おみたしょう (è¡š 5.1)。

著曞『Kafka Streams in Action. リアルタむム䜜業のためのアプリケヌションずマむクロサヌビス」
レコヌドが到着するず、同じキヌ、珟圚の日付/タむムスタンプ - 非アクティブ期間よりも短い終了時刻、および珟圚の日付/タむムスタンプ + 非アクティブ期間よりも倧きい開始時刻を持぀既存のセッションが怜玢されたす。 これを考慮するず、衚から 5.1 ぀の゚ントリが埗られたす。 XNUMX は次のように XNUMX ぀のセッションにマヌゞされたす。

1. レコヌド 1 が最初に到着するため、開始時刻は終了時刻ず等しく、00:00:00 になりたす。

2. 次に、゚ントリ 2 が到着し、23:59:55 以降に終了し、00:00:35 以降に開始するセッションを探したす。 レコヌド 1 を芋぀けお、セッション 1 ず 2 を結合したす。セッション 1 の開始時刻 (早い) ずセッション 2 の終了時刻 (遅い) を取埗し、新しいセッションが 00:00:00 に開始し、00 に終了するようにしたす。 00:15。

3. レコヌド 3 が到着するず、00:00:30 ず 00:01:10 の間のセッションを探したすが、芋぀かりたせん。 キヌ 123-345-654,FFBE の 00 番目のセッションを远加し、00:50:XNUMX に開始しお終了したす。

4. レコヌド 4 が到着し、23:59:45 から 00:00:25 たでのセッションを探しおいたす。 今回はセッション 1 ず 2 の䞡方が芋぀かり、開始時刻が 00:00:00、終了時刻が 00:00:15 の XNUMX ぀のセッションが XNUMX ぀に結合されたす。

このセクションの説明から、次の重芁なニュアンスを芚えおおく䟡倀がありたす。

  • セッションは固定サむズのりィンドりではありたせん。 セッションの継続時間は、指定された期間内のアクティビティによっお決たりたす。
  • デヌタ内の日付/時刻スタンプによっお、むベントが既存のセッション内に該圓するか、アむドル期間内に該圓するかが決たりたす。

次に、次のタむプの窓である「タンブリング」窓に぀いお説明したす。

「タンブリング」りィンドり

タンブリング りィンドりは、特定の期間内に発生するむベントをキャプチャしたす。 特定の䌁業のすべおの株匏取匕を 20 秒ごずにキャプチャする必芁があり、その期間䞭のすべおのむベントを収集するずしたす。 20 秒の間隔が終了するず、りィンドりが切り替わり、新しい 20 秒の芳察間隔に移動したす。 図 5.14 はこの状況を瀺しおいたす。

著曞『Kafka Streams in Action. リアルタむム䜜業のためのアプリケヌションずマむクロサヌビス」
ご芧のずおり、過去 20 秒間に受信したすべおのむベントがりィンドりに含たれおいたす。 この期間が終了するず、新しいりィンドりが䜜成されたす。

リスト 5.6 は、タンブリング りィンドりを䜿甚しお 20 秒ごずに株匏取匕をキャプチャするコヌドを瀺しおいたす (src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java にありたす)。

著曞『Kafka Streams in Action. リアルタむム䜜業のためのアプリケヌションずマむクロサヌビス」
TimeWindows.of メ゜ッド呌び出しに察するこの小さな倉曎により、タンブリング りィンドりを䜿甚できるようになりたす。 この䟋では until() メ゜ッドを呌び出しおいないため、デフォルトの保持間隔である 24 時間が䜿甚されたす。

最埌に、りィンドり オプションの最埌の「ホッピング」りィンドりに進みたす。

スラむド (「ゞャンピング」) りィンドり

スラむディング/ホッピング りィンドりはタンブリング りィンドりに䌌おいたすが、若干の違いがありたす。 スラむディング りィンドりは、最近のむベントを凊理するための新しいりィンドりを䜜成する前に、時間間隔が終了するたで埅機したせん。 りィンドり期間よりも短い埅機間隔の埌に、新しい蚈算が開​​始されたす。

タンブリング りィンドりずゞャンピング りィンドりの違いを説明するために、蚌刞取匕所の取匕をカりントする䟋に戻りたしょう。 私たちの目暙は䟝然ずしおトランザクション数をカりントするこずですが、カりンタヌを曎新するたでずっず埅機する必芁はありたせん。 代わりに、より短い間隔でカりンタヌを曎新したす。 たずえば、図に瀺すように、トランザクション数は匕き続き 20 秒ごずにカりントしたすが、カりンタは 5 秒ごずに曎新されたす。 5.15。 この堎合、最終的にデヌタが重耇する XNUMX ぀の結果りィンドりが衚瀺されたす。

著曞『Kafka Streams in Action. リアルタむム䜜業のためのアプリケヌションずマむクロサヌビス」
リスト 5.7 は、スラむディング りィンドりを定矩するコヌドを瀺しおいたす (src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java にありたす)。

著曞『Kafka Streams in Action. リアルタむム䜜業のためのアプリケヌションずマむクロサヌビス」
タンブリング りィンドりは、advancedBy() メ゜ッドぞの呌び出しを远加するこずでホッピング りィンドりに倉換できたす。 瀺されおいる䟋では、保存間隔は 15 分です。

このセクションでは、集蚈結果を時間枠に制限する方法を説明したした。 このセクションで特に次の XNUMX ぀のこずを芚えおおいおください。

  • セッション りィンドりのサむズは、期間ではなく、ナヌザヌのアクティビティによっお制限されたす。
  • 「タンブリング」りィンドりは、䞀定期間内のむベントの抂芁を提䟛したす。
  • ゞャンプ りィンドりの期間は固定されおいたすが、頻繁に曎新されるため、すべおのりィンドりに重耇する゚ントリが含たれる堎合がありたす。

次に、接続のために KTable を KStream に倉換する方法を孊びたす。

5.3.3. KStream オブゞェクトず KTable オブゞェクトの接続

第 4 章では、XNUMX ぀の KStream オブゞェクトの接続に぀いお説明したした。 次に、KTable ず KStream を接続する方法を孊習する必芁がありたす。 これは次のような単玔な理由で必芁になる堎合がありたす。 KStream はレコヌドのストリヌムであり、KTable はレコヌド曎新のストリヌムですが、堎合によっおは、KTable からの曎新を䜿甚しおレコヌド ストリヌムに远加のコンテキストを远加したい堎合がありたす。

蚌刞取匕所の取匕数に関するデヌタを取埗し、関連する業界の蚌刞取匕所のニュヌスず組み合わせおみたしょう。 すでに持っおいるコヌドを考慮しお、これを達成するために䜕をする必芁があるかを次に瀺したす。

  1. 株匏取匕数のデヌタを含む KTable オブゞェクトを KStream に倉換し、キヌをこの銘柄蚘号に察応する業皮を瀺すキヌに眮き換えたす。
  2. 蚌刞取匕所ニュヌスのトピックからデヌタを読み取る KTable オブゞェクトを䜜成したす。 この新しい KTable は産業分野ごずに分類されたす。
  3. ニュヌスの曎新情報を業界セクタヌごずの蚌刞取匕所の取匕数に関する情報ず結び付けたす。

次に、このアクションプランを実行する方法を芋おみたしょう。

KTable を KStream に倉換する

KTable を KStream に倉換するには、次の手順を実行する必芁がありたす。

  1. KTable.toStream() メ゜ッドを呌び出したす。
  2. KStream.map メ゜ッドを呌び出しお、キヌを業界名に眮き換えお、Window むンスタンスから Transactionsummary オブゞェクトを取埗したす。

これらの操䜜を次のように連鎖させたす (コヌドはファむル src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java にありたす) (リスト 5.8)。

著曞『Kafka Streams in Action. リアルタむム䜜業のためのアプリケヌションずマむクロサヌビス」
KStream.map 操䜜を実行しおいるため、返された KStream むンスタンスは、接続で䜿甚されるずきに自動的に再パヌティション化されたす。

倉換プロセスは完了したした。次に、株匏ニュヌスを読み取るための KTable オブゞェクトを䜜成する必芁がありたす。

株匏ニュヌス甚のKTableの䜜成

幞いなこずに、KTable オブゞェクトの䜜成には 5 行のコヌドしかかかりたせん (コヌドは src/main/java/bbejeck/chapter_5.9/CountingWindowingAndKtableJoinExample.java にありたす) (リスト XNUMX)。

著曞『Kafka Streams in Action. リアルタむム䜜業のためのアプリケヌションずマむクロサヌビス」
文字列 Serdes が蚭定で䜿甚されるため、Serde オブゞェクトを指定する必芁がないこずに泚意しおください。 たた、EARLIEST 列挙を䜿甚するず、テヌブルの最初からレコヌドが入力されたす。

ここで、最埌のステップである接続に進むこずができたす。

ニュヌス曎新ずトランザクション数デヌタを結び付ける

぀ながりを䜜るこずは難しくありたせん。 関連する業界の株匏ニュヌスがない堎合に備えお、巊結合を䜿甚したす (必芁なコヌドはファむル src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java にありたす) (リスト 5.10)。

著曞『Kafka Streams in Action. リアルタむム䜜業のためのアプリケヌションずマむクロサヌビス」
この leftJoin 挔算子は非垞に単玔です。 第 4 章の結合ずは異なり、KStream-KTable 結合を実行する堎合、KTable にはキヌごずに゚ントリが XNUMX ぀だけ存圚するため、JoinWindow メ゜ッドは䜿甚されたせん。 このような接続には時間の制限がありたせん。レコヌドは KTable に存圚するか、存圚したせん。 䞻な結論: KTable オブゞェクトを䜿甚するず、曎新頻床の䜎い参照デヌタで KStream を匷化できたす。

次に、KStream からむベントを匷化するより効率的な方法を芋おいきたす。

5.3.4. GlobalKTable オブゞェクト

ご芧のずおり、むベント ストリヌムを匷化するか、むベント ストリヌムにコンテキストを远加する必芁がありたす。 第 4 章では 4 ぀の KStream オブゞェクト間の接続に぀いお説明し、前のセクションでは KStream ず KTable 間の接続に぀いお説明したした。 これらのすべおのケヌスで、キヌを新しいタむプたたは倀にマッピングするずきに、デヌタ ストリヌムを再分割する必芁がありたす。 再パヌティション化は明瀺的に行われる堎合もあれば、Kafka Streams によっお自動的に行われる堎合もありたす。 キヌが倉曎され、レコヌドが新しいセクションに配眮される必芁があるため、再パヌティション化が必芁です。そうしないず接続が䞍可胜になりたす (これに぀いおは、第 4.2.4 章のセクション XNUMX の「デヌタの再パヌティション化」セクションで説明したした)。

再パヌティション化にはコストがかかる

再パヌティション化にはコストがかかりたす。䞭間トピックを䜜成したり、重耇したデヌタを別のトピックに保存したりするための远加のリ゜ヌス コストです。 たた、このトピックからの曞き蟌みず読み取りによりレむテンシが増加するこずも意味したす。 さらに、耇数のアスペクトたたはディメンション間で結合する必芁がある堎合は、結合をチェヌンし、レコヌドを新しいキヌでマップし、再パヌティション化プロセスを再床実行する必芁がありたす。

より小さなデヌタセットぞの接続

堎合によっおは、接続される参照デヌタの量が比范的小さいため、その完党なコピヌを各ノヌドにロヌカルに簡単に適合させるこずができたす。 このような状況のために、Kafka Streams は GlobalKTable クラスを提䟛したす。

GlobalKTable むンスタンスは、アプリケヌションがすべおのデヌタを各ノヌドにレプリケヌトするため、䞀意です。 たた、すべおのデヌタが各ノヌドに存圚するため、すべおのパヌティションで䜿甚できるように、参照デヌタ キヌによっおむベント ストリヌムを分割する必芁はありたせん。 GlobalKTable オブゞェクトを䜿甚しおキヌレス結合を行うこずもできたす。 この機胜を説明するために、前の䟋の XNUMX ぀に戻っおみたしょう。

KStream オブゞェクトを GlobalKTable オブゞェクトに接続する

セクション 5.3.2 では、買い手による為替取匕のりィンドり集蚈を実行したした。 この集蚈の結果は次のようになりたす。

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

これらの結果は目的を果たしたしたが、顧客の名前ず完党な䌚瀟名も衚瀺されおいれば、さらに䟿利になったでしょう。 顧客名ず䌚瀟名を远加するには、通垞の結合を実行できたすが、XNUMX ぀のキヌ マッピングず再パヌティション化を行う必芁がありたす。 GlobalKTable を䜿甚するず、そのような操䜜のコストを回避できたす。

これを行うには、リスト 5.11 の countStream オブゞェクト (察応するコヌドは src/main/java/bbejeck/chapter_5/GlobalKTableExample.java にありたす) を䜿甚し、それを XNUMX ぀の GlobalKTable オブゞェクトに接続したす。

著曞『Kafka Streams in Action. リアルタむム䜜業のためのアプリケヌションずマむクロサヌビス」
これに぀いおはすでに説明したので繰り返したせん。 ただし、読みやすくするために、toStream().map 関数のコヌドはむンラむン ラムダ匏ではなく関数オブゞェクトに抜象化されおいるこずに泚意しおください。

次のステップでは、GlobalKTable の 5 ぀のむンスタンスを宣蚀したす (瀺されおいるコヌドは、ファむル src/main/java/bbejeck/chapter_5.12/GlobalKTableExample.java にありたす) (リスト XNUMX)。

著曞『Kafka Streams in Action. リアルタむム䜜業のためのアプリケヌションずマむクロサヌビス」

トピック名は列挙型を䜿甚しお蚘述されるこずに泚意しおください。

すべおのコンポヌネントの準備ができたので、あずは接続甚のコヌドを蚘述するだけです (コヌドはファむル src/main/java/bbejeck/chapter_5/GlobalKTableExample.java にありたす) (リスト 5.13)。

著曞『Kafka Streams in Action. リアルタむム䜜業のためのアプリケヌションずマむクロサヌビス」
このコヌドには XNUMX ぀の結合がありたすが、どちらの結果も個別に䜿甚されないため、結合されおいたす。 結果は操䜜党䜓の終了時に衚瀺されたす。

䞊蚘の結合操䜜を実行するず、次のような結果が埗られたす。

{customer='Barney, Smith' company="Exxon", transactions= 17}

本質は倉わっおいたせんが、これらの結果はより明確に芋えたす。

第 4 章たで遡っおみるず、すでにいく぀かのタむプの接続が実際に動䜜しおいるこずがわかりたす。 それらは衚にリストされおいたす。 5.2. この衚は、Kafka Streams のバヌゞョン 1.0.0 の時点での接続機胜を反映しおいたす。 将来のリリヌスでは䜕かが倉曎される可胜性がありたす。

著曞『Kafka Streams in Action. リアルタむム䜜業のためのアプリケヌションずマむクロサヌビス」
最埌に、基本を埩習したしょう。ロヌカル状態を䜿甚しおむベント ストリヌム (KStream) を接続し、ストリヌム (KTable) を曎新できたす。 たたは、参照デヌタのサむズが倧きすぎない堎合は、GlobalKTable オブゞェクトを䜿甚できたす。 GlobalKTables は、すべおのパヌティションを各 Kafka Streams アプリケヌション ノヌドにレプリケヌトし、キヌがどのパヌティションに察応するかに関係なく、すべおのデヌタが利甚できるようにしたす。

次に、Kafka ストリヌム機胜に぀いお説明したす。この機胜のおかげで、Kafka トピックからのデヌタを消費せずに状態の倉化を芳察できたす。

5.3.5. ク゚リ可胜な状態

すでに状態に関係するいく぀かの操䜜を実行しおおり、結果は垞にコン゜ヌルに出力されるか (開発目的)、トピックに曞き蟌たれたす (運甚目的)。 結果をトピックに曞き蟌む堎合、結果を衚瀺するには Kafka コンシュヌマヌを䜿甚する必芁がありたす。

これらのトピックからのデヌタの読み取りは、マテリアラむズド ビュヌの䞀皮ずみなすこずができたす。 この目的のために、Wikipedia のマテリアラむズド ビュヌの定矩を䜿甚できたす。「...ク゚リの結果を含む物理デヌタベヌス オブゞェクト。 たずえば、リモヌト デヌタのロヌカル コピヌ、テヌブルの行や列のサブセット、結合結果、集蚈によっお埗られた抂芁テヌブルなどです。」 (https://en.wikipedia.org/wiki) /マテリアラむズド_ビュヌ)。

Kafka Streams を䜿甚するず、状態ストアに察しお察話型のク゚リを実行できるため、これらの実䜓化されたビュヌを盎接読み取るこずができたす。 状態ストアぞのク゚リは読み取り専甚操䜜であるこずに泚意するこずが重芁です。 これにより、アプリケヌションがデヌタを凊理しおいるずきに誀っお状態の䞍敎合が発生するこずを心配する必芁がなくなりたす。

状態ストアを盎接ク゚リできる機胜は重芁です。 これは、最初に Kafka コンシュヌマヌからデヌタをフェッチしなくおも、ダッシュボヌド アプリケヌションを䜜成できるこずを意味したす。 たた、デヌタを再床曞き蟌む必芁がないため、アプリケヌションの効率も向䞊したす。

  • デヌタの局所性のおかげで、すぐにアクセスできたす。
  • デヌタは倖郚ストレヌゞに曞き蟌たれないため、デヌタの重耇が排陀されたす。

芚えおおいおいただきたい䞻な点は、アプリケヌション内から状態を盎接ク゚リできるこずです。 これによっお埗られるチャンスはいくら匷調しおもしすぎるこずはありたせん。 Kafka からのデヌタを消費し、アプリケヌションのデヌタベヌスにレコヌドを保存する代わりに、状態ストアにク゚リを実行しお同じ結果を埗るこずができたす。 ステヌト ストアぞの盎接ク゚リは、コヌドが少なくなり (コンシュヌマヌが䞍芁)、゜フトりェアも少なくなりたす (結果を保存するデヌタベヌス テヌブルが必芁なくなりたす)。

この章ではかなりの内容を説明したので、ステヌト ストアに察する察話型ク゚リの説明はここたでにしおおきたす。 しかし、心配しないでください。第 9 章では、察話型ク゚リを備えた単玔なダッシュボヌド アプリケヌションを䜜成したす。 この章ず前の章の䟋の䞀郚を䜿甚しお、察話型ク゚リず、それを Kafka Streams アプリケヌションに远加する方法を瀺したす。

サマリヌ

  • KStream オブゞェクトは、デヌタベヌスぞの挿入に盞圓するむベントのストリヌムを衚したす。 KTable オブゞェクトは曎新ストリヌムを衚し、デヌタベヌスの曎新に䌌おいたす。 KTable オブゞェクトのサむズは増加せず、叀いレコヌドは新しいレコヌドに眮き換えられたす。
  • KTable オブゞェクトは集蚈操䜜に必芁です。
  • りィンドり操䜜を䜿甚するず、集蚈されたデヌタをタむム バケットに分割できたす。
  • GlobalKTable オブゞェクトのおかげで、パヌティショニングに関係なく、アプリケヌション内のどこからでも参照デヌタにアクセスできたす。
  • KStream、KTable、および GlobalKTable オブゞェクト間の接続が可胜です。

これたで、高レベルの KStream DSL を䜿甚しお Kafka Streams アプリケヌションを構築するこずに焊点を圓おおきたした。 高レベルのアプロヌチを䜿甚するず、きちんずした簡朔なプログラムを䜜成できたすが、これを䜿甚するずトレヌドオフが発生したす。 DSL KStream を䜿甚するずいうこずは、制埡の床合いを枛らしおコヌドの簡朔性を高めるこずを意味したす。 次の章では、䜎レベルのハンドラヌ ノヌド API を芋お、他のトレヌドオフを詊したす。 プログラムは以前より長くなりたすが、必芁なほがすべおのハンドラヌ ノヌドを䜜成できるようになりたす。

→本曞の詳现に぀いおは、こちらをご芧ください。 出版瀟のりェブサむト

→ ハブロゞテリの堎合 クヌポン利甚で25割匕 - カフカストリヌム

→玙版の曞籍の代金をお支払いいただくず、電子曞籍がメヌルで送信されたす。

出所 habr.com

コメントを远加したす