Delta: Платформа сінхранізацыі дадзеных і ўзбагачэння

Напярэдадні запуску новага патоку па курсе "Data Engineer" падрыхтавалі перавод цікавага матэрыялу.

Delta: Платформа сінхранізацыі дадзеных і ўзбагачэння

Агляд

Мы пагаворым пра досыць папулярны патэрн, з дапамогай якога прыкладанні выкарыстоўваюць некалькі сховішчаў дадзеных, дзе кожнае сховішча выкарыстоўваецца пад свае мэты, напрыклад, для захоўвання кананічнай формы дадзеных (MySQL і г.д.), забеспячэнні пашыраных магчымасцяў пошуку (ElasticSearch і г.д. .), кэшавання (Memcached і г.д.) і іншых. Звычайна пры выкарыстанні некалькіх сховішчаў дадзеных адно з іх працуе як асноўнае сховішча, а іншыя як вытворныя сховішчы. Адзіная праблема заключаецца ў тым, як сінхранізаваць гэтыя сховішчы дадзеных.

Мы разгледзелі шэраг розных патэрнаў, якія спрабавалі вырашыць праблему сінхранізацыі некалькіх сховішчаў, такіх як падвойны запіс, размеркаваныя транзакцыі і г.д. Аднак гэтыя падыходы маюць значныя абмежаванні ў плане выкарыстання ў рэальным жыцці, надзейнасці і тэхнічнага абслугоўвання. Апроч сінхранізацыі дадзеных, некаторым прыкладанням таксама неабходна ўзбагачаць дадзеныя, выклікаючы вонкавыя сэрвісы.

Для вырашэння гэтых праблем і была распрацавана Delta. Delta у канчатковым выніку ўяўляе сабою ўзгодненую, кіраваную падзеямі платформу для сінхранізацыі і ўзбагачэнні дадзеных.

існуючыя рашэння

Падвойны запіс

Каб сінхранізаваць два сховішчы дадзеных, можна выкарыстоўваць падвойны запіс, які выконвае запіс у адно сховішча, а потым адразу пасля гэтага робіць запіс у іншае. Першы запіс можна паўтарыць, а другую - перапыніць, калі першая завершыцца няўдачай пасля вычарпання колькасці спроб. Аднак два сховішчы дадзеных могуць перастаць сінхранізавацца, калі запіс у другое сховішча завершыцца няўдачай. Вырашаецца гэтая праблема звычайна стварэннем працэдуры аднаўлення, якая можа перыядычна паўторна пераносіць дадзеныя з першага сховішчы ў другое ці рабіць гэта толькі ў тым выпадку, калі ў дадзеных выяўляюцца адрозненні.

Праблемы:

Выкананне працэдуры аднаўлення - гэта спецыфічная праца, якую нельга перавыкарыстоўваць. Акрамя таго, дадзеныя паміж сховішчамі застаюцца рассінхранізаванымі да таго часу, пакуль не пройдзе працэдура аднаўлення. Рашэнне ўскладняецца, калі выкарыстоўваецца больш за два сховішчы дадзеных. І, нарэшце, працэдура аднаўлення можа дадаць нагрузку на зыходную крыніцу дадзеных.

Табліца логаў змен

Калі ў наборы табліц адбываюцца змены (напрыклад, устаўка, абнаўленне і выдаленне запісу), запісы змен дадаюцца ў табліцу логаў, як частка той жа транзакцыі. Іншы струмень або працэс увесь час запытвае падзеі з табліцы логаў і запісвае іх у адно або некалькі сховішчаў дадзеных, пры з'яўленні неабходнасці выдаляючы падзеі з табліцы логаў пасля пацверджання запісу ўсімі сховішчамі.

Праблемы:

Гэты патэрн павінен быць рэалізаваны як бібліятэка і ў ідэале без змены кода прыкладання яе які выкарыстоўвае. У асяроддзі-паліглоце рэалізацыя такой бібліятэкі павінна існаваць на любой неабходнай мове, але забяспечыць узгодненасць працы функцый і паводзін паміж мовамі вельмі няпроста.

Іншая праблема крыецца ў атрыманні змен схемы, у тых сістэмах, якія не падтрымліваюць транзакцыйныя змены схемы [1][2], як, напрыклад, MySQL. Таму шаблон выканання змены (напрыклад, змены схемы) і транзакцыйны запіс яго ў табліцу логаў змен не заўсёды будзе працаваць.

Размеркаваныя Транзакцыі

Размеркаваныя транзакцыі можна выкарыстоўваць для таго, каб падзяліць транзакцыю паміж некалькімі разнастайнымі сховішчамі дадзеных так, каб аперацыя альбо фіксавалася ва ўсіх выкарыстоўваных сховішчах, альбо не фіксавалася ні ў адным з іх.

Праблемы:

Размеркаваныя транзакцыі - вельмі вялікая праблема для разнастайных сховішчаў дадзеных. Па сваёй прыродзе яны могуць спадзявацца толькі на найменшы агульны назоўнік якія ўдзельнічаюць сістэм. Напрыклад, XA-транзакцыі блакуюць выкананне, калі падчас прыкладанняў адбываецца збой на этапе падрыхтоўкі. Акрамя таго, XA не забяспечвае выяўленні дзядлокаў і не падтрымлівае аптымістычныя схемы кіравання паралелізмам. Акрамя гэтага, некаторыя сістэмы па тыпе ElasticSearch не падтрымліваюць XA або любую іншую гетэрагенную мадэль транзакцый. Такім чынам, забеспячэнне атамарнасці запісы ў розных тэхналогіях захоўвання дадзеных застаецца для прыкладанняў вельмі складанай задачай [3].

дэльта

Delta была распрацавана для ўхілення абмежаванняў існуючых рашэнняў па сінхранізацыі дадзеных, яна таксама дазваляе ўзбагачаць дадзеныя на лета. Наша мэта складалася ў тым, каб абстрагаваць усе гэтыя складаныя моманты ад распрацоўшчыкаў прыкладанняў, каб яны маглі цалкам засяродзіцца на рэалізацыі бізнес-функцыяналу. Далей мы будзем апісваць "Movie Search", фактычны варыянт выкарыстання Delta ад Netflix.

У Netflix шырока прымяняецца мікрасэрвісная архітэктура і кожны мікрасэрвіс звычайна абслугоўвае па адным тыпе дадзеных. Асноўныя звесткі пра фільм вынесены ў мікрасэрвіс, які называецца Movie Service, а таксама звязаныя з імі дадзеныя, такія як інфармацыя аб прадзюсарах, акцёрах, вендарах і гэтак далей кіруюцца некалькімі іншымі мікрасэрвісамі (а менавіта Deal Service, Talent Service і Vendor Service).
Бізнес-карыстальнікі ў Netflix Studios часта маюць патрэбу ў пошуку па розных крытэрыях фільмаў, менавіта таму для іх вельмі важна мець магчымасць ажыццяўляць пошук па ўсіх дадзеных, злучаным з фільмамі.

Да з'яўлення Delta камандзе пошуку фільмаў трэба было атрымліваць дадзеныя з некалькіх мікрасэрвісаў, перш чым індэксаваць дадзеныя фільмаў. Апроч гэтага, каманда павінна была распрацаваць сістэму, якая перыядычна абнаўляла б пошукавы індэкс, запытваючы змены ў іншых мікрасэрвісаў, нават калі змен не было наогул. Гэтая сістэма вельмі хутка абрасла складанасцямі і яе стала цяжка падтрымліваць.

Delta: Платформа сінхранізацыі дадзеных і ўзбагачэння
Малюнак 1. Сістэма поллінга да Delta
Пасля пачатку выкарыстання Delta, сістэма была спрошчана да сістэмы, якая кіруецца падзеямі, як паказана на наступным малюнку. Падзеі CDC (Change-Data-Capture) адпраўляюцца ў топікі Keystone Kafka з дапамогай Delta-Connector. Дадатак Delta, пабудаванае з выкарыстаннем Delta Stream Processing Framework (заснаванага на Flink), атрымлівае CDC-падзеі з топіка, узбагачае іх, выклікаючы іншыя мікрасэрвісы, і, нарэшце, перадае узбагачаныя дадзеныя ў пошукавы індэкс у Elasticsearch. Увесь працэс праходзіць амаль у рэальным часе, гэта значыць, як толькі змены фіксуюцца ў сховішчы дадзеных, пошукавыя азначнікі абнаўляюцца.

Delta: Платформа сінхранізацыі дадзеных і ўзбагачэння
Малюнак 2. Пайплайн дадзеных пры выкарыстанні Delta
У наступных раздзелах мы апішам працу Delta-Connector, які падлучаецца да сховішча і публікуе CDC-падзеі на транспартным узроўні, які ўяўляе з сябе інфраструктуру перадачы дадзеных у рэальным часе, якая накіроўвае CDC-падзеі ў топікі Kafka. А ў самым канцы мы пагаворым аб структуры апрацоўкі патокаў Delta, якую распрацоўшчыкі прыкладанняў могуць выкарыстоўваць для логікі апрацоўкі і ўзбагачэння дадзеных.

CDC (Change-Data-Capture)

Мы распрацавалі CDC-сэрвіс пад назвай Delta-Connector, які можа фіксаваць закаммічаныя змены са сховішча дадзеных у рэжыме рэальнага часу і пісаць іх у струмень. Змены ў рэальным часе бяруцца з часопіса транзакцый і дампаў сховішчы. Дампы выкарыстоўваюцца паколькі часопісы транзакцый звычайна не захоўваюць усю гісторыю змен. Змены звычайна серыялізуюцца як падзеі Delta, так што атрымальніку не трэба турбавацца аб тым, адкуль з'яўляецца змена.

Delta-Connector падтрымлівае некалькі дадатковых функцый, такіх як:

  • Магчымасць пісаць у кастамныя выходныя дадзеныя міма Kafka.
  • Магчымасць актывацыі ручных дампаў у любы час для ўсіх табліц, нейкай вызначанай табліцы ці для вызначаных першасных ключоў.
  • Дампы можна забіраць чанкамі, таму няма неабходнасці пачынаць усё з пачатку ў выпадку збою.
  • Няма неабходнасці ставіць блакіроўкі на табліцы, што вельмі важна для таго, каб трафік запісу ў базу даных ніколі не блакаваўся нашым сэрвісам.
  • Высокая даступнасць з-за рэзервовых асобнікаў у AWS Availability Zones.

Цяпер мы падтрымліваем MySQL і Postgres, у тым ліку пры разгортванні ў AWS RDS і Aurora. Таксама мы падтрымліваем Cassandra (multi-master). Больш падрабязнасцяў аб Delta-Connector вы можаце даведацца ў гэтым блогу.

Kafka і транспартны ўзровень

Транспартны ўзровень падзей Delta пабудаваны на сэрвісе абмену паведамленнямі платформы Краевугольны камень.

Так гістарычна склалася, што публікацыя паведамленняў у Netflix аптымізавалася з улікам павышэння даступнасці, а не даўгавечнасці. папярэдні артыкул). Кампрамісам аказалася патэнцыйнае неадпаведнасць дадзеных брокера ў розных памежных сцэнарах. Напрыклад, unclean leader election адказвае за тое, што атрымальнік патэнцыйна дублюе ці губляе падзеі.

З Delta нам хацелася атрымаць больш важкія гарантыі даўгавечнасці, каб забяспечыць дастаўку CDC-падзей у вытворныя сховішчы. Для гэтага мы прапанавалі спецыяльна спраектаваны кластар Kafka у якасці аб'екта першага класа. Вы можаце паглядзець на некаторыя наладкі брокера ў табліцы ніжэй:

Delta: Платформа сінхранізацыі дадзеных і ўзбагачэння

У кластарах Keystone Kafka, unclean leader election звычайна уключаны для забеспячэння даступнасці выдаўца. Гэта можа прывесці да страты паведамленняў у выпадку, калі несінхранізаваная рэпліка будзе абрана ў якасці лідэра. Для новага высоканадзейнага кластара Kafka параметр unclean leader election выключаны, каб прадухіліць страту паведамленняў.

Таксама мы павялічылі replication factor з 2 да 3 і minimum insync replicas з 1 да 2. Выдаўцы, якія пішуць у гэты кластар, патрабуюць acks ад усіх іншых, гарантуючы, што 2 з 3 рэплік будуць мець самыя актуальныя паведамленні, дасланыя выдаўцом.

Калі асобнік брокера завяршае працу, новы асобнік замяняе стары. Аднак новаму брокеру трэба будзе дагнаць несінхранізаваныя рэплікі, што можа заняць некалькі гадзін. Каб скараціць час аднаўлення працы гэтага сцэнара, мы пачалі выкарыстоўваць блокавае сховішча дадзеных (Amazon Elastic Block Store) замест лакальных дыскаў брокераў. Калі новы асобнік замяняе які завяршыўся асобнік брокера, ён далучае EBS-том, які быў у які завяршыўся асобніка, і пачынае даганяць новыя паведамленні. Гэты працэс скарачае час ліквідацыі адставання з некалькіх гадзін да некалькіх хвілін, бо новаму асобніку больш не трэба рэплікаваць з пустога стану. У цэлым, асобныя жыццёвыя цыклы сховішчы і брокера значна змяншаюць уплыў эфекту ад змены брокера.

Каб яшчэ больш павялічыць гарантыю дастаўкі дадзеных, мы выкарыстоўвалі сістэму адсочвання паведамленняў для выяўлення любой страты паведамленняў у экстрэмальных умовах (напрыклад, рассінхранізацыя гадзін у лідэры часткі).

Stream Processing Framework

Узровень апрацоўкі ў Delta пабудаваны на базе платформы Netflix SPaaS, якая забяспечвае інтэграцыю Apache Flink з экасістэмай Netflix. Платформа дае карыстацкі інтэрфейс, які кіруе разгортваннем заданняў Flink і аркестрацыяй кластараў Flink па-над нашай платформай кіравання кантэйнерамі Titus. Інтэрфейс таксама кіруе канфігурацыямі заданняў і дазваляе карыстачамі ўносіць змены ў канфігурацыю дынамічна без неабходнасці перакампіляваць заданні Flink.

Delta падае фрэймворк струменевай апрацоўкі (stream processing framework) дадзеных на базе Flink і SPaaS, якая выкарыстоўвае заснаваны на анатацыях DSL (Domain Specific Language), каб абстрагаваць тэхнічныя дэталі. Напрыклад, каб вызначыць крок, з якім будуць узбагачацца падзеі, выклікаючы вонкавыя сэрвісы, карыстачам трэба напісаць наступны DSL, а фрэймворк створыць на аснове яго мадэль, якая выканаецца Flink.

Delta: Платформа сінхранізацыі дадзеных і ўзбагачэння
Малюнак 3. Прыклад узбагачэння на DSL у Delta

Фрэймворк для апрацоўкі не толькі скарачае крывую навучання, але і забяспечвае агульныя функцыі апрацоўкі патоку, такія як дэдуплікацыя, схематызацыя, а таксама гнуткасць і адмоваўстойлівасць для вырашэння агульных праблем у працы.

Delta Stream Processing Framework складаецца з двух ключавых модуляў, модуля DSL & API і модуля Runtime. Модуль DSL & API падае DSL і UDF (User-Defined-Function) API для таго, каб карыстачы маглі напісаць уласную логіку апрацоўкі (напрыклад, фільтраванне ці пераўтварэнні). Модуль Runtime дае рэалізацыю парсера DSL, які будуе ўнутранае ўяўленне крокаў апрацоўкі ў мадэлях DAG. Кампанент Execution інтэрпрэтуе DAG-мадэлі, каб ініцыялізаваць фактычныя аператары Flink і ў канчатковым выніку запусціць прыкладанне Flink. Архітэктура фрэймворка праілюстравана на наступным малюнку.

Delta: Платформа сінхранізацыі дадзеных і ўзбагачэння
Малюнак 4. Архітэктура Delta Stream Processing Framework

У такога падыходу ёсць некалькі пераваг:

  • Карыстальнікі могуць сфакусавацца на сваёй бізнэс-логіцы без неабходнасці паглыбляцца ў спецыфіку Flink ці структуру SPaaS.
  • Аптымізацыя можа выконвацца празрыстым для карыстачоў спосабам, а памылкі могуць быць выпраўленыя без неабходнасці занясення якія-небудзь змен у код карыстача (UDF).
  • Праца прыкладанняў Delta спрошчана для карыстальнікаў, паколькі платформа забяспечвае гнуткасць і адмоваўстойлівасць са скрынкі і збірае мноства падрабязных метрык, якія можна выкарыстоўваць для абвестак.

Выкарыстанне на прадакшэне

Delta працуе на прадакшэне ўжо больш за год і гуляе ключавую ролю ў шматлікіх прыкладаннях Netflix Studio. Яна дапамагла камандам рэалізаваць такія варыянты выкарыстання, як індэксацыя пошуку, захоўванне дадзеных і працоўныя працэсы, якія кіруюцца падзеямі. Ніжэй прадстаўлены агляд высокаўзроўневай архітэктуры платформы Delta.

Delta: Платформа сінхранізацыі дадзеных і ўзбагачэння
Малюнак 5. Высокаўзроўневая архітэктура Delta.

падзякі

Мы хацелі б падзякаваць наступным людзям, якія ўдзельнічалі ў стварэнні і развіцці Delta ў Netflix: Allen Wang, Charles Zhao, Jaebin Yoon, Josh Snyder, Kasturi Chatterjee, Mark Cho, Olof Johansson, Piyush Goyal, Prashanth Ramdas, Raghuram Onti Sriniva , Steven Wu, Tharanga Gamaethige, Yun Wang і Zhenzhong Xu.

крыніцы

  1. dev.mysql.com/doc/refman/5.7/en/implicit-commit.html
  2. dev.mysql.com/doc/refman/5.7/en/cannot-roll-back.html
  3. Martin Kleppmann, Alastair R. Beresford, Boerge Svingen: Online event processing. Commun. ACM 62(5): 43-49 (2019). DOI: doi.org/10.1145/3312527

Запісацца на бясплатны вэбінар: "Data Build Tool для сховішча Amazon Redshift".

Крыніца: habr.com

Дадаць каментар