Delta: Datensynchronisierungs- und Anreicherungsplattform

In Erwartung der Einführung eines neuen Flusses zum Kurs Dateningenieur Wir haben eine Übersetzung interessanten Materials vorbereitet.

Delta: Datensynchronisierungs- und Anreicherungsplattform

Beschreibung

Wir werden über ein ziemlich beliebtes Muster sprechen, bei dem Anwendungen mehrere Datenspeicher verwenden, wobei jeder Speicher für seine eigenen Zwecke verwendet wird, beispielsweise zum Speichern der kanonischen Form von Daten (MySQL usw.), zur Bereitstellung erweiterter Suchfunktionen (ElasticSearch, usw.) .), Caching (Memcached usw.) und andere. Wenn mehrere Datenspeicher verwendet werden, fungiert normalerweise einer davon als Primärspeicher und die anderen als Derivatspeicher. Das einzige Problem besteht darin, wie diese Datenspeicher synchronisiert werden.

Wir haben uns verschiedene Muster angesehen, die versucht haben, das Problem der Synchronisierung mehrerer Speicher zu lösen, z. B. doppelte Schreibvorgänge, verteilte Transaktionen usw. Diese Ansätze weisen jedoch erhebliche Einschränkungen hinsichtlich der realen Nutzung, Zuverlässigkeit und Wartung auf. Zusätzlich zur Datensynchronisierung müssen einige Anwendungen auch Daten durch den Aufruf externer Dienste anreichern.

Delta wurde entwickelt, um diese Probleme zu lösen. Delta bietet letztendlich eine konsistente, ereignisgesteuerte Plattform für die Datensynchronisierung und -anreicherung.

Bestehende Lösungen

doppelter Eintrag

Um zwei Datenspeicher synchron zu halten, können Sie Dual Write verwenden, bei dem in einen Speicher geschrieben wird und unmittelbar danach in den anderen. Die erste Aufzeichnung kann wiederholt werden und die zweite kann abgebrochen werden, wenn die erste fehlschlägt, nachdem die Anzahl der Versuche erschöpft ist. Es kann jedoch sein, dass die beiden Datenspeicher nicht mehr synchron sind, wenn das Schreiben in den zweiten Speicher fehlschlägt. Dieses Problem wird normalerweise durch die Erstellung eines Wiederherstellungsverfahrens gelöst, das Daten regelmäßig vom ersten Speicher auf den zweiten übertragen kann oder dies nur tut, wenn Unterschiede in den Daten festgestellt werden.

Probleme:

Die Durchführung eines Wiederherstellungsvorgangs ist eine spezifische Aufgabe, die nicht wiederverwendet werden kann. Darüber hinaus bleiben Daten zwischen Speicherorten bis zur Durchführung des Wiederherstellungsvorgangs nicht synchron. Komplexer wird die Lösung, wenn mehr als zwei Datenspeicher verwendet werden. Schließlich kann der Wiederherstellungsvorgang die ursprüngliche Datenquelle zusätzlich belasten.

Protokolltabelle ändern

Wenn an einer Reihe von Tabellen Änderungen vorgenommen werden (z. B. das Einfügen, Aktualisieren und Löschen eines Datensatzes), werden die Änderungsdatensätze als Teil derselben Transaktion zur Protokolltabelle hinzugefügt. Ein anderer Thread oder Prozess fordert ständig Ereignisse aus der Protokolltabelle an und schreibt sie bei Bedarf in einen oder mehrere Datenspeicher. Dabei werden Ereignisse aus der Protokolltabelle entfernt, nachdem der Datensatz von allen Speichern bestätigt wurde.

Probleme:

Dieses Muster sollte als Bibliothek implementiert werden und im Idealfall ohne Änderung des Codes der Anwendung, die es verwendet. In einer polyglotten Umgebung sollte eine Implementierung einer solchen Bibliothek in jeder erforderlichen Sprache vorhanden sein, es ist jedoch sehr schwierig, die Konsistenz von Funktionalität und Verhalten über Sprachen hinweg sicherzustellen.

Ein weiteres Problem besteht darin, Schemaänderungen in Systemen zu erhalten, die keine transaktionalen Schemaänderungen unterstützen [1][2], wie beispielsweise MySQL. Daher funktioniert das Muster, eine Änderung (z. B. eine Schemaänderung) vorzunehmen und sie transaktional in der Änderungsprotokolltabelle aufzuzeichnen, nicht immer.

Verteilte Transaktionen

Verteilte Transaktionen können verwendet werden, um eine Transaktion auf mehrere heterogene Datenspeicher aufzuteilen, sodass der Vorgang entweder an alle verwendeten Datenspeicher übergeben wird oder nicht an einen von ihnen.

Probleme:

Verteilte Transaktionen sind ein sehr großes Problem für heterogene Datenspeicher. Sie können naturgemäß nur auf den kleinsten gemeinsamen Nenner der beteiligten Systeme zurückgreifen. Beispielsweise blockieren XA-Transaktionen die Ausführung, wenn der Anwendungsprozess während der Vorbereitungsphase fehlschlägt. Darüber hinaus bietet XA keine Deadlock-Erkennung und unterstützt keine optimistischen Parallelitätskontrollschemata. Darüber hinaus unterstützen einige Systeme wie ElasticSearch weder XA noch andere heterogene Transaktionsmodelle. Daher bleibt die Sicherstellung der Schreibatomizität in verschiedenen Datenspeichertechnologien eine sehr anspruchsvolle Aufgabe für Anwendungen [3].

Delta

Delta wurde entwickelt, um die Einschränkungen bestehender Datensynchronisierungslösungen zu überwinden und ermöglicht auch die spontane Datenanreicherung. Unser Ziel war es, den Anwendungsentwicklern all diese Komplexitäten zu entziehen, damit sie sich voll und ganz auf die Implementierung von Geschäftsfunktionen konzentrieren können. Als nächstes beschreiben wir „Filmsuche“, den eigentlichen Anwendungsfall für Netflix‘ Delta.

Netflix verwendet häufig eine Microservice-Architektur, und jeder Microservice bedient typischerweise einen Datentyp. Grundlegende Informationen zum Film sind in einem Microservice namens Movie Service enthalten, und zugehörige Daten wie Informationen zu Produzenten, Schauspielern, Anbietern usw. werden von mehreren anderen Microservices (nämlich Deal Service, Talent Service und Vendor Service) verwaltet.
Geschäftsanwender bei Netflix Studios müssen häufig nach verschiedenen Filmkriterien suchen, weshalb es für sie sehr wichtig ist, alle filmbezogenen Daten durchsuchen zu können.

Vor Delta musste das Filmsuchteam Daten aus mehreren Microservices abrufen, bevor es die Filmdaten indizieren konnte. Darüber hinaus musste das Team ein System entwickeln, das den Suchindex regelmäßig aktualisiert, indem es Änderungen von anderen Microservices anfordert, auch wenn überhaupt keine Änderungen vorgenommen wurden. Dieses System wurde schnell komplex und schwer zu warten.

Delta: Datensynchronisierungs- und Anreicherungsplattform
Abbildung 1. Umfragesystem für Delta
Nach der Verwendung von Delta wurde das System zu einem ereignisgesteuerten System vereinfacht, wie in der folgenden Abbildung dargestellt. CDC-Ereignisse (Change-Data-Capture) werden mithilfe von Delta-Connector an Keystone-Kafka-Themen gesendet. Eine mit dem Delta Stream Processing Framework (basierend auf Flink) erstellte Delta-Anwendung empfängt CDC-Ereignisse von einem Thema, reichert sie durch den Aufruf anderer Microservices an und übergibt die angereicherten Daten schließlich an einen Suchindex in Elasticsearch. Der gesamte Prozess erfolgt nahezu in Echtzeit, d. h. sobald Änderungen in das Data Warehouse übernommen werden, werden die Suchindizes aktualisiert.

Delta: Datensynchronisierungs- und Anreicherungsplattform
Abbildung 2. Datenpipeline mit Delta
In den folgenden Abschnitten beschreiben wir die Funktionsweise des Delta-Connectors, der eine Verbindung zum Speicher herstellt und CDC-Ereignisse auf der Transportschicht veröffentlicht, einer Echtzeit-Datenübertragungsinfrastruktur, die CDC-Ereignisse an Kafka-Themen weiterleitet. Und ganz zum Schluss sprechen wir über das Delta Stream Processing Framework, das Anwendungsentwickler für die Datenverarbeitung und Anreicherungslogik nutzen können.

CDC (Change-Data-Capture)

Wir haben einen CDC-Dienst namens Delta-Connector entwickelt, der festgeschriebene Änderungen aus dem Datenspeicher in Echtzeit erfassen und in einen Stream schreiben kann. Echtzeitänderungen werden aus dem Transaktionsprotokoll und den Speicherauszügen übernommen. Dumps werden verwendet, da Transaktionsprotokolle normalerweise nicht den gesamten Änderungsverlauf speichern. Änderungen werden in der Regel als Delta-Ereignisse serialisiert, sodass sich der Empfänger keine Gedanken darüber machen muss, woher die Änderung kommt.

Delta-Connector unterstützt mehrere zusätzliche Funktionen wie:

  • Möglichkeit, benutzerdefinierte Ausgabedaten über Kafka hinaus zu schreiben.
  • Die Möglichkeit, jederzeit manuelle Dumps für alle Tabellen, eine bestimmte Tabelle oder für bestimmte Primärschlüssel zu aktivieren.
  • Dumps können in Blöcken abgerufen werden, sodass bei einem Fehler nicht noch einmal von vorne begonnen werden muss.
  • Es ist nicht erforderlich, Tabellen zu sperren. Dies ist sehr wichtig, um sicherzustellen, dass der Datenbank-Schreibverkehr niemals von unserem Dienst blockiert wird.
  • Hohe Verfügbarkeit durch redundante Instanzen in AWS Availability Zones.

Wir unterstützen derzeit MySQL und Postgres, einschließlich Bereitstellungen auf AWS RDS und Aurora. Wir unterstützen auch Cassandra (Multi-Master). Weitere Details zum Delta-Connector finden Sie hier блоге.

Kafka und die Transportschicht

Die Ereignistransportschicht von Delta basiert auf dem Messaging-Dienst der Plattform Keystone.

In der Vergangenheit wurde das Posten auf Netflix eher auf Zugänglichkeit als auf Langlebigkeit optimiert (siehe unten). Vorheriger Artikel). Der Kompromiss bestand in einer möglichen Inkonsistenz der Brokerdaten in verschiedenen Edge-Szenarien. Zum Beispiel, unreine Führerwahl ist dafür verantwortlich, dass der Empfänger möglicherweise doppelte oder verlorene Ereignisse hat.

Mit Delta wollten wir stärkere Haltbarkeitsgarantien, um die Lieferung von CDC-Ereignissen an abgeleitete Geschäfte sicherzustellen. Zu diesem Zweck haben wir einen speziell entwickelten Kafka-Cluster als erstklassiges Objekt vorgeschlagen. In der folgenden Tabelle können Sie sich einige Broker-Einstellungen ansehen:

Delta: Datensynchronisierungs- und Anreicherungsplattform

In Keystone-Kafka-Clustern unreine Führerwahl normalerweise enthalten, um die Zugänglichkeit für Herausgeber sicherzustellen. Dies kann zum Verlust von Nachrichten führen, wenn ein nicht synchronisiertes Replikat als Anführer ausgewählt wird. Für einen neuen hochverfügbaren Kafka-Cluster die Option unreine Führerwahl deaktiviert, um den Verlust von Nachrichten zu verhindern.

Auch wir haben zugenommen Replikationsfaktor von 2 bis 3 und minimale insynchrone Replikate 1 bis 2. Herausgeber, die an diesen Cluster schreiben, benötigen Bestätigungen von allen anderen. Dadurch wird sichergestellt, dass zwei von drei Replikaten über die aktuellsten vom Herausgeber gesendeten Nachrichten verfügen.

Wenn eine Broker-Instanz beendet wird, ersetzt eine neue Instanz die alte. Allerdings muss der neue Broker die nicht synchronisierten Replikate nachholen, was mehrere Stunden dauern kann. Um die Wiederherstellungszeit für dieses Szenario zu verkürzen, haben wir begonnen, Blockdatenspeicher (Amazon Elastic Block Store) anstelle lokaler Broker-Festplatten zu verwenden. Wenn eine neue Instanz eine beendete Broker-Instanz ersetzt, hängt sie das EBS-Volume an, über das die beendete Instanz verfügte, und beginnt, neue Nachrichten einzuholen. Dieser Prozess reduziert die Zeit für die Rückstandsbeseitigung von Stunden auf Minuten, da die neue Instanz nicht mehr aus einem leeren Zustand replizieren muss. Im Allgemeinen reduzieren getrennte Speicher- und Broker-Lebenszyklen die Auswirkungen eines Brokerwechsels erheblich.

Um die Datenliefergarantie weiter zu erhöhen, nutzen wir Nachrichtenverfolgungssystem um jeden Nachrichtenverlust unter extremen Bedingungen zu erkennen (z. B. Taktdesynchronisation im Partitionsleiter).

Stream-Verarbeitungs-Framework

Die Verarbeitungsschicht von Delta basiert auf der Netflix SPaaS-Plattform, die eine Apache Flink-Integration in das Netflix-Ökosystem ermöglicht. Die Plattform bietet eine Benutzeroberfläche, die die Bereitstellung von Flink-Jobs und die Orchestrierung von Flink-Clustern auf unserer Titus-Container-Management-Plattform verwaltet. Die Schnittstelle verwaltet auch Jobkonfigurationen und ermöglicht es Benutzern, Konfigurationsänderungen dynamisch vorzunehmen, ohne Flink-Jobs neu kompilieren zu müssen.

Delta bietet ein Stream-Verarbeitungs-Framework, das auf Flink und SPaaS basiert Annotationsbasiert DSL (Domain Specific Language) zur Abstraktion technischer Details. Um beispielsweise den Schritt zu definieren, in dem Ereignisse durch den Aufruf externer Dienste angereichert werden, müssen Benutzer das folgende DSL schreiben und das Framework erstellt darauf basierend ein Modell, das von Flink ausgeführt wird.

Delta: Datensynchronisierungs- und Anreicherungsplattform
Abbildung 3. Beispiel einer Anreicherung auf DSL in Delta

Das Verarbeitungsframework verkürzt nicht nur die Lernkurve, sondern bietet auch allgemeine Stream-Verarbeitungsfunktionen wie Deduplizierung, Schematisierung sowie Flexibilität und Ausfallsicherheit zur Lösung häufiger Betriebsprobleme.

Das Delta Stream Processing Framework besteht aus zwei Schlüsselmodulen, dem DSL- und API-Modul und dem Runtime-Modul. Das DSL- und API-Modul stellt DSL- und UDF-APIs (User-Defined-Function) bereit, sodass Benutzer ihre eigene Verarbeitungslogik (z. B. Filterung oder Transformationen) schreiben können. Das Runtime-Modul stellt eine Implementierung eines DSL-Parsers bereit, der eine interne Darstellung von Verarbeitungsschritten in DAG-Modellen erstellt. Die Ausführungskomponente interpretiert DAG-Modelle, um die eigentlichen Flink-Anweisungen zu initialisieren und schließlich die Flink-Anwendung auszuführen. Die Architektur des Frameworks ist in der folgenden Abbildung dargestellt.

Delta: Datensynchronisierungs- und Anreicherungsplattform
Abbildung 4. Architektur des Delta Stream Processing Framework

Dieser Ansatz hat mehrere Vorteile:

  • Benutzer können sich auf ihre Geschäftslogik konzentrieren, ohne sich mit den Besonderheiten von Flink oder der SPaaS-Struktur befassen zu müssen.
  • Die Optimierung kann auf für Benutzer transparente Weise erfolgen und Fehler können behoben werden, ohne dass Änderungen am Benutzercode (UDF) erforderlich sind.
  • Das Delta-Anwendungserlebnis wird für Benutzer vereinfacht, da die Plattform sofort Flexibilität und Ausfallsicherheit bietet und eine Vielzahl detaillierter Metriken sammelt, die für Warnungen verwendet werden können.

Produktionseinsatz

Delta ist seit über einem Jahr in Produktion und spielt in vielen Netflix Studio-Anwendungen eine Schlüsselrolle. Sie unterstützte Teams bei der Implementierung von Anwendungsfällen wie Suchindizierung, Datenspeicherung und ereignisgesteuerten Workflows. Nachfolgend finden Sie einen Überblick über die High-Level-Architektur der Delta-Plattform.

Delta: Datensynchronisierungs- und Anreicherungsplattform
Abbildung 5. Deltas High-Level-Architektur.

Danksagung

Wir möchten den folgenden Personen danken, die an der Entstehung und Entwicklung von Delta bei Netflix beteiligt waren: Allen Wang, Charles Zhao, Jaebin Yoon, Josh Snyder, Kasturi Chatterjee, Mark Cho, Olof Johansson, Piyush Goyal, Prashanth Ramdas, Raghuram Onti Srinivasan, Sandeep Gupta, Steven Wu, Tharanga Gamaethige, Yun Wang und Zhenzhong Xu.

Quellen

  1. dev.mysql.com/doc/refman/5.7/en/implicit-commit.html
  2. dev.mysql.com/doc/refman/5.7/en/cannot-roll-back.html
  3. Martin Kleppmann, Alastair R. Beresford, Boerge Svingen: Online-Ereignisverarbeitung. Komm. ACM 62(5): 43–49 (2019). DOI: doi.org/10.1145/3312527

Melden Sie sich für ein kostenloses Webinar an: „Datenerstellungstool für Amazon Redshift Storage.“

Source: habr.com

Kommentar hinzufügen