重新處理從 Kafka 收到的事件

重新處理從 Kafka 收到的事件

嘿哈布爾。

最近我 分享經驗 關於我們團隊中最常對 Kafka Producer 和 Consumer 使用哪些參數,以便更接近保證交付。 在本文中,我想告訴您我們如何組織重新處理由於外部系統暫時不可用而從 Kafka 收到的事件。

現代應用程序在非常複雜的環境中運行。 業務邏輯封裝在現代技術堆棧中,在由 Kubernetes 或 OpenShift 等編排器管理的 Docker 映像中運行,並通過物理和虛擬路由器鏈與其他應用程序或企業解決方案進行通信。 在這樣的環境中,某些東西總是可能出現故障,因此在外部系統之一不可用的情況下重新處理事件是我們業務流程的重要組成部分。

卡夫卡之前是怎樣的

在項目的早期,我們使用 IBM MQ 進行異步消息傳遞。 如果服務運行期間發生任何錯誤,則可以將接收到的消息放入死信隊列(DLQ)中以供進一步手動解析。 DLQ 在傳入隊列旁邊創建,消息傳輸在 IBM MQ 內部進行。

如果錯誤是暫時的並且我們可以檢測到它(例如,HTTP 調用上的 ResourceAccessException 或 MongoDb 查詢上的 MongoTimeoutException),則重試策略將生效。 無論應用程序邏輯如何分支,原始消息都會傳輸到系統隊列以進行延遲發送,或者傳輸到曾經重新發送消息的單獨應用程序。 在這種情況下,重傳次數寫入消息的標頭中,該標頭與延遲間隔或應用程序級別的策略結尾相關聯。 如果我們已經到達策略的末尾,但是外部系統仍然不可用,那麼消息將被放置在DLQ中以供手動解析。

搜索解決方案

網上搜索,可以發現如下 決定。 簡而言之,建議為每個延遲間隔創建一個主題,並在應用程序端實現Consumer,它將以所需的延遲讀取消息。

重新處理從 Kafka 收到的事件

儘管有大量正面評價,但在我看來,它並不完全成功。 首先,因為開發人員除了實現業務需求之外,還必須花費大量時間來實現所描述的機制。

此外,如果在 Kafka 集群上啟用了訪問控制,那麼您將必須花費一些時間設置主題並提供對它們的必要訪問。 除此之外,您還需要為每個重試主題選擇正確的retention.ms參數,以便消息有時間重新發送並且不會從消息中消失。 對於每個現有的或新的服務,必須重複實施和訪問請求。

現在讓我們看看 spring 和 spring-kafka 為我們提供了哪些重新處理消息的機制。 Spring-kafka 對 spring-retry 有傳遞依賴,它提供了管理不同 BackOffPolicies 的抽象。 這是一個相當靈活的工具,但其顯著缺點是它將重新提交的消息存儲在應用程序的內存中。 這意味著由於更新或服務中錯誤而重新啟動應用程序將導致所有待重新處理的消息丟失。 由於此項對我們的系統至關重要,因此我們沒有進一步考慮它。

spring-kafka 本身提供了 ContainerAwareErrorHandler 的幾種實現,例如 SeekToCurrentErrorHandler,使用它,您可以稍後處理消息,而無需在發生錯誤時移動偏移量。 從 spring-kafka 2.3 開始,可以設置 BackOffPolicy。

這種方法允許重新處理的消息在應用程序重新啟動後繼續存在,但仍然缺少 DLQ 機制。 我們在 2019 年初選擇了這個選項,樂觀地認為不需要 DLQ(我們很幸運,在使用這樣的再處理系統運行應用程序幾個月後,我們確實不需要它)。 臨時錯誤導致 SeekToCurrentErrorHandler 觸發。 剩餘的錯誤被打印到日誌中,導致偏移量offset,並繼續處理下一條消息。

最終決定

基於SeekToCurrentErrorHandler的實現促使我們開發自己的消息重發機制。

首先,我們想利用現有的經驗,並根據應用邏輯進行擴展。 對於線性邏輯應用程序,最好在短時間內停止讀取新消息,並將其設置為重試策略的一部分。 對於其他應用程序,我希望有一個單點來確保重新調用策略的執行。 此外,該單點必須具有適用於兩種方法的 DLQ 功能。

重試策略本身必須存儲在應用程序中,應用程序負責在發生瞬態錯誤時獲取下一個間隔。

停止線性應用程序的使用者

使用 spring-kafka 時,停止 Consumer 的代碼可能如下所示:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

在示例中,retryAt 是重新啟動 MessageListenerContainer(如果它仍在運行)的時間。 重啟將發生在TaskScheduler中運行的單獨線程中,其實現也是由spring提供的。

我們通過以下方式找到retryAt的值:

  1. 查找重複調用計數器的值。
  2. 根據計數器的值,查找當前重試策略中的延遲間隔。 該策略在應用程序本身中聲明;我們選擇 JSON 格式來存儲它。
  3. JSON 數組中的間隔包含秒數,在此之後需要重複處理。 將此秒數添加到當前時間以形成 retryAt 的值。
  4. 如果沒有找到間隔,則retryAt為空,消息被發送到DLQ進行手動解析。

使用這種方法,只需將當前正在處理的每條消息的重試次數存儲在例如應用程序的內存中。 將嘗試計數器保存在內存中對於這種方法並不重要,因為線性邏輯應用程序無法作為一個整體進行處理。 與 spring-retry 不同,重新啟動應用程序不會丟失所有重新處理的消息,而只是重新啟動策略。

此方法有助於減輕因負載過重而可能不可用的外部系統的負載。 也就是說,除了重新處理之外,我們還實現了模式的實現 斷路器.

在我們的例子中,錯誤閾值僅為 1,為了最大限度地減少由於臨時網絡中斷而導致的系統停機時間,我們使用了非常細粒度的重試策略和低延遲間隔。 這可能並不適合所有的群體應用,因此必鬚根據系統的特點來選擇誤差閾值和區間值之間的關係。

用於處理來自具有非確定性邏輯的應用程序的消息的單獨應用程序

以下是向此類應用程序 (Retryer) 發送消息的代碼示例,當到達 RETRY_AT 時間時,該應用程序將重新提交到 DESTINATION 主題:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

該示例顯示大量信息是在標頭中傳輸的。 RETRY_AT 值的查找方式與消費者的停止重試機制相同。 除了 DESTINATION 和 RETRY_AT 之外,我們還傳遞:

  • GROUP_ID,我們通過它對消息進行分組以便於手動分析和更輕鬆的搜索。
  • ORIGINAL_PARTITION 嘗試保留相同的 Consumer 以便重新解析。 該參數可以為空,此時新的分區將從原始消息的record.key()中獲取。
  • 更新了 COUNTER 值以遵循重試策略。
  • SEND_TO 是一個常量,指示當達到 RETRY_AT 時是否發送消息進行重新處理或將其放入 DLQ 中。
  • REASON - 消息處理中斷的原因。

Retryer 在 PostgreSQL 中存儲用於重新提交和手動解析的消息。 計時器啟動一個任務,查找帶有 RETRY_AT 的消息,並使用 record.key() 鍵將它們發送回 DESTINATION 主題的 ORIGINAL_PARTITION。

發送後,消息將從 PostgreSQL 中刪除。 消息的手動解析發生在一個簡單的 UI 中,該 UI 通過 REST API 與 Retryer 交互。 其主要功能是從 DLQ 中重新發送或刪除消息、查看錯誤信息以及搜索消息(例如按錯誤名稱)。

由於我們的集群啟用了訪問控制,因此我們需要額外請求訪問 Retryer 正在偵聽的主題,並允許 Retryer 寫入 DESTINATION 主題。 這很不方便,但是與間隔主題方法不同,我們有一個成熟的 DLQ 和 UI 來管理它。

在某些情況下,傳入的主題會被多個不同的消費者組讀取,這些消費者組的應用程序實現不同的邏輯。 通過重試器為其中一個應用程序重新處理消息將導致在另一個應用程序上產生重複消息。 為了防止這種情況,我們啟動了一個單獨的主題來進行重新處理。 傳入的主題和重試的主題可以由同一個 Consumer 讀取,沒有任何限制。

重新處理從 Kafka 收到的事件

默認情況下,此方法不提供斷路器功能,但可以使用以下命令將其添加到應用程序中: Spring Cloud Netflix 或新的 春雲斷路器,通過將外部服務調用站點包裝在適當的抽像中。 此外,還可以選擇策略 艙壁 模式,這也很有用。 例如,在 spring-cloud-netflix 中,這可能是線程池或信號量。

產量

因此,我們得到了一個單獨的應用程序,允許您在外部系統暫時不可用時重複處理消息。

該應用程序的主要優點之一是它可以被運行在同一 Kafka 集群上的外部系統使用,而無需對其進行重大修改! 這樣的應用程序只需要訪問重試主題,填寫一些 Kafka 標頭並向重試器發送消息。 不需要增加額外的基礎設施。 為了減少從應用程序到 Retryer 並返回的消息數量,我們識別了具有線性邏輯的應用程序,並通過 Consumer stop 重新處理它們。

來源: www.habr.com

添加評論