Delta Lake Dive:执法和计划演变

嘿哈布尔! 我向您展示这篇文章的翻译 “深入 Delta Lake:模式执行与演变” 作者 Burak Yavuz、Brenner Heintz 和 Denny Lee 是为课程开始而准备的 数据工程师 来自奥图斯。

Delta Lake Dive:执法和计划演变

数据就像我们的经验一样,不断积累和发展。 为了跟上,我们对世界的心理模型必须适应新的数据,其中一些数据包含新的维度——观察我们以前不知道的事物的新方法。 这些心理模型与决定我们如何分类和处理新信息的表格模式没有太大区别。

这给我们带来了模式管理的问题。 随着业务挑战和需求随着时间的推移而变化,数据的结构也会发生变化。 Delta Lake 可以在数据变化时轻松引入新的测量值。 用户可以访问简单的语义来管理他们的表模式。 这些工具包括架构执行(Schema Enforcement)和架构演进(Schema Evolution),前者可防止用户无意中因错误或不必要的数据而污染其表,后者允许将新的有价值数据列自动添加到适当的位置。 在本文中,我们将深入探讨这些工具的使用。

了解表模式

Apache Spark 中的每个 DataFrame 都包含一个定义数据形式的架构,例如数据类型、列和元数据。 使用 Delta Lake,表架构以 JSON 格式存储在事务日志中。

什么是计划执行?

架构执行,也称为架构验证,是 Delta Lake 中的一种安全机制,通过拒绝与表架构不匹配的记录来确保数据质量。 就像一家受欢迎的预约制餐厅的前台女服务员一样,她检查输入表中的每一列数据是否在相应的预期列列表中(换句话说,是否每个列都有“预订”) ),并拒绝任何列不在列表中的记录。

架构执行如何工作?

Delta Lake 使用写入时架构检查,这意味着对表的所有新写入都会在写入时检查与目标表架构的兼容性。 如果架构不一致,Delta Lake 将完全中止事务(不写入任何数据)并引发异常以通知用户不一致的情况。
Delta Lake 使用以下规则来确定记录是否与表兼容。 可写数据框:

  • 不能包含不在目标表架构中的其他列。 相反,如果传入数据不完全包含表中的所有列,则一切都很好 - 这些列将被简单地分配为空值。
  • 列数据类型不能与目标表中列的数据类型不同。 如果目标表列包含 StringType 数据,但 DataFrame 中的相应列包含 IntegerType 数据,架构强制执行将引发异常并阻止写入操作发生。
  • 不能包含仅大小写不同的列名。 这意味着您不能在同一个表中定义名为“Foo”和“foo”的列。 虽然 Spark 可以在区分大小写或不区分大小写(默认)模式下使用,但 Delta Lake 保留大小写,但在模式存储中不敏感。 Parquet 在存储和返回列信息时区分大小写。 为了避免可能的错误、数据损坏或数据丢失(我们在 Databricks 亲身经历过的事情),我们决定添加此限制。

为了说明这一点,让我们看一下当我们尝试将一些新生成的列添加到尚未配置为接受它们的 Delta Lake 表时,下面的代码中会发生什么。

# Сгенерируем DataFrame ссуд, который мы добавим в нашу таблицу Delta Lake
loans = sql("""
            SELECT addr_state, CAST(rand(10)*count as bigint) AS count,
            CAST(rand(10) * 10000 * count AS double) AS amount
            FROM loan_by_state_delta
            """)

# Вывести исходную схему DataFrame
original_loans.printSchema()

root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
 
# Вывести новую схему DataFrame
loans.printSchema()
 
root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
  |-- amount: double (nullable = true) # new column
 
# Попытка добавить новый DataFrame (с новым столбцом) в существующую таблицу
loans.write.format("delta") 
           .mode("append") 
           .save(DELTALAKE_PATH)

Returns:

A schema mismatch detected when writing to the Delta table.
 
To enable schema migration, please set:
'.option("mergeSchema", "true")'
 
Table schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
 
Data schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
-- amount: double (nullable = true)
 
If Table ACLs are enabled, these options will be ignored. Please use the ALTER TABLE command for changing the schema.

Delta Lake 不会自动添加新列,而是强加一个架构并停止写入。 为了帮助确定哪一列(或一组列)导致了差异,Spark 会从堆栈跟踪中输出两个模式以进行比较。

实施模式有什么好处?

由于模式执行是一项相当严格的检查,因此它是一个出色的工具,可用作干净、完全转换的数据集的看门人,以供生产或使用。 通常应用于直接提供数据的表:

  • 机器学习算法
  • 商业智能仪表板
  • 数据分析和可视化工具
  • 任何需要高度结构化、强类型语义模式的生产系统。

为了准备好数据以应对最后的障碍,许多用户使用简单的“多跳”架构,逐渐将结构引入到他们的表中。 要了解更多这方面的信息,您可以查看这篇文章 使用 Delta Lake 进行生产级机器学习。

当然,模式强制可以在管道中的任何位置使用,但请记住,在这种情况下流式传输到表可能会令人沮丧,因为例如,您忘记了向传入数据添加了另一列。

防止数据稀释

现在您可能想知道,这有什么大惊小怪的? 毕竟,有时意外的“架构不匹配”错误可能会让您的工作流程陷入困境,特别是如果您是 Delta Lake 的新手。 为什么不让架构根据需要进行更改,以便无论如何我都可以编写我的 DataFrame?

俗话说:“一分预防胜于一分治疗”。 在某些时候,如果您不注意强制实施您的模式,数据类型兼容性问题就会浮出水面——看似同质的原始数据源可能包含边缘情况、损坏的列、格式错误的映射或其他令人恐惧的事情噩梦。 最好的方法是在门口阻止这些敌人 - 通过模式强制 - 并在光明中处理它们,而不是稍后当它们开始潜伏在生产代码的黑暗深处时。

强制实施架构可以确保表的架构不会更改,除非您批准更改。 这可以防止数据稀释,当频繁添加新列以致以前有价值的压缩表由于数据淹没而失去其意义和有用性时,可能会发生数据稀释。 通过鼓励您有意识、设定高标准并期望高质量,模式实施完全符合其设计目的 - 帮助您保持认真负责并保持电子表格整洁。

如果经过进一步考虑,您决定您确实 必要 添加一个新列 - 没问题,下面是一行修复。 解决方案是电路的进化!

什么是模式演化?

模式演化是一项功能,允许用户根据随时间变化的数据轻松更改当前表模式。 它最常在执行追加或重写操作时使用,以自动调整架构以包含一个或多个新列。

模式演化如何运作?

按照上一节中的示例,开发人员可以轻松地使用模式演化来添加之前因模式不一致而被拒绝的新列。 通过添加激活电路进化 .option('mergeSchema', 'true') 致您的 Spark 团队 .write или .writeStream.

# Добавьте параметр mergeSchema
loans.write.format("delta") 
           .option("mergeSchema", "true") 
           .mode("append") 
           .save(DELTALAKE_SILVER_PATH)

要查看该图,请运行以下 Spark SQL 查询

# Создайте график с новым столбцом, чтобы подтвердить, что запись прошла успешно
%sql
SELECT addr_state, sum(`amount`) AS amount
FROM loan_by_state_delta
GROUP BY addr_state
ORDER BY sum(`amount`)
DESC LIMIT 10

Delta Lake Dive:执法和计划演变
或者,您可以通过添加以下内容为整个 Spark 会话设置此选项 spark.databricks.delta.schema.autoMerge = True 到 Spark 配置。 但请谨慎使用,因为模式强制将不再提醒您无意的模式不一致。

通过在请求中包含参数 mergeSchema,DataFrame 中存在但目标表中不存在的所有列都会作为写入事务的一部分自动添加到架构的末尾。 还可以添加嵌套字段,这些字段也将添加到相应结构列的末尾。

数据工程师和数据科学家可以使用此选项将新列(可能是最近跟踪的指标或本月的销售业绩列)添加到现有的机器学习生产表中,而不会破坏基于旧列的现有模型。

在表添加或重写期间,允许以下类型的架构更改作为架构演变的一部分:

  • 添加新列(这是最常见的情况)
  • 从 NullType -> 任何其他类型更改数据类型或从 ByteType -> ShortType -> IntegerType 升级

模式演化中不允许的其他更改需要通过添加来重写模式和数据 .option("overwriteSchema", "true")。 例如,如果列“Foo”最初是整数,而新架构是字符串数据类型,则所有 Parquet(data) 文件都需要重写。 这些变化包括:

  • 删除一列
  • 更改现有列的数据类型(就地)
  • 重命名仅大小写不同的列(例如“Foo”和“foo”)

最后,在 Spark 3.0 的下一个版本中,将完全支持显式 DDL(使用 ALTER TABLE),允许用户对表模式执行以下操作:

  • 添加列
  • 更改栏目评论
  • 设置控制表行为的表属性,例如设置事务日志的存储时间长度。

电路进化有什么好处?

模式演化可以在任何时候使用 打算 更改表的架构(而不是当您意外地将不应该存在的列添加到 DataFrame 中时)。 这是迁移架构的最简单方法,因为它会自动添加正确的列名称和数据类型,而无需显式声明它们。

结论

架构强制会拒绝与您的表不兼容的任何新列或其他架构更改。 通过设定和维护这些高标准,分析师和工程师可以相信他们的数据具有最高级别的完整性,清晰明确地传达数据,使他们能够做出更好的业务决策。

另一方面,模式演化通过简化来补充执行 预期 自动架构更改。 毕竟,添加一列应该不难。

方案的强制运用是阳,方案的演化是阴。 当一起使用时,这些功能使噪声抑制和信号调谐比以往更加容易。

我们还要感谢 Mukul Murthy 和 Pranav Anand 对本文的贡献。

本系列的其他文章:

深入 Delta Lake:解压事务日志

相关文章

使用 Delta Lake 进行生产级机器学习

什么是数据湖?

了解有关课程的更多信息

来源: habr.com

添加评论