卡夫卡如何成為現實

卡夫卡如何成為現實

嘿哈布爾!

我在 Tinkoff 團隊工作,該團隊正在開發自己的通知中心。 我主要使用Spring boot進行Java開發,並解決專案中出現的各種技術問題。

我們的大多數微服務透過訊息代理相互非同步通訊。 之前,我們使用IBM MQ作為代理,它已經無法應對負載,但同時具有很高的交付保證。

作為替代品,我們獲得了 Apache Kafka,它具有很高的擴展潛力,但不幸的是,需要針對不同場景採用幾乎單獨的配置方法。 此外,預設情況下在 Kafka 中運行的至少一次傳遞機制不允許維持開箱即用所需的一致性等級。 接下來,我將分享我們在Kafka配置方面的經驗,特別是我將告訴您如何設定和使用Exactly Once Delivery。

保證交貨及更多

下面討論的設定將有助於防止預設連線設定出現的許多問題。 但首先我想注意一個有助於可能的調試的參數。

這會有所幫助 客戶端ID 對於生產者和消費者。 乍一看,您可以使用應用程式名稱作為值,並且在大多數情況下這都是可行的。 儘管當應用程式使用多個 Consumer 並且您為它們提供相同的 client.id 時,會導致以下警告:

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

如果您想在具有 Kafka 的應用程式中使用 JMX,那麼這可能是一個問題。 對於這種情況,最好使用應用程式名稱和主題名稱的組合作為 client.id 值。 我們的配置結果可以在命令輸出中看到 卡夫卡消費者組 來自 Confluence 的實用程式:

卡夫卡如何成為現實

現在讓我們來看看保證訊息傳遞的場景。 Kafka Producer有一個參數 確認,它允許您配置在多少次確認後叢集領導者需要考慮訊息已成功寫入。 此參數可以採用以下值:

  • 0 — 不考慮確認。
  • 1 是預設參數,只需要 1 個副本即可確認。
  • −1 — 需要所有同步副本的確認(叢集設置 最小同步副本數).

從列出的值可以清楚地看出,acks等於-1給出了訊息不會丟失的最強保證。

眾所周知,分散式系統是不可靠的。 為了防止瞬時故障,Kafka Producer 提供了選項 重試,它允許您設定重新發送嘗試的次數 交付.超時.ms。 由於 retries 參數的預設值為 Integer.MAX_VALUE (2147483647),因此可以透過僅變更 payment.timeout.ms 來調整訊息重試次數。

我們正朝著一次性交付的方向邁進

列出的設定允許我們的生產者以高保證傳遞訊息。 現在我們來談談如何確保一則訊息只有一份副本寫入到Kafka主題中? 在最簡單的情況下,要做到這一點,你需要在Producer上設定參數 啟用冪等性 為真。 冪等性保證只有一則訊息寫入主題的特定分區。 啟用冪等性的前提是值 acks = 全部,重試 > 0,max.in.flight.requests.per.connection ≤ 5。 如果開發者沒有指定這些參數,則會自動設定上述值。

配置冪等性時,需要確保相同的訊息每次都出現在相同的分區中。 這可以透過將partitioner.class鍵和參數設為Producer來完成。 讓我們從鑰匙開始。 每次提交的內容必須相同。 這可以透過使用原始貼文中的任何企業 ID 輕鬆實現。 Partitioner.class 參數有一個預設值 - 預設分區器。 使用這種分區策略,預設情況下我們的行為如下:

  • 如果發送訊息時明確指定了分區,那麼我們就使用它。
  • 如果未指定分區,但指定了鍵,則透過鍵的雜湊值選擇分區。
  • 如果未指定分割區和鍵,則一一選擇分割區(循環)。

另外,使用金鑰和帶參數的冪等發送 每個連線的最大飛行請求數 = 1 為您提供簡化的 Consumer 訊息處理。 還值得記住的是,如果在叢集上配置了存取控制,那麼您將需要冪等寫入主題的權限。

如果你突然缺乏以 key 冪等傳送的能力,或是 Producer 端的邏輯需要維護不同分割區之間的資料一致性,那麼交易就會派上用場。 此外,使用鍊式事務,您可以有條件地同步Kafka中的記錄,例如與資料庫中的記錄同步。 要啟用向生產者的事務發送,它必須是冪等的並且另外設置 交易ID。 如果您的 Kafka 叢集配置了存取控制,則交易記錄(如冪等記錄)將需要寫入權限,可以使用 transactional.id 中儲存的值透過遮罩授予該權限。

形式上,任何字串(例如應用程式名稱)都可以用作事務識別碼。 但是,如果您使用相同的 transactional.id 啟動相同應用程式的多個實例,那麼第一個啟動的實例將因錯誤而停止,因為 Kafka 會將其視為殭屍進程。

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

為了解決這個問題,我們以主機名稱的形式為應用程式名稱加上後綴,該主機名稱是從環境變數中取得的。

生產者已配置,但 Kafka 上的交易僅控制訊息的範圍。 無論事務狀態如何,訊息都會立即轉到主題,但具有附加的系統屬性。

為了防止此類訊息被Consumer提前讀取到,需要設定參數 隔離等級 讀取已提交值。 這樣的消費者將能夠像以前一樣讀取非事務性訊息,並且只有在提交之後才能讀取事務性訊息。
如果您已經設置了前面列出的所有設置,那麼您就配置了一次交付。 恭喜!

但還有一個細微差別。 Transactional.id,我們上面配置的,其實就是交易前綴。 在事務管理器上,會新增一個序號。 收到的標識符被發送到 transactional.id.expiration.ms,在 Kafka 叢集上配置,預設值為「7 天」。 如果在此期間應用程式沒有收到任何訊息,那麼當您嘗試下一次事務發送時,您將收到 無效PidMapping異常。 然後,事務協調器將為下一個事務發出新的序號。 但是,如果未正確處理 InvalidPidMappingException,則訊息可能會遺失。

代替總數

正如您所看到的,僅僅向 Kafka 發送訊息是不夠的。 您需要選擇參數組合併準備好進行快速變更。 在本文中,我嘗試詳細展示一次性交付設置,並描述了我們遇到的 client.id 和 transactional.id 配置的幾個問題。 以下是生產者和消費者設定的摘要。

製片人:

  1. 確認=全部
  2. 重試 > 0
  3. 啟用.冪等性 = true
  4. max.in.flight.requests.per.connection ≤ 5(1為有序發送)
  5. transactional.id = ${應用程式名稱}-${主機名稱}

消費者:

  1. 隔離等級=已提交讀

為了最大限度地減少未來應用程式中的錯誤,我們在 spring 配置上製作了自己的包裝器,其中已經設定了一些列出的參數的值。

以下是一些自學資料:

來源: www.habr.com

添加評論