
嘿哈布爾!
我在 Tinkoff 團隊工作,該團隊正在開發自己的通知中心。我主要使用 Spring boot 在 Java 中進行開發並解決專案中出現的各種技術問題。
我們的大多數微服務透過訊息代理非同步地相互通訊。以前,我們使用 IBM MQ 作為代理,它無法再應對負載,但同時具有較高的交付保證。
作為替代方案,我們獲得了 Apache Kafka,它具有很高的可擴展性潛力,但不幸的是,它需要針對不同場景採用幾乎單獨的配置方法。此外,Kafka 中預設運作的至少一次傳送機制無法維持所需的開箱即用的一致性等級。接下來,我將分享我們在配置 Kafka 的經驗,特別是我將告訴您如何設定和使用精確一次交付。
保證送貨及更多
下面討論的設定將有助於防止預設連線設定出現的許多問題。但首先我想專注於一個有助於調試的參數。
這將有助於 客戶端 ID 針對生產者和消費者。乍一看,似乎可以使用應用程式名稱作為值,並且在大多數情況下,這樣做都是可行的。但是,當應用程式使用多個消費者並將它們設定為相同的client.id時,會導致以下警告:
org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0如果您想在具有 Kafka 的應用程式中使用 JMX,這可能會是一個問題。對於這種情況,最好使用應用程式名稱和主題名稱的組合作為 client.id 值。我們的配置結果可以在命令輸出中看到 kafka-consumer-groups 來自 Confluent 實用程式:

現在讓我們來看看保證訊息傳遞的場景。 Kafka Producer 有一個參數 確認,它允許您配置叢集領導應在多少次確認之後才認為訊息已成功寫入。此參數可以採用以下值:
- 0-不會考慮確認。
- 1 是預設參數,只有 1 個副本需要確認。
- -1 — 需要所有同步副本的確認(叢集設定) 最小同步副本數).
從列出的值可以清楚地看出,acks 等於 -1 可以提供最強的保證,確保訊息不會遺失。
眾所周知,分散式系統不可靠。為了防止臨時故障,Kafka Producer 提供了參數 重試,它允許您設定重試次數 遞送超時毫秒。由於 retries 參數的預設值為 Integer.MAX_VALUE (2147483647),因此只需變更 delivery.timeout.ms 即可調整訊息重新傳送的次數。
邁向「精確一次」交付
列出的設定允許我們的生產者以高保證傳遞訊息。現在讓我們來討論如何確保一則訊息只有一個副本寫入 Kafka 主題?最簡單的情況,需要在Producer上設定參數 啟用冪等性 為真。冪等性保證只有一則訊息寫入單一主題的特定分區。實現冪等性的前提是值 確認數 = 全部,重試次數 > 0,每個連線的最大飛行請求數 ≤ 5。如果開發人員未設定這些參數,則會自動設定上面指定的值。
一旦配置了冪等性,就需要確保相同的訊息每次都出現在相同的分區中。這可以透過在 Producer 上配置 partioner.class 鍵和參數來完成。讓我們從關鍵開始。每批貨物必須相同。透過使用原始訊息中的任何業務標識符即可輕鬆實現這一點。 partioner.class 參數的預設值為 。使用此預設分區策略,我們進行如下操作:
- 如果在發送訊息時明確指定了分區,我們就使用它。
- 如果未指定分區,但指定了鍵,則我們透過鍵的雜湊值來選擇分區。
- 如果未指定分區和鍵,我們將逐一選擇分區(循環)。
此外,使用金鑰和冪等發送參數 每個連線的最大飛行請求數 = 1 為您提供消費者上訊息的有序處理。還值得記住的是,如果您的叢集上配置了存取控制,您將需要對該主題的冪等寫入權限。
如果突然缺少了按鍵冪等發送的能力,或者生產者端的邏輯需要維護不同分區之間的資料一致性,那麼事務將會來救援。此外,使用鍊式事務,您可以有條件地同步 Kafka 中的記錄,例如,與資料庫中的記錄。為了能夠向生產者進行事務發送,它必須具有冪等性,並且另外指定 事務ID。如果您在 Kafka 叢集上配置了存取控制,那麼交易寫入(如冪等寫入)將需要寫入權限,並且可以使用儲存在 transactional.id 中的值透過遮罩授予該權限。
形式上,任何字串(例如應用程式名稱)都可以用作事務識別碼。但是,如果使用相同的 transactional.id 啟動相同應用程式的多個實例,則第一個啟動的實例將因錯誤而停止,因為 Kafka 會將其視為殭屍進程。
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.為了解決這個問題,我們在應用程式名稱後面加上從環境變數中取得的主機名稱。
生產者已配置,但 Kafka 上的交易僅管理訊息範圍。無論交易狀態如何,訊息都會立即轉到主題,但具有額外的系統屬性。
為了防止此類訊息被消費者提前讀取,需要設定參數 隔離等級 為 read_committed 值。這樣的消費者將能夠像以前一樣讀取非事務性訊息,並且只能在提交後讀取事務性訊息。
如果您已設定上面列出的所有設置,那麼您已設定了一次傳送。恭喜!
但還有一個細微差別。上面我們配置的Transactional.id其實就是一個事務前綴。在事務管理器上,會為其新增一個序號。收到的標識符被發給 transactional.id.expiration.ms,此配置在Kafka叢集上,預設值為「7天」。如果應用程式在此期間沒有收到任何訊息,那麼當您嘗試發送下一個交易時,您將收到 無效Pid映射異常。此後,事務協調器將為下一個事務發出新的序號。但是,如果未正確處理 InvalidPidMappingException,則訊息可能會遺失。
代替總數
如您所見,僅僅向 Kafka 發送訊息是不夠的。您需要選擇參數組合併準備好進行快速變更。在本文中,我嘗試詳細展示一次交付設置,並描述我們遇到的一些 client.id 和 transactional.id 配置問題。以下是生產者和消費者設定的摘要。
製片人:
- 確認 = 全部
- 重試次數 > 0
- 啟用.冪等性=真
- max.in.flight.requests.per.connection ≤ 5(1 - 有序發送)
- transactional.id = ${application-name}-${hostname}
消費者:
- 隔離.等級=讀取已提交
為了最大限度地減少未來應用中的錯誤,我們在 spring 配置上製作了自己的包裝器,其中一些列出的參數的值已經設定好了。
以下是一些可供獨立學習的材料:
來源: www.habr.com
