カフカはどのようにして現実になったのか

カフカはどのようにして現実になったのか

おい、ハブル!

私は独自の通知センターを開発している Tinkoff チームで働いています。 主に Spring Boot を使用して Java で開発し、プロジェクト内で発生するさまざまな技術的問題を解決します。

マイクロサービスのほとんどは、メッセージ ブローカーを通じて非同期に相互通信します。 以前は、IBM MQ をブローカーとして使用していましたが、負荷に対処できなくなりましたが、同時に高い配信保証がありました。

代わりに、Apache Kafka が提供されました。これは高い拡張性を備えていますが、残念なことに、さまざまなシナリオに合わせて構成するにはほぼ個別のアプローチが必要です。 さらに、Kafka でデフォルトで機能する少なくとも XNUMX 回の配信メカニズムでは、そのままでは必要なレベルの一貫性を維持できませんでした。 次に、Kafka 構成に関する経験を共有します。特に、XNUMX 回限りの配信を構成して使用する方法について説明します。

配達保証など

以下で説明する設定は、デフォルトの接続設定に伴う多くの問題を防ぐのに役立ちます。 ただし、最初に、デバッグを容易にする XNUMX つのパラメータに注目したいと思います。

これは役に立ちます クライアントID 生産者と消費者にとって。 一見すると、アプリケーション名を値として使用でき、ほとんどの場合、これで機能します。 ただし、アプリケーションが複数のコンシューマーを使用し、それらに同じ client.id を指定すると、次の警告が表示されます。

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

Kafka を使用するアプリケーションで JMX を使用する場合、これが問題になる可能性があります。 この場合、アプリケーション名とトピック名などを組み合わせて client.id 値として使用するのが最善です。 構成の結果はコマンド出力で確認できます。 kafka-消費者グループ Confluent のユーティリティから:

カフカはどのようにして現実になったのか

次に、メッセージ配信を保証するシナリオを見てみましょう。 Kafka プロデューサーにはパラメーターがあります ACKこれにより、クラスター リーダーがメッセージが正常に書き込まれたとみなすために必要な確認応答の数を構成できます。 このパラメータには次の値を指定できます。

  • 0 — 確認応答は考慮されません。
  • 1 はデフォルトのパラメータであり、確認応答に必要なレプリカは 1 つだけです。
  • −1 — すべての同期されたレプリカからの確認応答が必要です (クラスターのセットアップ) min.insync.レプリカ).

リストされた値から、acks が -1 に等しい場合、メッセージが失われないという最も強力な保証が与えられることは明らかです。

周知のとおり、分散システムは信頼性が低いです。 一時的な障害から保護するために、Kafka プロデューサーには次のオプションが用意されています。 再試行、再送信試行回数を設定できます。 配信.タイムアウト.ms。 retries パラメーターのデフォルト値は Integer.MAX_VALUE (2147483647) であるため、メッセージの再試行回数はdelivery.timeout.ms のみを変更することで調整できます。

正確に XNUMX 回の配信を目指して進んでいます

リストされている設定により、プロデューサーは高い保証でメッセージを配信できます。 ここで、メッセージのコピーが XNUMX つだけ Kafka トピックに書き込まれるようにする方法について話しましょう。 最も単純なケースでは、これを行うには、プロデューサーにパラメーターを設定する必要があります。 冪等性を有効にする 本当のこと。 冪等性により、XNUMX つのトピックの特定のパーティションに XNUMX つのメッセージだけが書き込まれることが保証されます。 冪等性を有効にするための前提条件は値です。 acks = すべて、再試行 > 0、接続ごとの最大 in.flight.requests ≤ 5。 これらのパラメータが開発者によって指定されていない場合、上記の値が自動的に設定されます。

冪等性が設定されている場合は、同じメッセージが毎回同じパーティションに到達するようにする必要があります。 これを行うには、partitioner.class キーとパラメーターをプロデューサーに設定します。 鍵から始めましょう。 各提出物で同じである必要があります。 これは、元の投稿のビジネス ID を使用することで簡単に実現できます。 Partitioner.classパラメータにはデフォルト値があります- デフォルトパーティショナ。 このパーティショニング戦略では、デフォルトで次のように動作します。

  • メッセージの送信時にパーティションが明示的に指定されている場合は、それが使用されます。
  • パーティションが指定されていないが、キーが指定されている場合は、キーのハッシュによってパーティションを選択します。
  • パーティションとキーが指定されていない場合は、パーティションを XNUMX つずつ選択します (ラウンドロビン)。

また、キーを使用し、パラメーターを使用して冪等で送信する 接続ごとの最大フライトリクエスト数 = 1 Consumer でのメッセージ処理を合理化します。 また、クラスターにアクセス制御が構成されている場合は、トピックに冪等に書き込む権限が必要になることにも注意してください。

突然、キーによる冪等送信機能がなくなったり、プロデューサー側のロジックで異なるパーティション間でデータの一貫性を維持する必要がある場合には、トランザクションが役に立ちます。 さらに、チェーン トランザクションを使用すると、たとえば Kafka のレコードをデータベースのレコードと条件付きで同期できます。 プロデューサへのトランザクション送信を有効にするには、べき等であり、さらに設定する必要があります。 トランザクションID。 Kafka クラスターにアクセス制御が構成されている場合、冪等レコードなどのトランザクション レコードには書き込みアクセス許可が必要になります。これは、transactional.id に格納されている値を使用してマスクによって付与できます。

正式には、アプリケーション名などの任意の文字列をトランザクション識別子として使用できます。 ただし、同じtransactional.idを使用して同じアプリケーションの複数のインスタンスを起動すると、最初に起動したインスタンスはエラーで停止します。これは、Kafkaがそれをゾンビプロセスと見なすためです。

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

この問題を解決するには、環境変数から取得したホスト名の形式でアプリケーション名にサフィックスを追加します。

プロデューサは構成されていますが、Kafka 上のトランザクションはメッセージのスコープのみを制御します。 トランザクションのステータスに関係なく、メッセージはすぐにトピックに移動しますが、追加のシステム属性があります。

このようなメッセージがコンシューマによって事前に読み取られるのを防ぐには、パラメータを設定する必要があります。 分離レベル read_committed 値に。 このようなコンシューマは、以前と同様に非トランザクション メッセージを読み取ることができますが、トランザクション メッセージはコミット後にのみ読み取ることができます。
前にリストしたすべての設定を行っている場合は、XNUMX 回限りの配信が構成されています。 おめでとう!

しかし、もう一つニュアンスがあります。 上記で構成した Transactional.id は、実際にはトランザクションのプレフィックスです。 トランザクション マネージャーでは、シーケンス番号が追加されます。 受信した識別子は次のように発行されます。 transactional.id.expiration.msこれは Kafka クラスター上に構成されており、デフォルト値は「7 日」です。 この間にアプリケーションがメッセージを受信しなかった場合、次のトランザクション送信を試行すると、次のメッセージが送信されます。 InvalidPidMappingException。 トランザクション コーディネーターは、次のトランザクションの新しいシーケンス番号を発行します。 ただし、InvalidPidMappingException が正しく処理されない場合、メッセージが失われる可能性があります。

合計の代わりに

ご覧のとおり、単に Kafka にメッセージを送信するだけでは十分ではありません。 パラメータの組み合わせを選択し、すぐに変更できるように準備する必要があります。 この記事では、XNUMX 回限りの配信設定を詳細に示し、遭遇した client.id およびtransactional.id 設定に関するいくつかの問題について説明しました。 以下は、プロデューサーとコンシューマーの設定の概要です。

プロデューサー:

  1. ACK = すべて
  2. 再試行回数 > 0
  3. イネーブル.冪等性 = true
  4. max.in.flight.requests.per.connection ≤ 5 (順序どおりに送信する場合は 1)
  5. transactional.id = ${アプリケーション名}-${ホスト名}

消費者:

  1. isolation.level = read_committed

将来のアプリケーションでのエラーを最小限に抑えるために、リストされたパラメータの一部の値がすでに設定されているスプリング構成に対する独自のラッパーを作成しました。

独習用の教材をいくつか紹介します。

出所: habr.com

コメントを追加します