《卡夫卡流在行動》一書。 用於實時工作的應用程式和微服務”

《卡夫卡流在行動》一書。 用於實時工作的應用程式和微服務” 哈布羅居民大家好! 本書適合任何想要了解線程處理的開發人員。 了解分散式程式設計將幫助您更好地理解 Kafka 和 Kafka Streams。 如果了解 Kafka 框架本身就好了,但這不是必要的:我會告訴你你需要的一切。 經驗豐富的 Kafka 開發人員和新手都將在本書中學習如何使用 Kafka Streams 函式庫建立有趣的串流處理應用程式。 已經熟悉序列化等概念的中級和高級 Java 開發人員將學習運用他們的技能來創建 Kafka Streams 應用程式。 本書的原始碼是用 Java 8 編寫的,並大量使用了 Java 8 lambda 表達式語法,因此了解如何使用 lambda 函數(甚至使用其他程式語言)將會派上用場。

摘抄。 5.3. 聚合和視窗操作

在本節中,我們將繼續探索 Kafka Streams 最有前途的部分。 到目前為止,我們已經介紹了 Kafka Streams 的以下幾個方面:

  • 創建處理拓撲;
  • 在串流應用程式中使用狀態;
  • 執行資料流連接;
  • 事件流 (KStream) 和更新流 (KTable) 之間的差異。

在下面的範例中,我們將把所有這些元素結合在一起。 您還將了解視窗化,這是串流媒體應用程式的另一個重要功能。 我們的第一個例子是一個簡單的聚合。

5.3.1. 按行業分類的股票銷售匯總

聚合和分組是處理流程資料時的重要工具。 對收到的個人記錄進行檢查往往是不夠的。 為了從數據中提取附加信息,有必要將它們分組和組合。

在此範例中,您將扮演一名日間交易員,他需要追蹤多個行業公司股票的銷售量。 具體來說,您對每個行業中份額銷售額最大的五家公司感興趣。

這種聚合將需要以下幾個步驟將資料轉換為所需的形式(籠統地說)。

  1. 建立基於主題的來源來發布原始股票交易資訊。 我們必須將 StockTransaction 類型的物件對應到 ShareVolume 類型的物件。 關鍵是 StockTransaction 物件包含銷售元數據,但我們只需要有關正在出售的股票數量的數據。
  2. 按股票代碼對 ShareVolume 資料進行分組。 按代碼分組後,您可以將此數據折疊為股票銷售量小計。 值得注意的是,KStream.groupBy 方法傳回一個 KGroupedStream 類型的實例。 進一步呼叫KGroupedStream.reduce方法即可取得KTable實例。

什麼是 KGroupedStream 接口

KStream.groupBy 和 KStream.groupByKey 方法傳回 KGroupedStream 的實例。 KGroupedStream 是按鍵分組後事件流的中間表示。 它根本不是為了直接使用它而設計的。 相反,KGroupedStream 用於聚合操作,這始終會產生 KTable。 由於聚合操作的結果是一個 KTable 並且它們使用狀態存儲,因此可能並非所有更新結果都會進一步沿著管道發送。

KTable.groupBy 方法傳回一個類似的 KGroupedTable - 更新流的中間表示,按鍵重新分組。

讓我們稍微休息一下,看一下圖。 5.9,這顯示了我們所取得的成就。 您應該已經非常熟悉這種拓撲結構。

《卡夫卡流在行動》一書。 用於實時工作的應用程式和微服務”
現在讓我們來看看這個拓樸的程式碼(可以在檔案 src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java 中找到)(清單 5.2)。

《卡夫卡流在行動》一書。 用於實時工作的應用程式和微服務”
給定的程式碼以其簡潔性和在多行中執行的大量操作而聞名。 您可能會注意到 builder.stream 方法的第一個參數中有一些新內容:枚舉類型 AutoOffsetReset.EARLIEST(還有一個 LATEST)的值,使用 Consumed.withOffsetResetPolicy 方法設定。 此枚舉類型可用於為每個 KStream 或 KTable 指定偏移重置策略,並優先於配置中的偏移重置選項。

GroupByKey 和 GroupBy

KStream 介面有兩種對記錄進行分組的方法:GroupByKey 和 GroupBy。 兩者都會傳回一個 KGroupedTable,因此您可能想知道它們之間有什麼區別以及何時使用哪一個?

當 KStream 中的鍵已經非空時,使用 GroupByKey 方法。 最重要的是,「需要重新分區」標誌從未設定。

GroupBy 方法假定您已變更分組鍵,因此重新分割區標誌設為 true。 在 GroupBy 方法之後執行聯結、聚合等將導致自動重新分區。
摘要: 只要有可能,您應該使用 GroupByKey 而不是 GroupBy。

很清楚mapValues和groupBy方法的作用,所以讓我們來看看sum()方法(位於src/main/java/bbejeck/model/ShareVolume.java)(清單5.3)。

《卡夫卡流在行動》一書。 用於實時工作的應用程式和微服務”
ShareVolume.sum方法傳回股票銷售的運行總計,整個計算鏈的結果是一個KTable對象。 現在您了解了 KTable 所扮演的角色。 當ShareVolume物件到達時,對應的KTable物件儲存最新的目前更新。 重要的是要記住,所有更新都會反映在先前的 shareVolumeKTable 中,但並非所有更新都會進一步發送。

然後,我們使用此 KTable 進行匯總(按交易股票數量),得出每個行業中股票交易量最高的五家公司。 在這種情況下,我們的操作將與第一次聚合的操作類似。

  1. 執行另一個 groupBy 操作以按行業對各個 ShareVolume 物件進行分組。
  2. 開始總結 ShareVolume 物件。 這次聚合物件是一個固定大小的優先權佇列。 在這個固定大小的隊列中,僅保留出售股票數量最多的五家公司。
  3. 將上一段中的佇列對應到字串值,並按行業數量傳回交易量排名前五的股票。
  4. 將結果以字串形式寫入主題。

在圖中。 資料流拓樸圖如圖5.10所示。 正如您所看到的,第二輪處理非常簡單。

《卡夫卡流在行動》一書。 用於實時工作的應用程式和微服務”
現在我們已經清楚地了解了第二輪處理的結構,我們可以轉向它的源代碼(您可以在文件 src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java 中找到它)(清單 5.4) 。

該初始化程序包含一個fixedQueue 變數。 這是一個自訂對象,它是 java.util.TreeSet 的適配器,用於追蹤按交易股票降序排列的前 N ​​個結果。

《卡夫卡流在行動》一書。 用於實時工作的應用程式和微服務”
您已經看到了 groupBy 和 mapValues 調用,因此我們不會深入討論它們(我們調用 KTable.toStream 方法是因為 KTable.print 方法已被棄用)。 但是您還沒有看到aggregate() 的KTable 版本,所以我們將花一些時間討論它。

正如您所記得的,KTable 的不同之處在於具有相同鍵的記錄被視為更新。 KTable 以新條目取代舊條目。 聚合以類似的方式發生:聚合具有相同鍵的最新記錄。 當一筆記錄到達時,會使用加法器(聚合方法呼叫中的第二個參數)將其新增至FixSizePriorityQueue 類別實例中,但如果已存在具有相同鍵的另一筆記錄,則使用減法器(聚合方法調用中的第三個參數)刪除舊記錄。聚合方法呼叫)。

這都意味著我們的聚合器FixedSizePriorityQueue並不是用一個鍵聚合所有值,而是儲存N種交易最多的股票類型的數量的移動總和。 每個傳入條目包含迄今為止已售出的股票總數。 KTable 將為您提供有關哪些公司的股票當前交易量最大的信息,而不需要每次更新的滾動聚合。

我們學會了做兩件重要的事情:

  • 透過公共鍵將 KTable 中的值分組;
  • 對這些分組值執行有用的操作,例如總和和聚合。

了解如何執行這些操作對於理解透過 Kafka Streams 應用程式移動的資料的含義以及它攜帶的資訊非常重要。

我們也匯集了本書前面討論的一些關鍵概念。 在第 4 章中,我們討論了容錯、本機狀態對於串流應用程式的重要性。 本章的第一個範例示範了為什麼本地狀態如此重要——它使您能夠追蹤您已經看到的資訊。 本地存取避免了網路延遲,使應用程式效能更高且更不易出錯。

執行任何匯總或聚合操作時,必須指定狀態儲存的名稱。 匯總和聚合操作傳回一個 KTable 實例,KTable 使用狀態儲存將舊結果替換為新結果。 正如您所看到的,並非所有更新都會沿著管道發送,這一點很重要,因為聚合操作旨在產生摘要資訊。 如果不套用本機狀態,KTable 將轉送所有聚合和總計結果。

接下來,我們將研究在特定時間段內執行聚合等操作 - 所謂的視窗操作。

5.3.2. 視窗操作

在上一節中,我們介紹了滑動卷積和聚合。 該應用程式連續匯總股票銷量,然後匯總交易所交易量最高的五隻股票。

有時,這種連續聚合和匯總結果是必要的。 有時您只需要在給定的時間段內執行操作。 例如,計算過去 10 分鐘內特定公司股票的交易量。 或者有多少用戶在過去 15 分鐘內點擊了新的廣告橫幅。 應用程式可以多次執行此類操作,但結果僅適用於指定的時間段(時間視窗)。

以買家計算外匯交易

在下一個範例中,我們將追蹤多個交易者(大型組織或聰明的個人金融家)之間的股票交易。

這種跟蹤有兩個可能的原因。 其中之一是需要了解市場領導者正在購買/出售什麼。 如果這些大公司和成熟的投資者看到了機會,那麼遵循他們的策略就有意義。 第二個原因是希望發現任何可能的非法內線交易跡象。 為此,您需要分析大幅銷售高峰與重要新聞稿的相關性。

此類追蹤包括以下步驟:

  • 建立一個用於讀取股票交易主題的串流;
  • 依買家 ID 和股票代碼對傳入記錄進行分組。 呼叫groupBy方法傳回KGroupedStream類別的實例;
  • KGroupedStream.windowedBy 方法傳回僅限於時間視窗的資料流,這允許視窗聚合。 根據視窗類型,返回 TimeWindowedKStream 或 SessionWindowedKStream;
  • 聚合操作的交易計數。 視窗資料流決定是否在此計數中考慮特定記錄;
  • 在開發過程中將結果寫入主題或將結果輸出到控制台。

該應用程式的拓撲結構很簡單,但清晰的描述會很有幫助。 我們來看一下圖。 5.11.

接下來我們來看看視窗操作的功能以及對應的程式碼。

《卡夫卡流在行動》一書。 用於實時工作的應用程式和微服務”

窗戶類型

Kafka Streams 中有三種類型的視窗:

  • 會期;
  • 「翻滾」(翻滾);
  • 滑動/跳躍。

選擇哪一種取決於您的業務需求。 翻滾和跳躍視窗是有時間限制的,而會話視窗則受到使用者活動的限制——會話的持續時間僅取決於使用者的活躍程度。 要記住的主要事情是所有視窗類型都基於條目的日期/時間戳,而不是系統時間。

接下來,我們使用每種視窗類型實作拓樸。 僅在第一個範例中給出完整的程式碼;對於其他類型的窗口,除了窗口操作的類型之外,不會發生任何變化。

會話視窗

會話視窗與所有其他類型的視窗有很大不同。 它們不僅受時間限制,還受使用者活動(或您想要追蹤的實體的活動)的限制。 會話視窗由不活動時間段界定。

圖 5.12 說明了會話視窗的概念。 較小的會話將與其左側的會話合併。 右側的會話將是單獨的,因為它是在長時間不活動之後進行的。 會話視窗是基於使用者活動,但使用條目中的日期/時間戳來確定條目屬於哪個會話。

《卡夫卡流在行動》一書。 用於實時工作的應用程式和微服務”

使用會話視窗追蹤股票交易

讓我們使用會話視窗來捕獲有關交換交易的資訊。 會話視窗的實作如清單 5.5 所示(可在 src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java 中找到)。

《卡夫卡流在行動》一書。 用於實時工作的應用程式和微服務”
您已經了解了此拓撲中的大部分操作,因此無需在此處再次查看它們。 但這裡還有一些新元素,我們現在將討論這些新元素。

任何 groupBy 操作通常都會執行某種聚合操作(聚合、總計或計數)。 您可以使用運行總計執行累積聚合,也可以執行視窗聚合,後者考慮指定時間視窗內的記錄。

清單 5.5 中的程式碼計算會話視窗內的交易數量。 在圖中。 5.13 這些行動是逐步分析的。

透過呼叫 windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) 我們建立一個會話窗口,其不活動間隔為 20 秒,持久間隔為 15 分鐘。 20 秒的空閒間隔表示應用程式會將目前會話結束或開始後 20 秒內到達的任何條目包含到目前(活動)會話中。

《卡夫卡流在行動》一書。 用於實時工作的應用程式和微服務”
接下來,我們指定需要在會話視窗中執行哪個聚合操作 - 在本例中為 count。 如果傳入條目超出非活動視窗(日期/時間戳記的任一側),應用程式將建立新會話。 保留間隔意味著將會話維持一定的時間,並允許超出會話非活動期但仍可以附加的最新資料。 此外,合併產生的新會話的開始和結束對應於最早和最晚的日期/時間戳記。

讓我們來看看 count 方法中的一些條目,看看會話是如何運作的(表 5.1)。

《卡夫卡流在行動》一書。 用於實時工作的應用程式和微服務”
當記錄到達時,我們會尋找具有相同金鑰、結束時間小於目前日期/時間戳記 - 不活動間隔、開始時間大於目前日期/時間戳記 + 不活動間隔的現有會話。 考慮到這一點,表中有四個條目。 5.1 合併為單一會話,如下所示。

1. 記錄1先到達,因此開始時間等於結束時間,皆為00:00:00。

2. 接下來,條目 2 到達,我們查找結束時間不早於 23:59:55 且開始時間不晚於 00:00:35 的會話。 我們找到記錄1 並合併會話1 和2。我們取得會話1 的開始時間(較早)和會話2 的結束時間(較晚),以便我們的新會話於00:00:00 開始並於00:00結束: 15:XNUMX。

3. 記錄 3 到達,我們查找 00:00:30 到 00:01:10 之間的會話,但沒有找到任何會話。 為金鑰 123-345-654,FFBE 新增第二個會話,開始和結束時間為 00:00:50。

4. 記錄 4 到達,我們正在尋找 23:59:45 到 00:00:25 之間的會話。 這次同時找到了會話 1 和會話 2。這三個會話合而為一,開始時間為 00:00:00,結束時間為 00:00:15。

根據本節的描述,值得記住以下重要的細微差別:

  • 會話不是固定大小的視窗。 會話的持續時間由給定時間段內的活動決定;
  • 資料中的日期/時間戳記會確定事件是在現有會話中還是在空閒期間。

接下來我們將討論下一種類型的窗戶—「翻滾」窗戶。

「翻滾」的窗戶

翻滾視窗可捕捉特定時間段內發生的事件。 想像一下,您需要每 20 秒捕獲某個公司的所有股票交易,因此您收集了該時間段內的所有事件。 在 20 秒間隔結束時,視窗會翻轉並移至新的 20 秒觀察間隔。 圖 5.14 說明了這種情況。

《卡夫卡流在行動》一書。 用於實時工作的應用程式和微服務”
如您所見,過去 20 秒內收到的所有事件都包含在視窗中。 這段時間結束後,會建立一個新視窗。

清單 5.6 顯示的程式碼示範如何使用捲動視窗每 20 秒擷取一次股票交易(位於 src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java 中)。

《卡夫卡流在行動》一書。 用於實時工作的應用程式和微服務”
透過對 TimeWindows.of 方法呼叫的這個小更改,您可以使用滾動視窗。 本範例沒有呼叫until()方法,因此將使用預設的保留間隔24小時。

最後,是時候轉到最後一個視窗選項 - “跳躍”視窗。

滑動(“跳躍”)窗戶

滑動/跳躍視窗與翻滾視窗類似,但略有不同。 滑動視窗不會等到時間間隔結束才建立新視窗來處理最近的事件。 他們在小於視窗持續時間的等待間隔後開始新的計算。

為了說明滾動視窗和跳躍視窗之間的差異,讓我們回到計算股票交易交易的範例。 我們的目標仍然是計算交易數量,但我們不想在更新計數器之前等待整個時間。 相反,我們將以更短的時間間隔更新計數器。 例如,我們仍然每 20 秒計算一次交易數量,但每 5 秒更新一次計數器,如圖 5.15 所示。 XNUMX。 在這種情況下,我們最終得到三個具有重疊資料的結果視窗。

《卡夫卡流在行動》一書。 用於實時工作的應用程式和微服務”
清單 5.7 顯示了定義滑動視窗的程式碼(位於 src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java 中)。

《卡夫卡流在行動》一書。 用於實時工作的應用程式和微服務”
透過新增對 advanceBy() 方法的調用,可以將翻滾視窗轉換為跳躍視窗。 在所示範例中,儲存間隔為 15 分鐘。

您在本節中了解如何將聚合結果限制為時間視窗。 我特別希望您記住本節的以下三件事:

  • 會話視窗的大小不受時間段限制,而是受使用者活動限制;
  • 「翻滾」視窗提供給定時間內事件的概覽;
  • 跳躍視窗的持續時間是固定的,但它們更新頻繁,並且可能在所有視窗中包含重疊的條目。

接下來,我們將學習如何將 KTable 轉換回 KStream 以進行連線。

5.3.3. 連線 KStream 和 KTable 對象

在第 4 章中,我們討論了連接兩個 KStream 物件。 現在我們要學習如何連接KTable和KStream。 出於以下簡單原因可能需要這樣做。 KStream 是記錄流,KTable 是記錄更新流,但有時您可能想要使用 KTable 中的更新向記錄流添加其他上下文。

讓我們取得證券交易所交易數量的數據,並將其與相關產業的證券交易所新聞結合。 鑑於您已有的程式碼,您需要執行以下操作才能實現此目的。

  1. 將包含股票交易數量資料的 KTable 物件轉換為 KStream,然後將 key 替換為該股票代碼對應的行業板塊的 key。
  2. 建立一個 KTable 對象,該物件從包含證券交易所新聞的主題中讀取資料。 這個新的KTable將按行業部門進行分類。
  3. 將新聞更新與按行業劃分的證券交易所交易數量資訊連結。

現在讓我們看看如何實施這個行動計畫。

將 KTable 轉換為 KStream

要將 KTable 轉換為 KStream,您需要執行下列操作。

  1. 呼叫 KTable.toStream() 方法。
  2. 透過呼叫 KStream.map 方法,將 key 替換為產業名稱,然後從 Windowed 實例中檢索 TransactionSummary 物件。

我們將這些操作連結在一起,如下所示(程式碼可以在檔案 src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java 中找到)(清單 5.8)。

《卡夫卡流在行動》一書。 用於實時工作的應用程式和微服務”
因為我們正在執行 KStream.map 操作,所以傳回的 KStream 實例在連線中使用時會自動重新分區。

我們已經完成了轉換過程,接下來我們需要建立一個KTable物件來讀取股票新聞。

建立股票新聞KTable

幸運的是,建立 KTable 物件只需要一行程式碼(程式碼可以在 src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java 中找到)(清單 5.9)。

《卡夫卡流在行動》一書。 用於實時工作的應用程式和微服務”
值得注意的是,不需要指定 Serde 對象,因為設定中使用了字串 Serdes。 此外,透過使用 EARLIEST 列舉,表格會在最開始處填入記錄。

現在我們可以繼續最後一步 - 連接。

將新聞更新與交易計數數據連接起來

創建連結並不困難。 如果沒有相關產業的股票新聞,我們將使用左連線(必要的程式碼可以在檔案 src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java 中找到)(清單 5.10)。

《卡夫卡流在行動》一書。 用於實時工作的應用程式和微服務”
這個 leftJoin 運算子非常簡單。 與第 4 章中的聯結不同,不使用 JoinWindow 方法,因為在執行 KStream-KTable 聯結時,每個鍵在 KTable 中只有一個條目。 這樣的連線沒有時間限制:記錄要麼在KTable中,要麼不存在。 主要結論:使用 KTable 對象,您可以透過更新頻率較低的參考資料來豐富 KStream。

現在我們將研究一種更有效的方法來豐富 KStream 中的事件。

5.3.4. GlobalKTable 對象

正如您所看到的,需要豐富事件流或為其添加上下文。 在第 4 章中,您看到了兩個 KStream 物件之間的連接,在上一節中,您看到了 KStream 和 KTable 之間的連接。 在所有這些情況下,在將鍵對應到新類型或值時,有必要重新分割資料流。 有時重新分區是明確完成的,有時 Kafka Streams 會自動完成。 重新分區是必要的,因為鍵已經改變,記錄必須以新的部分結束,否則連接將是不可能的(這在第 4 章 4.2.4 小節的「重新分區資料」一節中討論過)。

重新分區是有成本的

重新分區需要成本——創建中間主題、在另一個主題中儲存重複資料的額外資源成本; 它也意味著由於從該主題寫入和讀取而導致延遲增加。 此外,如果您需要跨多個方面或維度進行聯接,則必須連結聯接、使用新鍵映射記錄,然後再次執行重新分區程序。

連接到較小的數據集

在某些情況下,要連接的參考資料量相對較小,因此可以輕鬆地將其完整副本安裝在每個節點上。 對於這種情況,Kafka Streams 提供了 GlobalKTable 類別。

GlobalKTable 實例是唯一的,因為應用程式將所有資料複製到每個節點。 由於所有資料都存在於每個節點上,因此無需透過引用資料鍵對事件流進行分區,以便它可供所有分區使用。 您也可以使用 GlobalKTable 物件進行無鍵連接。 讓我們回到前面的一個範例來示範此功能。

將 KStream 物件連接到 GlobalKTable 對象

在第 5.3.2 小節中,我們對買家的交易所交易進行了視窗聚合。 聚合的結果如下圖所示:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

雖然這些結果達到了目的,但如果也顯示客戶的姓名和完整的公司名稱,那就更有用了。 若要新增客戶名稱和公司名稱,您可以執行普通連接,但需要執行兩個鍵映射和重新分區。 使用 GlobalKTable,您可以避免此類操作的成本。

為此,我們將使用清單 5.11 中的 countStream 物件(對應的程式碼可以在 src/main/java/bbejeck/chapter_5/GlobalKTableExample.java 中找到)並將其連接到兩個 GlobalKTable 物件。

《卡夫卡流在行動》一書。 用於實時工作的應用程式和微服務”
我們之前已經討論過這個問題,所以我不再重複。 但我注意到,為了可讀性,toStream().map 函數中的程式碼被抽象化為函數對象,而不是內聯 lambda 表達式。

下一步是宣告 GlobalKTable 的兩個實例(顯示的程式碼可以在檔案 src/main/java/bbejeck/chapter_5/GlobalKTableExample.java 中找到)(清單 5.12)。

《卡夫卡流在行動》一書。 用於實時工作的應用程式和微服務”

請注意,主題名稱是使用枚舉類型描述的。

現在我們已經準備好了所有元件,剩下的就是編寫連接程式碼(可以在檔案 src/main/java/bbejeck/chapter_5/GlobalKTableExample.java 中找到)(清單 5.13)。

《卡夫卡流在行動》一書。 用於實時工作的應用程式和微服務”
儘管此程式碼中有兩個聯接,但它們是連結的,因為它們的結果都沒有單獨使用。 結果顯示在整個操作結束時。

當您執行上述連接操作時,您將得到以下結果:

{customer='Barney, Smith' company="Exxon", transactions= 17}

本質沒有改變,但這些結果看起來更清晰。

如果倒數到第 4 章,您已經看到了幾種實際的連接類型。 它們列在表中。 5.2. 此表反映了 Kafka Streams 1.0.0 版本以來的連線能力; 未來版本中可能會發生一些變化。

《卡夫卡流在行動》一書。 用於實時工作的應用程式和微服務”
總而言之,讓我們回顧一下基礎知識:您可以使用本機狀態連線事件流 (KStream) 和更新流 (KTable)。 或者,如果參考資料的大小不太大,則可以使用 GlobalKTable 物件。 GlobalKTables 將所有分區複製到每個 Kafka Streams 應用程式節點,確保無論鍵對應哪個分區,所有資料都可用。

接下來我們將看到 Kafka Streams 功能,借助該功能,我們可以觀察狀態變化,而無需使用 Kafka 主題中的資料。

5.3.5。 可查詢狀態

我們已經執行了一些涉及狀態的操作,並且總是將結果輸出到控制台(用於開發目的)或將它們寫入主題(用於生產目的)。 將結果寫入主題時,您必須使用 Kafka 消費者來查看它們。

從這些主題中讀取資料可以被視為一種物化視圖。 出於我們的目的,我們可以使用維基百科中物化視圖的定義:「......包含查詢結果的實體資料庫物件。 例如,它可以是遠端資料的本機副本,或資料表或聯結結果的行和/或列的子集,或透過聚合獲得的總表」(https://en.wikipedia.org/wiki /物化檢視) 。

Kafka Streams 還允許您在狀態儲存上執行互動式查詢,從而允許您直接讀取這些物化視圖。 需要注意的是,對狀態儲存的查詢是唯讀操作。 這確保您不必擔心應用程式處理資料時意外地導致狀態不一致。

直接查詢狀態儲存的能力很重要。 這意味著您可以建立儀表板應用程序,而無需先從 Kafka 用戶獲取資料。 由於不需要再次寫入數據,它還提高了應用程式的效率:

  • 由於資料的本地化,可以快速存取它們;
  • 由於資料不會寫入外部存儲,因此消除了資料重複。

我希望您記住的主要事情是您可以直接從應用程式內查詢狀態。 這帶給您的機會怎麼強調都不為過。 您可以查詢狀態儲存以獲得相同的結果,而不是使用來自 Kafka 的資料並將記錄儲存在應用程式的資料庫中。 對狀態儲存的直接查詢意味著更少的程式碼(沒有消費者)和更少的軟體(不需要資料庫表來儲存結果)。

我們在本章中介紹了相當多的內容,因此我們暫時停止對狀態儲存的互動式查詢的討論。 但不用擔心:在第 9 章中,我們將建立一個具有互動式查詢的簡單儀表板應用程式。 它將使用本章和前面章節中的一些範例來示範互動式查詢以及如何將它們新增到 Kafka Streams 應用程式中。

總結

  • KStream 物件表示事件流,類似於資料庫中的插入。 KTable 物件代表更新流,更像是對資料庫的更新。 KTable 物件的大小不會成長,舊記錄會被新記錄取代。
  • 聚合操作需要 KTable 物件。
  • 使用視窗操作,您可以將聚合資料拆分為時間段。
  • 使用 GlobalKTable 對象,您可以在應用程式中的任何位置存取參考資料,而不管分區如何。
  • KStream、KTable 和 GlobalKTable 物件之間的連線是可能的。

到目前為止,我們專注於使用高級 KStream DSL 來建立 Kafka Streams 應用程式。 儘管高級方法允許您創建簡潔的程序,但使用它需要權衡。 使用 DSL KStream 意味著透過減少控製程度來提高程式碼的簡潔性。 在下一章中,我們將研究低階處理程序節點 API 並嘗試其他權衡。 這些程式將比以前更長,但我們將能夠創建幾乎任何我們可能需要的處理程序節點。

→ 有關本書的更多詳細信息,請訪問 出版商的網站

→ 對於 Habrozhiteli 使用優惠券可享 25% 折扣 - 卡夫卡流

→ 支付紙本書籍的費用後,將透過電子郵件發送電子書。

來源: www.habr.com

添加評論