ืืืืขืจืข ืืืืขื ืขืจ, ื ืืืื ืืื!
ืืื ืืขื ืึทืจืืืงื, ืื ืืืืื ื ืงืึธื ืกืืืืึทื ื ืคืื Neoflex ืก ืืื ืืึทืืึท ืกืึทืืืฉืึทื ื ืืขืฉืขืคื ืืขืื ื ืืืฉืจืืืื ืืื ืืขืืึทื ืื ืึธืคึผืฆืืขืก ืคึฟืึทืจ ืื ืื ืืืขืจืืึทืืึทื ืกืืจืืงืืืจ ืฉืึธืืงืืืกืื ื ืืฆื Apache Spark.
ืืื ืึท ืืืื ืคืื ืึท ืืึทืื ืึทื ืึทืืืกืืก ืคึผืจืืืขืงื, ืื ืึทืจืืขื ืคืื ืืืืขื ืกืืึธืจืคืจืึทื ืฅ ืืืืืจื ืืืืฃ ืืืกืื ืกืืจืึทืงืืฉืขืจื ืืึทืื ืึธืคื ืขืจืืืืึทื.
ืืืืฉืึทืืืึทืื ืืึธืก ืืขื ืขื ืืึธืืก, ืึธืืขืจ ืจืขืกืคึผืึธื ืกืขืก ืคืื ืคืึทืจืฉืืื ืกืืกืืขืืขื, ืืขืจืืืขืืืขื ืืื JSON ืึธืืขืจ XML. ืื ืืึทืื ืืขื ืขื ืืคึผืืึธืึทืืขื ืฆื Hadoop, ืืื ืืืจ ืืึทืจืคึฟื ืฆื ืืืืขื ืึท ืกืืึธืจืคืจืึทื ื ืคึฟืื ืืื. ืืืจ ืงืขื ืขื ืึธืจืืึทื ืืืืจื ืึทืงืกืขืก ืฆื ืื ืืืฉืืคื ืืืืืจืื ืข, ืืืฉื, ืืืจื ืืืคึผืึทืืึท.
ืืื ืืขื ืคืึทื, ืื ืกืืขืืข ืคืื โโืื ืฆืื ืกืืึธืจืคืจืึทื ื ืืื ื ืืฉื ืืึทืืืืกื ืคืจืืขืจ. ืืขืจืฆื, ืื ืกืืขืืข ืืืื ืงืขื ืขื ื ืื ืืืื ืฆืืขื ืึทืจืืืฃ ืืื ืฉืืืึทืื, ืืืืึทื ืขืก ืืขืคึผืขื ืืก ืืืืฃ ืื ืืึทืื, ืืื ืืืจ ืืขื ืขื ืืืืื ื ืืื ืื ืืืืขืจ ืืืกืื ืกืืจืึทืงืืฉืขืจื ืืึทืื.
ืคึฟืึทืจ ืืืึทืฉืคึผืื, ืืืึทื ื ืื ืคืืืืขื ืืข ืขื ืืคืขืจ ืืื ืืึธืื:
{source: "app1", error_code: ""}
ืืื ืืึธืจืื ืคืื ืืขืจ ืืขืืืืงืขืจ ืกืืกืืขื ืงืืื ืื ืคืืืืขื ืืข ืขื ืืคืขืจ:
{source: "app1", error_code: "error", description: "Network error"}
ืืื ืึท ืจืขืืืืืึทื, ื ืึธื ืืืื ืคืขืื ืืึธื ืืืื ืฆืืืขืืขืื ืฆื ืื ืืืืืจืื ืข - ืืึทืฉืจืืึทืืื ื, ืืื ืงืืื ืืืื ืขืจ ืืืืืกื ืฆื ืขืก ืืืขื ืงืืืขื ืึธืืขืจ ื ืืฉื.
ืื ืึทืจืืขื ืคืื ืงืจืืืืืื ื ืึท ืกืืึธืจืคืจืึทื ื ืืืืฃ ืึทืืึท ืืึทืื ืืื ืืึทื ืฅ ื ืึธืจืืึทื, ืืื ืกืคึผืึทืจืง ืืื ืึท ื ืืืขืจ ืคืื ืืืฉืืจืื ืคึฟืึทืจ ืืขื. ืคึฟืึทืจ ืคึผืึทืจืกืื ื ืื ืืงืืจ ืืึทืื, ืขืก ืืื ืฉืืืฆื ืคึฟืึทืจ ืืืืืข JSON ืืื XML, ืืื ืคึฟืึทืจ ืึท ืคืจืืขืจ ืืืืืึทืงืึทื ื ืกืืฉืขืืึท, ืฉืืืฆื ืคึฟืึทืจ schemaEvolution ืืื ืฆืืืขืฉืืขืื.
ืืื ืขืจืฉืืขืจ ืืืืง, ืื ืืืืืื ื ืงืืงื ืคึผืฉืื. ืืืจ ืืึทืจืคึฟื ืฆื ื ืขืืขื ืึท ืืขืงืข ืืื JSON ืืื ืืืืขื ืขื ืขืก ืืื ืึท ืืึทืืึทืคืจืึทืืข. ืกืคึผืึทืจืง ืืืขื ืืึทืื ืึท ืกืืขืืข, ืืืขื ืื ื ืขืกืืขื ืืึทืื ืืื ืกืืจืึทืงืืฉืขืจื. ืืืืึทืืขืจ, ืึทืืฅ ืืึทืจืฃ ืืืื ืืขืจืืืขืืืขื ืืื ืคึผืึทืจืงืื, ืืืึธืก ืืื ืืืื ืืขืฉืืืฆื ืืื ืืืคึผืึทืืึท, ืืืจื ืจืขืืืฉืืกืืขืจืื ื ืื ืกืืึธืจืคืจืึทื ื ืืื ืื ืืืืืข ืืขืืึทืกืืึธืจ.
ืึทืืฅ ืืืื ื ืฆื ืืืื ืคึผืฉืื.
ืึธืืขืจ, ืขืก ืืื ื ืืฉื ืงืืึธืจ ืคืื ืื ืงืืจืฅ ืืืืฉืคืืื ืืื ืื ืืึทืงืืืืขื ืืืืฉืึทื ืืืึธืก ืฆื ืืึธื ืืื ืึท ื ืืืขืจ ืคืื ืคืจืืืืขืืขื ืืื ืคืืจ.
ืื ืืึทืงืืืืขื ืืืืฉืึทื ืืืฉืจืืืื ืึท ืฆืืืึทื ื ื ืืฉื ืฆื ืฉืึทืคึฟื ืึท ืกืืึธืจืคืจืึทื ื, ืึธืืขืจ ืฆื ืืืืขื ืขื JSON ืึธืืขืจ XML ืืื ืึท ืืึทืืึทืคืจืึทืืข.
ื ืืืืื, ืขืก ืคืฉืื ืืืืืื ืืื ืฆื ืืืืขื ืขื ืืื ืคึผืึทืจืก JSON:
df = spark.read.json(path...)
ืืึธืก ืืื ืืขื ืื ืฆื ืืึทืื ืื ืืึทืื ืื ืืืฆื ืฆื Spark.
ืืื ืคืืจ, ืื ืฉืจืืคื ืืื ืคืื ืืขืจ ืงืึธืืคึผืืืฆืืจื ืืื ื ืึธืจ ืืืืขื ืขื JSON ืืขืงืขืก ืคึฟืื ืึท ืืขืงืข ืืื ืฉืึทืคึฟื ืึท ืืึทืืึทืคืจืึทืืข. ืืขืจ ืืฆื ืืขื ืืืืก ืืืื: ืขืก ืืื ืฉืืื ืื ื ืืขืืืืกืข ืกืืึธืจืคืจืื ื, ื ืืืข ืืึทืื ืงืืืขื ืืขืื ืืื ืืจืืื, ืืขื ืืืจืฃ ืฆืืืขืื ืืื ืฆืื ืกืืึธืจืคืจืื ื, ื ืืฉื ืคืืจืืขืกื ืื ืื ืกืืขืืข ืงืขื ืืืื ืื ืืขืจืฉ.
ืืขืจ ื ืึธืจืืึทื ืกืืขืืข ืคึฟืึทืจ ืื ืื ืึท ืืืืืจืื ืข ืืื ืืื ืืืื:
ืฉืจืื ืงืกื ืืืงืก. ืื ืืึทืื ืืขื ืขื ืืึธืืืื ืืื Hadoop ืืื ืกืึทืืกืึทืงืืืึทื ื ืืขืืืขื ืจืืืึธืืืื ื ืืื ืฆืืืขืืขืื ืฆื ืึท ื ืืึทืข ืฆืขืืืืืื ื. ืขืก ืืืจื ืก ืืืืก ืึท ืืขืงืข ืืื ืขืจืฉื ืืึทืื ืคึผืึทืจืืืฉืึทื ื ืืืจื ืืึธื.
ืฉืจืื ืงืกื ืืืงืก. ืืขืฉืึทืก ืืขืจ ืขืจืฉื ืืึทืกืข, ืืขื ืืขืงืข ืืื ืืืืขื ืขื ืืื ืคึผืึทืจืกืขื ืืืจื Spark. ืื ืจืืืึทืืืื ื ืืึทืืึทืคืจืึทืืข ืืื ืืขืจืืืขืืืขื ืืื ืึท ืคึผืึทืจืกืึทืืืข ืคึฟืึธืจืืึทื, ืคึฟืึทืจ ืืืึทืฉืคึผืื, ืืื ืคึผืึทืจืงืื, ืืืึธืก ืงืขื ืขื ืืืื ืืืคึผืึธืจืืื ืืื ืืืคึผืึทืืึท. ืืึธืก ืงืจืืืืฅ ืึท ืฆืื ืืืืืจืื ืข ืืื ืึทืืข ืื ืืึทืื ืืืึธืก ืืึธืื ืึทืงืืืืืึทืืืืืื ืืื ืืขื ืคืื ื.
ืฉืจืื ืงืกื ืืืงืก. ื ืืจืืคืงืืคืืข ืืื ืืืฉืืคื ืืืึธืก ืืืขื ืืขืจืืืึทื ืืืงื ืื ืกืืึธืจืคืจืึทื ื ืืขืืขืจ ืืึธื.
ืขืก ืืื ืึท ืงืฉืื ืคืื ืื ืงืจืึทืืขื ืืึทื ืืึธืืืื ื, ืื ื ืืื ืฆื ืฆืขืืืืื ืื ืืืืืจืื ืข, ืืื ืื ืงืฉืื ืคืื ืืืื ืืืื ืื ื ืื ืึทืืืขืืืื ืข ืกืืขืืข ืคืื โโโโืื ืืืืืจืื ืข.
ืืืืืจ ื ืขืืขื ื ืืืืฉืคืื. ืืื ืก ืืึธืื ืึทื ืืขืจ ืขืจืฉืืขืจ ืฉืจืื ืคืื ืื ืื ืึท ืจืืคึผืึทืืึทืืึธืจื ืืื ืืืคึผืืึทืืขื ืึทื, ืืื JSON ืืขืงืขืก ืืขื ืขื ืืคึผืืึธืึทืืขื ืฆื ืึท ืืขืงืข.
ืงืจืืืืืื ื ืึท ืืึทืืึทืคืจืึทืืข ืคึฟืื ืืื, ืืื ืฉืคึผืึธืจื ืขืก ืืื ืึท ืืืืืจืื ืข, ืืื ื ืืฉื ืึท ืคึผืจืึธืืืขื. ืืึธืก ืืื ืืขืจ ืขืจืฉืืขืจ ืฉืจืื ืืืึธืก ืงืขื ืขื ืืืื ืืขืคึฟืื ืขื ืืืืื ืืื ืื Spark ืืึทืงืืืืขื ืืืืฉืึทื:
df = spark.read.option("mergeSchema", True).json(".../*")
df.printSchema()
root
|-- a: long (nullable = true)
|-- b: string (nullable = true)
|-- c: struct (nullable = true) |
|-- d: long (nullable = true)
ืึทืืฅ ืืืื ื ืฆื ืืืื ืคืืึทื.
ืืืจ ืืืืขื ืขื ืืื ืคึผืึทืจืกืขื JSON, ืืขืืึธืื ืืืจ ืจืึทืืขืืืขื ืื ืืึทืืึทืคืจืึทืืข ืืื ืึท ืคึผืึทืจืงืื, ืจืขืืืฉืืกืืขืจืื ื ืขืก ืืื ืืืืืข ืืื ืงืืื ืืึทืงืืืขื ืืืขื:
df.write.format(โparquetโ).option('path','<External Table Path>').saveAsTable('<Table Name>')
ืืืจ ืืึทืงืืืขื ืึท ืคึฟืขื ืฆืืขืจ.
ืึธืืขืจ, ืืขืจ ืืืืึทืืขืจ ืืึธื, ื ืืึท ืืึทืื ืคืื ืื ืืงืืจ ืืื ืฆืืืขืืขืื. ืืืจ ืืึธืื ืึท ืืขืงืข ืืื JSON ืืื ืึท ืืืืืจืื ืข ืืืฉืืคื ืคึฟืื ืืขื ืืขืงืข. ื ืึธื ืืึธืืืื ื ืื ืืืืึทืืขืจ ืคึผืขืงื ืคืื ืืึทืื ืคืื ืื ืืงืืจ, ืื ืืึทืื ืืึทืจื ืคืขืื ืืืง ืืืื ืืึธื ืก ืืืขืจื ืคืื ืืึทืื.
ืื ืืึทืืืฉืืงืึทื ืืืืืื ื ืืืึธืื ืืืื ืฆื ืฆืขืืืืื ืื ืกืืึธืจืคืจืึทื ื ืืืจื ืืึธื, ืืืึธืก ืืืขื ืืึธืื ืึทืืื ื ืึท ื ืืึทืข ืฆืขืืืืืื ื ืืขืืขืจ ืืืืึทืืขืจ ืืึธื. ืืขืจ ืืขืงืึทื ืืืึทื ืคึฟืึทืจ ืืขื ืืื ืืืื ืืึทืืืืกื, ืกืคึผืึทืจืง ืึทืืึทืื ืืืจ ืฆื ืฉืจืืึทืื ืคึผืึทืจืืืฉืึทื ื ืกืขืคึผืขืจืึทืืื.
ืขืจืฉืืขืจ, ืืืจ ืืึธื ืึทื ืขืจืฉื ืืึทืกืข, ืฉืคึผืึธืจื ืื ืืึทืื ืืื ืืืกืงืจืืืื ืืืืื, ืึทืืื ื ืืืืื ืคึผืึทืจืืืฉืึทื ืื ื. ืืขืจ ืงืึทืืฃ ืืื ืืขืจืืคึฟื ืกืืึธืจืคืจืึทื ื ืื ืืืืึทืืืืึทืืืึธื ืืื ืืื ืืขืืื ืืืืื ืึทืืึธื:
df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)
ืืขืจ ืืืืึทืืขืจ ืืึธื, ืืืจ ืืึธืื ืืืืื ืึท ื ืืึทืข ืฆืขืืืืืื ื:
df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")
ืึทืืข ืืืึธืก ืืืืืื ืืื ืฆื ืฉืืึทืขื-ืจืขืืืกืืจืืจื ืืื ืืืืืข ืฆื ืืขืจืืืึทื ืืืงื ืื ืกืืฉืขืืึท.
ืึธืืขืจ, ืืึธืก ืืื ืืื ืคึผืจืึธืืืขืืก ืืืืคืฉืืืื.
ืขืจืฉืืขืจ ืคึผืจืึธืืืขื. ืืืืขืจ ืึธืืขืจ ืฉืคึผืขืืขืจ, ืื ืจืืืึทืืืื ื ืคึผืึทืจืงืื ืืืขื ืืืื ืึทื ืจืืืึทืืึทื. ืืึธืก ืืื ืจืขืื ืฆื ืืขื ืืื ืคึผืึทืจืงืื ืืื JSON ืืืึทืื ืืืืืืง ืคืขืืืขืจ ืึทื ืืขืจืฉ.
ืืื ืก ืืึทืืจืึทืืื ืึท ืืืคึผืืฉ ืกืืืืึทืฆืืข. ืฆืื ืืืืฉืคึผืื, ื ืขืืื ืงืืื JSON:
ะะตะฝั 1: {"a": {"b": 1}},
ืืื ืืืึทื ื ืืขืจ ืืขืืืืงืขืจ JSON ืงืืงื ืืื ืืึธืก:
ะะตะฝั 2: {"a": null}
ืืื ืก ืืึธืื ืืืจ ืืึธืื ืฆืืืื ืคืึทืจืฉืืืขื ืข ืคึผืึทืจืืืฉืึทื ื, ืืขืืขืจ ืืื ืืืื ืฉืืจื.
ืืืขื ืืืจ ืืืืขื ืขื ืื ืืื ืฆืข ืืงืืจ ืืึทืื, ืกืคึผืึทืจืง ืืืขื ืงืขื ืขื ืฆื ืืึทืฉืืืืขื ืืขื ืืืคึผ, ืืื ืืืขื ืคึฟืึทืจืฉืืืื ืึทื "ืึท" ืืื ืึท ืคืขืื ืคืื ืืืคึผ "ืกืืจืืงืืืจ", ืืื ืึท ื ืขืกืืขื ืคืขืื "ื" ืคืื ืืืคึผ INT. ืึธืืขืจ, ืืืื ืืขืืขืจ ืฆืขืืืืืื ื ืืื ืืขืจืืืขืืืขื ืกืขืคึผืขืจืึทืืื, ืืืจ ืืึทืงืืืขื ืึท ืคึผืึทืจืงืื ืืื ืื ืงืึทืืคึผืึทืืึทืืึทื ืฆืขืืืืืื ื ืกืงืืื:
df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)
ืื ืกืืืืึทืฆืืข ืืื ืืึทืืืืกื, ืึทืืื ืึทื ืึธืคึผืฆืืข ืืื ืกืคึผืขืฆืืขื ืฆืืืขืืขืื - ืืืขื ืคึผืึทืจืกืื ื ืื ืืงืืจ ืืึทืื, ืึทืจืึธืคึผื ืขืืขื ืืืืืืง ืคืขืืืขืจ:
df = spark.read.json("...", dropFieldIfAllNull=True)
ืืื ืืขื ืคืึทื, ืื ืคึผืึทืจืงืื ืืืขื ืฆืื ืืืคืฉืืขืื ืืื ืคืื ืคึผืึทืจืืืฉืึทื ื ืืืึธืก ืงืขื ืขื ืืืื ืืืืขื ืขื ืฆืืืึทืืขื.
ืืึธืืฉ ืื ืืืึธืก ืืึธืื ืืึธืก ืืขืืึธื ืืื ืคืืจ ืืืขืื ืืึธ ืืืืขืจ ืฉืืืืืืขื. ืคืืจืืืืก? ืืึธ, ืืืืึทื ืขืก ืืขื ืขื ืืกืชึผืื ืฆื ืืืื ืฆืืืื ืืขืจ ืกืืืืึทืืืึธื ืก. ืึธืืขืจ ืืจืืึท. ืึธืืขืจ ืคืืจ. ืืขืจ ืขืจืฉืืขืจ, ืืืึธืก ืืืขื ืึผืืขื ืืืืขืจ ืคึผืึทืกืืจื, ืืื ืึทื ื ืืืขืจืืง ืืืืคึผืก ืืืขื ืงืืงื ืึทื ืืขืจืฉ ืืื ืคืึทืจืฉืืืขื ืข JSON ืืขืงืขืก. ืคึฟืึทืจ ืืืึทืฉืคึผืื, {ืื ืืคืืขืื: 1} ืืื {ืื ืืคืืขืื: 1.1}. ืืืื ืึทืืึท ืคืขืืืขืจ ืืขื ืขื ืืขืคึฟืื ืขื ืืื ืืืื ืฆืขืืืืืื ื, ืื ืกืืฉืขืืึท ืฆืื ืืืคืืืกื ืืืขื ืืืืขื ืขื ืึทืืฅ ืจืืืืืง, ืืืึธืก ืืืขื ืคืืจื ืฆื ืื ืืขืจืกื ืคึผืื ืืืขื ืืืคึผ. ืืืขืจ ืืืื ืืื ืคืึทืจืฉืืืขื ืข ืึธื ืขืก, ืืืื ืขืจ ืืืขื ืืึธืื intField: int, ืืื ืื ืื ืืขืจืข ืืืขื ืืึธืื intField: ืืึธืคึผื.
ืขืก ืืื ืื ืคืืืืขื ืืข ืคืึธื ืฆื ืืึทื ืืืขื ืืื ืืขื ืกืืืืึทืฆืืข:
df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)
ืืืฆื ืืืจ ืืึธืื ืึท ืืขืงืข ืืื ืขืก ืืขื ืขื ืคึผืึทืจืืืฉืึทื ื ืืืึธืก ืงืขื ืขื ืืืื ืืืืขื ืขื ืืื ืึท ืืืื ืืึทืืึทืคืจืึทืืข ืืื ืึท ืืืืืืง ืคึผืึทืจืงืื ืคืื ืื ืืื ืฆืข ืืืืืจืื ืข. ืื? ื ืืื.
ืืืจ ืืืื ืืขืืขื ืงืขื ืึทื ืืืจ ืจืขืืืกืืจืืจื ืื ืืืฉ ืืื ืืืืืข. ืืืืืข ืืื ื ืืฉื ืคืึทื-ืฉืคึผืืจืขืืืืืง ืืื ืคืขืื ื ืขืืขื, ืืฉืขืช ืคึผืึทืจืงืื ืืื ืคืึทื-ืฉืคึผืืจืขืืืืืง. ืืขืจืืืขืจ, ืคึผืึทืจืืืฉืึทื ื ืืื ืกืืฉืขืืึทืก: field1: int ืืื Field1: int ืืขื ืขื ืื ืืขืืืข ืคึฟืึทืจ ืืืืืข, ืึธืืขืจ ื ืืฉื ืคึฟืึทืจ ืกืคึผืึทืจืง. ืื ืืืืกื ื ืืฉื ืคืึทืจืืขืกื ืฆื ืืขืจ ืื ืคืขืื ื ืขืืขื ืฆื ื ืืืขืจืืงืขืจ ืคืึทื.
ื ืึธื ืืขื, ืึทืืฅ ืืืื ื ืฆื ืืืื ืืื.
ืึธืืขืจ, ื ืื ืึทืืข ืึทืืื ืคึผืฉืื. ืขืก ืืื ืื ื ืฆืืืืืืข, ืืืื ืืืงืื ืืข ืคืจืืืืขื. ืืื ื ืืขืืขืจ ื ืืึท ืฆืขืืืืืื ื ืืื ืืขืจืืืขืืืขื ืกืขืคึผืขืจืึทืืื, ืื ืฆืขืืืืืื ื ืืขืงืข ืืืขื ืึทื ืืืึทืืื Spark ืกืขืจืืืืก ืืขืงืขืก, ืคึฟืึทืจ ืืืึทืฉืคึผืื, ืื _SUCCESS ืึธืคึผืขืจืึทืฆืืข ืืฆืืื ืคืึธื. ืืขื ืืืขื ืจืขืืืืืึทื ืืื ืึท ืืขืืช ืืืขื ืืจืืื ื ืฆื ืคึผืึทืจืงืื. ืฆื ืืืกืืืืื ืืขื, ืืืจ ืืึทืจืคึฟื ืฆื ืงืึทื ืคืืืืขืจ ืื ืงืึทื ืคืืืืขืจืืืฉืึทื ืฆื ืคืึทืจืืืึทืื ืกืคึผืึทืจืง ืฆื ืืืืื ืกืขืจืืืืก ืืขืงืขืก ืฆื ืืขืจ ืืขืงืข:
hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
ืขืก ืืืื ื ืึทื ืืืฆื ืืขืืขืจ ืืึธื ืึท ื ืืึท ืคึผืึทืจืงืื ืฆืขืืืืืื ื ืืื ืืืกืืฃ ืฆื ืื ืฆืื ืืืืืจืื ืข ืืขืงืข, ืืื ืื ืคึผืึทืจืกืขื ืืึทืื ืคึฟืึทืจ ืืขื ืืึธื ืืื ืืืื. ืืืจ ืืึธืื ืืื ืฉืืืึทืื ืืึธืจืื ืึทื ืขืก ืืขื ืขื ืงืืื ืคึผืึทืจืืืฉืึทื ื ืืื ืึท ืืึทืื ืืืคึผ ืงืึธื ืคืืืงื.
ืึธืืขืจ, ืืืจ ืืึธืื ืึท ืืจืื ืคึผืจืึธืืืขื. ืืขืฆื ืืื ื ืืฉื ืืืงืื ื ืื ืืืืขืืืื ืข ืกืืขืืข, ืืขืจืฆื ืืื ืืขืจ ืืืฉ ืืื ืืืื ืื ืืืืจืขืื ืกืืขืืข, ืืืืืืื ืืขืืข ื ืืืข ืฆืขืืืืืื ื ืืื ืืขืจืกืื ืก ืืจืืื ืืขืืจืขื ืื ื ืืืกืืึธืจืฉืึทื ืืื ืื ืกืืขืืข.
ืืืจ ืืึทืจืคึฟื ืฆื ืฉืืึทืขื-ืจืขืืืกืืจืืจื ืื ืืืฉ. ืืึธืก ืงืขื ืืืื ืืขืืื ืคืฉืื: ืืืืขื ืขื ืื ืคึผืึทืจืงืื ืคืื ืื ืกืืึธืจืคืจืึทื ื ืืืืืขืจ, ื ืขืืขื ืื ืกืืฉืขืืึท ืืื ืฉืึทืคึฟื ืึท ืืื ืืืืืจื ืืืืฃ ืขืก, ืืื ืืืึธืก ืฆื ืจืข-ืจืขืืืกืืจืืจื ืื ืืขืงืข ืืื ืืืืืข ืืื ืึท ืคืื ืืจืืืกื ืืืง ืืืฉ, ืึทืคึผืืืืืื ื ืื ืกืืฉืขืืึท ืคืื ืื ืฆืื ืกืืึธืจืคืจืึทื ื.
ืืืจ ืืึธืื ืึท ืคืขืจื ืคึผืจืึธืืืขื. ืืืขื ืืืจ ืจืขืืืกืืจืืจื ืื ืืืฉ ืคึฟืึทืจ ืื ืขืจืฉืืขืจ ืืึธื, ืืืจ ืคืึทืจืืึธืื ืืื ืืืืฃ ืกืคึผืึทืจืง. ืืืฆื ืืืจ ืืึธื ืขืก ืืื, ืืื ืืืจ ืืึทืจืคึฟื ืฆื ืืขืืขื ืงืขื ืึทื ืคึผืึทืจืงืื ืคืขืืืขืจ ืงืขื ืขื ืึธื ืืืืื ืืื ืืืชืืืช ืืืึธืก ืืขื ืขื ื ืืฉื ืขืจืืืืื ืคึฟืึทืจ ืืืืืข. ืคึฟืึทืจ ืืืึทืฉืคึผืื, ืกืคึผืึทืจืง ืืืืจืคื ืขืจ ืืืืก ืฉืืจืืช ืืืึธืก ืขืก ืงืขื ื ืืฉื ืคึผืึทืจืกืืจื ืืื ืื "ืงืึธืจืจืืคึผื_ืจืขืงืึธืจื" ืคืขืื. ืึทืืึท ืคืขืื ืงืขื ืขื ื ืื ืืืื ืจืขืืืกืืจืืจื ืืื ืืืืืข ืึธื ืืืื ืื ืืจืื ืขื.
ืืืืื ืืึธืก, ืืืจ ืืึทืงืืืขื ืื ืกืืขืืข:
f_def = ""
for f in pf.dtypes:
if f[0] != "date_load":
f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<")
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)
ืงืึธืืขืงืก ("_corrupt_record", "`_corrupt_record`") + "" + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("ืืขื ืืข <`", "ืืขื ืืข <") ืืืื ืืืืขืจ DDL, ื"ื ืึทื ืฉืืึธื ืคืื:
create table tname (_field1 string, 1field string)
ืืื ืคืขืื ื ืขืืขื ืืื "_field1, 1field", ืืืืขืจ DDL ืืื ืืขืืืื ืืื ืื ืคืขืื ื ืขืืขื ืืขื ืขื ืื ืืจืื ืขื: ืฉืึทืคึฟื ืืืฉ `ืื ืึทืืข` (`_field1` ืฉืืจืืงื, `1field` ืฉืืจืืงื).
ืื ืงืฉืื ืขืจืืืืึทื: ืืื ืฆื ืืึทืงืืืขื ืึท ืืึทืืึทืคืจืึทืืข ืืื ืึท ืืึทื ืฅ ืกืืขืืข (ืืื ืคึผืฃ ืงืึธื)? ืืื ืฆื ืืึทืงืืืขื ืืขื ืคึผืฃ? ืืึธืก ืืื ืืขืจ ืคืื ืคืืขืจ ืคึผืจืึธืืืขื. ืจืืืืืขื ืขื ืื ืกืืขืืข ืคืื โโโโืึทืืข ืคึผืึทืจืืืฉืึทื ื ืคืื ืืขืจ ืืขืงืข ืืื ืคึผืึทืจืงืื ืืขืงืขืก ืคืื ืื ืฆืื ืืืืืจืื ืข? ืืขื ืืืคึฟื ืืื ืื ืกืืืคืึทืกื, ืึธืืขืจ ืฉืืืขืจ.
ืื ืกืืขืืข ืืื ืฉืืื ืืื ืืืืืข. ืืืจ ืงืขื ืขื ืืึทืงืืืขื ืึท ื ืืึทืข ืกืืฉืขืืึท ืืืจื ืงืึทืืืืื ืื ื ืื ืกืืฉืขืืึท ืคืื ืื ืืื ืฆืข ืืืฉ ืืื ืื ื ืืึทืข ืฆืขืืืืืื ื. ืึทืืื ืืืจ ืืึทืจืคึฟื ืฆื ื ืขืืขื ืื ืืืฉ ืกืืฉืขืืึท ืคึฟืื ืืืืืข ืืื ืคืึทืจืืื ืื ืขืก ืืื ืื ืกืืฉืขืืึท ืคืื ืื ื ืืึทืข ืฆืขืืืืืื ื. ืืึธืก ืงืขื ืืืื ืืขืืื ืืืจื ืืืืขื ืขื ืื ืคึผืจืึธืืข ืืขืืึทืืึทืืึท ืคืื ืืืืืข, ืฉืคึผืึธืจื ืขืก ืืื ืึท ืฆืืึทืืืืืึทืืืง ืืขืงืข, ืืื ื ืืฆื ืกืคึผืึทืจืง ืฆื ืืืืขื ืขื ืืืืืข ืคึผืึทืจืืืฉืึทื ื ืืื ืึทืืึธื.
ืืื ืคืึทืงื, ืขืก ืืื ืึทืืฅ ืืืจ ืืึทืจืคึฟื: ืืขืจ ืึธืจืืืื ืขื ืืืฉ ืกืืฉืขืืึท ืืื ืืืืืข ืืื ืื ื ืืึทืข ืฆืขืืืืืื ื. ืืืจ ืืืื ืืึธืื ืืึทืื. ืขืก ืืืืืื ื ืึธืจ ืฆื ืืึทืงืืืขื ืึท ื ืืึทืข ืกืืฉืขืืึท ืืืึธืก ืงืึทืืืืื ื ืื ืกืืึธืจืคืจืึทื ื ืกืืฉืขืืึท ืืื ื ืืึทืข ืคืขืืืขืจ ืคึฟืื ืื ืืืฉืืคื ืฆืขืืืืืื ื:
from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")
ืืืืึทืืขืจ, ืืืจ ืืึทืื ืื ืืืฉ ืจืขืืืกืืจืึทืฆืืข DDL, ืืื ืืื ืื ืคืจืืขืจืืืงืข ืกื ืืคึผืึทื.
ืืืื ืื ืืื ืฆืข ืงืืื ืึทืจืืขื ืจืืืืืง, ื ืืืืื, ืขืก ืืื ืืขืืืขื ืึท ืื ืืืืึทืืืืื ื ืืึทืกืข, ืืื ืื ืืืฉ ืืื ืืืฉืืคื ืจืืืืืง ืืื ืืืืืข, ืืืจ ืืึทืงืืืขื ืึท ืืขืจืืืึทื ืืืงื ืืืฉ ืกืืฉืขืืึท.
ืืื ืื ืืขืฆืืข ืคึผืจืึธืืืขื ืืื ืึทื ืืืจ ืงืขื ืขื ื ืืฉื ื ืึธืจ ืืืืื ืึท ืฆืขืืืืืื ื ืฆื ืึท ืืืืืข ืืืฉ, ืืืืึทื ืขืก ืืืขื ืืืื ืฆืขืืจืืื. ืืืจ ืืึทืจืคึฟื ืฆื ืฆืืืื ืืขื ืืืืืข ืฆื ืคืึทืจืจืืืื ืืืึทื ืฆืขืืืืืื ื ืกืืจืืงืืืจ:
from pyspark.sql import HiveContext
hc = HiveContext(spark)
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)
ืื ืคึผืฉืื ืึทืจืืขื ืคืื ืืืืขื ืขื JSON ืืื ืฉืึทืคึฟื ืึท ืกืืึธืจืคืจืึทื ื ืืืืืจื ืืืืฃ ืขืก ืจืืืึทืืืื ืืื ืึธืืืืขืจืงืึทืืื ื ืึท ื ืืืขืจ ืคืื ืืืคึผืืืกืึทื ืฉืืืขืจืืงืืืื, ืกืึทืืืฉืึทื ื ืคึฟืึทืจ ืืืึธืก ืืืจ ืืึธืื ืฆื ืงืืงื ืคึฟืึทืจ ืกืขืคึผืขืจืึทืืื. ืืื ืืึธืืฉ ืื ืกืึทืืืฉืึทื ื ืืขื ืขื ืคึผืฉืื, ืขืก ื ืขืื ืึท ืคึผืืึทืฅ ืคืื ืฆืืื ืฆื ืืขืคึฟืื ืขื ืืื.
ืฆื ืื ืกืืจืืืขื ื ืื ืงืึทื ืกืืจืึทืงืฉืึทื ืคืื ืื ืืืืืจืื ืข, ืืื ืืขืืื ืฆื:
- ืืืื ืคึผืึทืจืืืฉืึทื ื ืฆื ืื ืืืืืจืื ืข, ืืึทืงืืืขื ืืึทืคืจืืึทืขื ืคืื ืกืขืจืืืืก ืืขืงืขืก
- ืืึทื ืืืขื ืืื ืืืืืืง ืคืขืืืขืจ ืืื ืืงืืจ ืืึทืื ืืืึธืก ืกืคึผืึทืจืง ืืื ืืืืคึผื
- ืืืึทืจืคื ืคึผืฉืื ืืืืคึผืก ืฆื ืึท ืฉืืจืืงื
- ืืขืจ ืคืขืื ื ืขืืขื ืฆื ืืึธืืืขืจืงืึทืกืข
- ืืึทืืื ืืขืจ ืืึทืื ืืคึผืืึธืึทื ืืื ืืืฉ ืจืขืืืกืืจืึทืฆืืข ืืื ืืืืืข (DDL ืืืจ)
- ืื ืืืืกื ื ืืฉื ืคืึทืจืืขืกื ืฆื ืึทื ืืืืืคื ืคืขืื ื ืขืืขื ืืืึธืก ืงืขื ืืืื ืื ืงืึทืืคึผืึทืืึทืืึทื ืืื ืืืืืข
- ืืขืจื ืขื ืืื ืฆื ืืขืจืืืึทื ืืืงื ืืืฉ ืจืขืืืกืืจืึทืฆืืข ืืื ืืืืืข
ืกืึทืืื ื ืึทืจืืืฃ, ืืืจ ืืึธื ืึทื ืืขืจ ืืึทืฉืืืก ืฆื ืืืืขื ืงืจืึธื ืคึฟืขื ืฆืืขืจ ืืื ืคืจืึธื ืืื ืคืืืข ืคึผืืืคืึธืื. ืืขืจืืืขืจ, ืืื ืคืึทื ืคืื ืืืคึผืืึทืืขื ืืืืฉืึทื ืฉืืืขืจืืงืืืื, ืขืก ืืื ืืขืกืขืจ ืฆื ืงืึธื ืืึทืงื ืึท ืืงืกืคึผืืจืืึทื ืกื ืฉืืืขืฃ ืืื ืืฆืืื ืขืงืกืคึผืขืจืืื.
ืืื ืง ืืืจ ืคึฟืึทืจ ืืืืขื ืขื ืืขื ืึทืจืืืงื, ืืืจ ืืึธืคึฟื ืืืจ ืืขืคึฟืื ืขื ืื ืืื ืคึฟืึธืจืืึทืฆืืข ื ืืฆืืง.
ืืงืืจ: www.habr.com