Delta: Data Synchronization and Enrichment Platform

In anticipation of the launch of a new flow at the rate Data Engineer prepared a translation of interesting material.

Delta: Data Synchronization and Enrichment Platform

Review

We will talk about a fairly popular pattern by which applications use several data stores, where each store is used for its own purposes, for example, to store the canonical form of data (MySQL, etc.), provide advanced search capabilities (ElasticSearch, etc.) .), caching (Memcached, etc.) and others. Typically, when using multiple data stores, one of them operates as the primary store and the others as derived stores. The only problem is how to synchronize these data stores.

We looked at a number of different patterns that tried to solve the problem of synchronizing multiple storages, such as double entry, distributed transactions, etc. However, these approaches have significant limitations in terms of real life use, reliability and maintenance. In addition to data synchronization, some applications also need to enrich data by calling external services.

To solve these problems, Delta was developed. Delta is ultimately a consistent, event-driven platform for data synchronization and enrichment.

Existing solutions

double entry

To keep two data stores in sync, you can use dual write, which writes to one store and then writes to the other immediately afterward. The first recording can be repeated, and the second recording can be aborted if the first one fails after the number of attempts has been exhausted. However, the two data stores may become out of sync if a write to the second store fails. This problem is usually solved by creating a recovery procedure that can periodically re-transfer data from the first storage to the second, or do this only if differences are found in the data.

Problems:

Performing a recovery procedure is a specific job that cannot be reused. In addition, the data between the stores remains out of sync until the restore procedure is completed. The solution becomes more complicated if more than two data stores are used. And finally, the recovery procedure can add load to the original data source.

Change log table

When changes occur in a set of tables (such as inserting, updating, and deleting a record), the change records are added to the log table as part of the same transaction. Another thread or process constantly requests events from the log table and writes them to one or more data stores, if necessary, deleting events from the log table after the record is confirmed by all stores.

Problems:

This pattern should be implemented as a library and ideally without changing the application code that uses it. In a polyglot environment, an implementation of such a library would need to exist in any desired language, but it's very difficult to ensure consistent functionality and behavior across languages.

Another problem is getting schema changes on systems that do not support transactional schema changes [1][2], such as MySQL. Therefore, the pattern of making a change (such as a schema change) and transactionally writing it to the change log table will not always work.

Distributed Transactions

Distributed transactions can be used to split a transaction across multiple, heterogeneous data stores so that the transaction is either committed to all of the stores in use, or not committed to any of them.

Problems:

Distributed transactions are a very big problem for heterogeneous data stores. By their nature, they can only rely on the lowest common denominator of the systems involved. For example, XA transactions block execution if the application process fails during the prepare phase. In addition, XA does not provide deadlock detection and does not support optimistic concurrency control schemes. In addition, some systems like ElasticSearch do not support XA or any other heterogeneous transaction model. Thus, ensuring the atomicity of recording in various data storage technologies remains a very difficult task for applications [3].

Delta

Delta was designed to remove the limitations of existing data synchronization solutions, and it also allows data enrichment on the fly. Our goal was to abstract all of these complexities away from application developers so that they can fully focus on implementing the business functionality. Next, we will describe "Movie Search", the actual use case for Netflix's Delta.

Netflix uses a microservice architecture extensively, and each microservice typically serves one type of data. The main information about the movie is placed in a microservice called Movie Service, and related data, such as information about producers, actors, vendors, and so on, is managed by several other microservices (namely Deal Service, Talent Service and Vendor Service).
Business users at Netflix Studios often need to search across multiple criteria for movies, which is why it's important for them to be able to search all movie-related data.

Before Delta, the movie search team needed to fetch data from multiple microservices before indexing movie data. In addition, the team had to develop a system that would periodically update the search index, requesting changes from other microservices, even if there were no changes at all. This system quickly became complex and difficult to maintain.

Delta: Data Synchronization and Enrichment Platform
Figure 1. Polling system up to Delta
After starting to use Delta, the system was simplified to an event-driven system, as shown in the following figure. CDC (Change-Data-Capture) events are sent to Keystone Kafka topics using Delta-Connector. A Delta application built using the Delta Stream Processing Framework (based on Flink) receives CDC events from a topic, enriches them by calling other microservices, and finally passes the enriched data to the search index in Elasticsearch. The whole process takes place almost in real time, that is, as soon as changes are committed to the data warehouse, the search indexes are updated.

Delta: Data Synchronization and Enrichment Platform
Figure 2. Data pipeline using Delta
In the following sections, we will describe the operation of the Delta-Connector, which connects to the store and publishes CDC events at the transport layer, which is a real-time data transfer infrastructure that sends CDC events to Kafka topics. And at the very end, we will talk about the Delta stream processing framework, which application developers can use for processing logic and data enrichment.

CDC (Change-Data-Capture)

We have developed a CDC service called Delta-Connector that can capture committed changes from the data store in real time and write them to a stream. Real-time changes are taken from the transaction log and storage dumps. Dumps are used because transaction logs usually do not store the entire history of changes. Changes are usually serialized as Delta events so that the receiver doesn't have to worry about where the change comes from.

Delta-Connector supports several additional features such as:

  • Ability to write to custom output by Kafka.
  • Ability to activate manual dumps at any time for all tables, a specific table, or specific primary keys.
  • Dumps can be taken in chunks, so there is no need to start over from the beginning in case of a failure.
  • There is no need to put locks on the tables, which is very important so that write traffic to the database is never blocked by our service.
  • High availability due to standby instances in AWS Availability Zones.

We currently support MySQL and Postgres, including when deployed to AWS RDS and Aurora. We also support Cassandra (multi-master). More details about Delta-Connector you can find in this Π±Π»ΠΎΠ³Π΅.

Kafka and the transport layer

The Delta event transport layer is built on the platform's messaging service Keystone.

Historically, posting on Netflix has been optimized for accessibility rather than durability (see below). previous article). The trade-off turned out to be the potential inconsistency of broker data in various edge scenarios. For example, unclean leader election is responsible for the receiver potentially duplicating or losing events.

With Delta, we wanted stronger durability guarantees to ensure that CDC events were delivered to derivative stores. To do this, we have proposed a specially designed Kafka cluster as a first class object. You can look at some broker settings in the table below:

Delta: Data Synchronization and Enrichment Platform

In Keystone Kafka clusters, unclean leader election typically included to ensure publisher availability. This can result in message loss if an out-of-sync replica is chosen as the leader. For a new highly available Kafka cluster, the parameter unclean leader election turned off to prevent message loss.

We also increased replication factor from 2 to 3 and minimum insync replicas 1 to 2. Publishers writing to this cluster require acks from all others, ensuring that 2 out of 3 replicas will have the most current messages sent by the publisher.

When the broker instance terminates, the new instance replaces the old one. However, the new broker will need to catch up with the out-of-sync replicas, which can take several hours. To reduce the recovery time for this scenario, we started using block storage (Amazon Elastic Block Store) instead of local broker disks. When a new instance replaces a terminated broker instance, it attaches the EBS volume that the terminated instance had and starts catching up with new messages. This process reduces backlog cleanup time from hours to minutes because the new instance no longer needs to replicate from an empty state. In general, separate storage and broker lifecycles greatly reduce the effect of changing brokers.

To further increase the data delivery guarantee, we used message tracking system to detect any loss of messages under extreme conditions (for example, a clock out of sync in the partition leader).

Stream Processing Framework

The processing layer in Delta is built on top of the Netflix SPaaS platform, which provides Apache Flink integration with the Netflix ecosystem. The platform provides a user interface that manages the deployment of Flink jobs and the orchestration of Flink clusters on top of our Titus container management platform. The interface also manages job configurations and allows users to make configuration changes dynamically without having to recompile Flink jobs.

Delta provides a stream processing framework based on Flink and SPaaS that uses annotation based DSL (Domain Specific Language) to abstract away the technical details. For example, to determine the step at which events will be enriched by calling external services, users need to write the following DSL, and the framework will create a model based on it, which will be executed by Flink.

Delta: Data Synchronization and Enrichment Platform
Figure 3. An example of DSL enrichment in Delta

The processing framework not only reduces the learning curve, but also provides common stream processing features such as deduplication, schematization, as well as flexibility and resiliency to solve common work problems.

The Delta Stream Processing Framework consists of two key modules, the DSL & API module and the Runtime module. The DSL & API module provides a DSL and UDF (User-Defined-Function) API so that users can write their own processing logic (such as filtering or transformations). The Runtime module provides an implementation of a DSL parser that builds an internal representation of processing steps in DAG models. The Execution component interprets the DAGs to initialize the actual Flink statements and ultimately run the Flink application. The architecture of the framework is illustrated in the following figure.

Delta: Data Synchronization and Enrichment Platform
Figure 4. Architecture of the Delta Stream Processing Framework

This approach has several advantages:

  • Users can focus on their business logic without having to delve into Flink specifics or the SPaaS framework.
  • Optimization can be done in a transparent way to users, and bugs can be fixed without the need to make any changes to the user code (UDF).
  • Delta applications are simplified for users as the platform provides flexibility and resiliency out of the box and collects many detailed metrics that can be used for alerts.

Production use

Delta has been in production for over a year now and plays a key role in many of the Netflix Studio apps. She helped teams implement use cases such as search indexing, data storage, and event-driven workflows. Below is an overview of the high-level architecture of the Delta platform.

Delta: Data Synchronization and Enrichment Platform
Figure 5. Delta high-level architecture.

Acknowledgements

We would like to thank the following people who were involved in the creation and development of Delta at Netflix: Allen Wang, Charles Zhao, Jaebin Yoon, Josh Snyder, Kasturi Chatterjee, Mark Cho, Olof Johansson, Piyush Goyal, Prashanth Ramdas, Raghuram Onti Srinivasan, Sandeep Gupta , Steven Wu, Tharanga Gamaethige, Yun Wang and Zhenzhong Xu.

Sources of

  1. dev.mysql.com/doc/refman/5.7/en/implicit-commit.html
  2. dev.mysql.com/doc/refman/5.7/en/cannot-roll-back.html
  3. Martin Kleppmann, Alastair R. Beresford, Boerge Svingen: Online event processing. commun. ACM 62(5): 43–49 (2019). DOI: doi.org/10.1145/3312527

Sign up for a free webinar: "Data Build Tool for Amazon Redshift Storage."

Source: habr.com

Add a comment