Wie Kafka Wirklichkeit wurde

Wie Kafka Wirklichkeit wurde

Hey Habr!

Ich arbeite im Tinkoff-Team, das ein eigenes Benachrichtigungscenter entwickelt. Ich entwickle hauptsächlich in Java mit Spring Boot und löse verschiedene technische Probleme, die in einem Projekt auftreten.

Die meisten unserer Microservices kommunizieren asynchron über einen Message Broker miteinander. Zuvor nutzten wir IBM MQ als Broker, der der Auslastung nicht mehr gewachsen war, gleichzeitig aber über hohe Liefergarantien verfügte.

Als Ersatz wurde uns Apache Kafka angeboten, das über ein hohes Skalierungspotenzial verfügt, aber leider einen nahezu individuellen Konfigurationsansatz für unterschiedliche Szenarien erfordert. Darüber hinaus ermöglichte der in Kafka standardmäßig funktionierende Mechanismus zur mindestens einmaligen Zustellung nicht die Aufrechterhaltung des erforderlichen Konsistenzniveaus im Auslieferungszustand. Als nächstes werde ich unsere Erfahrungen mit der Kafka-Konfiguration teilen, insbesondere werde ich Ihnen erklären, wie Sie genau eine Lieferung konfigurieren und damit leben können.

Garantierte Lieferung und mehr

Die unten beschriebenen Einstellungen helfen dabei, eine Reihe von Problemen mit den Standardverbindungseinstellungen zu vermeiden. Zunächst möchte ich jedoch auf einen Parameter achten, der ein mögliches Debuggen erleichtert.

Das wird helfen Kunden ID für Produzenten und Konsumenten. Auf den ersten Blick können Sie den Anwendungsnamen als Wert verwenden, was in den meisten Fällen auch funktioniert. Obwohl eine Situation, in der eine Anwendung mehrere Verbraucher verwendet und Sie ihnen dieselbe client.id zuweisen, zu der folgenden Warnung führt:

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

Wenn Sie JMX in einer Anwendung mit Kafka verwenden möchten, könnte dies ein Problem darstellen. In diesem Fall ist es am besten, eine Kombination aus dem Anwendungsnamen und beispielsweise dem Themennamen als client.id-Wert zu verwenden. Das Ergebnis unserer Konfiguration ist in der Befehlsausgabe zu sehen Kafka-Verbrauchergruppen von Dienstprogrammen von Confluent:

Wie Kafka Wirklichkeit wurde

Schauen wir uns nun das Szenario für die garantierte Nachrichtenzustellung an. Kafka Producer hat einen Parameter Arsch, wodurch Sie konfigurieren können, nach wie vielen Bestätigungen der Clusterleiter die Nachricht als erfolgreich geschrieben betrachten muss. Dieser Parameter kann folgende Werte annehmen:

  • 0 – Bestätigung wird nicht berücksichtigt.
  • 1 ist der Standardparameter, zur Bestätigung ist nur 1 Replikat erforderlich.
  • −1 – Bestätigung von allen synchronisierten Replikaten ist erforderlich (Cluster-Setup). min.insync.replicas).

Aus den aufgeführten Werten geht hervor, dass Acks gleich −1 die stärkste Garantie dafür bieten, dass die Nachricht nicht verloren geht.

Wie wir alle wissen, sind verteilte Systeme unzuverlässig. Zum Schutz vor vorübergehenden Fehlern bietet Kafka Producer die Option versucht es erneut, mit dem Sie die Anzahl der erneuten Sendeversuche festlegen können Delivery.timeout.ms. Da der Parameter „Retries“ den Standardwert „Integer.MAX_VALUE“ (2147483647) hat, kann die Anzahl der Nachrichtenwiederholungen angepasst werden, indem nur „delivery.timeout.ms“ geändert wird.

Wir streben eine exakt einmalige Lieferung an

Die aufgeführten Einstellungen ermöglichen es unserem Produzenten, Nachrichten mit hoher Garantie zuzustellen. Lassen Sie uns nun darüber sprechen, wie sichergestellt werden kann, dass nur eine Kopie einer Nachricht in ein Kafka-Thema geschrieben wird. Im einfachsten Fall müssen Sie dazu den Parameter im Producer einstellen Idempotenz aktivieren zu wahr. Idempotenz garantiert, dass nur eine Nachricht in eine bestimmte Partition eines Themas geschrieben wird. Voraussetzung für die Aktivierung der Idempotenz sind die Werte acks = all, retry > 0, max.in.flight.requests.per.connection ≤ 5. Wenn diese Parameter nicht vom Entwickler angegeben werden, werden die oben genannten Werte automatisch festgelegt.

Bei der Konfiguration der Idempotenz muss sichergestellt werden, dass immer dieselben Nachrichten in denselben Partitionen landen. Dies kann erreicht werden, indem der Schlüssel und Parameter „partitioner.class“ auf „Producer“ gesetzt wird. Beginnen wir mit dem Schlüssel. Es muss für jede Einreichung gleich sein. Dies kann leicht erreicht werden, indem eine der Geschäfts-IDs aus dem ursprünglichen Beitrag verwendet wird. Der Parameter partitioner.class hat den Standardwert − Standardpartitionierer. Bei dieser Partitionierungsstrategie verhalten wir uns standardmäßig wie folgt:

  • Wenn die Partition beim Senden der Nachricht explizit angegeben wird, verwenden wir sie.
  • Wenn die Partition nicht angegeben ist, aber der Schlüssel angegeben ist, wählen Sie die Partition anhand des Hashs des Schlüssels aus.
  • Wenn die Partition und der Schlüssel nicht angegeben sind, wählen Sie die Partitionen einzeln aus (Round-Robin).

Auch die Verwendung eines Schlüssels und das idempotente Senden mit einem Parameter max.in.flight.requests.per.connection = 1 ermöglicht Ihnen eine optimierte Nachrichtenverarbeitung auf dem Verbraucher. Denken Sie auch daran, dass Sie, wenn die Zugriffskontrolle in Ihrem Cluster konfiguriert ist, Rechte benötigen, um idempotent in ein Thema zu schreiben.

Wenn Ihnen plötzlich die Möglichkeiten des idempotenten Sendens per Schlüssel fehlen oder die Logik auf der Produzentenseite die Aufrechterhaltung der Datenkonsistenz zwischen verschiedenen Partitionen erfordert, dann helfen Transaktionen. Darüber hinaus können Sie mithilfe einer Kettentransaktion einen Datensatz in Kafka beispielsweise bedingt mit einem Datensatz in der Datenbank synchronisieren. Um das transaktionale Senden an den Produzenten zu ermöglichen, muss es idempotent sein und zusätzlich festgelegt werden transaktional.id. Wenn für Ihren Kafka-Cluster eine Zugriffskontrolle konfiguriert ist, benötigt ein Transaktionsdatensatz wie ein idempotenter Datensatz Schreibberechtigungen, die per Maske mithilfe des in „transactional.id“ gespeicherten Werts gewährt werden können.

Formal kann jede Zeichenfolge, beispielsweise der Anwendungsname, als Transaktionskennung verwendet werden. Wenn Sie jedoch mehrere Instanzen derselben Anwendung mit derselben Transaktions-ID starten, wird die erste gestartete Instanz mit einem Fehler gestoppt, da Kafka sie als Zombie-Prozess betrachtet.

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Um dieses Problem zu lösen, fügen wir dem Anwendungsnamen ein Suffix in Form des Hostnamens hinzu, den wir aus Umgebungsvariablen erhalten.

Der Produzent ist konfiguriert, aber Transaktionen auf Kafka steuern nur den Umfang der Nachricht. Unabhängig vom Transaktionsstatus geht die Nachricht sofort zum Thema, verfügt jedoch über zusätzliche Systemattribute.

Um zu verhindern, dass solche Nachrichten vorzeitig vom Consumer gelesen werden, muss dieser den Parameter setzen Isolationsstufe zum read_committed-Wert. Ein solcher Verbraucher kann nicht-transaktionale Nachrichten wie zuvor und transaktionale Nachrichten erst nach einem Commit lesen.
Wenn Sie alle zuvor aufgeführten Einstellungen vorgenommen haben, dann haben Sie genau eine Lieferung konfiguriert. Glückwunsch!

Aber es gibt noch eine weitere Nuance. Transactional.id, das wir oben konfiguriert haben, ist eigentlich das Transaktionspräfix. Im Transaktionsmanager wird eine Sequenznummer hinzugefügt. Die empfangene Kennung wird an ausgegeben transactional.id.expiration.ms, das auf einem Kafka-Cluster konfiguriert ist und einen Standardwert von „7 Tage“ hat. Wenn die Anwendung während dieser Zeit keine Nachrichten empfangen hat, erhalten Sie beim nächsten Transaktionsversand eine Nachricht InvalidPidMappingException. Der Transaktionskoordinator vergibt dann eine neue Sequenznummer für die nächste Transaktion. Die Nachricht geht jedoch möglicherweise verloren, wenn die InvalidPidMappingException nicht korrekt behandelt wird.

Anstelle von Summen

Wie Sie sehen, reicht es nicht aus, Kafka einfach nur Nachrichten zu schicken. Sie müssen eine Kombination von Parametern auswählen und auf schnelle Änderungen vorbereitet sein. In diesem Artikel habe ich versucht, die Einrichtung der genau einmaligen Zustellung im Detail darzustellen und mehrere Probleme mit den Konfigurationen „client.id“ und „transactional.id“ beschrieben, auf die wir gestoßen sind. Nachfolgend finden Sie eine Zusammenfassung der Producer- und Consumer-Einstellungen.

Produzent:

  1. acks = alle
  2. Wiederholungsversuche > 0
  3. enable.idempotence = true
  4. max.in.flight.requests.per.connection ≤ 5 (1 für ordnungsgemäßes Senden)
  5. transaktional.id = ${application-name}-${hostname}

Verbraucher:

  1. isolation.level = read_committed

Um Fehler in zukünftigen Anwendungen zu minimieren, haben wir einen eigenen Wrapper für die Federkonfiguration erstellt, in dem Werte für einige der aufgeführten Parameter bereits festgelegt sind.

Hier sind ein paar Materialien zum Selbststudium:

Source: habr.com

Kommentar hinzufügen