Spark schemaEvolution 實踐

親愛的讀者,美好的一天!

在本文中,Neoflex 大數據解決方案業務領域的首席顧問詳細介紹了使用 Apache Spark 構建可變結構展示的選項。

作為數據分析項目的一部分,經常會出現基於鬆散結構化數據構建店面的任務。

通常這些是日誌或來自不同系統的響應,保存為 JSON 或 XML。 數據上傳到 Hadoop,然後您需要根據它們構建一個店面。 我們可以組織對創建的展示的訪問,例如通過 Impala。

在這種情況下,目標店面的模式事先是未知的。 此外,該方案也無法提前製定,因為它取決於數據,而我們正在處理這些結構非常鬆散的數據。

例如,今天記錄了以下響應:

{source: "app1", error_code: ""}

明天,同一系統將給出以下答案:

{source: "app1", error_code: "error", description: "Network error"}

這樣一來,展示櫃就應該多加一個字段——描述,但沒人知道它會不會來。

在此類數據上創建店面的任務非常標準,Spark 有許多用於此目的的工具。 為了解析源數據,支持 JSON 和 XML,對於以前未知的模式,提供了 schemaEvolution 支持。

乍一看,解決方案看起來很簡單。 您需要獲取一個包含 JSON 的文件夾並將其讀入數據幀。 Spark 將創建一個模式,將嵌套數據轉換為結構。 此外,所有內容都需要保存在 Parquet 中,通過在 Hive 元存儲中註冊店面,Impala 也支持 Parquet。

一切似乎都很簡單。

然而,從文檔中的簡短示例中並不清楚如何處理實踐中的許多問題。

該文檔描述了一種不創建店面,而是將 JSON 或 XML 讀入數據幀的方法。

也就是說,它只是展示瞭如何讀取和解析 JSON:

df = spark.read.json(path...)

這足以使數據可供 Spark 使用。

實際上,該腳本比僅僅從文件夾中讀取 JSON 文件並創建數據幀要復雜得多。 情況是這樣的:已經有某個店面了,每天都有新的數據進來,需要將它們添加到店面中,不要忘記方案可能會有所不同。

通常的展示櫃搭建方案如下:

步驟1。 數據加載到 Hadoop 中,隨後每日重新加載並添加到新分區。 結果是一個包含按天分區的初始數據的文件夾。

步驟2。 在初始加載期間,Spark 會讀取並解析該文件夾。 生成的數據幀以可解析的格式保存,例如 parquet,然後可以將其導入到 Impala 中。 這將創建一個目標展示,其中包含迄今為止積累的所有數據。

步驟3。 創建的下載將每天更新店面。
存在增量加載的問題,需要對展櫃進行分區的問題,以及維護展櫃總體方案的問題。

讓我們舉個例子。 假設構建存儲庫的第一步已經實現,並且 JSON 文件已上傳到文件夾。

從它們創建一個數據框,然後將其保存為展示,不是問題。 這是第一步,可以在 Spark 文檔中輕鬆找到:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

一切似乎都很好。

我們讀取並解析 JSON,然後將數據幀保存為 parquet,並以任何方便的方式將其註冊到 Hive 中:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

我們得到一個窗口。

但是,第二天,添加了來自該源的新數據。 我們有一個包含 JSON 的文件夾,以及從該文件夾創建的展示櫃。 從源加載下一批數據後,數據集市丟失了一天的數據。

合理的解決方案是按天對店面進行分區,這將允許每天添加一個新分區。 這個機制也是眾所周知的,Spark允許你單獨寫入分區。

首先,我們進行初始加載,如上所述保存數據,僅添加分區。 此操作稱為店面初始化,並且僅執行一次:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

第二天,我們只加載一個新分區:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

剩下的就是在 Hive 中重新註冊以更新架構。
然而,這就是出現問題的地方。

第一個問題。 遲早,生成的鑲木地板將無法讀取。 這是因為 parquet 和 JSON 處理空字段的方式不同。

讓我們考慮一個典型的情況。 例如,昨天 JSON 到達:

День 1: {"a": {"b": 1}},

現在同樣的 JSON 看起來像這樣:

День 2: {"a": null}

假設我們有兩個不同的分區,每個分區都有一行。
當我們讀取整個源數據時,Spark將能夠確定類型,並且會理解“a”是“struct”類型的字段,其中嵌套了一個INT類型的字段“b”。 但是,如果每個分區都是單獨保存的,那麼我們會得到一個分區方案不兼容的實木複合地板:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

這種情況眾所周知,所以特意添加了一個選項——解析源數據時,去掉空字段:

df = spark.read.json("...", dropFieldIfAllNull=True)

在這種情況下,鑲木地板將由可以一起讀取的分區組成。
雖然實踐過這樣做的人會在這裡苦笑。 為什麼? 是的,因為很可能還有兩種情況。 或者三個。 或者四個。 第一個幾乎肯定會發生的問題是數字類型在不同的 JSON 文件中看起來會有所不同。 例如,{intField: 1} 和 {intField: 1.1}。 如果在一個分區中找到此類字段,則模式合併將正確讀取所有內容,從而產生最準確的類型。 但如果在不同的字段中,則一個將具有 intField: int,另一個將具有 intField: double。

有以下標誌可以處理這種情況:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

現在我們有一個文件夾,其中有可以讀入單個數據幀的分區和整個展示的有效鑲木地板。 是的? 不。

我們必須記住我們在 Hive 中註冊了該表。 Hive 字段名稱不區分大小寫,而 parquet 區分大小寫。 因此,具有 schemas: field1: int 和 Field1: int 的分區對於 Hive 是相同的,但對於 Spark 則不同。 不要忘記將字段名稱轉換為小寫。

之後,一切似乎都很好。

然而,事情並非如此簡單。 還有第二個也是眾所周知的問題。 由於每個新分區都是單獨保存的,因此分區文件夾中將包含Spark服務文件,例如_SUCCESS操作成功標誌。 這將導致嘗試鑲木地板時出錯。 為了避免這種情況,需要進行配置以阻止 Spark 將服務文件添加到該文件夾​​中:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

似乎現在每天都會將一個新的 parquet 分區添加到目標展示文件夾中,其中包含當天解析的數據。 我們提前註意不存在數據類型沖突的分區。

但是,我們還有第三個問題。 現在,一般模式尚不清楚,此外,Hive 中的表的模式不正確,因為每個新分區很可能會導致模式失真。

您需要重新註冊該表。 這可以簡單地完成:再次讀取店面的鑲木地板,獲取架構並基於它創建一個 DDL,用它在 Hive 中將文件夾重新註冊為外部表,從而更新目標店面的架構。

我們還有第四個問題。 當我們第一次註冊該表時,我們依賴於 Spark。 現在我們自己做,我們需要記住鑲木地板字段可以以 Hive 不允許的字符開頭。 例如,Spark 會拋出它無法在“corrupt_record”字段中解析的行。 這樣的字段如果不進行轉義就無法在 Hive 中註冊。

知道了這一點,我們就得到了方案:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

代碼 ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("數組<`", "數組<") 製作安全的 DDL,即代替:

create table tname (_field1 string, 1field string)

對於像“_field1, 1field”這樣的字段名稱,在字段名稱被轉義的地方進行安全的DDL:創建表`tname`(`_field1`字符串,`1field`字符串)。

問題出現了:如何正確獲取具有完整模式的數據框(在 pf 代碼中)? 如何獲得這個pf? 這是第五個問題。 重新讀取目標展示櫃的鑲木地板文件文件夾中所有分區的方案? 這種方法是最安全的,但是比較困難。

該架構已位於 Hive 中。 您可以通過組合整個表的架構和新分區來獲得新的架構。 因此,您需要從 Hive 獲取表架構,並將其與新分區的架構結合起來。 這可以通過從 Hive 讀取測試元數據、將其保存到臨時文件夾並使用 Spark 一次讀取兩個分區來完成。

事實上,這裡有您需要的一切:Hive 中的原始表架構和新分區。 我們也有數據。 剩下的只是獲取一個新的架構,該架構結合了店面架構和創建的分區中的新字段:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

接下來,我們創建表註冊 DDL,如前面的代碼片段所示。
如果整個鏈工作正常,即存在初始化負載,並且在 Hive 中正確創建了表,那麼我們將獲得更新的表架構。

最後一個問題是你不能只向 Hive 表添加分區,因為它會被破壞。 您需要強制 Hive 修復其分區結構:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

讀取 JSON 並基於它創建店面的簡單任務會克服許多隱含的困難,您必須單獨尋找解決方案。 而且雖然這些解決方案很簡單,但是找到它們卻需要花費大量的時間。

為了實現展示櫃的構建,我必須:

  • 向展示櫃添加分區,擺脫服務文件
  • 處理 Spark 輸入的源數據中的空字段
  • 將簡單類型轉換為字符串
  • 將字段名稱轉換為小寫
  • Hive中單獨的數據上傳和表註冊(DDL生成)
  • 不要忘記轉義可能與 Hive 不兼容的字段名稱
  • 了解如何更新 Hive 中的表註冊

總而言之,我們注意到建造商店櫥窗的決定充滿了許多陷阱。 因此,如果實施過程中遇到困難,最好聯繫具有成功專業知識、經驗豐富的合作夥伴。

感謝您閱讀本文,我們希望您發現這些信息有用。

來源: www.habr.com

添加評論