カフカはどのようにして現実になったのか

カフカはどのようにして現実になったのか

おい、ハブル!

私はTinkoffチームで働いており、独自の通知センターを開発しています。主にJavaとSpring Bootを使って開発を行い、プロジェクトで発生する様々な技術的問題を解決しています。

当社のマイクロサービスのほとんどは、メッセージブローカーを介して非同期的に通信しています。以前はIBM MQをブローカーとして使用していましたが、負荷に耐えられなくなっていましたが、同時に高い配信保証も備えていました。

代替案として、高いスケーラビリティを秘めたApache Kafkaを提案されましたが、残念ながら、シナリオごとにほぼ個別に設定を行う必要がありました。さらに、Kafkaでデフォルトで動作する「少なくとも1回」の配信メカニズムでは、必要なレベルの一貫性をそのまま維持することができませんでした。以下では、Kafkaの設定方法について、特に「1回限りの配信」の設定方法と運用方法について解説します。

配達保証など

以下で説明するパラメータは、デフォルトの接続設定で発生する可能性のある多くの問題を防ぐのに役立ちます。しかしまず、デバッグを容易にするパラメータを1つご紹介します。

これは役に立つ クライアントID ProducerとConsumerについて。一見すると、アプリケーション名を値として使用できるように見えますが、ほとんどの場合、これで問題ありません。しかし、アプリケーションが複数のConsumerを使用し、それらに同じclient.idを設定すると、次の警告が表示されます。

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

Kafka を使ったアプリケーションで JMX を使用する場合、これが問題になる可能性があります。この場合、client.id の値として、アプリケーション名とトピック名などの組み合わせを使用するのが最適です。設定の結果は、コマンドの出力で確認できます。 Kafka コンシューマーグループ Confluent ユーティリティから:

カフカはどのようにして現実になったのか

それでは、保証されたメッセージ配信のシナリオを見てみましょう。Kafkaプロデューサーにはパラメータがあります。 ACKクラスタリーダーがメッセージの書き込みに成功したと判断する確認応答回数を設定できます。このパラメータには以下の値を指定できます。

  • 0 - 確認応答は考慮されません。
  • デフォルトのパラメータは 1 で、確認応答が必要なレプリカは 1 つだけです。
  • -1 — すべての同期レプリカからの確認応答が必要(クラスタ設定) min.insync.レプリカ).

リストされた値から、acks が -1 に等しい場合、メッセージが失われないことが最も強く保証されることがわかります。

ご存知の通り、分散システムは信頼性に欠けます。一時的な障害から保護するために、Kafka Producerは次のパラメータを提供しています。 再試行再試行回数を設定できます。 配信タイムアウト(ミリ秒)retriesパラメータのデフォルト値はInteger.MAX_VALUE(2147483647)なので、delivery.timeout.msを変更するだけでメッセージの再送信回数を調整できます。

1回限りの配信に向けて

上記の設定により、プロデューサーは高い保証でメッセージを配信できます。では、Kafkaトピックに書き込まれるメッセージのコピーが1つだけであることを保証する方法について説明しましょう。最も単純なケースでは、プロデューサーのパラメータを設定する必要があります。 有効にするべき等性 をtrueに設定します。冪等性により、1つのトピックの特定のパーティションには1つのメッセージのみが書き込まれるようになります。冪等性を有効にするには、次の値が前提条件となります。 確認応答 = すべて、再試行 > 0、接続あたりの最大飛行中リクエスト数 ≤ 5開発者がこれらのパラメータを設定しない場合は、上記の値が自動的に設定されます。

冪等性を確立したら、同じメッセージが毎回同じパーティションに送信されるよう保証する必要があります。これは、プロデューサーのキーとpartitioner.classパラメータを設定することで実現できます。まずはキーから見ていきましょう。キーは送信ごとに同じである必要があります。これは、元のメッセージのビジネス識別子を使用することで簡単に実現できます。partitioner.classパラメータのデフォルト値は デフォルトパーティショナーこのデフォルトのパーティション分割戦略では、次のように進めます。

  • メッセージを送信するときにパーティションが明示的に指定されている場合は、それを使用します。
  • パーティションが指定されていないが、キーが指定されている場合は、キーのハッシュによってパーティションを選択します。
  • パーティションとキーが指定されていない場合は、パーティションを 1 つずつ選択します (ラウンドロビン)。

また、キーとべき等送信をパラメータとともに使用すると 接続あたりの最大飛行中リクエスト数 = 1 Consumer上でメッセージを整然と処理できます。なお、クラスターでアクセス制御が設定されている場合は、トピックへの冪等書き込み権限が必要になることにご注意ください。

キーによる冪等送信が突然不可能になったり、プロデューサー側のロジックで異なるパーティション間のデータ一貫性を維持する必要がある場合、トランザクションが役に立ちます。さらに、チェーントランザクションを使用することで、例えばKafka内のレコードをDB内のレコードと条件付きで同期できます。プロデューサーへのトランザクション送信を有効にするには、プロデューサーが冪等性を備え、さらに以下の指定が必要です。 トランザクションIDKafka クラスターにアクセス制御が設定されている場合、冪等な書き込みなどのトランザクション書き込みには書き込み権限が必要になり、これは transactional.id に格納されている値を使用してマスクによって付与できます。

技術的には、アプリケーション名など、任意の文字列をトランザクション識別子として使用できます。ただし、同じアプリケーションのインスタンスを同じtransactional.idで複数起動した場合、最初に起動されたインスタンスはKafkaによってゾンビプロセスとみなされ、エラーで終了されます。

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

この問題を解決するには、環境変数から取得したホスト名をアプリケーション名に接尾辞として追加します。

プロデューサーは設定されていますが、Kafka 上のトランザクションはメッセージのスコープのみを制御します。トランザクションのステータスに関わらず、メッセージはすぐにトピックに追加されますが、追加のシステム属性が付与されます。

このようなメッセージが事前にコンシューマによって読み取られるのを防ぐには、パラメータを設定する必要があります。 分離レベル read_committed 値に設定されます。このようなコンシューマは、非トランザクションメッセージはこれまで通り読み取り可能となり、トランザクションメッセージはコミット後にのみ読み取り可能となります。
上記の設定をすべて完了すると、1回限りの配信の設定が完了します。おめでとうございます!

しかし、もう一つ微妙な違いがあります。上で設定したTransactional.idは、実際にはトランザクションのプレフィックスです。トランザクションマネージャでは、これにシーケンス番号が追加されます。結果として得られる識別子は、 トランザクションIDの有効期限これはKafkaクラスタ上で設定されており、デフォルト値は「7日間」です。この期間にアプリケーションがメッセージを受信しない場合、次のトランザクションを送信しようとすると、次のメッセージが送信されます。 無効なPidMappingExceptionトランザクションコーディネータは、次のトランザクションのために新しいシーケンス番号を発行します。ただし、InvalidPidMappingException が正しく処理されない場合、メッセージが失われる可能性があります。

合計の代わりに

ご覧のとおり、Kafka にメッセージを送信するだけでは不十分です。パラメータの組み合わせを選択し、迅速な変更に対応できるように準備しておく必要があります。この記事では、1 回限りの配信設定の詳細を示し、実際に発生した client.id と transactional.id の設定に関する問題についていくつか説明しました。以下は、プロデューサーとコンシューマーの設定の概要です。

プロデューサー:

  1. acks = すべて
  2. 再試行 > 0
  3. enable.idempotence = true
  4. 接続あたりの最大飛行リクエスト数 ≤ 5 (順序付き送信の場合は 1)
  5. transactional.id = ${アプリケーション名}-${ホスト名}

消費者:

  1. 分離レベル = read_committed

将来のアプリケーションでのエラーを最小限に抑えるために、リストされているパラメーターの一部の値がすでに設定されているスプリング構成の上に独自のラッパーを作成しました。

ここに自主学習用の教材をいくつか紹介します。

出所: habr.com

DDoS 保護機能を備えた信頼性の高いサイト用ホスティング、VPS VDS サーバーを購入する 🔥 DDoS攻撃対策付きの信頼性の高いウェブサイトホスティング、VPS/VDSサーバーを購入しましょう | ProHoster