Comment Kafka est devenu réalité

Comment Kafka est devenu réalité

Hé Habr !

Je travaille dans l'équipe Tinkoff, qui développe son propre centre de notification. Je développe principalement en Java en utilisant Spring Boot et résous divers problèmes techniques qui surviennent dans un projet.

La plupart de nos microservices communiquent entre eux de manière asynchrone via un courtier de messages. Auparavant, nous utilisions IBM MQ comme courtier, qui ne pouvait plus faire face à la charge, mais disposait en même temps de garanties de livraison élevées.

En remplacement, on nous a proposé Apache Kafka, qui a un potentiel d'évolutivité élevé, mais qui nécessite malheureusement une approche presque individuelle de la configuration pour différents scénarios. De plus, le mécanisme de livraison au moins une fois qui fonctionne par défaut dans Kafka ne permettait pas de maintenir le niveau de cohérence requis dès le départ. Ensuite, je partagerai notre expérience dans la configuration de Kafka, en particulier, je vous expliquerai comment configurer et vivre avec exactement une seule livraison.

Livraison garantie et plus encore

Les paramètres décrits ci-dessous aideront à éviter un certain nombre de problèmes avec les paramètres de connexion par défaut. Mais je voudrais d’abord faire attention à un paramètre qui facilitera un éventuel débogage.

CA aidera identité du client pour le producteur et le consommateur. À première vue, vous pouvez utiliser le nom de l'application comme valeur, et dans la plupart des cas, cela fonctionnera. Cependant, la situation dans laquelle une application utilise plusieurs consommateurs et que vous leur attribuez le même client.id entraîne l'avertissement suivant :

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

Si vous souhaitez utiliser JMX dans une application avec Kafka, cela pourrait poser un problème. Dans ce cas, il est préférable d'utiliser une combinaison du nom de l'application et, par exemple, du nom du sujet comme valeur client.id. Le résultat de notre configuration peut être vu dans la sortie de la commande groupes-de-consommateurs-kafka à partir des utilitaires de Confluent :

Comment Kafka est devenu réalité

Examinons maintenant le scénario de livraison garantie des messages. Kafka Producer a un paramètre acques, qui vous permet de configurer après combien d'accusés de réception le responsable du cluster doit considérer que le message a été écrit avec succès. Ce paramètre peut prendre les valeurs suivantes :

  • 0 — l’accusé de réception ne sera pas pris en compte.
  • 1 est le paramètre par défaut, une seule réplique est nécessaire pour accuser réception.
  • −1 — un accusé de réception de toutes les répliques synchronisées est requis (configuration du cluster min.insync.replicas).

D'après les valeurs répertoriées, il est clair que les accusés de réception égaux à -1 donnent la plus forte garantie que le message ne sera pas perdu.

Comme nous le savons tous, les systèmes distribués ne sont pas fiables. Pour se protéger contre les pannes transitoires, Kafka Producer offre la possibilité tentatives, qui vous permet de définir le nombre de tentatives de renvoi dans un délai livraison.timeout.ms. Étant donné que le paramètre retries a une valeur par défaut de Integer.MAX_VALUE (2147483647), le nombre de tentatives de message peut être ajusté en modifiant uniquement delivery.timeout.ms.

Nous nous dirigeons vers une livraison en une seule fois

Les paramètres répertoriés permettent à notre producteur de transmettre des messages avec une garantie élevée. Parlons maintenant de la façon de s'assurer qu'une seule copie d'un message est écrite dans un sujet Kafka ? Dans le cas le plus simple, pour ce faire, vous devez définir le paramètre sur Producer activer.idempotence à vrai. L'idempotence garantit qu'un seul message est écrit sur une partition spécifique d'un sujet. La condition préalable pour permettre l’idempotence réside dans les valeurs accusés de réception = tous, réessayer > 0, max.in.flight.requests.per.connection ≤ 5. Si ces paramètres ne sont pas spécifiés par le développeur, les valeurs ci-dessus seront automatiquement définies.

Lorsque l'idempotence est configurée, il faut s'assurer que les mêmes messages finissent à chaque fois dans les mêmes partitions. Cela peut être fait en définissant la clé et le paramètre partitioner.class sur Producer. Commençons par la clé. Il doit être le même pour chaque soumission. Cela peut facilement être réalisé en utilisant l’un des identifiants d’entreprise de la publication d’origine. Le paramètre partitioner.class a une valeur par défaut - Partitionneur par défaut. Avec cette stratégie de partitionnement, par défaut nous agissons comme ceci :

  • Si la partition est explicitement spécifiée lors de l'envoi du message, alors nous l'utilisons.
  • Si la partition n'est pas spécifiée, mais que la clé est spécifiée, sélectionnez la partition par le hachage de la clé.
  • Si la partition et la clé ne sont pas spécifiées, sélectionnez les partitions une par une (round robin).

Aussi, utilisation d'une clé et envoi idempotent avec un paramètre max.in.flight.requests.per.connection = 1 vous offre un traitement rationalisé des messages sur le consommateur. Il convient également de rappeler que si le contrôle d'accès est configuré sur votre cluster, vous aurez alors besoin des droits pour écrire de manière idempotente sur un sujet.

Si soudainement vous n'avez pas les capacités d'envoi idempotent par clé ou si la logique du côté producteur nécessite de maintenir la cohérence des données entre les différentes partitions, alors les transactions viendront à la rescousse. De plus, à l'aide d'une transaction en chaîne, vous pouvez synchroniser conditionnellement un enregistrement dans Kafka, par exemple avec un enregistrement dans la base de données. Pour permettre l'envoi transactionnel au Producteur, il doit être idempotent et en outre défini transactionnel.id. Si votre cluster Kafka dispose d'un contrôle d'accès configuré, alors un enregistrement transactionnel, comme un enregistrement idempotent, aura besoin d'autorisations d'écriture, qui peuvent être accordées par masque en utilisant la valeur stockée dans transactional.id.

Formellement, n'importe quelle chaîne, telle que le nom de l'application, peut être utilisée comme identifiant de transaction. Mais si vous lancez plusieurs instances de la même application avec le même transactional.id, alors la première instance lancée sera arrêtée avec une erreur, puisque Kafka la considérera comme un processus zombie.

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Pour résoudre ce problème, nous ajoutons un suffixe au nom de l'application sous la forme du nom d'hôte, que nous obtenons à partir des variables d'environnement.

Le producteur est configuré, mais les transactions sur Kafka contrôlent uniquement la portée du message. Quel que soit l'état de la transaction, le message est immédiatement dirigé vers le sujet, mais possède des attributs système supplémentaires.

Pour éviter que de tels messages ne soient lus à l'avance par le Consommateur, celui-ci doit définir le paramètre niveau d'isolement à la valeur read_committee. Un tel consommateur pourra lire les messages non transactionnels comme auparavant, et les messages transactionnels uniquement après une validation.
Si vous avez défini tous les paramètres répertoriés précédemment, vous avez configuré exactement une seule livraison. Toutes nos félicitations!

Mais il y a encore une nuance. Transactional.id, que nous avons configuré ci-dessus, est en fait le préfixe de la transaction. Sur le gestionnaire de transactions, un numéro de séquence y est ajouté. L'identifiant reçu est délivré à transactionnel.id.expiration.ms, qui est configuré sur un cluster Kafka et a une valeur par défaut de « 7 jours ». Si pendant ce temps l'application n'a reçu aucun message, alors lorsque vous tenterez le prochain envoi transactionnel, vous recevrez InvalidPidMappingException. Le coordinateur de transaction émettra alors un nouveau numéro de séquence pour la prochaine transaction. Cependant, le message peut être perdu si l'InvalidPidMappingException n'est pas gérée correctement.

Au lieu de totaux

Comme vous pouvez le constater, il ne suffit pas d’envoyer des messages à Kafka. Vous devez choisir une combinaison de paramètres et être prêt à apporter des modifications rapides. Dans cet article, j'ai essayé de montrer en détail la configuration de la livraison exactement une fois et j'ai décrit plusieurs problèmes avec les configurations client.id et transactional.id que nous avons rencontrés. Vous trouverez ci-dessous un résumé des paramètres Producteur et Consommateur.

Producteur:

  1. accusés de réception = tous
  2. tentatives > 0
  3. activer.idempotence = vrai
  4. max.in.flight.requests.per.connection ≤ 5 (1 pour un envoi ordonné)
  5. transactional.id = ${nom-application}-${nom d'hôte}

Consommateur:

  1. isolation.level = read_commit

Pour minimiser les erreurs dans les applications futures, nous avons créé notre propre wrapper sur la configuration Spring, où les valeurs de certains des paramètres répertoriés sont déjà définies.

Voici quelques documents pour l'auto-apprentissage :

Source: habr.com

Ajouter un commentaire