Spark schemaEvolution ในทางปฏิบัติ

เรียนผู้อ่าน ขอให้เป็นวันที่ดี!

ในบทความนี้ ที่ปรึกษาชั้นนำของธุรกิจ Big Data Solutions ของ Neoflex จะอธิบายรายละเอียดเกี่ยวกับตัวเลือกสำหรับการสร้างการแสดงโครงสร้างแบบแปรผันโดยใช้ Apache Spark

ในฐานะที่เป็นส่วนหนึ่งของโครงการวิเคราะห์ข้อมูล มักจะมีงานสร้างหน้าร้านโดยยึดตามข้อมูลที่มีโครงสร้างหลวมๆ

โดยปกติจะเป็นบันทึกหรือการตอบกลับจากระบบต่างๆ ที่บันทึกเป็น JSON หรือ XML ข้อมูลถูกอัปโหลดไปยัง Hadoop จากนั้นคุณต้องสร้างหน้าร้านจากข้อมูลเหล่านั้น เราสามารถจัดระเบียบการเข้าถึงตู้โชว์ที่สร้างขึ้นได้ เช่น ผ่านอิมพาลา

ในกรณีนี้ จะไม่ทราบสคีมาของหน้าร้านเป้าหมายล่วงหน้า นอกจากนี้ โครงร่างไม่สามารถร่างล่วงหน้าได้ เนื่องจากขึ้นอยู่กับข้อมูล และเรากำลังจัดการกับข้อมูลที่มีโครงสร้างหลวมๆ เหล่านี้

ตัวอย่างเช่น วันนี้ การตอบสนองต่อไปนี้ถูกบันทึก:

{source: "app1", error_code: ""}

และพรุ่งนี้จากระบบเดียวกันมีคำตอบต่อไปนี้:

{source: "app1", error_code: "error", description: "Network error"}

เป็นผลให้ควรเพิ่มอีกหนึ่งฟิลด์ในตู้โชว์ - คำอธิบายและไม่มีใครรู้ว่าจะมาหรือไม่

งานในการสร้างหน้าร้านบนข้อมูลดังกล่าวค่อนข้างเป็นมาตรฐาน และ Spark มีเครื่องมือมากมายสำหรับสิ่งนี้ สำหรับการแยกวิเคราะห์ข้อมูลต้นทาง มีการรองรับทั้ง JSON และ XML และสำหรับสคีมาที่ไม่รู้จักก่อนหน้านี้ จะมีการสนับสนุนสคีมาเอโวลูชัน

เมื่อมองแวบแรก วิธีแก้ปัญหาจะดูเรียบง่าย คุณต้องใช้โฟลเดอร์ที่มี JSON และอ่านลงในดาต้าเฟรม Spark จะสร้างสคีมา เปลี่ยนข้อมูลที่ซ้อนกันเป็นโครงสร้าง นอกจากนี้ จำเป็นต้องบันทึกทุกอย่างลงในไม้ปาร์เก้ ซึ่งรองรับใน Impala ด้วย โดยลงทะเบียนหน้าร้านใน Hive metastore

ทุกอย่างดูเหมือนจะเรียบง่าย

อย่างไรก็ตาม ยังไม่ชัดเจนจากตัวอย่างสั้นๆ ในเอกสารประกอบว่าจะทำอย่างไรกับปัญหาต่างๆ ในทางปฏิบัติ

เอกสารอธิบายวิธีการที่จะไม่สร้างหน้าร้าน แต่เพื่ออ่าน JSON หรือ XML ลงในดาต้าเฟรม

กล่าวคือ มันแสดงวิธีการอ่านและแยกวิเคราะห์ JSON:

df = spark.read.json(path...)

นี่เพียงพอที่จะทำให้ข้อมูลพร้อมใช้งานสำหรับ Spark

ในทางปฏิบัติ สคริปต์มีความซับซ้อนมากกว่าการอ่านไฟล์ JSON จากโฟลเดอร์และสร้าง dataframe สถานการณ์มีลักษณะดังนี้: มีหน้าร้านอยู่แล้ว มีข้อมูลใหม่เข้ามาทุกวัน จำเป็นต้องเพิ่มในหน้าร้าน โดยไม่ลืมว่าโครงร่างอาจแตกต่างกัน

รูปแบบปกติสำหรับการสร้างตู้โชว์มีดังนี้:

ขั้นตอนที่ 1 ข้อมูลจะถูกโหลดลงใน Hadoop พร้อมกับการโหลดซ้ำรายวันและเพิ่มลงในพาร์ติชันใหม่ มันกลายเป็นโฟลเดอร์ที่มีข้อมูลเริ่มต้นแบ่งตามวัน

ขั้นตอนที่ 2 ระหว่างการโหลดครั้งแรก โฟลเดอร์นี้จะถูกอ่านและแยกวิเคราะห์โดย Spark ดาต้าเฟรมที่ได้จะถูกบันทึกในรูปแบบที่แยกวิเคราะห์ได้ เช่น ในปาร์เกต์ ซึ่งสามารถนำเข้าอิมพาลาได้ สิ่งนี้จะสร้างการแสดงเป้าหมายพร้อมข้อมูลทั้งหมดที่สะสมจนถึงจุดนี้

ขั้นตอนที่ 3 มีการสร้างการดาวน์โหลดที่จะอัพเดทหน้าร้านทุกวัน
มีคำถามเกี่ยวกับการโหลดที่เพิ่มขึ้น ความจำเป็นในการแบ่งส่วนตู้โชว์ และคำถามเกี่ยวกับการรักษารูปแบบทั่วไปของตู้โชว์

ลองมาเป็นตัวอย่าง สมมติว่าขั้นตอนแรกของการสร้างพื้นที่เก็บข้อมูลได้รับการดำเนินการแล้ว และไฟล์ JSON ถูกอัปโหลดไปยังโฟลเดอร์

การสร้าง dataframe จากพวกเขาแล้วบันทึกเป็นโชว์เคสนั้นไม่ใช่ปัญหา นี่เป็นขั้นตอนแรกที่สามารถพบได้ง่ายในเอกสารประกอบของ Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

ทุกอย่างดูเหมือนจะดี

เราอ่านและแยกวิเคราะห์ JSON จากนั้นบันทึก dataframe เป็น parquet ลงทะเบียนใน Hive ด้วยวิธีใดก็ได้ที่สะดวก:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

เราได้หน้าต่าง

แต่ในวันถัดไป มีการเพิ่มข้อมูลใหม่จากแหล่งที่มา เรามีโฟลเดอร์ที่มี JSON และตู้โชว์ที่สร้างขึ้นจากโฟลเดอร์นี้ หลังจากโหลดข้อมูลชุดถัดไปจากแหล่งข้อมูลแล้ว data mart ขาดข้อมูลหนึ่งวัน

วิธีแก้ไขปัญหาคือแบ่งพาร์ติชันหน้าร้านตามวัน ซึ่งจะทำให้เพิ่มพาร์ติชันใหม่ได้ทุกวัน กลไกนี้เป็นที่รู้จักกันดี Spark ช่วยให้คุณสามารถเขียนพาร์ติชันแยกกันได้

ขั้นแรก เราทำการโหลดเริ่มต้น บันทึกข้อมูลตามที่อธิบายไว้ข้างต้น เพิ่มเฉพาะการแบ่งพาร์ติชัน การดำเนินการนี้เรียกว่าการเริ่มต้นหน้าร้านและดำเนินการเพียงครั้งเดียว:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

ในวันถัดไป เราโหลดพาร์ติชันใหม่เท่านั้น:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

สิ่งที่เหลืออยู่คือการลงทะเบียนใหม่ใน Hive เพื่ออัปเดตสคีมา
อย่างไรก็ตามนี่คือปัญหาที่เกิดขึ้น

ปัญหาแรก ไม่ช้าก็เร็ว ไม้ปาร์เก้ที่ได้จะไม่สามารถอ่านได้ นี่เป็นเพราะวิธีการที่ parquet และ JSON ปฏิบัติต่อช่องว่างต่างกัน

ลองพิจารณาสถานการณ์ทั่วไป ตัวอย่างเช่น เมื่อวานนี้ JSON มาถึง:

День 1: {"a": {"b": 1}},

และวันนี้ JSON เดียวกันมีลักษณะดังนี้:

День 2: {"a": null}

สมมติว่าเรามีสองพาร์ติชันที่แตกต่างกัน แต่ละพาร์ติชันมีหนึ่งบรรทัด
เมื่อเราอ่านแหล่งข้อมูลทั้งหมด Spark จะสามารถระบุประเภทได้ และจะเข้าใจว่า "a" เป็นฟิลด์ประเภท "โครงสร้าง" โดยมีฟิลด์ซ้อน "b" เป็นประเภท INT แต่ถ้าแต่ละพาร์ติชันถูกบันทึกแยกกัน เราจะได้ปาร์เก้ที่มีโครงร่างพาร์ติชันที่เข้ากันไม่ได้:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

สถานการณ์นี้เป็นที่ทราบกันดี ดังนั้นจึงมีการเพิ่มตัวเลือกพิเศษ - เมื่อแยกวิเคราะห์แหล่งข้อมูล ให้ลบฟิลด์ว่างออก:

df = spark.read.json("...", dropFieldIfAllNull=True)

ในกรณีนี้ไม้ปาร์เก้จะประกอบด้วยพาร์ติชันที่สามารถอ่านร่วมกันได้
แม้ว่าผู้ที่ทำสิ่งนี้ในทางปฏิบัติจะยิ้มอย่างขมขื่นที่นี่ ทำไม ใช่ เพราะน่าจะมีอีกสองสถานการณ์ หรือสาม หรือสี่. สิ่งแรกที่จะเกิดขึ้นอย่างแน่นอนคือประเภทตัวเลขจะดูแตกต่างกันในไฟล์ JSON ที่แตกต่างกัน ตัวอย่างเช่น {intField: 1} และ {intField: 1.1} หากพบช่องดังกล่าวในพาร์ติชันเดียว การผสานสคีมาจะอ่านทุกอย่างถูกต้อง ซึ่งจะนำไปสู่ประเภทที่ถูกต้องที่สุด แต่ถ้าอยู่คนละอัน อันหนึ่งจะมี intField: int และอีกอันจะมี intField: double

มีแฟล็กต่อไปนี้เพื่อจัดการกับสถานการณ์นี้:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

ตอนนี้เรามีโฟลเดอร์ที่มีพาร์ติชันที่สามารถอ่านได้ใน dataframe เดียวและปาร์เก้ที่ถูกต้องของทั้งโชว์เคส ใช่? เลขที่

เราต้องจำไว้ว่าเราลงทะเบียนตารางใน Hive Hive ไม่คำนึงถึงตัวพิมพ์เล็กและใหญ่ในชื่อฟิลด์ ในขณะที่ parquet คำนึงถึงตัวพิมพ์เล็กและใหญ่ ดังนั้น พาร์ติชันที่มี schema: field1: int และ Field1: int จะเหมือนกันสำหรับ Hive แต่ไม่ใช่สำหรับ Spark อย่าลืมแปลงชื่อฟิลด์เป็นตัวพิมพ์เล็ก

หลังจากนั้นทุกอย่างดูเหมือนจะดี

อย่างไรก็ตามไม่ใช่เรื่องง่ายทั้งหมด มีปัญหาที่สองซึ่งเป็นที่รู้จักกันดีเช่นกัน เนื่องจากพาร์ติชันใหม่แต่ละพาร์ติชันถูกบันทึกแยกกัน โฟลเดอร์พาร์ติชันจะมีไฟล์บริการ Spark ตัวอย่างเช่น แฟล็กความสำเร็จของการดำเนินการ _SUCCESS ซึ่งจะส่งผลให้เกิดข้อผิดพลาดเมื่อพยายามปาร์เก้ เพื่อหลีกเลี่ยงปัญหานี้ คุณต้องกำหนดคอนฟิกูเรชันเพื่อป้องกันไม่ให้ Spark เพิ่มไฟล์บริการลงในโฟลเดอร์:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

ดูเหมือนว่าทุกๆ วันจะมีการเพิ่มพาร์ติชันไม้ปาร์เก้ใหม่ไปยังโฟลเดอร์โชว์เคสเป้าหมาย ซึ่งเป็นที่ตั้งของข้อมูลที่แยกวิเคราะห์สำหรับวันนั้นๆ เราดูแลล่วงหน้าว่าไม่มีพาร์ติชันที่มีความขัดแย้งของประเภทข้อมูล

แต่เรามีปัญหาที่สาม ตอนนี้ยังไม่ทราบสคีมาทั่วไป ยิ่งกว่านั้น ตารางในไฮฟ์มีสคีมาไม่ถูกต้อง เนื่องจากพาร์ติชันใหม่แต่ละพาร์ติชันมักจะแนะนำการบิดเบือนในสคีมา

คุณต้องลงทะเบียนตารางใหม่ สิ่งนี้สามารถทำได้ง่ายๆ: อ่าน parquet ของหน้าร้านอีกครั้ง ใช้สคีมาและสร้าง DDL ตามนั้น เพื่อลงทะเบียนโฟลเดอร์ใน Hive อีกครั้งเป็นตารางภายนอก อัปเดตสคีมาของหน้าร้านเป้าหมาย

เรามีปัญหาที่สี่ เมื่อเราลงทะเบียนตารางเป็นครั้งแรก เราอาศัย Spark ตอนนี้เราทำด้วยตัวเองและเราต้องจำไว้ว่าช่องไม้ปาร์เก้สามารถขึ้นต้นด้วยอักขระที่ไม่อนุญาตสำหรับ Hive ตัวอย่างเช่น Spark โยนบรรทัดที่ไม่สามารถแยกวิเคราะห์ในช่อง "corrupt_record" ฟิลด์ดังกล่าวไม่สามารถลงทะเบียนใน Hive ได้หากไม่ถูก Escape

เมื่อรู้สิ่งนี้เราจะได้รับโครงร่าง:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

รหัส ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",, ",`").replace("อาร์เรย์<`", "อาร์เรย์<") ทำให้ DDL ปลอดภัย เช่น แทนที่จะเป็น:

create table tname (_field1 string, 1field string)

ด้วยชื่อฟิลด์ เช่น "_field1, 1field" DDL ที่ปลอดภัยจะถูกสร้างขึ้นโดยมีการ Escape ชื่อฟิลด์: สร้างตาราง `tname` (สตริง `_field1`, สตริง `1field`)

คำถามเกิดขึ้น: จะรับดาต้าเฟรมด้วยสคีมาที่สมบูรณ์ได้อย่างไร (ในรหัส pf) วิธีรับ pf นี้ นี่คือปัญหาที่ห้า อ่านโครงร่างของพาร์ติชันทั้งหมดอีกครั้งจากโฟลเดอร์ที่มีไฟล์ปาร์เก้ของตู้โชว์เป้าหมายหรือไม่ วิธีนี้ปลอดภัยที่สุด แต่ยาก

สคีมาอยู่ในไฮฟ์แล้ว คุณสามารถรับสคีมาใหม่ได้โดยการรวมสคีมาของทั้งตารางและพาร์ติชันใหม่ ดังนั้นคุณต้องใช้สคีมาของตารางจาก Hive และรวมเข้ากับสคีมาของพาร์ติชันใหม่ ซึ่งสามารถทำได้โดยการอ่านข้อมูลเมตาทดสอบจาก Hive บันทึกลงในโฟลเดอร์ชั่วคราว และใช้ Spark เพื่ออ่านพาร์ติชันทั้งสองพร้อมกัน

ในความเป็นจริงมีทุกสิ่งที่คุณต้องการ: สคีมาตารางเดิมใน Hive และพาร์ติชันใหม่ เรายังมีข้อมูล ยังคงเป็นเพียงเพื่อรับสคีมาใหม่ที่รวมสคีมาหน้าร้านและฟิลด์ใหม่จากพาร์ติชันที่สร้างขึ้น:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

ต่อไป เราสร้างการลงทะเบียนตาราง DDL เช่นเดียวกับในตัวอย่างก่อนหน้า
หากห่วงโซ่ทั้งหมดทำงานได้อย่างถูกต้อง กล่าวคือ มีการเริ่มต้นการโหลด และตารางถูกสร้างขึ้นอย่างถูกต้องใน Hive เราจะได้รับสคีมาของตารางที่อัปเดต

และปัญหาสุดท้ายคือคุณไม่สามารถเพิ่มพาร์ติชันลงในตารางไฮฟ์ได้ เพราะมันจะเสียหาย คุณต้องบังคับให้ Hive แก้ไขโครงสร้างพาร์ติชัน:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

งานง่าย ๆ ในการอ่าน JSON และสร้างหน้าร้านตามนั้นส่งผลให้สามารถเอาชนะความยากลำบากโดยนัยจำนวนมาก ซึ่งเป็นวิธีแก้ปัญหาที่คุณต้องค้นหาแยกต่างหาก และแม้ว่าวิธีแก้ปัญหาเหล่านี้จะง่าย แต่ก็ต้องใช้เวลามากในการค้นหา

ในการดำเนินการก่อสร้างตู้โชว์ ฉันต้อง:

  • เพิ่มพาร์ติชันในตู้โชว์ กำจัดไฟล์บริการ
  • จัดการกับช่องว่างในแหล่งข้อมูลที่ Spark พิมพ์
  • แปลงประเภทอย่างง่ายเป็นสตริง
  • แปลงชื่อฟิลด์เป็นตัวพิมพ์เล็ก
  • แยกการอัปโหลดข้อมูลและการลงทะเบียนตารางใน Hive (การสร้าง DDL)
  • อย่าลืมที่จะหลีกเลี่ยงชื่อฟิลด์ที่อาจเข้ากันไม่ได้กับไฮฟ์
  • เรียนรู้วิธีอัปเดตการลงทะเบียนตารางใน Hive

สรุปแล้ว เราทราบว่าการตัดสินใจสร้างหน้าต่างร้านค้านั้นเต็มไปด้วยข้อผิดพลาดมากมาย ดังนั้นในกรณีที่มีปัญหาในการใช้งาน จะเป็นการดีกว่าที่จะติดต่อพันธมิตรที่มีประสบการณ์และมีความเชี่ยวชาญที่ประสบความสำเร็จ

ขอขอบคุณที่อ่านบทความนี้ เราหวังว่าคุณจะพบว่าข้อมูลมีประโยชน์

ที่มา: will.com

เพิ่มความคิดเห็น