Занурення в Delta Lake: примусове застосування та еволюція схеми

Привіт, Хабре! Представляю вашій увазі переклад статті "Diving Into Delta Lake: Schema Enforcement & Evolution" авторів Burak Yavuz, Brenner Heintz and Denny Lee, який був підготовлений напередодні старту курсу "Data Engineer" від OTUS.

Занурення в Delta Lake: примусове застосування та еволюція схеми

Дані, як і наш досвід, постійно накопичуються та розвиваються. Щоб не відставати, наші ментальні моделі світу повинні адаптуватися до нових даних, деякі з яких містять нові виміри — нові способи спостерігати речі, про які ми раніше не мали уявлення. Ці ментальні моделі мало чим відрізняються від схем таблиць, що визначають, як ми класифікуємо та обробляємо нову інформацію.

Це підводить нас до питання керування схемами. У міру того, як бізнес завдання та вимоги змінюються з часом, змінюється структура ваших даних. Delta Lake дозволяє легко впроваджувати нові виміри при зміні даних. Користувачі мають доступ до простої семантики для керування схемами своїх таблиць. Ці інструменти включають примусове застосування схеми (Schema Enforcement), яке захищає користувачів від ненавмисного засмічення своїх таблиць помилками або непотрібними даними, а також еволюцію схеми (Schema Evolution), яка дозволяє автоматично додавати нові стовпці з цінними даними у відповідні місця. У цій статті ми заглибимося у використання цих інструментів.

Розуміння схем таблиць

Кожен DataFrame в Apache Spark містить схему, яка визначає форму даних, таку як типи даних, стовпці та метадані. За допомогою Delta Lake схема таблиці зберігається у форматі JSON усередині журналу транзакцій.

Що таке примусове застосування схеми?

Примусове застосування схеми (Schema Enforcement), також відоме як перевірка схеми (Schema Validation), є захисним механізмом Delta Lake, який гарантує якість даних, відхиляючи записи, які не відповідають схемі таблиці. Як і хостес на стійці реєстрації в популярному ресторані, який приймає лише за попередньою бронею, він перевіряє, чи є кожен стовпець даних, що вводяться в таблицю, у відповідному списку очікуваних стовпців (іншими словами, чи є для кожного з них «броня»), та відхиляє будь-які записи зі стовпцями, яких немає у списку.

Як працює примусове застосування схеми?

Delta Lake використовує перевірку схеми під час запису, що означає, що нові записи до таблиці перевіряються на сумісність зі схемою цільової таблиці під час запису. Якщо схема несумісна, Delta Lake повністю скасовує транзакцію (дані не записуються) і створює виняток, щоб повідомити користувача про невідповідність.
Для визначення сумісності запису з таблицею Delta Lake використовує такі правила. Записуваний DataFrame:

  • не може містити додаткові стовпці, яких немає у схемі цільової таблиці. І навпаки, все гаразд, якщо вхідні дані не містять абсолютно всіх стовпців з таблиці — цим стовпцям просто буде присвоєно нульові значення.
  • не може мати типи даних стовпців, які відрізняються від типів даних стовпців у цільовій таблиці. Якщо стовпець цільової таблиці містить дані StringType, але відповідний стовпець DataFrame містить дані IntegerType, примусове застосування схеми викличе виняток і запобіжить виконання операції запису.
  • не може містити імена стовпців, які відрізняються лише регістром. Це означає, що ви не можете мати стовпці з іменами Foo і Foo, визначені в одній таблиці. Хоча Spark можна використовувати в чутливому або нечутливому (за замовчуванням) режимі, Delta Lake зберігає регістр, але нечутливий в рамках зберігання схеми. Parquet чутливий до регістру при зберіганні та поверненні інформації стовпця. Щоб уникнути можливих помилок, пошкодження даних або їх втрати (з чим ми особисто стикалися з Databricks), ми вирішили додати це обмеження.

Щоб проілюструвати це, давайте поглянемо на те, що відбувається в наведеному нижче коді при спробі додати деякі недавно сгенеровані стовпці таблицю Delta Lake, яка ще не налаштована для їх прийняття.

# Сгенерируем DataFrame ссуд, который мы добавим в нашу таблицу Delta Lake
loans = sql("""
            SELECT addr_state, CAST(rand(10)*count as bigint) AS count,
            CAST(rand(10) * 10000 * count AS double) AS amount
            FROM loan_by_state_delta
            """)

# Вывести исходную схему DataFrame
original_loans.printSchema()

root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
 
# Вывести новую схему DataFrame
loans.printSchema()
 
root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
  |-- amount: double (nullable = true) # new column
 
# Попытка добавить новый DataFrame (с новым столбцом) в существующую таблицу
loans.write.format("delta") 
           .mode("append") 
           .save(DELTALAKE_PATH)

Returns:

A schema mismatch detected when writing to the Delta table.
 
To enable schema migration, please set:
'.option("mergeSchema", "true")'
 
Table schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
 
Data schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
-- amount: double (nullable = true)
 
If Table ACLs are enabled, these options will be ignored. Please use the ALTER TABLE command for changing the schema.

Замість автоматичного додавання нових стовпців, Delta Lake нав'язує схему та зупиняє запис. Щоб допомогти визначити, який стовпець (або їх безліч) є причиною невідповідності, Spark виводить обидві схеми зі стек трейсу для порівняння.

У чому примусове застосування схеми?

Оскільки примусове застосування схеми є досить суворою перевіркою, воно є відмінним інструментом для використання в якості воротаря чистого, повністю перетвореного набору даних, який готовий до виробництва або споживання. Як правило, застосовується до таблиць, які безпосередньо подають дані:

  • Алгоритмам машинного навчання
  • BI дашбордам
  • Аналітика даних та інструментів візуалізації
  • Будь-якій виробничій системі, що вимагає строго структурованих, строго типізованих семантичних схем.

Щоб підготувати свої дані до цього фінального бар'єру, багато користувачів використовують просту “multi-hop” архітектуру, яка поступово вносить структуру їх таблиці. Щоб дізнатися про це більше, ви можете ознайомитись зі статтею Машинне навчання виробничого рівня з Delta Lake.

Звичайно, примусове застосування схеми можна використовувати в будь-якому місці вашого пайплайну, але пам'ятайте, що потоковий запис в таблицю в такому випадку може бути фруструючим, тому що, наприклад, ви забули, що додали ще один стовпець у вхідні дані.

Запобігання розрідження даних

До цього моменту ви можете поставити запитання, через що такий ажіотаж? Зрештою, іноді несподівана помилка "невідповідності схеми" може підставити вам підніжку у вашому робочому процесі, особливо якщо ви новачок у Delta Lake. Чому б просто не дозволити схемі змінитись так, як потрібно для того, щоб я міг записати свій DataFrame, незважаючи ні на що?

Як каже стара приказка, «унція профілактики коштує фунта лікування». У якийсь момент, якщо ви не подбаєте про застосування своєї схеми, піднімуть свої огидні голови проблеми із сумісністю типів даних — на перший погляд однорідні джерела необроблених даних можуть містити прикордонні випадки, пошкоджені стовпці, неправильно сформовані відображення чи інші страшні речі, що сняться. у кошмарах. Найкращий підхід полягає в тому, щоб зупиняти цих ворогів біля воріт — за допомогою примусового застосування схеми — і мати справу з ними на світлі, а не пізніше, коли вони почнуть нишпорити в темних глибинах вашого робочого коду.

Примусове застосування схеми дає впевненість у тому, що схема вашої таблиці не зміниться, якщо ви самі не підтвердите варіант зміни. Це запобігає «розрідженням» (dilution) даних, яке може відбуватися, коли нові стовпці додаються так часто, що раніше цінні, стислі таблиці втрачають своє значення та корисність через повінь даних. Заохочуючи вас бути навмисним, встановлювати високі стандарти та очікувати високої якості, примусове застосування схеми робить саме те, для чого було призначено допомагати вам залишатися сумлінними, а вашим таблицям чистими.

Якщо при подальшому розгляді ви вирішите, що вам насправді потрібно додати новий стовпець - ніяких проблем, нижче наведено однорядковий фікс. Рішення – еволюція схеми!

Що таке еволюція схеми?

Еволюція схеми – це функція, яка дозволяє користувачам легко змінювати поточну схему таблиці відповідно до даних, що змінюються з часом. Найчастіше вона використовується при виконанні операції додавання або перезапису, щоб автоматично адаптувати схему для включення одного або кількох нових стовпців.

Як працює еволюція схеми?

Наслідуючи приклад з попереднього розділу, розробники можуть легко використовувати еволюцію схеми для додавання нових стовпців, які раніше були відхилені через невідповідність до схеми. Еволюція схеми активується шляхом додавання .option('mergeSchema', 'true') до вашої Spark команди .write или .writeStream.

# Добавьте параметр mergeSchema
loans.write.format("delta") 
           .option("mergeSchema", "true") 
           .mode("append") 
           .save(DELTALAKE_SILVER_PATH)

Для перегляду графіка виконайте наступний Spark SQL запит

# Создайте график с новым столбцом, чтобы подтвердить, что запись прошла успешно
%sql
SELECT addr_state, sum(`amount`) AS amount
FROM loan_by_state_delta
GROUP BY addr_state
ORDER BY sum(`amount`)
DESC LIMIT 10

Занурення в Delta Lake: примусове застосування та еволюція схеми
Як альтернатива, ви можете встановити цю опцію для всієї сесії Spark, додавши spark.databricks.delta.schema.autoMerge = True у конфігурацію Spark. Але користуйтеся цим з обережністю, оскільки примусове застосування схеми більше не попереджатиме вас про ненавмисні невідповідності схемі.

Увімкнувши запит параметр mergeSchema, всі стовпці, які присутні в DataFrame, але відсутні в цільовій таблиці, автоматично додаються до кінця схеми в рамках транзакції запису. Також можуть бути додані вкладені поля, і вони також будуть додані до кінця відповідних стовпців структури.

Дата інженери та вчені можуть використовувати цю опцію, щоб додати нові стовпці (можливо, нещодавно відстежувану метрику або стовпець показників продажу цього місяця) у свої існуючі виробничі таблиці машинного навчання, не порушуючи існуючі моделі, засновані на старих стовпцях.

Наступні типи змін схеми допустимі в рамках еволюції схеми під час додавання або перезапису таблиці:

  • Додавання нових стовпців (це найпоширеніший сценарій)
  • Зміна типів даних з NullType -> будь-який інший тип або підвищення з ByteType -> ShortType -> IntegerType

Інші зміни, неприпустимі в рамках еволюції схеми, вимагають, щоб схема та дані були перезаписані шляхом додавання .option("overwriteSchema", "true"). Наприклад, у випадку, коли стовпець Foo спочатку був integer, а нова схема була б рядкового типу даних, тоді всі файли Parquet (data) необхідно було б перезаписати. До таких змін відносяться:

  • видалення стовпця
  • зміна типу даних існуючого стовпця (на місці)
  • перейменування стовпців, які відрізняються лише регістром (наприклад, «Foo» та «foo»)

Нарешті, з наступним релізом Spark 3.0 повністю підтримуватиметься явний DDL (з використанням ALTER TABLE), що дозволить користувачам виконувати наступні дії над схемами таблиць:

  • додавання стовпців
  • зміна коментарів до стовпців
  • налаштування властивостей таблиці, що визначають поведінку таблиці, наприклад, встановлення тривалості зберігання журналу транзакцій.

У чому еволюція схеми?

Еволюцію схеми можна використовувати завжди, коли ви маєте намір змінити схему своєї таблиці (на противагу тим випадкам, коли ви випадково додали у свій DataFrame стовпці, яких там не повинно бути). Це найпростіший спосіб мігрувати вашу схему, тому що він автоматично додає правильні імена стовпців та типи даних без необхідності їхнього явного оголошення.

Висновок

Примусове застосування схеми відхиляє нові стовпці або інші зміни схеми, які не сумісні з вашою таблицею. Встановлюючи та підтримуючи ці високі стандарти, аналітики та інженери можуть покладатися на те, що їхні дані мають найвищий рівень цілісності, розмірковуючи про це чітко та ясно, що дозволяє їм приймати більш ефективні бізнес-рішення.

З іншого боку, еволюція схеми доповнює примусове застосування, спрощуючи гадані автоматичні зміни схеми. Зрештою, це не повинно бути складністю додати стовпець.

Примусове застосування схеми це янь, де еволюції схеми це інь. При спільному використанні ці функції спрощують придушення шуму і налаштування сигналу.

Ми також хотіли б подякувати Мукула Мурті та Пранава Ананда за їх внесок у цю статтю.

Інші статті з цієї серії:

Занурення в Delta Lake: розпакування журналу транзакцій

Статті на тему

Машинне навчання виробничого рівня з Delta Lake

Що таке озеро даних?

Дізнатись про курс детальніше

Джерело: habr.com

Додати коментар або відгук