Spark schemaEvolution 实践

亲爱的读者,美好的一天!

在本文中,Neoflex 大数据解决方案业务领域的领先顾问详细描述了使用 Apache Spark 构建可变结构展示的选项。

作为数据分析项目的一部分,经常会出现基于松散结构化数据构建店面的任务。

通常这些是日志或来自各种系统的响应,保存为 JSON 或 XML。 数据被上传到 Hadoop,然后您需要从中构建一个店面。 我们可以组织对创建的展示柜的访问,例如,通过 Impala。

在这种情况下,目标店面的模式事先是未知的。 而且,方案也不能事先拟定,因为它取决于数据,我们处理的是这些结构非常松散的数据。

例如,今天记录了以下响应:

{source: "app1", error_code: ""}

明天来自同一个系统的答案如下:

{source: "app1", error_code: "error", description: "Network error"}

于是,showcase要多一个字段——description,谁也不知道会不会来。

在这些数据上创建店面的任务是非常标准的,Spark 有很多工具可以做到这一点。 对于解析源数据,支持 JSON 和 XML,对于以前未知的模式,提供了对 schemaEvolution 的支持。

乍一看,解决方案看起来很简单。 您需要使用 JSON 获取一个文件夹并将其读入数据框。 Spark 将创建一个模式,将嵌套数据转换为结构。 此外,所有内容都需要保存在 parquet 中,这在 Impala 中也受支持,方法是在 Hive metastore 中注册店面。

一切似乎都很简单。

但是,从文档中的简短示例中并不清楚如何处理实践中的许多问题。

该文档描述了一种不创建店面,而是将 JSON 或 XML 读入数据框的方法。

也就是说,它只是展示了如何读取和解析 JSON:

df = spark.read.json(path...)

这足以使数据可用于 Spark。

实际上,该脚本比仅从文件夹中读取 JSON 文件并创建数据框要复杂得多。 情况是这样的:已经有某个店面,每天都有新的数据进来,需要添加到店面,别忘了方案可能不同。

通常搭建展柜的方案如下:

步骤1。 数据加载到 Hadoop 中,随后每天重新加载并添加到新分区中。 结果是一个文件夹,其中包含按天分区的初始数据。

步骤2。 在初始加载期间,此文件夹由 Spark 读取和解析。 生成的数据帧以可解析的格式保存,例如,parquet,然后可以将其导入 Impala。 这将创建一个目标展示柜,其中包含到目前为止积累的所有数据。

步骤3。 创建一个下载,每天更新店面。
有一个增量加载的问题,需要对展柜进行分区,还有维护展柜总方案的问题。

让我们举个例子。 假设第一步构建仓库已经实现,JSON文件上传到一个文件夹中。

从它们创建数据框,然后将其保存为展示,这不是问题。 这是可以在 Spark 文档中轻松找到的第一步:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

一切似乎都很好。

我们读取并解析 JSON,然后将数据帧保存为镶木地板,以任何方便的方式将其注册到 Hive 中:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

我们得到一个窗口。

但是,第二天,添加了来自源的新数据。 我们有一个包含 JSON 的文件夹,以及一个从该文件夹创建的展示柜。 从源加载下一批数据后,数据集市缺少一天的数据。

合乎逻辑的解决方案是按天对店面进行分区,这将允许每隔一天添加一个新分区。 这方面的机制也是众所周知的,Spark 允许您单独编写分区。

首先,我们进行初始加载,如上所述保存数据,仅添加分区。 此操作称为店面初始化并且只执行一次:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

第二天,我们只加载一个新分区:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

剩下的就是在 Hive 中重新注册以更新模式。
然而,这就是出现问题的地方。

第一个问题。 迟早,生成的镶木地板将不可读。 这是由于 parquet 和 JSON 处理空字段的方式不同。

让我们考虑一个典型的情况。 例如,昨天 JSON 到达:

День 1: {"a": {"b": 1}},

今天相同的 JSON 看起来像这样:

День 2: {"a": null}

假设我们有两个不同的分区,每个分区有一行。
当我们读取整个源数据时,Spark 将能够确定类型,并将理解“a”是一个“结构”类型的字段,嵌套了一个 INT 类型的字段“b”。 但是,如果每个分区都是单独保存的,那么我们会得到一个分区方案不兼容的镶木地板:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

这种情况众所周知,所以特地加了一个选项——解析源数据时,去掉空字段:

df = spark.read.json("...", dropFieldIfAllNull=True)

在这种情况下,镶木地板将由可以一起读取的分区组成。
虽然在实践中这样做过的人在这里会苦笑。 为什么? 是的,因为很可能还有两种情况。 或者三个。 或者四个。 第一个几乎肯定会发生的是,数字类型在不同的 JSON 文件中看起来会有所不同。 例如,{intField: 1} 和 {intField: 1.1}。 如果在一个分区中找到这样的字段,那么架构合并将正确读取所有内容,从而导致最准确的类型。 但是如果在不同的中,那么一个将具有 intField: int,另一个将具有 intField: double。

有以下标志来处理这种情况:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

现在我们有一个文件夹,其中有可以读入单个数据帧的分区和整个展示柜的有效镶木地板。 是的? 不。

我们必须记住我们在 Hive 中注册了表。 Hive 对字段名不区分大小写,而 parquet 区分大小写。 因此,具有 schemas: field1: int 和 Field1: int 的分区对于 Hive 是相同的,但对于 Spark 则不同。 不要忘记将字段名称转换为小写。

在那之后,一切似乎都很好。

然而,并非一切都那么简单。 还有第二个也是众所周知的问题。 由于每个新分区都是单独保存的,所以分区文件夹中会包含Spark服务文件,例如_SUCCESS操作成功标志。 这将在尝试镶木地板时导致错误。 为了避免这种情况,您需要配置配置以防止Spark向该文件夹添加服务文件:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

似乎现在每天都有一个新的镶木地板分区添加到目标展示文件夹中,该文件夹是当天解析的数据所在的位置。 我们事先注意没有数据类型冲突的分区。

但是,我们还有第三个问题。 现在一般的模式是未知的,而且,Hive 中的表有一个不正确的模式,因为每个新分区很可能在模式中引入了扭曲。

您需要重新注册该表。 这可以简单地完成:再次读取店面的 parquet,获取模式并基于它创建一个 DDL,用它在 Hive 中将文件夹重新注册为外部表,更新目标店面的模式。

我们有第四个问题。 当我们第一次注册表时,我们依赖于 Spark。 现在我们自己做,我们需要记住 parquet 字段可以以 Hive 不允许的字符开头。 例如,Spark 会抛出它无法在“corrupt_record”字段中解析的行。 这样的字段不转义是无法注册到Hive中的。

知道了这一点,我们得到了方案:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

代码 ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("数组<`", "数组<") 制作安全的 DDL,即代替:

create table tname (_field1 string, 1field string)

使用像“_field1, 1field”这样的字段名称,在字段名称被转义的地方创建安全的 DDL:create table `tname` (`_field1` string, `1field` string)。

问题出现了:如何正确获取具有完整模式的数据框(在 pf 代码中)? 如何获得这个pf? 这是第五个问题。 从目标展柜的镶木地板文件文件夹中重新读取所有分区的方案? 这种方法最安全,但难度较大。

该架构已在 Hive 中。 您可以通过组合整个表的模式和新分区来获得新的模式。 因此,您需要从 Hive 中获取表架构并将其与新分区的架构结合起来。 这可以通过从 Hive 读取测试元数据,将其保存到临时文件夹,并使用 Spark 一次读取两个分区来完成。

事实上,这里有您需要的一切:Hive 中的原始表模式和新分区。 我们也有数据。 它仍然只是为了获得一个新的模式,它结合了店面模式和来自创建的分区的新字段:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

接下来,我们创建表注册 DDL,如前一个代码片段所示。
如果整个链工作正常,即有一个初始化负载,并且在 Hive 中正确创建了表,那么我们将得到一个更新的表模式。

最后一个问题是你不能只给 Hive 表添加一个分区,因为它会被破坏。 您需要强制 Hive 修复其分区结构:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

读取 JSON 并基于它创建店面的简单任务会导致克服许多隐含的困难,您必须单独寻找解决方案。 而且虽然这些解决方案很简单,但要找到它们却需要花费大量的时间。

为了实施展示柜的构建,我必须:

  • 将分区添加到展示柜,删除服务文件
  • 处理 Spark 键入的源数据中的空字段
  • 将简单类型转换为字符串
  • 将字段名称转换为小写
  • Hive中单独的数据上传和表注册(DDL生成)
  • 不要忘记转义可能与 Hive 不兼容的字段名称
  • 了解如何在 Hive 中更新表注册

总而言之,我们注意到建造商店橱窗的决定充满了许多陷阱。 因此,如果在实施中遇到困难,最好联系具有成功专业知识的经验丰富的合作伙伴。

感谢您阅读本文,我们希望您能找到有用的信息。

来源: habr.com

添加评论