Delta: Платформа синхронізації даних та збагачення

Напередодні запуску нового потоку за курсом "Data Engineer" підготували переведення цікавого матеріалу.

Delta: Платформа синхронізації даних та збагачення

Огляд

Ми поговоримо про досить популярний патерн, за допомогою якого програми використовують кілька сховищ даних, де кожне сховище використовується під свої цілі, наприклад, для зберігання канонічної форми даних (MySQL і т.д.), забезпечення розширених можливостей пошуку (ElasticSearch і т.д.) .), кешування (Memcached і т.д.) та інших. Зазвичай при використанні кількох сховищ даних одне з них працює як основне сховище, інші як похідні сховища. Єдина проблема полягає в тому, як синхронізувати ці сховища даних.

Ми розглянули низку різних патернів, які намагалися вирішити проблему синхронізації кількох сховищ, таких як подвійний запис, розподілені транзакції тощо. Однак ці підходи мають суттєві обмеження щодо використання в реальному житті, надійності та технічного обслуговування. Крім синхронізації даних, деяким додаткам також потрібно збагачувати дані, викликаючи зовнішні послуги.

Для вирішення цих проблем і було розроблено Delta. Delta в кінцевому підсумку являє собою узгоджену, керовану подіями платформу для синхронізації та збагачення даних.

Існуючі рішення

Подвійний запис

Щоб синхронізувати два сховища даних, можна використовувати подвійний запис, який виконує запис в одне сховище, а потім відразу після цього робить запис в інше. Перший запис можна повторити, а другий – перервати, якщо перший завершиться невдачею після вичерпання кількості спроб. Однак два сховища даних можуть перестати синхронізуватись, якщо запис у друге сховище завершиться невдачею. Вирішується ця проблема зазвичай створенням процедури відновлення, яка може періодично повторно переносити дані з першого сховища на друге або робити це тільки в тому випадку, якщо в даних виявляються відмінності.

Проблеми:

Виконання процедури відновлення – це специфічна робота, яку не можна перевикористовувати. Крім того, дані між сховищами залишаються розсинхронізованими доти, доки не пройде процедура відновлення. Рішення ускладнюється, якщо використовується більше двох сховищ даних. І, нарешті, процедура відновлення може додати навантаження вихідне джерело даних.

Таблиця логів змін

Коли в наборі таблиць відбуваються зміни (наприклад, вставка, оновлення та видалення запису), записи змін додаються до таблиці логів, як частина тієї ж транзакції. Інший потік або процес постійно запитує події з таблиці логів і записує їх в одне або кілька сховищ даних, при необхідності видаляючи події з таблиці логів після підтвердження запису всіма сховищами.

Проблеми:

Цей патерн має бути реалізований як бібліотека та в ідеалі без зміни коду програми, що її використовує. У середовищі-поліглоті реалізація такої бібліотеки має існувати будь-якою необхідною мовою, але забезпечити узгодженість роботи функцій та поведінки між мовами дуже непросто.

Інша проблема полягає в отриманні змін схеми, в тих системах, які не підтримують зміни транзакцій схеми [1][2], як, наприклад, MySQL. Тому шаблон виконання зміни (наприклад, зміни схеми) та транзакційного запису їх у таблицю логів змін не завжди працюватиме.

Розподілені транзакції

Розподілені транзакції можна використовувати для того, щоб розділити транзакцію між декількома різнорідними сховищами даних так, щоб операція або фіксувалася у всіх використовуваних сховищах, або не фіксувалася в жодному з них.

Проблеми:

Розподілені транзакції – дуже велика проблема для різноманітних сховищ даних. За своєю природою вони можуть покладатися лише з найменший загальний знаменник систем, що беруть участь. Наприклад, XA-транзакції блокують виконання, якщо в процесі застосування відбувається збій на етапі підготовки. Крім того, XA не забезпечує виявлення дідлок і не підтримує оптимістичні схеми управління паралелізмом. Крім цього, деякі системи типу ElasticSearch не підтримують XA або будь-яку іншу гетерогенну модель транзакцій. Таким чином, забезпечення атомарності запису в різних технологіях зберігання даних залишається для додатків складним завданням [3].

Дельта

Delta була розроблена для усунення обмежень існуючих рішень щодо синхронізації даних, вона також дозволяє збагачувати дані на льоту. Наша мета полягала в абстрагуванні всіх цих складних моментів від розробників додатків, щоб вони могли повністю зосередитися на реалізації бізнес-функціоналу. Далі ми описуватимемо «Movie Search», фактичний варіант використання Delta від Netflix.

У Netflix широко застосовується мікросервісна архітектура і кожен мікросервіс зазвичай обслуговує по одному типу даних. Основні відомості про фільм винесені в мікросервіс, який називається Movie Service, а також пов'язані з ними дані, такі як інформація про продюсерів, акторів, вендорів і так далі керуються декількома іншими мікросервісами (а саме Deal Service, Talent Service та Vendor Service).
Бізнес-користувачі в Netflix Studios часто потребують пошуку за різними критеріями фільмів, тому для них дуже важливо мати можливість здійснювати пошук за всіма даними, пов'язаними з фільмами.

До появи Delta команді пошуку фільмів потрібно було отримувати дані з кількох мікросервісів, перш ніж індексувати дані фільмів. Крім цього, команда повинна була розробити систему, яка періодично оновлювала пошуковий індекс, запитуючи зміни в інших мікросервісів, навіть якщо змін не було взагалі. Ця система дуже швидко обросла складнощами та її стало важко підтримувати.

Delta: Платформа синхронізації даних та збагачення
Малюнок 1. Система полінгу до Delta
Після початку використання Delta система була спрощена до системи, керованої подіями, як показано на наступному малюнку. Події CDC (Change-Data-Capture) відправляються в топіки Keystone Kafka за допомогою Delta-Connector. Програма Delta, побудована з використанням Delta Stream Processing Framework (заснованого на Flink), отримує CDC-події з топіка, збагачує їх, викликаючи інші мікросервіси, і, нарешті, передає збагачені дані до пошукового індексу в Elasticsearch. Весь процес проходить майже в реальному часі, тобто, як зміни фіксуються в сховищі даних, пошукові індекси оновлюються.

Delta: Платформа синхронізації даних та збагачення
Малюнок 2. Пайплайн даних під час використання Delta
У наступних розділах ми опишемо роботу Delta-Connector, який підключається до сховища і публікує CDC-події на транспортному рівні, який є інфраструктурою передачі даних в реальному часі, що направляє CDC-події в топики Kafka. А в самому кінці ми поговоримо про структуру обробки потоків Delta, яку розробники програм можуть використовувати для логіки обробки та збагачення даних.

CDC (Change-Data-Capture)

Ми розробили CDC-сервіс під назвою Delta-Connector, який може фіксувати закоммічені зміни зі сховища даних у режимі реального часу та писати їх у потік. Зміни в реальному часі беруться з журналу транзакцій та дампів сховища. Дампи використовуються, оскільки журнали транзакцій зазвичай не зберігають всю історію змін. Зміни зазвичай серіалізуються як події Delta, тому одержувачу не потрібно турбуватися про те, звідки з'являється зміна.

Delta-Connector підтримує кілька додаткових функцій, таких як:

  • Можливість писати в кастомні вихідні дані повз Kafka.
  • Можливість активації ручних дампів у будь-який час для всіх таблиць, певної таблиці або для певних первинних ключів.
  • Дампи можна забирати чанками, тому немає необхідності починати все спочатку у разі збою.
  • Немає необхідності ставити блокування на таблиці, що дуже важливо для того, щоб трафік запису до бази даних ніколи не блокувався нашим сервісом.
  • Висока доступність через резервні екземпляри в AWS Availability Zones.

Зараз ми підтримуємо MySQL та Postgres, у тому числі при розгортанні в AWS RDS та Aurora. Також ми підтримуємо Cassandra (multi-master). Більше подробиць про Delta-Connector ви можете дізнатися в цьому блозі.

Kafka та транспортний рівень

Транспортний рівень подій Delta побудований на сервісі обміну повідомленнями платформи Наріжний камінь.

Так історично склалося, що публікація повідомлень у Netflix оптимізувалась з урахуванням підвищення доступності, а не довговічності (див. попередня стаття). Компромісом виявилася потенційна невідповідність даних брокера у різних прикордонних сценаріях. Наприклад, unclean leader election відповідає за те, що одержувач потенційно дублює чи втрачає події.

З Delta нам хотілося отримати більш вагомі гарантії довговічності, щоб забезпечити доставку CDC-подій до похідних сховищ. Для цього ми запропонували спеціально спроектований кластер Kafka як об'єкт першого класу. Ви можете переглянути деякі налаштування брокера в таблиці нижче:

Delta: Платформа синхронізації даних та збагачення

У кластерах Keystone Kafka, unclean leader election зазвичай увімкнено для забезпечення доступності видавця. Це може призвести до втрати повідомлень у випадку, якщо несинхронізована репліка буде обрана лідером. Для нового високонадійного кластера Kafka параметр unclean leader election вимкнено, щоб запобігти втраті повідомлень.

Також ми збільшили replication factor з 2 до 3 та minimum insync replicas з 1 до 2. Видавці, які пишуть у цей кластер, вимагають acks від усіх інших, гарантуючи, що 2 з 3 реплік матимуть найактуальніші повідомлення, надіслані видавцем.

Коли екземпляр брокера завершує роботу, новий екземпляр замінює старий. Однак новому брокеру потрібно буде наздогнати несинхронізовані репліки, що може тривати кілька годин. Щоб скоротити час відновлення цього сценарію, ми почали використовувати блокове сховище даних (Amazon Elastic Block Store) замість локальних дисків брокерів. Коли новий екземпляр замінює екземпляр брокера, що завершився, він приєднує EBS-том, який був у екземпляра, що завершився, і починає наздоганяти нові повідомлення. Цей процес скорочує час ліквідації відставання з кількох годин до кількох хвилин, оскільки новому екземпляру не потрібно реплікувати з порожнього стану. Загалом, окремі життєві цикли сховища та брокера значно знижують вплив ефекту від зміни брокера.

Щоб ще більше збільшити гарантію доставки даних, ми використали систему відстеження повідомлень для виявлення будь-якої втрати повідомлень в екстремальних умовах (наприклад, розсинхронізація годинника в лідері розділу).

Stream Processing Framework

Рівень обробки Delta побудований на базі платформи Netflix SPaaS, яка забезпечує інтеграцію Apache Flink з екосистемою Netflix. Платформа надає інтерфейс користувача, який керує розгортанням завдань Flink і оркестрацією кластерів Flink поверх нашої платформи управління контейнерами Titus. Інтерфейс також управляє конфігураціями завдань і дозволяє користувачам вносити зміни до конфігурації динамічно без необхідності перекомпілювати завдання Flink.

Delta надає фреймворк потокової обробки (stream processing framework) даних на базі Flink та SPaaS, яка використовує заснований на інструкціях DSL (Domain Specific Language), щоб абстрагувати технічні деталі. Наприклад, щоб визначити крок, з яким збагачуватимуться події, викликаючи зовнішні сервіси, користувачам потрібно написати наступний DSL, а фреймворк створить на основі нього модель, яка виконається Flink.

Delta: Платформа синхронізації даних та збагачення
Рисунок 3. Приклад збагачення на DSL у Delta

Фреймворк для обробки не тільки скорочує криву навчання, а й забезпечує загальні функції обробки потоку, такі як дедуплікація, схематизація, а також гнучкість та стійкість до відмови для вирішення загальних проблем у роботі.

Delta Stream Processing Framework складається із двох ключових модулів, модуля DSL & API та модуля Runtime. Модуль DSL & API надає DSL та UDF (User-Defined-Function) API для того, щоб користувачі могли написати власну логіку обробки (наприклад, фільтрацію або перетворення). Модуль Runtime надає реалізацію парсера DSL, який будує внутрішнє уявлення кроків обробки у моделях DAG. Компонент Execution інтерпретує DAG-моделі, щоб ініціалізувати фактичні оператори Flink та зрештою запустити програму Flink. Архітектура фреймворку проілюстрована на наступному малюнку.

Delta: Платформа синхронізації даних та збагачення
Рисунок 4. Архітектура Delta Stream Processing Framework

Такий підхід має кілька переваг:

  • Користувачі можуть сфокусуватися на своїй бізнес-логіці без необхідності заглиблюватись у специфіку Flink або структуру SPaaS.
  • Оптимізація може виконуватися прозорим для користувачів способом, а помилки можуть бути виправлені без необхідності внесення будь-яких змін до коду користувача (UDF).
  • Робота програм Delta спрощена для користувачів, оскільки платформа забезпечує гнучкість і відмовостійкість з коробки і збирає безліч докладних метрик, які можна використовувати для сповіщень.

Використання на продакшені

Delta працює на продакшені вже більше року і відіграє ключову роль у багатьох програмах Netflix Studio. Вона допомогла командам реалізувати такі варіанти використання, як індексація пошуку, зберігання даних та робочі процеси, керовані подіями. Нижче наведено огляд високорівневої архітектури платформи Delta.

Delta: Платформа синхронізації даних та збагачення
Рисунок 5. Високорівнева архітектура Delta.

Подяки

Ми хотіли б подякувати наступним людям, які брали участь у створенні та розвитку Delta в Netflix: Allen Wang, Charles Zhao, Jaebin Yoon, Josh Snyder, Kasturi Chatterjee, Mark Cho, Olof Johansson, Piyush Goyal, Prashanth Ramdas, Raghuram Onti Sriniva , Steven Wu, Tharanga Gamaethige, Yun Wang та Zhenzhong Xu.

Джерела

  1. dev.mysql.com/doc/refman/5.7/en/implicit-commit.html
  2. dev.mysql.com/doc/refman/5.7/en/cannot-roll-back.html
  3. Martin Kleppmann, Alastair R. Beresford, Boerge Svingen: Online event processing. Commun. ACM 62(5): 43–49 (2019). DOI: doi.org/10.1145/3312527

Записатися на безкоштовний вебінар: "Data Build Tool для сховища Amazon Redshift"

Джерело: habr.com

Додати коментар або відгук