深入 Delta Lake:架構執行和演化

嘿哈布爾! 我提請您注意這篇文章的翻譯 “潛入 Delta Lake:模式執行與演化” 作者 Burak Yavuz、Brenner Heintz 和 Denny Lee,這是在課程開始前準備的 數據工程師 來自 OTUS。

深入 Delta Lake:架構執行和演化

數據,就像我們的經驗一樣,不斷積累和發展。 為了跟上步伐,我們對世界的心智模型必須適應新數據,其中一些包含新維度——觀察我們以前不知道的事物的新方法。 這些心智模型與定義我們如何分類和處理新信息的表模式沒有太大區別。

這給我們帶來了模式管理的問題。 隨著業務目標和要求隨時間變化,您的數據結構也會隨之變化。 隨著數據的變化,Delta Lake 可以輕鬆實現新的測量。 用戶可以訪問簡單的語義來管理他們的表模式。 這些工具包括 Schema Enforcement 和 Schema Evolution,前者可保護用戶免於因錯誤或不必要的數據無意中污染他們的表,後者允許在適當的位置自動添加新的有價值數據列。 在本文中,我們將深入研究這些工具的使用。

了解表架構

Apache Spark 中的每個 DataFrame 都包含一個模式,該模式定義了數據的形狀,例如數據類型、列和元數據。 使用 Delta Lake,表架構以 JSON 格式存儲在事務日誌中。

什麼是架構執行?

Schema Enforcement,也稱為 Schema Validation,是 Delta Lake 中的一種保護機制,通過拒絕與表模式不匹配的記錄來保證數據質量。 就像一家只接受預訂的熱門餐廳的前台女服務員,他會檢查輸入表中的每一列數據是否在相應的預期列列表中(換句話說,是否有“預訂”他們每個人),並拒絕任何列不在列表中的條目。

架構實施如何工作?

Delta Lake 在寫入時使用模式驗證,這意味著在寫入時檢查所有新寫入到表的內容是否與目標表的模式兼容。 如果模式不一致,Delta Lake 會完全反轉事務(不寫入任何數據)並拋出異常以告知用戶不一致。
Delta Lake 使用以下規則來確定記錄是否與表兼容。 書面數據框:

  • 不能包含不在目標表架構中的其他列。 相反,如果傳入數據不完全包含表中的所有列,則一切都很好——這些列將被簡單地分配為零值。
  • 列數據類型不能與目標表中的列數據類型不同。 如果目標表中的列包含 StringType 數據,但 DataFrame 中相應的列包含 IntegerType 數據,架構強制將拋出異常並阻止寫入操作發生。
  • 不能包含僅大小寫不同的列名稱。 這意味著您不能在同一個表中定義名為“Foo”和“foo”的列。 雖然 Spark 可以在區分大小寫或不區分大小寫(默認)模式下使用,但 Delta Lake 在模式存儲中保留大小寫但不區分大小寫。 Parquet 在存儲和返回列信息時區分大小寫。 為了避免可能的錯誤、數據損壞或數據丟失(我們在 Databricks 中親身經歷過),我們決定添加此限制。

為了說明這一點,讓我們看一下當嘗試將一些新生成的列添加到尚未配置為接受它們的 Delta Lake 表時,下面的代碼會發生什麼。

# Сгенерируем DataFrame ссуд, который мы добавим в нашу таблицу Delta Lake
loans = sql("""
            SELECT addr_state, CAST(rand(10)*count as bigint) AS count,
            CAST(rand(10) * 10000 * count AS double) AS amount
            FROM loan_by_state_delta
            """)

# Вывести исходную схему DataFrame
original_loans.printSchema()

root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
 
# Вывести новую схему DataFrame
loans.printSchema()
 
root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
  |-- amount: double (nullable = true) # new column
 
# Попытка добавить новый DataFrame (с новым столбцом) в существующую таблицу
loans.write.format("delta") 
           .mode("append") 
           .save(DELTALAKE_PATH)

Returns:

A schema mismatch detected when writing to the Delta table.
 
To enable schema migration, please set:
'.option("mergeSchema", "true")'
 
Table schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
 
Data schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
-- amount: double (nullable = true)
 
If Table ACLs are enabled, these options will be ignored. Please use the ALTER TABLE command for changing the schema.

Delta Lake 不會自動添加新列,而是強制執行模式並停止記錄。 為了幫助確定是哪一列(或哪一組列)導致了不匹配,Spark 從跟踪堆棧中彈出兩個模式以進行比較。

模式實施的好處是什麼?

由於模式實施是一項相當嚴格的檢查,因此它是一個很好的工具,可以用作準備好生產或使用的干淨、完全轉換的數據集的看門人。 通常應用於直接提供數據的表:

  • 機器學習算法
  • BI 儀錶盤
  • 數據分析和可視化工具
  • 任何需要高度結構化、強類型語義模式的生產系統。

為了為這個最後的障礙準備他們的數據,許多用戶使用一個簡單的“多跳”架構,逐漸將結構引入他們的表中。 要了解更多信息,您可以閱讀文章 使用 Delta Lake 進行生產級機器學習。

當然,可以在管道中的任何地方使用模式實施,但請記住,在這種情況下,流式寫入表可能會令人沮喪,因為例如,您忘記了向傳入數據添加了另一列。

防止數據細化

至此,您可能想知道為什麼要大肆宣傳? 畢竟,有時意外的“模式不匹配”​​錯誤會讓您在工作流程中陷入困境,尤其是如果您是 Delta Lake 的新手。 為什麼不讓模式根據需要改變,這樣我就可以編寫我的 DataFrame 無論如何?

俗話說,“一分預防勝於一磅治療”。 在某些時候,如果您不小心執行您的模式,數據類型兼容性問題就會浮出水面——看似同質的原始數據源可能包含邊緣情況、損壞的列、格式錯誤的映射或您夢寐以求的其他可怕事情. 在噩夢中。 最好的方法是在門口阻止這些敵人——通過模式強制執行——並在光明中對付他們,而不是在他們開始在你的生產代碼的黑暗深處徘徊時。

架構實施讓您確信表的架構不會更改,除非您自己確認更改。 這可以防止在頻繁添加新列時可能發生的數據稀釋,導致以前有價值的壓縮表因數據氾濫而失去價值和用處。 通過鼓勵您有意識、設定高標準並期望高質量,模式實施完全按照其設計目的進行——幫助您保持認真並保持電子表格的清潔。

如果經過進一步考慮,您決定您真的 添加一個新列 - 沒問題,下面是一個單行修復。 解決方案是電路進化!

什麼是模式演化?

模式演變是一項功能,允許用戶輕鬆更改表的當前模式以匹配隨時間變化的數據。 它最常用於執行添加或覆蓋操作以自動調整架構以包含一個或多個新列。

模式演化如何運作?

按照上一節中的示例,開發人員可以輕鬆地使用模式演化來添加之前因模式不一致而被拒絕的新列。 通過添加激活電路進化 .option('mergeSchema', 'true') 給您的 Spark 團隊 .write или .writeStream.

# Добавьте параметр mergeSchema
loans.write.format("delta") 
           .option("mergeSchema", "true") 
           .mode("append") 
           .save(DELTALAKE_SILVER_PATH)

要查看圖形,請運行以下 Spark SQL 查詢

# Создайте график с новым столбцом, чтобы подтвердить, что запись прошла успешно
%sql
SELECT addr_state, sum(`amount`) AS amount
FROM loan_by_state_delta
GROUP BY addr_state
ORDER BY sum(`amount`)
DESC LIMIT 10

深入 Delta Lake:架構執行和演化
或者,您可以通過添加來為整個 Spark 會話設置此選項 spark.databricks.delta.schema.autoMerge = True 到 Spark 配置。 但請謹慎使用,因為模式強制將不再警告您無意的模式不一致。

通過在請求中包含一個參數 mergeSchema,DataFrame 中存在但目標表中不存在的所有列都將作為寫事務的一部分自動添加到模式的末尾。 也可以添加嵌套字段,這些字段也將添加到相應結構列的末尾。

日期工程師和數據科學家可以使用此選項將新列(可能是最近跟踪的指標或本月的銷售數字列)添加到他們現有的機器學習生產表中,而不會破壞基於舊列的現有模型。

在添加或覆蓋表時,允許以下類型的模式更改作為模式演變的一部分:

  • 添加新列(這是最常見的情況)
  • 從 NullType 更改數據類型 -> 任何其他類型或從 ByteType -> ShortType -> IntegerType 提升

作為模式演變的一部分不允許的其他更改需要通過添加來覆蓋模式和數據 .option("overwriteSchema", "true"). 例如,如果“Foo”列最初是一個整數,而新模式將是一個字符串數據類型,那麼所有 Parquet(data) 文件都需要被覆蓋。 這些變化包括:

  • 刪除列
  • 更改現有列的數據類型(就地)
  • 重命名僅大小寫不同的列(例如,“Foo”和“foo”)

最後,在 Spark 3.0 的下一個版本中,將完全支持顯式 DDL(使用 ALTER TABLE),允許用戶對錶模式執行以下操作:

  • 添加列
  • 更改列註釋
  • 設置決定表行為方式的表屬性,例如設置事務日誌的保留時間。

模式演進有什麼好處?

任何時候都可以使用示意圖進化 打算 更改表的架構(而不是當您不小心將不應該存在的列添加到 DataFrame 時)。 這是遷移架構的最簡單方法,因為它會自動添加正確的列名和數據類型,而無需顯式聲明它們。

結論

架構實施拒絕任何與您的表不兼容的新列或其他架構更改。 通過設置和維護這些高標準,分析師和工程師可以依靠他們的數據獲得最高級別的完整性,清晰簡潔地對其進行推理,從而做出更好的業務決策。

另一方面,模式演變通過簡化 據稱 自動架構更改。 畢竟,添加一列應該不難。

模式實施是陽,模式演變是陰。 當一起使用時,這些功能使降噪和信號調諧比以往任何時候都更容易。

我們還要感謝 Mukul Murthy 和 Pranav Anand 對本文的貢獻。

本系列其他文章:

深入 Delta Lake:解包事務日誌

有關該主題的文章

使用 Delta Lake 進行生產級機器學習

什麼是數據湖?

了解有關課程的更多信息

來源: www.habr.com

添加評論