卡夫卡如何成为现实

卡夫卡如何成为现实

嘿哈布尔!

我在 Tinkoff 团队工作,该团队正在开发自己的通知中心。 我主要使用Spring boot进行Java开发,并解决项目中出现的各种技术问题。

我们的大多数微服务通过消息代理相互异步通信。 之前,我们使用IBM MQ作为代理,它已经无法应对负载,但同时具有很高的交付保证。

作为替代方案,我们获得了 Apache Kafka,它具有很高的扩展潜力,但不幸的是,需要针对不同场景采用几乎单独的配置方法。 此外,默认情况下在 Kafka 中运行的至少一次传递机制不允许维持开箱即用所需的一致性级别。 接下来,我将分享我们在Kafka配置方面的经验,特别是我将告诉您如何配置和使用Exactly Once Delivery。

保证交货及更多

下面讨论的设置将有助于防止默认连接设置出现的许多问题。 但首先我想注意一个有助于可能的调试的参数。

这会有所帮助 客户端.id 对于生产者和消费者。 乍一看,您可以使用应用程序名称作为值,并且在大多数情况下这都是可行的。 尽管当应用程序使用多个 Consumer 并且您为它们提供相同的 client.id 时,会导致以下警告:

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

如果您想在带有 Kafka 的应用程序中使用 JMX,那么这可能是一个问题。 对于这种情况,最好使用应用程序名称和主题名称的组合作为 client.id 值。 我们的配置结果可以在命令输出中看到 卡夫卡消费者组 来自 Confluence 的实用程序:

卡夫卡如何成为现实

现在让我们看一下保证消息传递的场景。 Kafka Producer有一个参数 确认,它允许您配置在多少次确认后集群领导者需要考虑消息已成功写入。 该参数可以采用以下值:

  • 0 — 不考虑确认。
  • 1 是默认参数,只需要 1 个副本即可确认。
  • −1 — 需要所有同步副本的确认(集群设置 最小同步副本数).

从列出的值可以清楚地看出,acks等于-1给出了消息不会丢失的最强保证。

众所周知,分布式系统是不可靠的。 为了防止瞬时故障,Kafka Producer 提供了选项 重试,它允许您设置重新发送尝试的次数 交付.超时.ms。 由于retries参数的默认值为Integer.MAX_VALUE (2147483647),因此可以通过仅更改delivery.timeout.ms来调整消息重试次数。

我们正朝着一次性交付的方向迈进

列出的设置允许我们的生产者以高保证传递消息。 现在我们来谈谈如何确保一条消息只有一份副本写入到Kafka主题中? 在最简单的情况下,要做到这一点,你需要在Producer上设置参数 启用幂等性 为真。 幂等性保证只有一条消息写入一个主题的特定分区。 启用幂等性的前提是值 acks = 全部,重试 > 0,max.in.flight.requests.per.connection ≤ 5。 如果开发者没有指定这些参数,则会自动设置上述值。

配置幂等性时,需要保证相同的消息每次都出现在相同的分区中。 这可以通过将partitioner.class键和参数设置为Producer来完成。 让我们从钥匙开始吧。 每次提交的内容必须相同。 这可以通过使用原始帖子中的任何企业 ID 轻松实现。 Partitioner.class 参数有一个默认值 - 默认分区器。 使用这种分区策略,默认情况下我们的行为如下:

  • 如果发送消息时明确指定了分区,那么我们就使用它。
  • 如果未指定分区,但指定了键,则通过键的哈希值选择分区。
  • 如果未指定分区和键,则一一选择分区(循环)。

另外,使用密钥和带参数的幂等发送 每个连接的最大飞行请求数 = 1 为您提供简化的消费者消息处理。 还值得记住的是,如果在集群上配置了访问控制,那么您将需要幂等写入主题的权限。

如果你突然缺乏按 key 幂等发送的能力,或者 Producer 端的逻辑需要维护不同分区之间的数据一致性,那么事务就会派上用场。 此外,使用链式事务,您可以有条件地同步Kafka中的记录,例如与数据库中的记录同步。 要启用向生产者的事务发送,它必须是幂等的并且另外设置 交易ID。 如果您的 Kafka 集群配置了访问控制,则事务记录(如幂等记录)将需要写入权限,可以使用 transactional.id 中存储的值通过掩码授予该权限。

形式上,任何字符串(例如应用程序名称)都可以用作事务标识符。 但是,如果您使用相同的 transactional.id 启动同一应用程序的多个实例,那么第一个启动的实例将因错误而停止,因为 Kafka 会将其视为僵尸进程。

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

为了解决这个问题,我们以主机名的形式为应用程序名称添加后缀,该主机名是从环境变量中获取的。

生产者已配置,但 Kafka 上的事务仅控制消息的范围。 无论事务状态如何,消息都会立即转到主题,但具有附加的系统属性。

为了防止此类消息被Consumer提前读取到,需要设置参数 隔离级别 读取已提交值。 这样的消费者将能够像以前一样读取非事务性消息,并且只有在提交之后才能读取事务性消息。
如果您已经设置了前面列出的所有设置,那么您就配置了一次交付。 恭喜!

但还有一个细微差别。 Transactional.id,我们上面配置的,实际上就是交易前缀。 在事务管理器上,会添加一个序列号。 收到的标识符被发送到 transactional.id.expiration.ms,在 Kafka 集群上配置,默认值为“7 天”。 如果在此期间应用程序没有收到任何消息,那么当您尝试下一次事务发送时,您将收到 无效PidMapping异常。 然后,事务协调器将为下一个事务发出新的序列号。 但是,如果未正确处理 InvalidPidMappingException,消息可能会丢失。

代替总数

正如您所看到的,仅仅向 Kafka 发送消息是不够的。 您需要选择参数组合并准备好进行快速更改。 在本文中,我尝试详细展示一次性交付设置,并描述了我们遇到的 client.id 和 transactional.id 配置的几个问题。 以下是生产者和消费者设置的摘要。

制片人:

  1. 确认=全部
  2. 重试 > 0
  3. 启用.幂等性 = true
  4. max.in.flight.requests.per.connection ≤ 5(1为有序发送)
  5. transactional.id = ${应用程序名称}-${主机名}

消费者:

  1. 隔离级别=已提交读

为了最大限度地减少未来应用程序中的错误,我们在 spring 配置上制作了自己的包装器,其中已经设置了一些列出的参数的值。

以下是一些自学材料:

来源: habr.com

添加评论