Spark schemaEvolution ื‘ืคื•ืขืœ

ืงื•ืจืื™ื ื™ืงืจื™ื, ื™ื•ื ื˜ื•ื‘!

ื‘ืžืืžืจ ื–ื”, ื”ื™ื•ืขืฅ ื”ืžื•ื‘ื™ืœ ืฉืœ ืชื—ื•ื ื”ืขืกืงื™ื Big Data Solutions ืฉืœ Neoflex ืžืชืืจ ื‘ืคื™ืจื•ื˜ ืืช ื”ืืคืฉืจื•ื™ื•ืช ืœื‘ื ื™ื™ืช ื—ืœื•ื ื•ืช ืจืื•ื•ื” ืฉืœ ืžื‘ื ื” ืžืฉืชื ื” ื‘ืืžืฆืขื•ืช Apache Spark.

ื›ื—ืœืง ืžืคืจื•ื™ืงื˜ ื ื™ืชื•ื— ื ืชื•ื ื™ื, ืžืชืขื•ืจืจืช ืœืขืชื™ื ืงืจื•ื‘ื•ืช ื”ืžืฉื™ืžื” ืฉืœ ื‘ื ื™ื™ืช ื—ืœื•ื ื•ืช ืจืื•ื•ื” ื”ืžื‘ื•ืกืกื™ื ืขืœ ื ืชื•ื ื™ื ื‘ืขืœื™ ืžื‘ื ื” ืจื•ืคืฃ.

ื‘ื“ืจืš ื›ืœืœ ืžื“ื•ื‘ืจ ื‘ื™ื•ืžื ื™ื, ืื• ืชื’ื•ื‘ื•ืช ืžืžืขืจื›ื•ืช ืฉื•ื ื•ืช, ืฉื ืฉืžืจื• ื›-JSON ืื• XML. ื”ื ืชื•ื ื™ื ืขื•ืœื™ื ืœ-Hadoop, ื•ืื– ืฆืจื™ืš ืœื‘ื ื•ืช ืžื”ื ื—ืœื•ืŸ ืจืื•ื•ื”. ืื ื—ื ื• ื™ื›ื•ืœื™ื ืœืืจื’ืŸ ื’ื™ืฉื” ืœื—ืœื•ืŸ ื”ืจืื•ื•ื” ืฉื ื•ืฆืจ, ืœืžืฉืœ, ื“ืจืš ืื™ืžืคืœื”.

ื‘ืžืงืจื” ื–ื”, ื”ืกื›ื™ืžื” ืฉืœ ื—ืœื•ืŸ ื”ืจืื•ื•ื” ืฉืœ ื”ื™ืขื“ ืื™ื ื” ื™ื“ื•ืขื” ืžืจืืฉ. ื™ืชืจื” ืžื›ืš, ื’ื ืืช ื”ืชื•ื›ื ื™ืช ืœื ื ื™ืชืŸ ืœืขืจื•ืš ืžืจืืฉ, ืฉื›ืŸ ื”ื™ื ืชืœื•ื™ื” ื‘ื ืชื•ื ื™ื, ื•ืื ื• ืขื•ืกืงื™ื ื‘ื ืชื•ื ื™ื ื”ืžืื•ื“ ืจื•ืคืคื™ื ื”ืœืœื•.

ืœื“ื•ื’ืžื”, ื”ื™ื•ื ื ืจืฉืžื” ื”ืชื’ื•ื‘ื” ื”ื‘ืื”:

{source: "app1", error_code: ""}

ื•ืžื—ืจ ืžืื•ืชื” ืžืขืจื›ืช ืžื’ื™ืขื” ื”ืชืฉื•ื‘ื” ื”ื‘ืื”:

{source: "app1", error_code: "error", description: "Network error"}

ื›ืชื•ืฆืื” ืžื›ืš ืฆืจื™ืš ืœื”ื•ืกื™ืฃ ืขื•ื“ ืฉื“ื” ืื—ื“ ืœื•ื•ื™ื˜ืจื™ื ื” - ืชื™ืื•ืจ, ื•ืืฃ ืื—ื“ ืœื ื™ื•ื“ืข ืื ื”ื•ื ื™ื’ื™ืข ืื• ืœื.

ื”ืžืฉื™ืžื” ืฉืœ ื™ืฆื™ืจืช ื—ืœื•ืŸ ืจืื•ื•ื” ืขืœ ื ืชื•ื ื™ื ื›ืืœื” ื”ื™ื ื“ื™ ืกื˜ื ื“ืจื˜ื™ืช, ื•ืœ-Spark ื™ืฉ ืžืกืคืจ ื›ืœื™ื ืœื›ืš. ืœื ื™ืชื•ื— ื ืชื•ื ื™ ื”ืžืงื•ืจ, ืงื™ื™ืžืช ืชืžื™ื›ื” ื”ืŸ ื‘-JSON ื•ื”ืŸ ื‘-XML, ื•ื‘ืกื›ื™ืžื” ืฉืœื ื”ื™ื™ืชื” ื™ื“ื•ืขื” ื‘ืขื‘ืจ, ื ื™ืชื ืช ืชืžื™ื›ื” ื‘-schemaEvolution.

ื‘ืžื‘ื˜ ืจืืฉื•ืŸ, ื”ืคืชืจื•ืŸ ื ืจืื” ืคืฉื•ื˜. ืืชื” ืฆืจื™ืš ืœืงื—ืช ืชื™ืงื™ื” ืขื JSON ื•ืœืงืจื•ื ืื•ืชื” ืœืชื•ืš ืžืกื’ืจืช ื ืชื•ื ื™ื. Spark ื™ืฆื•ืจ ืกื›ืžื”, ื™ื”ืคื•ืš ื ืชื•ื ื™ื ืžืงื•ื ื ื™ื ืœืžื‘ื ื™ื. ื™ืชืจื” ืžื›ืš, ื”ื›ืœ ืฆืจื™ืš ืœื”ื™ืฉืžืจ ื‘ืคืจืงื˜, ื”ื ืชืžืš ื’ื ื‘ืื™ืžืคืœื”, ืขืœ ื™ื“ื™ ืจื™ืฉื•ื ื—ืœื•ืŸ ื”ืจืื•ื•ื” ื‘-Hive metastore.

ื”ื›ืœ ื ืจืื” ืคืฉื•ื˜.

ืขื ื–ืืช, ืœื ื‘ืจื•ืจ ืžื”ื“ื•ื’ืžืื•ืช ื”ืงืฆืจื•ืช ื‘ืชื™ืขื•ื“ ืžื” ืœืขืฉื•ืช ืขื ืžืกืคืจ ื‘ืขื™ื•ืช ื‘ืคื•ืขืœ.

ื”ืชื™ืขื•ื“ ืžืชืืจ ื’ื™ืฉื” ืœื ืœื™ืฆื•ืจ ื—ื–ื™ืช ื—ื ื•ืช, ืืœื ืœืงืจื•ื JSON ืื• XML ืœืชื•ืš ืžืกื’ืจืช ื ืชื•ื ื™ื.

ื›ืœื•ืžืจ, ื–ื” ืคืฉื•ื˜ ืžืจืื” ื›ื™ืฆื“ ืœืงืจื•ื ื•ืœื ืชื— JSON:

df = spark.read.json(path...)

ื–ื” ืžืกืคื™ืง ื›ื“ื™ ืœื”ืคื•ืš ืืช ื”ื ืชื•ื ื™ื ืœื–ืžื™ื ื™ื ืœ-Spark.

ื‘ืคื•ืขืœ, ื”ืกืงืจื™ืคื˜ ื”ืจื‘ื” ื™ื•ืชืจ ืžืกื•ื‘ืš ืžืกืชื ืงืจื™ืืช ืงื‘ืฆื™ JSON ืžืชื™ืงื™ื” ื•ื™ืฆื™ืจืช Dataframe. ื”ืžืฆื‘ ื ืจืื” ื›ืš: ื™ืฉ ื›ื‘ืจ ื—ืœื•ืŸ ืจืื•ื•ื” ืžืกื•ื™ื, ื ืชื•ื ื™ื ื—ื“ืฉื™ื ืžื’ื™ืขื™ื ื›ืœ ื™ื•ื, ืฆืจื™ืš ืœื”ื•ืกื™ืฃ ืื•ืชื ืœื—ืœื•ืŸ ื”ืจืื•ื•ื”, ื‘ืœื™ ืœืฉื›ื•ื— ืฉื”ืชื›ื ื™ืช ืขืฉื•ื™ื” ืœื”ื™ื•ืช ืฉื•ื ื”.

ื”ืชื•ื›ื ื™ืช ื”ืจื’ื™ืœื” ืœื‘ื ื™ื™ืช ื—ืœื•ืŸ ืจืื•ื•ื” ื”ื™ื ื›ื“ืœืงืžืŸ:

ืฉืœื‘ 1. ื”ื ืชื•ื ื™ื ื ื˜ืขื ื™ื ืœืชื•ืš Hadoop ืขื ื˜ืขื™ื ื” ื™ื•ืžื™ืช ืœืื—ืจ ืžื›ืŸ ื•ืžืชื•ื•ืกืคื™ื ืœืžื—ื™ืฆื” ื—ื“ืฉื”. ืžืกืชื‘ืจ ืฉืชื™ืงื™ื” ืขื ื ืชื•ื ื™ื ืจืืฉื•ื ื™ื™ื ืžื—ื•ืœืงื™ื ืœืคื™ ื™ื•ื.

ืฉืœื‘ 2. ื‘ืžื”ืœืš ื”ื˜ืขื™ื ื” ื”ืจืืฉื•ื ื™ืช, ืชื™ืงื™ื™ื” ื–ื• ื ืงืจืืช ื•ืžื ืชืงืช ืขืœ ื™ื“ื™ Spark. ืžืกื’ืจืช ื”ื ืชื•ื ื™ื ื”ืžืชืงื‘ืœืช ื ืฉืžืจืช ื‘ืคื•ืจืžื˜ ืฉื ื™ืชืŸ ืœื ืชื—, ืœืžืฉืœ, ื‘ืคืจืงื˜, ืฉืื•ืชื• ื ื™ืชืŸ ืœื™ื™ื‘ื ืœืื™ืžืคืœื”. ื–ื” ื™ื•ืฆืจ ื—ืœื•ืŸ ืจืื•ื•ื” ืฉืœ ื™ืขื“ ืขื ื›ืœ ื”ื ืชื•ื ื™ื ืฉื”ืฆื˜ื‘ืจื• ืขื“ ืœื ืงื•ื“ื” ื–ื•.

ืฉืœื‘ 3. ื ื•ืฆืจืช ื”ื•ืจื“ื” ืฉืชืขื“ื›ืŸ ืืช ื—ืœื•ืŸ ื”ืจืื•ื•ื” ื‘ื›ืœ ื™ื•ื.
ื™ืฉื ื” ืฉืืœื” ืฉืœ ื”ืขืžืกื” ืžืฆื˜ื‘ืจืช, ื”ืฆื•ืจืš ื‘ื—ืœื•ืงืช ื”ื•ื•ื™ื˜ืจื™ื ื”, ื•ืฉืืœื” ืฉืœ ืฉืžื™ืจื” ืขืœ ื”ืกื›ื™ืžื” ื”ื›ืœืœื™ืช ืฉืœ ื”ื•ื•ื™ื˜ืจื™ื ื”.

ื‘ื•ืื• ื ื™ืงื— ื“ื•ื’ืžื”. ื ื ื™ื— ืฉื”ืฉืœื‘ ื”ืจืืฉื•ืŸ ืฉืœ ื‘ื ื™ื™ืช ืžืื’ืจ ื™ื•ืฉื, ื•ืงื‘ืฆื™ JSON ืžื•ืขืœื™ื ืœืชื™ืงื™ื”.

ื™ืฆื™ืจืช ืžืกื’ืจืช ื ืชื•ื ื™ื ืžื”ื, ื•ืื– ืฉืžื™ืจืชื• ื›ื—ืœื•ืŸ ืจืื•ื•ื”, ืื™ื ื” ื‘ืขื™ื”. ื–ื”ื• ื”ืฆืขื“ ื”ืจืืฉื•ืŸ ืฉื ื™ืชืŸ ืœืžืฆื•ื ื‘ืงืœื•ืช ื‘ืชื™ืขื•ื“ Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

ื”ื›ืœ ื ืจืื” ื‘ืกื“ืจ.

ืงืจืื ื• ื•ืคืจื—ื ื• ืืช JSON, ื•ืื– ืื ื—ื ื• ืฉื•ืžืจื™ื ืืช ื”-dataframe ื›ืคืจืงื˜, ื•ืจื•ืฉืžื™ื ืื•ืชื• ื‘-Hive ื‘ื›ืœ ื“ืจืš ื ื•ื—ื”:

df.write.format(โ€œparquetโ€).option('path','<External Table Path>').saveAsTable('<Table Name>')

ืื ื—ื ื• ืžืงื‘ืœื™ื ื—ืœื•ืŸ.

ืื‘ืœ, ืœืžื—ืจืช, ื ื•ืกืคื• ื ืชื•ื ื™ื ื—ื“ืฉื™ื ืžื”ืžืงื•ืจ. ื™ืฉ ืœื ื• ืชื™ืงื™ื™ื” ืขื JSON, ื•ื—ืœื•ืŸ ืจืื•ื•ื” ืฉื ื•ืฆืจ ืžื”ืชื™ืงื™ื™ื” ื”ื–ื•. ืœืื—ืจ ื˜ืขื™ื ืช ืืฆื•ื•ื” ื”ื ืชื•ื ื™ื ื”ื‘ืื” ืžื”ืžืงื•ืจ, ื—ืกืจื™ื ืœื—ื ื•ืช ื”ื ืชื•ื ื™ื ื ืชื•ื ื™ื ืฉืœ ื™ื•ื ืื—ื“.

ื”ืคืชืจื•ืŸ ื”ื”ื’ื™ื•ื ื™ ื™ื”ื™ื” ื—ืœื•ืงื” ืฉืœ ื—ืœื•ืŸ ื”ืจืื•ื•ื” ื‘ื™ื•ื, ืžื” ืฉื™ืืคืฉืจ ื”ื•ืกืคืช ืžื—ื™ืฆื” ื—ื“ืฉื” ื‘ื›ืœ ื™ื•ื ืœืžื—ืจืช. ื’ื ื”ืžื ื’ื ื•ืŸ ืœื›ืš ื™ื“ื•ืข, Spark ืžืืคืฉืจ ืœื›ืชื•ื‘ ืžื—ื™ืฆื•ืช ื‘ื ืคืจื“.

ืจืืฉื™ืช, ืื ื• ืžื‘ืฆืขื™ื ื˜ืขื™ื ื” ืจืืฉื•ื ื™ืช, ืฉื•ืžืจื™ื ืืช ื”ื ืชื•ื ื™ื ื›ืžืชื•ืืจ ืœืขื™ืœ, ื•ืžื•ืกื™ืคื™ื ืจืง ื—ืœื•ืงื” ืœืžื—ื™ืฆื•ืช. ืคืขื•ืœื” ื–ื• ื ืงืจืืช ืืชื—ื•ืœ ื—ื–ื™ืช ื”ื—ื ื•ืช ื•ื”ื™ื ื ืขืฉื™ืช ืคืขื ืื—ืช ื‘ืœื‘ื“:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

ืœืžื—ืจืช, ืื ื• ื˜ื•ืขื ื™ื ืจืง ืžื—ื™ืฆื” ื—ื“ืฉื”:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

ื›ืœ ืžื” ืฉื ื•ืชืจ ื”ื•ื ืœื”ื™ืจืฉื ืžื—ื“ืฉ ื‘ื›ื•ื•ืจืช ื›ื“ื™ ืœืขื“ื›ืŸ ืืช ื”ืกื›ื™ืžื”.
ืขื ื–ืืช, ื›ืืŸ ืžืชืขื•ืจืจื•ืช ื‘ืขื™ื•ืช.

ื‘ืขื™ื” ืจืืฉื•ื ื”. ื‘ืžื•ืงื“ื ืื• ื‘ืžืื•ื—ืจ, ื”ืคืจืงื˜ ืฉื™ืชืงื‘ืœ ื™ื”ื™ื” ื‘ืœืชื™ ืงืจื™ื. ื–ื” ื ื•ื‘ืข ืžื”ืื•ืคืŸ ืฉื‘ื• ืคืจืงื˜ ื•-JSON ืžืชื™ื™ื—ืกื™ื ืื—ืจืช ืœืฉื“ื•ืช ืจื™ืงื™ื.

ื‘ื•ืื• ื ื™ืงื— ื‘ื—ืฉื‘ื•ืŸ ืžืฆื‘ ื˜ื™ืคื•ืกื™. ืœื“ื•ื’ืžื”, ืืชืžื•ืœ ืžื’ื™ืข JSON:

ะ”ะตะฝัŒ 1: {"a": {"b": 1}},

ื•ื”ื™ื•ื ืื•ืชื• JSON ื ืจืื” ื›ืš:

ะ”ะตะฝัŒ 2: {"a": null}

ื ื ื™ื— ืฉื™ืฉ ืœื ื• ืฉืชื™ ืžื—ื™ืฆื•ืช ืฉื•ื ื•ืช, ืฉืœื›ืœ ืื—ืช ื™ืฉ ืฉื•ืจื” ืื—ืช.
ื›ืืฉืจ ื ืงืจื ืืช ื›ืœ ื ืชื•ื ื™ ื”ืžืงื•ืจ, Spark ื™ื•ื›ืœ ืœืงื‘ื•ืข ืืช ื”ืกื•ื’, ื•ื™ื‘ื™ืŸ ืฉ"a" ื”ื•ื ืฉื“ื” ืžืกื•ื’ "ืžื‘ื ื”", ืขื ืฉื“ื” ืžืงื•ื ืŸ "b" ืžืกื•ื’ INT. ืื‘ืœ, ืื ื›ืœ ืžื—ื™ืฆื” ื ืฉืžืจื” ื‘ื ืคืจื“, ืื– ื ืงื‘ืœ ืคืจืงื˜ ืขื ืกื›ื™ืžื•ืช ืžื—ื™ืฆื•ืช ืœื ืชื•ืืžื•ืช:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

ืžืฆื‘ ื–ื” ื™ื“ื•ืข, ื•ืœื›ืŸ ื ื•ืกืคื” ืืคืฉืจื•ืช ืžื™ื•ื—ื“ืช - ื‘ืขืช ื ื™ืชื•ื— ื ืชื•ื ื™ ื”ืžืงื•ืจ, ื”ืกืจ ืฉื“ื•ืช ืจื™ืงื™ื:

df = spark.read.json("...", dropFieldIfAllNull=True)

ื‘ืžืงืจื” ื–ื”, ื”ืคืจืงื˜ ื™ื”ื™ื” ืžื•ืจื›ื‘ ืžืžื—ื™ืฆื•ืช ืฉื ื™ืชืŸ ืœืงืจื•ื ื™ื—ื“.
ืœืžืจื•ืช ืฉืžื™ ืฉืขืฉื” ื–ืืช ื‘ืคื•ืขืœ ื™ื—ื™ื™ืš ื›ืืŸ ื‘ืžืจื™ืจื•ืช. ืœืžื”? ื›ืŸ, ื›ื™ ืกื‘ื™ืจ ืฉื™ื”ื™ื• ืขื•ื“ ืฉื ื™ ืžืฆื‘ื™ื. ืื• ืฉืœื•ืฉ. ืื• ืืจื‘ืข. ื”ืจืืฉื•ืŸ, ืฉื™ืชืจื—ืฉ ื›ืžืขื˜ ื‘ื•ื•ื“ืื•ืช, ื”ื•ื ืฉืกื•ื’ื™ื ืžืกืคืจื™ื™ื ื™ื™ืจืื• ืื—ืจืช ื‘ืงื‘ืฆื™ JSON ืฉื•ื ื™ื. ืœื“ื•ื’ืžื”, {intField: 1} ื•-{intField: 1.1}. ืื ืฉื“ื•ืช ื›ืืœื” ื ืžืฆืื™ื ื‘ืžื—ื™ืฆื” ืื—ืช, ืžื™ื–ื•ื’ ื”ืกื›ื™ืžื” ื™ืงืจื ื”ื›ืœ ื‘ืฆื•ืจื” ื ื›ื•ื ื”, ื•ื™ื•ื‘ื™ืœ ืœืกื•ื’ ื”ืžื“ื•ื™ืง ื‘ื™ื•ืชืจ. ืื‘ืœ ืื ื‘ืืœื” ืฉื•ื ื™ื, ืื– ืœืื—ื“ ื™ื”ื™ื” intField: int, ื•ืœืฉื ื™ ื™ื”ื™ื” intField: double.

ื™ืฉ ืืช ื”ื“ื’ืœ ื”ื‘ื ืœื˜ื™ืคื•ืœ ื‘ืžืฆื‘ ื–ื”:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

ื›ืขืช ื™ืฉ ืœื ื• ืชื™ืงื™ื™ื” ืฉื‘ื” ื™ืฉ ืžื—ื™ืฆื•ืช ืฉื ื™ืชืŸ ืœืงืจื•ื ืœ-dataframe ื‘ื•ื“ื“ ื•ืคืจืงื˜ ืชืงืฃ ืฉืœ ื›ืœ ื”ื•ื•ื™ื˜ืจื™ื ื”. ื›ืŸ? ืœื.

ืขืœื™ื ื• ืœื–ื›ื•ืจ ืฉืจืฉืžื ื• ืืช ื”ื˜ื‘ืœื” ื‘ื›ื•ื•ืจืช. Hive ืื™ื ื• ืจื’ื™ืฉ ืœืื•ืชื™ื•ืช ื’ื“ื•ืœื•ืช ื‘ืฉืžื•ืช ืฉื“ื•ืช, ื‘ืขื•ื“ ืคืจืงื˜ ื”ื•ื ืจื’ื™ืฉ ืจื™ืฉื™ื•ืช. ืœื›ืŸ, ืžื—ื™ืฆื•ืช ืขื ืกื›ื™ืžื•ืช: field1: int ื•-Field1: int ื–ื”ื•ืช ืขื‘ื•ืจ Hive, ืืš ืœื ืขื‘ื•ืจ Spark. ืืœ ืชืฉื›ื— ืœื”ืžื™ืจ ืืช ืฉืžื•ืช ื”ืฉื“ื•ืช ืœืื•ืชื™ื•ืช ืงื˜ื ื•ืช.

ืื—ืจื™ ื–ื” ื ืจืื” ืฉื”ื›ืœ ื‘ืกื“ืจ.

ืขื ื–ืืช, ืœื ื”ื›ืœ ื›ืœ ื›ืš ืคืฉื•ื˜. ื™ืฉ ื‘ืขื™ื” ืฉื ื™ื™ื”, ื’ื ื™ื“ื•ืขื”. ืžื›ื™ื•ื•ืŸ ืฉื›ืœ ืžื—ื™ืฆื” ื—ื“ืฉื” ื ืฉืžืจืช ื‘ื ืคืจื“, ืชื™ืงื™ื™ืช ื”ืžื—ื™ืฆื” ืชื›ื™ืœ ืงื‘ืฆื™ ืฉื™ืจื•ืช Spark, ืœื“ื•ื’ืžื”, ื“ื’ืœ ื”ืฆืœื—ืช ื”ืคืขื•ืœื” _SUCCESS. ื–ื” ื™ื’ืจื•ื ืœืฉื’ื™ืื” ื‘ืขืช ื ื™ืกื™ื•ืŸ ืœืคืจืงื˜. ื›ื“ื™ ืœื”ื™ืžื ืข ืžื›ืš, ืขืœื™ืš ืœื”ื’ื“ื™ืจ ืืช ื”ืชืฆื•ืจื” ื›ื“ื™ ืœืžื ื•ืข ืž-Spark ืœื”ื•ืกื™ืฃ ืงื‘ืฆื™ ืฉื™ืจื•ืช ืœืชื™ืงื™ื”:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

ื ืจืื” ืฉื›ืขืช ื›ืœ ื™ื•ื ืžืชื•ื•ืกืคืช ืžื—ื™ืฆืช ืคืจืงื˜ ื—ื“ืฉื” ืœืชื™ืงื™ื™ืช ื—ืœื•ืŸ ื”ืจืื•ื•ื” ื”ื™ืขื“, ื‘ื” ื ืžืฆืื™ื ื”ื ืชื•ื ื™ื ื”ืžื ื•ืชื—ื™ื ืฉืœ ื”ื™ื•ื. ื“ืื’ื ื• ืžืจืืฉ ืฉืœื ื™ื”ื™ื• ืžื—ื™ืฆื•ืช ืขื ื”ืชื ื’ืฉื•ืช ื‘ืกื•ื’ ื”ื ืชื•ื ื™ื.

ืื‘ืœ, ื™ืฉ ืœื ื• ื‘ืขื™ื” ืฉืœื™ืฉื™ืช. ื›ืขืช ื”ืกื›ื™ืžื” ื”ื›ืœืœื™ืช ืื™ื ื” ื™ื“ื•ืขื”, ื™ืชืจ ืขืœ ื›ืŸ, ื‘ื˜ื‘ืœื” ื‘-Hive ื™ืฉ ืกื›ื™ืžื” ืฉื’ื•ื™ื”, ืžื›ื™ื•ื•ืŸ ืฉื›ืœ ืžื—ื™ืฆื” ื—ื“ืฉื” ื›ื›ืœ ื”ื ืจืื” ื”ื›ื ื™ืกื” ืขื™ื•ื•ืช ืœืกื›ื™ืžื”.

ืขืœื™ืš ืœืจืฉื•ื ืžื—ื“ืฉ ืืช ื”ื˜ื‘ืœื”. ื ื™ืชืŸ ืœืขืฉื•ืช ื–ืืช ื‘ืคืฉื˜ื•ืช: ืงืจืื• ืฉื•ื‘ ืืช ื”ืคืจืงื˜ ืฉืœ ื—ืœื•ืŸ ื”ืจืื•ื•ื”, ืงื— ืืช ื”ืกื›ื™ืžื” ื•ืฆื•ืจ DDL ืขืœ ื‘ืกื™ืกื”, ืื™ืชื• ื ื™ืชืŸ ืœืจืฉื•ื ืžื—ื“ืฉ ืืช ื”ืชื™ืงื™ื” ื‘-Hive ื›ื˜ื‘ืœื” ื—ื™ืฆื•ื ื™ืช, ืชื•ืš ืขื“ื›ื•ืŸ ื”ืกื›ื™ืžื” ืฉืœ ื—ืœื•ืŸ ื”ืจืื•ื•ื” ื”ื™ืขื“.

ื™ืฉ ืœื ื• ื‘ืขื™ื” ืจื‘ื™ืขื™ืช. ื›ืฉืจืฉืžื ื• ืืช ื”ื˜ื‘ืœื” ื‘ืคืขื ื”ืจืืฉื•ื ื”, ืกืžื›ื ื• ืขืœ ืกืคืืจืง. ืขื›ืฉื™ื• ืื ื—ื ื• ืขื•ืฉื™ื ืืช ื–ื” ื‘ืขืฆืžื ื•, ื•ืฆืจื™ืš ืœื–ื›ื•ืจ ืฉืฉื“ื•ืช ืคืจืงื˜ ื™ื›ื•ืœื™ื ืœื”ืชื—ื™ืœ ื‘ื“ืžื•ื™ื•ืช ืฉืืกื•ืจ ืœื›ื•ื•ืจืช. ืœื“ื•ื’ืžื”, Spark ื–ื•ืจืง ืงื•ื•ื™ื ืฉื”ื•ื ืœื ื”ืฆืœื™ื— ืœื ืชื— ื‘ืฉื“ื” "corrupt_record". ืœื ื ื™ืชืŸ ืœืจืฉื•ื ืฉื“ื” ื›ื–ื” ื‘ื›ื•ื•ืจืช ืžื‘ืœื™ ืœื”ื™ืžืœื˜.

ื‘ื™ื“ื™ืขื” ื–ื•, ืื ื• ืžืงื‘ืœื™ื ืืช ื”ืชื•ื›ื ื™ืช:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

ืงื•ื“ ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("ืžืขืจืš<`", "ืžืขืจืš<") ืขื•ืฉื” DDL ื‘ื˜ื•ื—, ื›ืœื•ืžืจ ื‘ืžืงื•ื:

create table tname (_field1 string, 1field string)

ืขื ืฉืžื•ืช ืฉื“ื•ืช ื›ืžื• "_field1, 1field", ื ื•ืฆืจ DDL ื‘ื˜ื•ื— ื‘ืžืงื•ื ืฉื‘ื• ืฉืžื•ืช ื”ืฉื“ื•ืช ืžื•ื—ืœืคื™ื: ืฆื•ืจ ื˜ื‘ืœื” `tname` (ืžื—ืจื•ื–ืช `_field1`, ืžื—ืจื•ื–ืช `1field`).

ื ืฉืืœืช ื”ืฉืืœื”: ื›ื™ืฆื“ ืœื”ืฉื™ื’ ื›ืจืื•ื™ Dataframe ืขื ืกื›ื™ืžื” ืžืœืื” (ื‘ืงื•ื“ pf)? ืื™ืš ืžืฉื™ื’ื™ื ืืช ื”-PF ื”ื–ื”? ื–ื• ื”ื‘ืขื™ื” ื”ื—ืžื™ืฉื™ืช. ืœืงืจื•ื ืžื—ื“ืฉ ืืช ื”ืกื›ื™ืžื” ืฉืœ ื›ืœ ื”ืžื—ื™ืฆื•ืช ืžื”ืชื™ืงื™ื™ื” ืขื ืงื‘ืฆื™ ืคืจืงื˜ ืฉืœ ื—ืœื•ืŸ ื”ืจืื•ื•ื” ื”ื™ืขื“? ืฉื™ื˜ื” ื–ื• ื”ื™ื ื”ื‘ื˜ื•ื—ื” ื‘ื™ื•ืชืจ, ืืš ืงืฉื”.

ื”ืกื›ื™ืžื” ื›ื‘ืจ ื ืžืฆืืช ื‘ื›ื•ื•ืจืช. ื ื™ืชืŸ ืœืงื‘ืœ ืกื›ื™ืžื” ื—ื“ืฉื” ืขืœ ื™ื“ื™ ืฉื™ืœื•ื‘ ืฉืœ ื”ืกื›ื™ืžื” ืฉืœ ื”ื˜ื‘ืœื” ื›ื•ืœื” ื•ื”ืžื—ื™ืฆื” ื”ื—ื“ืฉื”. ืื– ืืชื” ืฆืจื™ืš ืœืงื—ืช ืืช ืกื›ื™ืžืช ื”ื˜ื‘ืœื” ืž-Hive ื•ืœืฉืœื‘ ืื•ืชื” ืขื ื”ืกื›ื™ืžื” ืฉืœ ื”ืžื—ื™ืฆื” ื”ื—ื“ืฉื”. ื ื™ืชืŸ ืœืขืฉื•ืช ื–ืืช ืขืœ ื™ื“ื™ ืงืจื™ืืช ื”ืžื˜ื-ื ืชื•ื ื™ื ืฉืœ ื”ื‘ื“ื™ืงื” ืž-Hive, ืฉืžื™ืจืชื ื‘ืชื™ืงื™ื™ื” ื–ืžื ื™ืช ื•ืฉื™ืžื•ืฉ ื‘-Spark ื›ื“ื™ ืœืงืจื•ื ืืช ืฉืชื™ ื”ืžื—ื™ืฆื•ืช ื‘ื‘ืช ืื—ืช.

ืœืžืขืฉื”, ื™ืฉ ืืช ื›ืœ ืžื” ืฉืืชื” ืฆืจื™ืš: ืกื›ื™ืžืช ื”ื˜ื‘ืœื” ื”ืžืงื•ืจื™ืช ื‘-Hive ื•ื”ืžื—ื™ืฆื” ื”ื—ื“ืฉื”. ื™ืฉ ืœื ื• ื’ื ื ืชื•ื ื™ื. ื ื•ืชืจ ืจืง ืœืงื‘ืœ ืกื›ื™ืžื” ื—ื“ืฉื”, ื”ืžืฉืœื‘ืช ืืช ืกื›ื™ืžืช ื—ื–ื™ืช ื”ื—ื ื•ืช ื•ืฉื“ื•ืช ื—ื“ืฉื™ื ืžื”ืžื—ื™ืฆื” ืฉื ื•ืฆืจื”:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

ืœืื—ืจ ืžื›ืŸ, ืื ื• ื™ื•ืฆืจื™ื ืืช ื”-DDL ืฉืœ ืจื™ืฉื•ื ื”ื˜ื‘ืœื”, ื›ืžื• ื‘ืงื˜ืข ื”ืงื•ื“ื.
ืื ื›ืœ ื”ืฉืจืฉืจืช ืขื•ื‘ื“ืช ื›ืžื• ืฉืฆืจื™ืš, ื›ืœื•ืžืจ, ื”ื™ื” ืขื•ืžืก ืืชื—ื•ืœ, ื•ื”ื˜ื‘ืœื” ื ื•ืฆืจื” ื‘ืฆื•ืจื” ื ื›ื•ื ื” ื‘-Hive, ืื– ื ืงื‘ืœ ืกื›ื™ืžืช ื˜ื‘ืœื” ืžืขื•ื“ื›ื ืช.

ื•ื”ื‘ืขื™ื” ื”ืื—ืจื•ื ื” ื”ื™ื ืฉืื™ ืืคืฉืจ ืคืฉื•ื˜ ืœื”ื•ืกื™ืฃ ืžื—ื™ืฆื” ืœื˜ื‘ืœื” ืฉืœ Hive, ื›ื™ ื”ื™ื ืชื™ืฉื‘ืจ. ืืชื” ืฆืจื™ืš ืœื”ื›ืจื™ื— ืืช Hive ืœืชืงืŸ ืืช ืžื‘ื ื” ื”ืžื—ื™ืฆื” ืฉืœื”:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

ื”ืžืฉื™ืžื” ื”ืคืฉื•ื˜ื” ืฉืœ โ€‹โ€‹ืงืจื™ืืช JSON ื•ื™ืฆื™ืจืช ื—ืœื•ืŸ ืจืื•ื•ื” ืขืœ ื‘ืกื™ืกื• ืžื‘ื™ืื” ืœื”ืชื’ื‘ืจ ืขืœ ืžืกืคืจ ืงืฉื™ื™ื ืžืจื•ืžื–ื™ื, ืคืชืจื•ื ื•ืช ืฉืขื‘ื•ืจื ื™ืฉ ืœื—ืคืฉ ื‘ื ืคืจื“. ื•ืœืžืจื•ืช ืฉื”ืคืชืจื•ื ื•ืช ื”ืืœื” ืคืฉื•ื˜ื™ื, ืœื•ืงื— ื”ืจื‘ื” ื–ืžืŸ ืœืžืฆื•ื ืื•ืชื.

ื›ื“ื™ ืœื™ื™ืฉื ืืช ื‘ื ื™ื™ืช ื”ื•ื•ื™ื˜ืจื™ื ื”, ื”ื™ื™ืชื™ ืฆืจื™ืš:

  • ื”ื•ืกืฃ ืžื—ื™ืฆื•ืช ืœื—ืœื•ืŸ ื”ืจืื•ื•ื”, ื”ื™ืคื˜ืจ ืžืงื‘ืฆื™ ืฉื™ืจื•ืช
  • ื”ืชืžื•ื“ื“ ืขื ืฉื“ื•ืช ืจื™ืงื™ื ื‘ื ืชื•ื ื™ ืžืงื•ืจ ืฉ-Spark ื”ืงืœื™ื“
  • ื™ืฆืงื• ื˜ื™ืคื•ืกื™ื ืคืฉื•ื˜ื™ื ืœืžื—ืจื•ื–ืช
  • ื”ืžืจืช ืฉืžื•ืช ืฉื“ื•ืช ืœืื•ืชื™ื•ืช ืงื˜ื ื•ืช
  • ื”ืขืœืืช ื ืชื•ื ื™ื ื ืคืจื“ืช ื•ืจื™ืฉื•ื ื˜ื‘ืœื” ื‘ื›ื•ื•ืจืช (ื“ื•ืจ DDL)
  • ืืœ ืชืฉื›ื— ืœื‘ืจื•ื— ืžืฉืžื•ืช ืฉื“ื•ืช ืฉืขืœื•ืœื™ื ืœื”ื™ื•ืช ืœื ืชื•ืืžื™ื ืœ-Hive
  • ืœืžื“ ื›ื™ืฆื“ ืœืขื“ื›ืŸ ืืช ืจื™ืฉื•ื ื”ื˜ื‘ืœื” ื‘ื›ื•ื•ืจืช

ืœืกื™ื›ื•ื, ื ืฆื™ื™ืŸ ื›ื™ ื”ื”ื—ืœื˜ื” ืœื‘ื ื•ืช ื—ืœื•ื ื•ืช ืจืื•ื•ื” ื˜ื•ืžื ืช ื‘ื—ื•ื‘ื” ืžืœื›ื•ื“ื•ืช ืจื‘ื•ืช. ืœื›ืŸ, ื‘ืžืงืจื” ืฉืœ ืงืฉื™ื™ื ื‘ื™ื™ืฉื•ื, ืขื“ื™ืฃ ืœืคื ื•ืช ืœืฉื•ืชืฃ ืžื ื•ืกื” ืขื ืžื•ืžื—ื™ื•ืช ืžื•ืฆืœื—ืช.

ืชื•ื“ื” ืฉืงืจืืช ืžืืžืจ ื–ื”, ืื ื• ืžืงื•ื•ื™ื ืฉืชืžืฆื ืืช ื”ืžื™ื“ืข ืฉื™ืžื•ืฉื™.

ืžืงื•ืจ: www.habr.com

ื”ื•ืกืคืช ืชื’ื•ื‘ื”