α’αααα’αΆαααΆααΈαααα ααααααα’!
αα αααα»αα’αααααααα ααΈααααΉααααΆααΆααα»ααααααααααα’αΆααΈααααα Big Data Solutions αααα Neoflex αα·αααααΆαααα’α·αα’αααΈαααααΎααααααΆααααΆααααΆααα ααΆααααααααα’ααααααααααΎααααΆαα Apache Spark α
ααΆααααααααααααααα·ααΆααα·αααααα ααΆααα·α αα ααααΆααααΆααα»αα αΆααααααα’ααααΎαα·αααααααααααΆααα ααΆααααααααααα»αααΆααΉαααΆααααΎαα‘αΎαα
ααΆααααααΆααΆαααααααΊααΆαααααα ααα» α¬ααΆαααααΎαααααΈαααααααααααααα αααααααΌαααΆααααααΆαα»αααΆ JSON α¬ XMLα αα·ααααααβααααΌαβααΆαβαααα»αβα‘αΎαβαα Hadoop αααααΆααβααβα’αααβααααΌαβααΆαβαααααΎαβαα»αβα αΆαβααΈβαα½αβααβα ααΎαα’αΆα αααα αααΆαα αΌααα ααΆααααααααααΆαααααα αΆααααααΆααααααΎαα§ααΆα αααααΆαααα Impala α
αααα»αααααΈααα αααααααΆααααααα»αα αΆαααααα αα·αααααΌαααΆαααααΉαααΆαα»αααα ααΆαααααα ααα αααααααΆαααααααααα·αα’αΆα ααΌααα»αααΆαα»αααΆαααα αααααααΆα’αΆαααααααΎαα·αααααα α αΎαααΎααααα»ααααααααΆαααΆαα½ααα·αααααααααααΆααα ααΆααααααααααα»αααααΆααααΆαααααα
α§ααΆα ααα ααααααα ααΆαααααΎαααααΆααααααααααΌαααΆααααααααΆα
{source: "app1", error_code: ""}
α αΎαααααααα’αα ααΈαααααααααααα½ααα α ααααΎαααΆααααααα
{source: "app1", error_code: "error", description: "Network error"}
ααΆαααααα ααΆααα½αααααααααααα½αααααααΌαααΆααααααααα αααααααααα αΆα - ααΆααα·αααααΆ α αΎαααααΆαααααΆααααΆααααΉαααΆααΆααΉαααα¬α’αααααααα
ααΆααα·α αα ααααΆααααααΎα storefront ααΎαα·ααααααααααααααΊαα·αααΆαααααααΆαα αΎα Spark ααΆαα§ααααααα½αα ααα½ααααααΆααααΏααααα αααααΆααααΆααα·ααΆαααααααα·αααααα ααΆαααΆαααΆαααααααααΆααααΆαα JSON αα·α XML α αΎααααααΆαααααααααΆαααααααα·αααααΆααααΈαα»α ααΆαααΆαααααααααΆαα schemaEvolution ααααΌαααΆααααααααΌαα
αα glance ααααΌα, αααααααααΆαααΎααα ααΆααααα α’αααααααΌααα Folder ααΆαα½α JSON α αΎαα’αΆαααΆαα αααα»α dataframeα Spark ααΉααααααΎααααααααΆααα αααααααα·αααααααααααΆαααΆααα αΌααα αααα»ααα ααΆααααααααα ααΎαααΈαααααα α’αααΈααααααααΆαααααΌααααααΆαα»ααα αααα»α parquet αααααααΌαααΆαααΆααααααααααα αααα»α Impala αααααΆαα α»αααααααα»αα αΆααα αααα»α Hive metastore α
α’αααΈααααααααΆαα αΆααααΌα
ααΆααΆααααα
αααααΆαααΆαααΆααααα ααΆαα·αα
αααΆααααααΈα§ααΆα αααααααΈααα
αααα»αα―αααΆα α’αααΈαααααααΌαααααΎααΆαα½ααααα αΆαα½αα
ααα½ααααα»αααΆαα’αα»ααααα
α―αααΆααα·αααααΆα’αααΈαα·ααΈααΆααααααα·ααααααΎα storefront ααα»ααααααΎααααΈα’αΆα JSON α¬ XML αα αααα»α dataframe α
αααααΊααΆααααΆαααααααα αΆαααΈααααα’αΆα αα·αααα JSONα
df = spark.read.json(path...)
αααααΊαααααααααΆααααΎααααΈααααΎα±αααα·ααααααααΆααααααΆαα Spark α
αα αααα»αααΆαα’αα»αααα ααααααΈαααΆαααΆααααα»αααααΆαααΆαααΆαα’αΆαα―αααΆα JSON ααΈααα―αααΆα αα·ααααααΎαααα»ααα·ααααααα ααααΆαααΆαααΎααα ααΌα αααα ααΆααα»αα αΆαααΆααααΆαααα½α α αΎα αα·ααααααααααΈα αΌαααααΆαααααΆαααααα αα½αααααααΌααααααααα αα»αα αΆαααααα·αααααα ααΆαααααααΆαααα’αΆα αα»αααααΆα
αααααααΆαααααααααΆαααααΆααααΆαααΆααααααααααααΆαααααα αΆαααΆαααΌα ααΆαααααα:
ααα αΆα 1 α αα·ααααααβααααΌαβααΆαβαααα»αβαα βαααα»α Hadoop ααΆαα½αβααΉαβααΆαβαααα»αβα‘αΎαβαα·αβαααα αΆαβααααβααΆβααααβαααααΆααβαα·αβααΆαβααααααβαα βααΆαβααΆβαβααααΈβαα½αβα ααΆααααα αααααααααΆααα·ααααααααααΌααααα ααααΆαααααα
ααα αΆα 2 α ααα‘α»αααααααα»αααααΌα αααααααααΌαααΆαα’αΆα αα·ααααααα Spark α ααα»ααα·ααααααααααααααααΌαααΆααααααΆαα»αααΆαααααααααα’αΆα αααααΆα α§ααΆα ααααα αααα»α parquet ααααααααΆααααα’αΆα ααααΌαααΆαααΆαα αΌααα αααα»α Impala α ααΆαααααΎαααΆααααα αΆαααααα ααΆαα½αααΉααα·ααααααααΆααα’αααααααΆααααααΌααααα»ααα αΌααααα ααα»α αααα
ααα αΆα 3 α ααΆαααΆαααααααΌαααΆααααααΎαα‘αΎααααααΉαααααΎαα
αα
α»ααααααααΆααα»αα αΆαααΆαααααΆααααααα
ααΆααααα½αααααΆααααααΎαααΆααααα»α αααααΌαααΆααααα»αααΆααααα
ααααααααααΆαααααα αΆα αα·ααααα½αααααΆααααααααΆαααααααΆαααααΌαα
ααααααααααΆαααααα αΆαα
ααΌαααΎαα§ααΆα ααααα½αα α αΌααα·ααΆαααΆααα αΆαααααΌαααααΆααααααΎαααααΆααααααΌαααΆαα’αα»ααααα αΎαα―αααΆα JSON ααααΌαααΆααααα»αα‘αΎααα ααΆααααα―αααΆαα
ααΆααααααΎα dataframe ααΈαα½αααΆ αααααΆαααααααααΆαα»αααΆααΆααΆααααα αΆα αα·ααααααΆαααα αΆααα αααααΊααΆααα αΆαααααΌααααα»ααααα’αΆα ααααΌαααΆαααααΎααααΆαααΆααααα½ααα αααα»αα―αααΆα Sparkα
df = spark.read.option("mergeSchema", True).json(".../*")
df.printSchema()
root
|-- a: long (nullable = true)
|-- b: string (nullable = true)
|-- c: struct (nullable = true) |
|-- d: long (nullable = true)
α’αααΈααααααααΆαα αΆααααΌα ααΆααα’α
ααΎαα’αΆα αα·αααα JSON αααααΆααααααΎααααααΆαα»α dataframe ααΆ parquet αααα α»ααααααααΆαα αααα»α Hive ααΆααααααααΆαααΆααααα½αααΆαα½αα
df.write.format(βparquetβ).option('path','<External Table Path>').saveAsTable('<Table Name>')
ααΎαααα½αααΆααααα’α½α α
ααα»αααααα αααααααααΆαα αα·ααααααααααΈααΈαααααααααΌαααΆαααααααα ααΎαβααΆαβααβα―αααΆαβαααβααΆα JSON αα·αβααααΆααβαααα αΆαβαααβααΆαβαααααΎαβααΈβααβαααα αααααΆααααΈαααα»ααα·ααααααααααα»ααααααΆααααΈααααα Data mart ααΆαααα·ααααααααααααα½αααααα
αααααααααΆαα‘αΌααΈααααΊααααΌααααα αααα»αα αΆαααΆααα½ααααα αααααΉαα’αα»ααααΆαα±ααααααααααΆαααΆαααααΈαααααΆαααααααααααΆααα ααααααΆααααααΆααααΆααααααααΌαααΆαααααααΆααααααα Spark α’αα»ααααΆαα±ααα’ααααααααααΆαααΆαααΆα ααααα‘ααα
ααααΌαααΎαααααΎααΆααααα»αααααΌαααααααααΆαα»ααα·ααααααααΌα αααααΆααα·αααααΆααΆαααΎαααααααααααααΆααααα ααααα»αααααα αααααααΆααααααααΌαααΆαααα α ααΆααΆαα αΆααααααΎα storefront α αΎαααααΌαααΆαααααΎαααααααααα
df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)
αα αααααααααΆαα ααΎααααα»αααααΆαααΆαααααΈααα»αααααα
df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")
α’αααΈααααα
ααααααααΊααααΌαα
α»ααααααα‘αΎααα·ααα
αααα»α Hive ααΎααααΈααααΎαα
αα
α»ααααααααΆααααααααΆαααα
ααααααΆαααΆααααααααααΊααΆααααααααααααα αΆααΎαα‘αΎαα
αααα αΆααααΌαα αα·αααΌααα·αααΆαα parquet ααααααααΉααα·αα’αΆα α’αΆαααΆαααα αααααΊαααααΆαααααααααα parquet αα·α JSON ααααΆααΆαααΆαααααα»αααααΆα
α αΌαααΎααα·α αΆαααΆα’αααΈααααΆαααΆαααααααΆα α§ααΆα ααα αααα·ααα·α JSON αααααα
ΠΠ΅Π½Ρ 1: {"a": {"b": 1}},
α αΎαααααααα JSON ααΌα ααααΆααΎααα ααΌα αααα
ΠΠ΅Π½Ρ 2: {"a": null}
α§αααΆααΆααΎαααΆαααΆαααΆαααΈααααααααααΆ αααααΈαα½ααααΆααααααΆαααα½αα
αα
ααααααααΎαα’αΆααα·αααααααααααααΆααααΌα Spark ααΉαα’αΆα
ααααααααααα α αΎαααΉααααααΆ "a" ααΊααΆααΆααααααααα "structure" ααΆαα½αααΉαααΆααααααΆαα "b" αααααααα INT α ααα»ααααααααα·αααΎααΆαααΆαααΈαα½ααααααΌαααΆααααααΆαα»ααααα‘ααααΈααααΆαααααΎαααα½αααΆα parquet ααΆαα½αααΉααααααααΆαααααΆααα·αααααααΆ:
df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)
ααααΆαααΆααααααααΌαααΆαααααααΆαααααΆαα αααΆαα ααΌα αααααααααΎααα½αααααΌαααΆαααααααααΆαα·ααα - αα αααααααα·ααααααααααα ααααΆααααα ααα
df = spark.read.json("...", dropFieldIfAllNull=True)
αααα»αααααΈααα parquet ααΉαααΆαααΆαααΆααααα’αΆα
α’αΆαααΆαα½αααααΆααΆαα
ααααΈαααα·αααα’ααααααααΆαααααΎααΆαα
αααα»αααΆαα’αα»ααααααΉααααΉααααΆαααΌαα
αααα
ααΈααααααααα α ααα»α’αααΈ? ααΆα / α
αΆααααααααααααΆααΆαααααΆαααΆαααΈααααα α¬ααΈα α¬αα½αα ααΈαα½α αααααααΎαααααΉαααΎαα‘αΎα ααΊααΆαααααααααααΉαααΎααα
αα»αααααΆαα
αααα»αα―αααΆα JSON ααααααααααΆα α§ααΆα ααα {intField: 1} αα·α {intField: 1.1} α ααααα·αααΎααΆαααααααααααΌαααΆαααααΎααα
αααα»αααΆαααΆααα½α αααααΆααααα
αΌαααααΆαααααααααΆαααααΉαα’αΆαα’αααΈααααααααΆαααααΉαααααΌααααααΆααα
αααααααααααααΉαααααΌααααα»αα ααα»ααααααααα·αααΎαα
αααα»ααα½ααααααααααΆ ααααα½αααΉαααΆα intField: int α αΎααα½ααααααΉαααΆα intField: ααααα
ααΆααααααΆααααααααΎααααΈαααααααΆαααααΆαααΆααααα
df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)
α₯α‘αΌααααααΎαααΆααααααααΆαααΆαααΆααααα’αΆα ααααΌαααΆαα’αΆαα αΌααα αααα»α dataframe αααα½ααα·α parquet ααααΉαααααΌααα showcase ααΆααααΌαα ααΆα? αα
ααΎαααααΌαααα αα αΆαααΆααΎαααΆαα α»ααααααααΆααΆααα αααα»α Hive α Hive αα·ααααααΆααα’ααααααΌα αααα αααα»ααααααααΆααα ααααααααα parquet ααΊαααααΆααα’ααααααΌα ααα ααΌα αααα ααΆαααΆααααααΆααααααααΆαααα field1: int αα·α Field1: int ααΊααΌα ααααΆαααααΆαα Hive ααα»αααααα·αααααααααΆαα Spark ααα αα»αααααα ααααααααααααααΆααα ααΆα’ααααααΌα α
αααααΆααααΈαααα’αααΈαα αΆααααΌα ααΆααα’α
ααααααΆαααΆααααααα·ααααααΆααα’ααααΆααααααα ααΆααααα αΆααΈααΈα αααααααααΆαααααααα αααααΆαααΆαααΆαααααΈααΈαα½ααααααΌαααΆααααααΆαα»ααααα‘ααααΈααααΆ ααααΆαααΆαααΉαααΆαα―αααΆαααααΆαααα Spark α§ααΆα ααα αααααααααααααα·ααααα·ααΆα _SUCCESS α αααααΉααααααΆαα±ααααΆαααα α»ααα αααααααΆααΆα parquet α ααΎααααΈαααααΆααααα αΆααα α’αααααααΌαααααααα ααΆαααααααα ααΎααααΈααΆαααΆα Spark ααΈααΆαααααααα―αααΆαααααΆαααααα ααα
hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
ααΆα αΆααααΌα ααΆααΆα₯α‘αΌααααααΆαααααΆαααααα ααΆαααΆα parquet ααααΈααααΌαααΆααααααααα ααα―αααΆααααα αΆαααααα ααααα·αααααααααααΆαααααααααΆααααααααααααα·ααα α ααΎαααΆαααα α·ααααα»αααΆααααΆαα»αααΆαα·αααΆαααΆαααΆααααααΆααααααααααααααα·ααααααααα
ααα»ααααααΎαααΆααααα αΆααΈααΈα α₯α‘αΌαααα αααααααΆαααααΌαα αα·αααααΌαααΆαααααΉαααααα ααΎαααΈααα ααΆααΆααα αααα»α Hive ααΆααααααααΆααααα·αααααΉαααααΌα αααααΆαααΆαααΆαααααΈααΈαα½ααααααααΆααααΆαααΆααααααΌα αααααααααΆααα αααα»ααααααααΆαααα
α’αααααααΌαα α»ααααααααΆααΆαα‘αΎααα·αα αααα’αΆα ααααΌαααΆαααααΎαααααΆαααα: α’αΆα parquet αα storefront ααααααα, αααααααααΆααααα·ααααααΎα DDL αααααα’ααααΎααΆ, ααΆαα½αααΉαααΆααααα α»ααααααααααααααααα αααα»α Hive ααΆααΆααΆαααΆααααα , ααααΎαα αα α»ααααααααΆααααααααΆααααα storefront ααααα α
ααΎαααΆααααα αΆααΈαα½αα αα ααααααααΎαααΆαα α»ααααααααΆααΆαααΆααΎαααααΌα ααΎαααΉαααα’ααααΎ Spark α α₯α‘αΌααααααΎαααααΎααΆααααααα½αα―αα αΎαααΎαααααΌαα αΆαααΆααΆα parquet α’αΆα α αΆααααααΎαααΆαα½αααΉααα½α’ααααααααα·αααααΌαααΆαα’αα»ααααΆααααααΆαα Hive α α§ααΆα ααα Spark αααα αααααααΆαααααααΆαα·αα’αΆα ααααα αααα»αααΆα "corrupt_record" α ααΆααααααααα·αα’αΆα ααααΌαααΆαα α»αααααααα αααα»α Hive ααααα·αααααΌαααΆααααααα αααα½αα
αααααΉαααΏααααααΎαααα½αααΆααααααααΆαααα
f_def = ""
for f in pf.dtypes:
if f[0] != "date_load":
f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<")
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)
αααααΌα ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").αααα½α("α’αΆαα<`", "α’αΆαα<") ααααΎα±αα DDL ααΆααα»ααααα·ααΆα αααααΊαααα½αα±ααα
create table tname (_field1 string, 1field string)
ααΆαα½αααΉααααααααΆαααΌα ααΆ "_field1, 1field" αα»ααααα·ααΆα DDL ααααΌαααΆααααααΎαα‘αΎααα ααααααααααααααααΆαααααΌαααΆαααα α ααα αααααΎαααΆααΆα `tname` (`_field1` string, `1field` string)α
αααα½αααΎαα‘αΎα: ααααααΎααααΈααα½αααΆαααα»ααα·ααααααα±ααααΆαααααΉαααααΌαααΆαα½αααΉααααααααΆααααααααα (αα αααα»αααΌα pf)? ααΎααααΎααΌα ααααα ααΎααααΈααα½αααΆα pf ααα? αααααΊααΆαααα αΆααΈααααΆαα α’αΆαα‘αΎααα·αααΌααααααααΆαααααααΆαααΆαααΆααα’ααααΈααα―αααΆαααΆαα½αααΉαα―αααΆα parquet ααααααααααΆαααααα αΆαααααα ? αα·ααΈααΆααααααααααΊααΆααα»ααααα·ααΆααααα»α ααα»αααααα·ααΆαα
αααααααΆαααααΆααα½α α αΎααα αααα»α Hive α α’αααα’αΆα ααα½αααΆααααααααΆαααααααΈααααα½ααααα αΌαααααΆααΌααααααααΆαααααααΆααΆαααΆααααΌααα·αααΆαααΆαααααΈα ααΌα αααα α’αααααααΌαααααΆααΆα schema ααΈ Hive α αΎαααααααΆααΆαα½α schema αα partition ααααΈα αααα’αΆα ααααΌαααΆαααααΎαααααΆαα’αΆααα·ααααααααααΆααΆαααααααΈ Hive αααααΆαα»αααΆαα αααα»αααααααααα’αΆαααα α αΎαααααΎ Spark ααΎααααΈα’αΆαααΆαααΆαααΆααααΈααααα»αααααααα½αα
ααΆαααΆααα·αααΆαα’αααΈααααααααΆααααα’αααααααΌαααΆα: αααααααΆαααααΆααΆαααΎααα αααα»α Hive αα·αααΆαααΆαααααΈα ααΎαααααΆααα·αααααααααααα ααΆαα αααααααΎααααΈααα½αααΆααααααααΆαααααααΈααααα½ααααα αΌαααααΆααΌααααααααΆααααα»αα αΆα αα·αααΆαααααΈααΈααΆαααΆααααααΆααααααΎαα
from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")
αααααΆαααα ααΎααααααΎαααΆααΆαα
α»αααααα DDL ααΌα
αα
αααα»αα’ααααααα»ααααα
ααααα·αααΎαααααααααΆααααΆααααΌαααααΎαααΆαααΆαααααΉαααααΌα αααααΊααΆαααΆαα
αΆααααααΎααααα»α α αΎαααΆααΆαααααΌαααΆααααααΎααααΆαααααΉαααααΌααα
αααα»α Hive αααααΎαααα½αααΆαααΆααΆαααΆααΆααααααΆαααααΎαα
αα
α»ααααααααΆαα
α αΎααααα αΆα α»ααααααααΊααΆα’ααααα·αα’αΆα ααααΆααααααααααααΆαααΆααα ααΆααΆα Hive αααααααααΆααΉαααΌα α α’αααααααΌαααααα Hive ααΎααααΈαα½ααα»ααα ααΆααααααααααΆαααααααΆα
from pyspark.sql import HiveContext
hc = HiveContext(spark)
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)
ααΆααα·α αα ααΆααααααααΆαα’αΆα JSON αα·ααααααΎααα»αα αΆααααααα’ααααΎααΆααΆαα±ααααααααααΎααΆαααααΆααα½αα ααα½ααααααΆαααααααααΆααααα’αααααααΌαααααΎααααα‘ααααΈααααΆα α αΎααααααΈααΆαααααααααΆαααΆαααααααΆαααααααααΆααααααααα ααααΆααααΌαααΆααααα αααΎαααΎααααΈααααααααα½αααα
ααΎααααΈβα’αα»ααααβααΆαβααΆααααβααααααβααΆααβαααα αΆα αααα»αβααααΌαβα
- ααααααααΆαααΆααα ααααααααΆαααααα αΆα αααα αΆααα―αααΆαααααΆαααα
- αααααααΆαααΆαα½αααΆαααααα αααα»ααα·αααααααααααααα Spark ααΆαααΆααααα αΌα
- αααααααααααΆαααααα ααααα’αααα
- ααααααααααααααΆααα ααΆα’ααααααΌα
- ααΆααααα αΌααα·ααααααααΆα ααααα‘αα αα·αααΆαα α»ααααααααΆααΆααααα»α Hive (ααααΆαα DDL)
- αα»αααααα ααα ααΈαααααααΆααααα’αΆα αα·αααααΌαααααΆααΆαα½α Hive
- ααααααααααΈααααααααΎαα αα α»ααααααααΆαααΆαα α»ααααααααΆααΆααα αααα»α Hive
ααα»ααα ααΎαααααααααΆααααΆααΆααααααα α α·αααααΆαααααααα’α½α α αΆαααΊαααααααααααΆαααααΆαααΆα αααΎαα ααΌα αααααααα»αααααΈααΆαααΆαααααΆααααα»αααΆαα’αα»αααα ααΆααΆααΆααααααΎααααα»αααΆαααΆααααααααΌαααααΆααααα·αααααααΆαα½αα’αααααααΆαααααααα
ααΌαα’ααα»ααααααΆααααΆαα’αΆαα’αααααααα ααΎααααααΉαααΆα’αααααααΎαααααααΆαααΆαααααααααα
ααααα: www.habr.com