Delta : Plateforme de synchronisation et d'enrichissement des données

En prévision du lancement d'un nouveau flux au rythme Ingénieur de données Nous avons préparé une traduction de matériel intéressant.

Delta : Plateforme de synchronisation et d'enrichissement des données

vue d'ensemble

Nous parlerons d'un modèle assez populaire selon lequel les applications utilisent plusieurs magasins de données, où chaque magasin est utilisé à ses propres fins, par exemple pour stocker la forme canonique des données (MySQL, etc.), fournir des capacités de recherche avancées (ElasticSearch, etc.) .), la mise en cache (Memcached, etc.) et autres. Généralement, lorsque vous utilisez plusieurs magasins de données, l’un d’eux fait office de magasin principal et les autres de magasins dérivés. Le seul problème est de savoir comment synchroniser ces magasins de données.

Nous avons examiné un certain nombre de modèles différents qui tentaient de résoudre le problème de la synchronisation de plusieurs magasins, tels que les doubles écritures, les transactions distribuées, etc. Cependant, ces approches présentent des limites importantes en termes d’utilisation réelle, de fiabilité et de maintenance. Outre la synchronisation des données, certaines applications doivent également enrichir les données en faisant appel à des services externes.

Delta a été développé pour résoudre ces problèmes. Delta fournit en fin de compte une plateforme cohérente et basée sur les événements pour la synchronisation et l'enrichissement des données.

Solutions existantes

double saisie

Pour synchroniser deux magasins de données, vous pouvez utiliser la double écriture, qui écrit dans un magasin, puis écrit dans l'autre immédiatement après. Le premier enregistrement peut être réessayé et le second peut être interrompu si le premier échoue une fois le nombre de tentatives épuisé. Toutefois, les deux magasins de données peuvent devenir désynchronisés si l'écriture dans le deuxième magasin échoue. Ce problème est généralement résolu en créant une procédure de récupération capable de retransférer périodiquement les données du premier stockage vers le second, ou de le faire uniquement si des différences sont détectées dans les données.

Problèmes:

Effectuer une procédure de récupération est une tâche spécifique qui ne peut pas être réutilisée. De plus, les données entre les emplacements de stockage restent désynchronisées jusqu'à ce que la procédure de restauration ait lieu. La solution devient plus complexe si plus de deux magasins de données sont utilisés. Enfin, la procédure de restauration peut ajouter une charge à la source de données d'origine.

Tableau du journal des modifications

Lorsque des modifications surviennent dans un ensemble de tables (telles que l'insertion, la mise à jour et la suppression d'un enregistrement), les enregistrements modifiés sont ajoutés à la table du journal dans le cadre de la même transaction. Un autre thread ou processus demande constamment des événements à la table journal et les écrit dans un ou plusieurs magasins de données, si nécessaire, supprimant les événements de la table journal une fois que l'enregistrement a été confirmé par tous les magasins.

Problèmes:

Ce modèle doit être implémenté sous forme de bibliothèque, et idéalement sans modifier le code de l'application qui l'utilise. Dans un environnement polyglotte, une implémentation d'une telle bibliothèque devrait exister dans n'importe quel langage nécessaire, mais il est très difficile d'assurer la cohérence des fonctionnalités et du comportement entre les langages.

Un autre problème réside dans l'obtention de modifications de schéma dans des systèmes qui ne prennent pas en charge les modifications de schéma transactionnelles [1][2], comme MySQL. Par conséquent, le modèle consistant à effectuer une modification (par exemple, une modification de schéma) et à l'enregistrer de manière transactionnelle dans la table du journal des modifications ne fonctionnera pas toujours.

Transactions distribuées

Les transactions distribuées peuvent être utilisées pour diviser une transaction sur plusieurs magasins de données hétérogènes afin que l'opération soit soit validée dans tous les magasins de données utilisés, soit non validée dans aucun d'entre eux.

Problèmes:

Les transactions distribuées constituent un très gros problème pour les magasins de données hétérogènes. De par leur nature, ils ne peuvent s’appuyer que sur le plus petit commun dénominateur des systèmes concernés. Par exemple, les transactions XA bloquent l'exécution si le processus de candidature échoue pendant la phase de préparation. De plus, XA ne fournit pas de détection de blocage ni ne prend en charge les schémas de contrôle de concurrence optimistes. De plus, certains systèmes comme ElasticSearch ne prennent pas en charge XA ou tout autre modèle de transaction hétérogène. Ainsi, assurer l’atomicité d’écriture dans diverses technologies de stockage de données reste une tâche très difficile pour les applications [3].

Delta

Delta a été conçu pour répondre aux limites des solutions de synchronisation de données existantes et permet également un enrichissement des données à la volée. Notre objectif était de soustraire toutes ces complexités aux développeurs d'applications afin qu'ils puissent se concentrer pleinement sur la mise en œuvre des fonctionnalités métier. Nous décrirons ensuite la « Recherche de films », le cas d'utilisation réel du Delta de Netflix.

Netflix utilise largement une architecture de microservices, et chaque microservice sert généralement un type de données. Les informations de base sur le film sont contenues dans un microservice appelé Movie Service, et les données associées telles que les informations sur les producteurs, les acteurs, les vendeurs, etc. sont gérées par plusieurs autres microservices (à savoir Deal Service, Talent Service et Vendor Service).
Les utilisateurs professionnels des studios Netflix doivent souvent effectuer des recherches sur différents critères de films, c'est pourquoi il est très important pour eux de pouvoir effectuer une recherche sur toutes les données liées aux films.

Avant Delta, l'équipe de recherche de films devait extraire des données de plusieurs microservices avant d'indexer les données du film. De plus, l'équipe a dû développer un système qui mettrait périodiquement à jour l'index de recherche en demandant des modifications à d'autres microservices, même s'il n'y avait aucun changement. Ce système est rapidement devenu complexe et difficile à maintenir.

Delta : Plateforme de synchronisation et d'enrichissement des données
Figure 1. Système de vote vers Delta
Après avoir utilisé Delta, le système a été simplifié en un système piloté par événements, comme le montre la figure suivante. Les événements CDC (Change-Data-Capture) sont envoyés aux sujets Keystone Kafka à l'aide de Delta-Connector. Une application Delta construite à l'aide du Delta Stream Processing Framework (basé sur Flink) reçoit les événements CDC d'un sujet, les enrichit en appelant d'autres microservices et transmet enfin les données enrichies à un index de recherche dans Elasticsearch. L'ensemble du processus se déroule presque en temps réel, c'est-à-dire que dès que les modifications sont validées dans l'entrepôt de données, les index de recherche sont mis à jour.

Delta : Plateforme de synchronisation et d'enrichissement des données
Figure 2. Pipeline de données utilisant Delta
Dans les sections suivantes, nous décrirons le fonctionnement du Delta-Connector, qui se connecte au stockage et publie les événements CDC vers la couche transport, qui est une infrastructure de transmission de données en temps réel qui achemine les événements CDC vers les sujets Kafka. Et à la toute fin, nous parlerons du framework de traitement de flux Delta, que les développeurs d'applications peuvent utiliser pour le traitement des données et la logique d'enrichissement.

CDC (Changement-Data-Capture)

Nous avons développé un service CDC appelé Delta-Connector, qui peut capturer les modifications validées du magasin de données en temps réel et les écrire dans un flux. Les modifications en temps réel sont extraites du journal des transactions et des vidages de stockage. Les vidages sont utilisés car les journaux de transactions ne stockent généralement pas l'intégralité de l'historique des modifications. Les modifications sont généralement sérialisées sous forme d'événements Delta, de sorte que le destinataire n'a pas à se soucier de l'origine de la modification.

Delta-Connector prend en charge plusieurs fonctionnalités supplémentaires telles que :

  • Possibilité d'écrire des données de sortie personnalisées après Kafka.
  • Possibilité d'activer les dumps manuels à tout moment pour toutes les tables, une table spécifique ou pour des clés primaires spécifiques.
  • Les dumps peuvent être récupérés en morceaux, il n'est donc pas nécessaire de tout recommencer en cas d'échec.
  • Il n'est pas nécessaire de placer des verrous sur les tables, ce qui est très important pour garantir que le trafic d'écriture de la base de données ne soit jamais bloqué par notre service.
  • Haute disponibilité grâce aux instances redondantes dans les zones de disponibilité AWS.

Nous prenons actuellement en charge MySQL et Postgres, y compris les déploiements sur AWS RDS et Aurora. Nous prenons également en charge Cassandra (multi-maître). Vous pouvez trouver plus de détails sur Delta-Connector ici блоге.

Kafka et la couche transport

La couche de transport d'événements de Delta est construite sur le service de messagerie de la plateforme Clé de voûte.

Historiquement, la publication sur Netflix a été optimisée pour l’accessibilité plutôt que pour la longévité (voir ci-dessous). article précédent). Le compromis était une incohérence potentielle des données des courtiers dans divers scénarios de pointe. Par exemple, élection d'un chef impur est responsable du fait que le destinataire ait potentiellement des événements en double ou perdus.

Avec Delta, nous souhaitions des garanties de durabilité plus solides pour garantir la livraison des événements CDC aux magasins dérivés. À cette fin, nous avons proposé un cluster Kafka spécialement conçu comme objet de première classe. Vous pouvez consulter certains paramètres du courtier dans le tableau ci-dessous :

Delta : Plateforme de synchronisation et d'enrichissement des données

Dans les clusters Keystone Kafka, élection d'un chef impur généralement inclus pour garantir l’accessibilité de l’éditeur. Cela peut entraîner la perte de messages si une réplique non synchronisée est élue comme leader. Pour un nouveau cluster Kafka haute disponibilité, l'option élection d'un chef impur désactivé pour éviter la perte de messages.

Nous avons également augmenté facteur de réplication de 2 à 3 et nombre minimum de réplicas insynchrones 1 à 2. Les éditeurs écrivant sur ce cluster nécessitent des accusés de réception de tous les autres, garantissant que 2 réplicas sur 3 contiennent les messages les plus récents envoyés par l'éditeur.

Lorsqu'une instance de courtier se termine, une nouvelle instance remplace l'ancienne. Toutefois, le nouveau courtier devra rattraper les réplicas non synchronisés, ce qui peut prendre plusieurs heures. Pour réduire le temps de récupération dans ce scénario, nous avons commencé à utiliser le stockage de données par blocs (Amazon Elastic Block Store) au lieu de disques de courtier locaux. Lorsqu'une nouvelle instance remplace une instance de courtier terminée, elle attache le volume EBS que possédait l'instance terminée et commence à rattraper les nouveaux messages. Ce processus réduit le temps de traitement du retard de quelques heures à quelques minutes, car la nouvelle instance n'a plus besoin d'être répliquée à partir d'un état vide. En général, des cycles de vie séparés pour le stockage et les courtiers réduisent considérablement l’impact du changement de courtier.

Pour augmenter encore la garantie de livraison des données, nous avons utilisé système de suivi des messages pour détecter toute perte de message dans des conditions extrêmes (par exemple, désynchronisation de l'horloge dans le leader de la partition).

Cadre de traitement de flux

La couche de traitement de Delta est construite sur la plate-forme Netflix SPaaS, qui permet l'intégration d'Apache Flink à l'écosystème Netflix. La plateforme fournit une interface utilisateur qui gère le déploiement des tâches Flink et l'orchestration des clusters Flink au-dessus de notre plateforme de gestion de conteneurs Titus. L'interface gère également les configurations des tâches et permet aux utilisateurs d'apporter des modifications à la configuration de manière dynamique sans avoir à recompiler les tâches Flink.

Delta fournit un cadre de traitement de flux basé sur Flink et SPaaS qui utilise basé sur des annotations DSL (Domain Specific Language) pour résumer les détails techniques. Par exemple, pour définir l'étape à laquelle les événements seront enrichis en appelant des services externes, les utilisateurs doivent écrire le DSL suivant, et le framework créera un modèle basé sur celui-ci, qui sera exécuté par Flink.

Delta : Plateforme de synchronisation et d'enrichissement des données
Figure 3. Exemple d'enrichissement sur DSL dans Delta

Le cadre de traitement réduit non seulement la courbe d'apprentissage, mais fournit également des fonctionnalités communes de traitement de flux telles que la déduplication, la schématisation, ainsi que la flexibilité et la résilience pour résoudre les problèmes opérationnels courants.

Delta Stream Processing Framework se compose de deux modules clés, le module DSL & API et le module Runtime. Le module DSL & API fournit des API DSL et UDF (User-Defined-Function) afin que les utilisateurs puissent écrire leur propre logique de traitement (comme le filtrage ou les transformations). Le module Runtime fournit une implémentation d'un analyseur DSL qui construit une représentation interne des étapes de traitement dans les modèles DAG. Le composant d'exécution interprète les modèles DAG pour initialiser les instructions Flink réelles et finalement exécuter l'application Flink. L'architecture du framework est illustrée dans la figure suivante.

Delta : Plateforme de synchronisation et d'enrichissement des données
Figure 4. Architecture du cadre de traitement de flux Delta

Cette approche a de nombreux avantages:

  • Les utilisateurs peuvent se concentrer sur leur logique métier sans avoir à se plonger dans les spécificités de Flink ou de la structure SPaaS.
  • L'optimisation peut être effectuée de manière transparente pour les utilisateurs et les erreurs peuvent être corrigées sans nécessiter de modification du code utilisateur (UDF).
  • L'expérience de l'application Delta est simplifiée pour les utilisateurs car la plateforme offre flexibilité et résilience dès le départ et collecte une variété de mesures détaillées qui peuvent être utilisées pour les alertes.

Utilisation en production

Delta est en production depuis plus d'un an et joue un rôle clé dans de nombreuses applications Netflix Studio. Elle a aidé les équipes à mettre en œuvre des cas d'utilisation tels que l'indexation de recherche, le stockage de données et les workflows événementiels. Vous trouverez ci-dessous un aperçu de l'architecture de haut niveau de la plateforme Delta.

Delta : Plateforme de synchronisation et d'enrichissement des données
Figure 5. Architecture de haut niveau de Delta.

Remerciements

Nous tenons à remercier les personnes suivantes qui ont participé à la création et au développement de Delta chez Netflix : Allen Wang, Charles Zhao, Jaebin Yoon, Josh Snyder, Kasturi Chatterjee, Mark Cho, Olof Johansson, Piyush Goyal, Prashanth Ramdas, Raghuram Onti. Srinivasan, Sandeep Gupta, Steven Wu, Tharanga Gamaethige, Yun Wang et Zhenzhong Xu.

sources

  1. dev.mysql.com/doc/refman/5.7/en/implicit-commit.html
  2. dev.mysql.com/doc/refman/5.7/en/cannot-roll-back.html
  3. Martin Kleppmann, Alastair R. Beresford, Boerge Svingen : Traitement des événements en ligne. Commun. ACM 62(5) : 43-49 (2019). EST CE QUE JE: est ce que je.org/10.1145/3312527

Inscrivez-vous à un webinaire gratuit: « Outil de création de données pour le stockage Amazon Redshift. »

Source: habr.com

Ajouter un commentaire