ááťá áşáážá á˝áŹááąáŹá áŹáááşáá°ááťáŹá¸á áááşášáááŹáážáááąáŹááąáˇáááşá¸ááŤá
á¤ááąáŹááşá¸ááŤá¸áá˝ááşá Neoflex á Big Data Solutions ááŻááşáááşá¸áááşáááşáĄáá˝ááş áŚá¸ááąáŹááşáĄáááŻááşáááşááśáááş Apache Spark áááŻáĄááŻáśá¸ááźáŻá ááźáąáŹááşá¸áá˛áááŻááşááąáŹáá˝á˛áˇá ááşá¸ááŻáśá áááŻá¸áááŻááşááťááşáážáŹá áŹááťáŹá¸áááşááąáŹááşááźááşá¸áĄáá˝ááş áĄááąá¸á áááşáá˝áąá¸ááťááşá ááŹááťáŹá¸ááᯠááąáŹáşááźááŹá¸ááŤáááşá
ááąááŹáá˝á˛ááźááşá¸á áááşááźáŹáážáŻáááąáŹááťááşááá áşá áááşáá áşáááŻááşá¸áĄááąááźááˇáşá ááťáąáŹáˇáá˛áá˛áááşá¸áááşá¸áá˝á˛áˇá ááşá¸ááŹá¸ááąáŹááąááŹáááŻáĄááźáąááśá áááşá¸ááťááşá¸ááźááážáŻááťáŹá¸áááşááąáŹááşááźááşá¸ááŹáááşáááş áááźáŹááááąáŤáşááąáŤááşááŤáááşá
ááŻáśáážááşáĄáŹá¸ááźááˇáş áááşá¸áááŻáˇáááş JSON áááŻáˇáááŻááş XML ááŻáśá áśááźááˇáş ááááşá¸áááşá¸ááŹá¸ááąáŹ á áá áşáĄááťááŻá¸ááťááŻá¸ááž áážááşáááşá¸ááťáŹá¸ áááŻáˇáááŻááş ááŻáśáˇááźááşáážáŻááťáŹá¸ááźá áşáááşá ááąááŹááᯠHadoop áááŻáˇ áĄááşááŻááşááŻááşááźáŽá¸á áááŻáˇááąáŹááş áááşá¸ááž á áááŻá¸áááŻááşáá áşáᯠáááşááąáŹááşáááş áááŻáĄááşáááşá áĽáááŹáĄáŹá¸ááźááˇáşá Impala áážááááˇáş áááşááŽá¸ááŹá¸ááąáŹ áááŻááşááťááşáážáŹá áŹáááŻáˇ áááşááąáŹááşáá˝ááˇáşááᯠá áŻá ááşá¸áááŻááşáááşá
á¤ááá ášá áá˝ááşá áá áşáážááşá áááŻá¸ááťááşáážáŹá áŹá áĄááźááşáĄáááşááᯠááźááŻááááááŻááşááŤá áááŻáˇáĄááźááşá áááşá¸áááşááąááŹááąáŤáşáá˝ááşáá°áááşááąáŹááźáąáŹááˇáşá áĄá áŽáĄá ááşáááŻááźááŻáááşááąá¸áá˝á˛ááááááˇáşáĄááźááşá á¤áá˝á˛áˇá ááşá¸ááŻáśáĄáá˝ááşáĄáŹá¸áááşá¸ááąáŹááąááŹáááŻááťá˝ááşáŻááşáááŻáˇáááŻááşáá˝ááşááźáąáážááşá¸ááąááŤáááşá
áĽáááŹá áááąáˇáá˝ááş áĄáąáŹááşááŤááŻáśáˇááźááşááťááşááᯠáážááşáááşá¸áááşááŹá¸áááş-
{source: "app1", error_code: ""}
ááááşááźááşáá˝ááş áĄáąáŹááşááŤááŻáśáˇááźááşáážáŻáááş áá°ááŽááąáŹá áá áşááž áá˝ááşááąáŤáşááŹáááş-
{source: "app1", error_code: "error", description: "Network error"}
ááááşáĄááąááźááˇáşá áĄááźáŹá¸áĄáá˝ááşááᯠá áááŻá¸ááťááşáážáŹá Ꮰ- ááąáŹáşááźááťááşáááŻáˇ ááąáŤááşá¸áááˇáşáááˇáşááźáŽá¸ áááşá¸áááş ááŹáááşá áááŹáááşááᯠáááşáá°ááťáž ááááááŻááşááŤá
áááŻáá˛áˇáááŻáˇááąáŹááąááŹááťáŹá¸áá˝ááş mart áá áşááŻáááşááŽá¸ááźááşá¸áááŹáááşáááş á áśááá°ááŹáážáááźáŽá¸ Spark áá˝ááş áááşá¸áĄáá˝ááşááááááŹááťáŹá¸á á˝áŹáážááááşá áĄáááşá¸áĄááźá áşááąááŹáááŻáá˝á˛ááźááşá¸á áááşááźáŹáááşáĄáá˝ááşá JSON áážááˇáş XML áážá áşááŻá ááŻáśá¸áĄáá˝ááş ááśáˇáááŻá¸áážáŻáážáááźáŽá¸ ááááşáááááááąá¸ááąáŹ schema áĄáá˝ááş schemaEvolution ááśáˇáááŻá¸áážáŻááᯠááąá¸ááŹá¸áááşá
ááááá áşááťááşáážáŹ ááźáąáážááşá¸ááťááşá áááŻá¸áážááşá¸ááŤáááşá JSON ááźááˇáş áááŻááşáá˝á˛ááᯠáá°á ááąááŹááąáŹááşáááŻáˇ áááşáááş áááŻáĄááşáááşá Spark áááş schema áá áşááŻááᯠáááşááŽá¸ááźáŽá¸ nested data ááᯠáááşááąáŹááşááŻáśááťáŹá¸áĄááźá áş ááźáąáŹááşá¸áá˛ááąá¸ááŤáááşá áááŻáˇááąáŹááşá Hive metastore áá˝ááş á áááŻá¸áááŻááşááťááşáážáŹá áŹáá˝ááş á áŹáááşá¸áá˝ááşá¸ááźááşá¸ááźááˇáş Impala áá˝ááş ááśáˇáááŻá¸ááŹá¸áááˇáşáĄááŹáĄáŹá¸ááŻáśá¸ááᯠááŤááąá¸ááźááˇáşááááşá¸áááşá¸áááşáááŻáĄááşááŤáááşá
áĄááŹáĄáŹá¸ááŻáśá¸áááŻá¸áážááşá¸ááŻáśááááşá
áááŻáˇááŹáá˝ááşá á
áŹáá˝ááşá
áŹáááşá¸áážá áĽáááŹáááŻááťáŹá¸ááźááˇáş áááşáá˝áąáˇáá˝ááş ááźáżááŹááťáŹá¸á
á˝áŹáážááˇáş ááŹááŻááşááááşááᯠááážááşá¸áááşá¸ááŤá
á áŹáá˝ááşá áŹáááşá¸áááş á áááŻá¸áááŻááşááťááşáážáŹá ᏠáááşááŽá¸ááźááşá¸áĄáá˝ááşáááŻááşáᲠJSON áááŻáˇáááŻááş XML ááᯠááąááŹááąáŹááşáá áşááŻáááŻáˇ áááşááźááşá¸áĄáá˝ááş ááťááşá¸áááşááŻáśááᯠááąáŹáşááźáááşá
ááźáąáŹááááşá JSON ááᯠáááşáááŻáááşááźáŽá¸ áá˝á˛ááźááşá¸á áááşááźáŹáááá˛áááŻáᏠáááŻá¸áážááşá¸á á˝áŹááźááŹá¸ááŤáááş-
df = spark.read.json(path...)
áááşá¸áááş Spark áá˝ááş ááąááŹááážááááŻááşá áąáááş ááŻáśááąáŹááşááŤáááşá
áááşáá˝áąáˇáá˝ááşá áááŻááŤáá áşááŻááž JSON áááŻááşááťáŹá¸ááᯠáááşáážáŻááźáŽá¸ ááąááŹááąáŹááşáá áşáᯠáááşááŽá¸ááŻáśáááş áááŻáááŻáážáŻááşáá˝áąá¸ááŤáááşá áĄááźáąáĄááąáááş á¤áá˛áˇáááŻáˇ ááźá áşáááş- áĄááťááŻáˇááąáŹ ááźáá˝á˛áá áşááŻáážáááąááźáŽá ááąááŹáĄáá áşááťáŹá¸ ááąáˇá ááşááąáŹááşáážáááŹáááşá áááşá¸áááŻáˇááᯠááźáá˝á˛áá˝ááş áááˇáşáá˝ááşá¸áááş áááŻáĄááşáááşá áĄá áŽáĄá ááşáážáŹ áá˝á˛ááźáŹá¸áááŻááşáááşááᯠáááąáˇáááťáąáŹáˇáᲠáážáááąáááşá
áááŻááşááťááşáážáŹá ᏠááąáŹááşááŻááşáááşáĄáá˝ááş ááŻáśáážááşáĄá áŽáĄá ááşáážáŹ áĄáąáŹááşááŤáĄáááŻááşá¸ááźá áşáááşá
1 áĄáááşáˇá ááąááŹááᯠHadoop áá˝ááş áááşááąáŹááşááźáŽá¸ ááąáŹááşáá˝ááş ááąáˇá ááş áááşááąáŹááşá¸áááşááąá¸ááźáŽá¸ partition áĄáá áşáá áşááŻáááŻáˇ ááąáŤááşá¸áááˇáşáááşá ááááşáááş ááąáˇáĄáááŻááş áááŻááşá¸ááźáŹá¸ááŹá¸ááąáŹ áĄáááşá¸áĄááźá áşááąááŹááŤáááˇáş áááŻááşáá˝á˛áá áşááŻááźá áşáááşá
2 áĄáááşáˇá áááŚá¸á áááşááąá ááşáá˝ááşá á¤áááŻááşáá˝á˛ááᯠSpark ááŻáśá¸ááźáŽá¸ áááşááźáŽá¸ áá˝á˛ááźááşá¸á áááşááźáŹááŤáááşá ááááşááąááŹááąáŹááşááᯠImpala áá˛áááŻáˇ áááˇáşáá˝ááşá¸áááŻááşáááˇáş áĽáááŹáĄáŹá¸ááźááˇáş ááŤááąá¸ááźááˇáş áá˝á˛ááźááşá¸á áááşááźáŹáááŻááşááąáŹ ááąáŹáşáááşááźááˇáş ááááşá¸áááşá¸ááŹá¸áááşá áááşá¸áááş á¤áĄááťááşáĄáá á áŻááąáŹááşá¸ááŹá¸áááˇáş ááąááŹáĄáŹá¸ááŻáśá¸áážááˇáşáĄáá° áá áşáážááşá áááŻá¸áááŻááşááᯠáááşááŽá¸ááąá¸áááşá
3 áĄáááşáˇá á
áááŻá¸áááŻááşááťááşáážáŹá
áŹááᯠááąáˇá
ááş áĄááşááááşááŻááşáááˇáş ááąáŤááşá¸ááŻááşáá
áşáᯠáááşááŽá¸ááŹá¸áááşá
áááŻá¸ááźážááşáááşááźááşá¸áááŻááşáᏠááąá¸áá˝ááşá¸á á
áááŻá¸ááťááşáážáŹá
áŹááᯠáááŻááşá¸ááźáŹá¸áááş áááŻáĄááşááźááşá¸áážááˇáş á
áááŻá¸ááťááşáážáŹá
áŹá ááąáá°ááť áĄááźááşáĄáááşááᯠááśáˇáááŻá¸ááąá¸áááş ááąá¸áá˝ááşá¸ááťáŹá¸ ááąáŤáşááąáŤááşááŹáááşá
áĽáááŹáá áşááŻááąá¸ááźááˇáşááĄáąáŹááşá repository áá áşááŻáááşááąáŹááşááźááşá¸á ááááĄáááˇáşááᯠáĄááąáŹááşáĄáááş ááąáŹáşáá˛áˇááźáŽá¸á áááŻááşáá˝á˛áá áşááŻáááŻáˇ JSON áááŻááşááťáŹá¸ áĄááşááŻááşáááşááźááşá¸ááᯠá áŽá ááşáááşáážááşááŹá¸ááŤáááşá
áááşá¸áááŻáˇááśááž ááąááŹááąáŹááşáá áşááŻááᯠáááşááŽá¸ááźáŽá¸ááąáŹááş áááşá¸áááŻáˇááᯠááźáá˝á˛áá áşááŻáĄááźá áş ááááşá¸áááşá¸áááş ááźáżááŹááážáááŤá ááŤá Spark documentation áážáŹ áĄáá˝ááşááá° áážáŹáá˝áąáˇáááŻááşáá˛áˇ áááááŻáśá¸áĄáááˇáşááŤá
df = spark.read.option("mergeSchema", True).json(".../*")
df.printSchema()
root
|-- a: long (nullable = true)
|-- b: string (nullable = true)
|-- c: struct (nullable = true) |
|-- d: long (nullable = true)
áĄáŹá¸ááŻáśá¸áĄáááşááźáąááŻáśáááŤáááşá
ááťá˝ááşáŻááşáááŻáˇáááş JSON áááŻáááşááźáŽá¸ áá˝á˛ááźááşá¸á áááşááźáŹááźáŽá¸ááąáŹáĄááŤá ááťá˝ááşáŻááşáááŻáˇáááş ááąááŹááąáŹááşááᯠááŤááąá¸áĄááźá áşááááşá¸áááşá¸áᏠáĄáááşááźáąáááˇáşáááşá¸áááşá¸ááźááˇáş Hive áá˝ááş á áŹáááşá¸áá˝ááşá¸ááŤ-
df.write.format(âparquetâ).option('path','<External Table Path>').saveAsTable('<Table Name>')
áááşá¸ááťááşá¸ááźááážáŻáá áşááŻááážááááşá
áááŻáˇááąáŹáş ááąáŹááşáá áşááąáˇáá˝ááş áĄáááşá¸áĄááźá áşááž ááąááŹáĄáá áş áááşáááˇáşáá˛áˇáááşá ááťá˝ááşáŻááşáááŻáˇáá˝ááş JSON ááŤáááˇáş áááŻááşáá˝á˛áá áşááŻáážáááźáŽá¸ á¤áááŻááşáá˝á˛ááᯠáĄááźáąááśá áááşááŽá¸ááŹá¸ááąáŹ á áááŻá¸áááŻááşáá áşááŻáážááááşá áĄáááşá¸áĄááźá áşááž ááąááŹá ááąáŹááşáĄáááŻááşá¸ááᯠáááşááźáŽá¸ááąáŹááşá á áááŻá¸ááťááşáážáŹá áŹáá˝ááş áá áşáááşáĄáá˝ááş ááŻáśááąáŹááşááąáŹááąááŹááážáááŤá
ááŻáášáááááşááąáŹááźáąáážááşá¸ááťááşáážáŹ á áááŻá¸áááŻááşááťááşáážáŹá áŹáĄáŹá¸ ááąáˇá áĽáşáĄáááŻááşá¸áááŻááşá¸áá˝á˛ááźááşá¸ááźá áşááźáŽá¸ ááąáŹááşááąáˇáá˝ááş áĄáááŻááşá¸áĄáá áşáá áşáᯠáááşáááˇáşáááŻááşá áąáááşááźá áşáááşá áááşá¸áĄáá˝ááş ááášáááŹá¸áááŻáááşá¸ áá°ááááťáŹá¸áááşá Spark áááş áááˇáşáĄáŹá¸ partitions ááťáŹá¸ááᯠááŽá¸ááźáŹá¸á áŽáážááşáááşá¸áááşáááş áá˝ááˇáşááźáŻáááşá
ááááŚá¸á á˝áŹ ááťá˝ááşáŻááşáááŻáˇáááş áĄáááşááąáŹáşááźááŤáĄáááŻááşá¸ ááąááŹááᯠáĄáááŻááşá¸áááŻááşá¸áá˝á˛ááźááşá¸áᏠááąáŤááşá¸áááˇáşáᏠáááŚá¸ loading ááᯠááŻááşááąáŹááşááŤáááşá á¤ááŻááşááąáŹááşááťááşááᯠá áááŻá¸ááťááşáážáŹá ᏠáááŚá¸áááşáážááşááźááşá¸ááŻááąáŤáşááźáŽá¸ áá áşááźáááşáᏠááŻááşááąáŹááşáááş-
df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)
ááąáŹááşááąáˇáá˝ááş partition áĄáá áşáááŻáᏠááąáŤááşá¸ááŻááşááŻááşááŤá
df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")
ááťááşáážáááąááąá¸áááşáážáŹ schema áááŻáá˝ááşá¸ááśáááşáĄáá˝ááş Hive áá˝ááşááźááşáááşáážááşááŻáśáááşáááşááźá
áşáááşá
áááŻáˇááąáŹáş á¤ááąááŹáá˝ááş ááźáżááŹááťáŹá¸ ááąáŤáşááŹáááşá
áááááźáżááŹá áááźáŹáᎠáááŻáˇáááŻááş ááąáŹááşáááŻááşá¸áá˝ááş ááážáááŹááąáŹ ááŤááąá¸ááᯠáááşáááááąáŹáˇááŤá ááŤááąá¸áážááˇáş JSON áááş áĄáá˝ááşáááşááťáŹá¸ááᯠáá˝á˛ááźáŹá¸á á˝áŹ áááşááśááŻáśááźáąáŹááˇáşááźá áşáááşá
ááŻáśáážááşáĄááźáąáĄááąáá áşááŻááᯠááŻáśá¸áááşááźááˇáşááĄáąáŹááşá áĽáááŹá áááąáˇá JSON ááąáŹááşááŹáááş-
ĐĐľĐ˝Ń 1: {"a": {"b": 1}},
áááąáˇáá˝ááş áĄááŹá¸áá° JSON áááş á¤áá˛áˇáááŻáˇ ááźá áşáááş-
ĐĐľĐ˝Ń 2: {"a": null}
ááťá˝ááşáŻááşáááŻáˇáá˝ááş ááá°ááŽááąáŹ partition áážá
áşááŻáážááááşá áá
áşááŻá
áŽáá˝ááş áááŻááşá¸áá
áşááŻáážááááş áááŻááźááŤá
ááŻáˇá
áĄáááşá¸áĄááźá
áşááąááŹáá
áşááŻááŻáśá¸ááᯠááťá˝ááşáŻááşáááŻáˇáááşáážáŻáááˇáşáĄá፠Spark áááş áĄááťááŻá¸áĄá
áŹá¸ááᯠááŻáśá¸ááźááşáááŻááşáááşááźá
áşááźáŽá¸ âaâ áááş áĄááťááŻá¸áĄá
áŹá¸ INT á áĄá
áŻáĄááąá¸áĄáá˝ááş âbâ ááźááˇáş áĄááťááŻá¸áĄá
áŹá¸ âáá˝á˛áˇá
ááşá¸ááŻáśâ áááşáááşáá
áşááŻááźá
áşááźáąáŹááşá¸ ááŹá¸áááşááŹáááşááźá
áşáááşá áááŻáˇááąáŹáşá áĄáááşá¸áááˇáşáá
áşááŻá
áŽááᯠááŽá¸ááźáŹá¸á
áŽááááşá¸áááşá¸ááŹá¸ááŤáá ááááşáážáŹ ááááŹááááźá
áşááąáŹ áĄáááŻááşá¸áĄá
áŽáĄá
ááşááťáŹá¸ááźááˇáş ááŤááąá¸áá
áşááŻááźá
áşáááşá
df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)
á¤áĄááźáąáĄááąááᯠááąáŹááşá¸á á˝áŹááááąáŹááźáąáŹááˇáş áĄáááşá¸áĄááźá áşááąááŹááᯠáá˝á˛ááźááşá¸á áááşááźáŹáááˇáşáĄá፠áááŹáĄáá˝ááşááťáŹá¸ááᯠáááşáážáŹá¸áááş áá˝áąá¸ááťááşáážáŻáá áşááŻááᯠáĄáá°á¸áááˇáşáá˝ááşá¸ááŹá¸áááş-
df = spark.read.json("...", dropFieldIfAllNull=True)
á¤ááá
ášá
áá˝ááşá ááŤááąá¸áááşáĄáá°ááá˝áááşáážáŻáááŻááşááąáŹáĄáááŻááşá¸ááťáŹá¸á፠á ááşááááˇáşáááşá
ááŤááᯠáááşáá˝áąáˇááŻááşáá˛áˇáá°áá˝áąá áážááŻááşááźáŽá¸ááááş ááźáŻáśá¸ááźááááˇáşáááşá áĄáááşááźáąáŹááşáˇ? ááŻááşáááşá ááŹááźáąáŹááˇáşáá˛áááŻááąáŹáˇ ááąáŹááşáááş áĄááźáąáĄááą áážá
áşáᯠáááşááźá
áşááŹáááŻááşáááŻáˇááŤá áááŻáˇáááŻááş ááŻáśá¸ááŻá áááŻáˇáááŻááş ááąá¸ááŻá ááááĄááťááşáážáŹ ááąááťáŹááŻááŽá¸ááŤá¸ááźá
áşáááˇáş ááááşá¸áĄááťááŻá¸áĄá
áŹá¸ááťáŹá¸áááş ááá°ááŽááąáŹ JSON áááŻááşááťáŹá¸áá˝ááş áá˝á˛ááźáŹá¸ááąáááş ááźá
áşáááşá áĽáááŹá {intField: 1} áážááˇáş {intField: 1.1}á áááŻáááŻáˇááąáŹáĄáá˝ááşááťáŹá¸áááş áá
áşááŻááşáá˝ááş ááąáŤáşááŹááŤáá schema ááąáŤááşá¸á
ááşá¸áážáŻáááş áĄááŹáĄáŹá¸ááŻáśá¸ááᯠáážááşáááşá
á˝áŹáááşáááŻááşááźáŽá¸ áĄááááťááŻáśá¸áĄááťááŻá¸áĄá
áŹá¸áááŻáˇ áŚá¸áááşáá˝áŹá¸áááşááźá
áşáááşá ááŤááąáááˇáş ááá°áá°á¸áááŻáááş áá
áşááŻá intField: int áážááááşá ááąáŹááşáá
áşááŻá intField: áážá
áşááážááááşá
á¤áĄááźáąáĄááąááᯠáááŻááşáá˝ááşáááş áĄáąáŹááşááŤáĄááś áážáááŤáááşá
df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)
ááᯠááťá˝ááşáŻááşáááŻáˇáá˝ááş áĄáááşá¸áááˇáşááťáŹá¸áááşáážááᏠáááŻááşáá˝á˛áá áşááŻáážáááźáŽá¸á ááąááŹááąáŹááşáá áşááŻáááşá¸áážááˇáş á áááŻá¸áááŻááşááťááşáážáŹá áŹáá áşááŻááŻáśá¸á áááŹá¸áááşááŤááąá¸áá áşááŻáĄááźá áş áááşáááŻááşáááşá ááŻááşááŹá¸? ááážá
ááťá˝ááşáŻááşáááŻáˇáááş Hive áá˝ááş áááŹá¸ááᯠá áŹáááşá¸áá˝ááşá¸ááŹá¸ááźáąáŹááşá¸ áááááááŤáááşá Hive áááş áĄáá˝ááşáĄáááşááťáŹá¸áá˝ááş áĄááąá¸áĄáá˝á˛áááŻááşááąáŹáşáááşá¸ ááŤááąá¸ááźá áşáááşá áááŻáˇááźáąáŹááˇáşá schemas ááŤáážáááąáŹ partitions ááťáŹá¸áááş field1: int áážááˇáş Field1: int áááŻáˇáááş Hive áĄáá˝ááşáá°ááŽááąáŹáşáááşá¸ Spark áĄáá˝ááşáááŻááşááŤá áĄáá˝ááşáĄáááşááťáŹá¸ááᯠá áŹááŻáśá¸áĄááąá¸áááŻáˇ ááźáąáŹááşá¸áááş áááąáˇááŤáážááˇáşá
ááŽááąáŹááşáážáŹááąáŹáˇ áĄáŹá¸ááŻáśá¸áĄáááşááźáąáá˝áŹá¸ááŻáśáááŤáááşá
áááŻáˇááąáŹáşá á¤ááťážááąáŹááşááááŻá¸áážááşá¸ááŤá ááŻáááá áá°ááááťáŹá¸ááąáŹááźáżááŹáá áşááŻááąáŤáşááŹáááşá áĄáááŻááşá¸áĄáá áşáá áşááŻá áŽááᯠááŽá¸ááźáŹá¸á áŽááááşá¸áááşá¸ááŹá¸ááąáŹááźáąáŹááˇáş áĄáááşá¸áááˇáşáááŻááşáá˝á˛áá˝ááş Spark áááşááąáŹááşáážáŻáááŻááşááťáŹá¸ áĽáááŹá _SUCCESS ááŻááşááąáŹááşááťááş áĄáąáŹááşááźááşáážáŻáĄááśááŤáážááááşá áááşá¸áááş ááŤááąá¸áááşá¸áááş ááźááŻá¸áááşá¸ááąáŹáĄá፠áĄáážáŹá¸áĄáá˝ááşá¸ ááźá áşááąáŤáşááŹááááˇáşáááşá áááşá¸áááŻáážáąáŹááşáážáŹá¸áááş Spark áááŻááŤáááŻáˇáááşááąáŹááşáážáŻáááŻááşááťáŹá¸áááˇáşááźááşá¸áážááŹá¸ááŽá¸ááźááşá¸ááźááˇáş configuration ááᯠconfigure ááŻááşáááşáááŻáĄááşáááş-
hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
ááᯠááąáˇáááŻááşá¸ parquet partition áĄáá áşáá áşááŻáááş ááąáˇá áĽáşáá˝á˛ááźááşá¸á áááşááźáŹááŹá¸ááąáŹááąááŹáááşáážááᏠáá áşáážááşá áááŻá¸ááťááşáážáŹá áŹáááŻááŤáááŻáˇ ááąáˇáááŻááşá¸áááˇáşááąááŻáśááááşá ááąááŹáĄááťááŻá¸áĄá áŹá¸áá˝á˛áá˝á˛áážáŻááťáŹá¸áážááááˇáş áĄáááŻááşá¸áá˝á˛ááťáŹá¸ááážáá áąáááşáĄáá˝ááş ááťá˝ááşáŻááşáááŻáˇ ááźááŻáááşáááŻá ááŻááşááŹá¸ááŤáááşá
ááŤááąáááˇáş ááááááźáżááŹáá˛áˇ áááşáááŻááşááąááááşá áááŻáĄááŤáá˝ááş ááąááŻááť schema ááᯠáááááááˇáşáĄááźááşá Hive áá˝ááş áááŹá¸áá˝ááşáĄáá áşáá áşááŻá áŽáááş ááŻáśááťááşáá˝áŹá¸áá˝ááşáĄáážáááŻáśá¸ schema áá˝ááş ááŻáśááťááşááąáááŻááşááąáŹááźáąáŹááˇáş áááŹá¸áá˝ááş áážáŹá¸áá˝ááşá¸ááąáŹ schema áážáááąááŤáááşá
áááŹá¸ááᯠááźááşáááşáážááşááŻáśáááşáááş áááŻáĄááşáááşá áááşá¸áááŻáááŻá¸áážááşá¸á á˝áŹááŻááşááąáŹááşáááŻááşáááş- á áááŻá¸áááŻááşááťááşáážáŹá áŹáááŤááąá¸áááŻáááşááśáááşáážáŻááŤá schema áááŻáá°á áááşá¸áĄááąáŤáşáĄááźáąááśá DDL áá áşááŻáááşááŽá¸ááŤá áááşá¸áááş Hive áá˝ááş folder áááŻááźááşááááŹá¸áĄááźá áşááźááşáááşáážááşááŻáśáááşáááŻááşááźáŽá¸á áá áşáážááşá áááŻá¸ááťááşáážáŹá áŹá schema áááŻáá˝ááşá¸ááśááźááşá¸á
á ááŻáášáááźáżááŹááᯠááťá˝ááşáŻááşáááŻáˇ áááşáááŻááşááááşá áááŹá¸ááᯠááááĄááźáááş á áŹáááşá¸áá˝ááşá¸ááŻááşá¸á Spark ááᯠáĄáŹá¸áááŻá¸áááşá áááŻááťá˝ááşáŻááşáááŻáˇáááŻááşáááŻááşááźáŻááŻááşááźáŽá¸ááŤááąá¸áá˝ááşáááşááťáŹá¸áááş Hive áážáá˝ááˇáşáááźáŻááąáŹá áŹááŻáśá¸ááťáŹá¸ááźááˇáşáĄá ááźáŻáááŻááşááźáąáŹááşá¸áááááááşáááŻáĄááşáááşá áĽáááŹáĄáŹá¸ááźááˇáşá Spark áááş âcorrupt_recordâ áĄáá˝ááşáá˝ááş áá˝á˛ááźááşá¸á áááşááźáŹáááááąáŹ áááŻááşá¸ááťáŹá¸ááᯠááŻááşáá˝ážááşáááşá áááŻáááŻáˇááąáŹáĄáá˝ááşáááş ááá˝ááşááźáąáŹááşáᲠHive áá˝ááş á áŹáááşá¸áá˝ááşá¸áááááŤá
ááŤáááŻáááááş diagram áááŻááááŻááşáááşá
f_def = ""
for f in pf.dtypes:
if f[0] != "date_load":
f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<")
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)
ááŻááş ("_corrupt_record", "`_corrupt_record`") + "" + f[1].replace(":", "`:").replace("<","<`").replace(",", ",`").áĄá áŹá¸áááŻá¸("array<`","array<") DDL ááᯠááąá¸áááşá¸á áąáááşá áááŻáááŻáááşáážáŹá áĄá áŹá¸á
create table tname (_field1 string, 1field string)
â_field1á 1fieldâ áá˛áˇáááŻáˇááąáŹ áĄáá˝ááşáĄáááşááťáŹá¸ááźááˇáşá áĄáá˝ááşáĄáááşááťáŹá¸ááᯠáá˝ááşáááşá¸áááˇáşááąááŹáá˝ááş ááŻáśááźáŻáśááąáŹ DDL áá áşááŻááᯠááźáŻááŻááşáááş- áááŹá¸ `tname` (`_field1` á áŹááźáąáŹááşá¸á `1field` á áŹááźáąáŹááşá¸) ááᯠáááşááŽá¸ááŤá
ááąá¸áá˝ááşá¸ááąáŤáşááŹáááş- ááźáŽá¸ááźááˇáşá áŻáśááąáŹ schema (pf ááŻááşáá˝ááş) ááźááˇáş dataframe áááŻáááşáááŻáˇáážááşáááşá á˝áŹááááŻááşáááşáááşá¸á áᎠpf áááŻáááşáááŻááááŻááşááá˛á ááŤá ááášá áááźáżááŹááŤá áá áşáážááşá áááŻá¸ááťááşáážáŹá áŹá ááŤááąá¸áááŻááşááťáŹá¸ááŤáááˇáş áááŻááşáá˝á˛ááž áĄáááşá¸áááˇáşáĄáŹá¸ááŻáśá¸á ááŻáśááźááşá¸ááᯠááźááşáááşáááŹá¸á ááŽáááşá¸áááşá¸á áĄááŻáśááźáŻáśááŻáśá¸ááźá áşááąáááˇáş áááşáá˛ááŤáááşá
áĄá áŽáĄá ááşáááş Hive áá˝ááş áážááážááˇáşááźáŽá¸ááźá áşáááşá áááŹá¸áá áşááŻááŻáśá¸á schema áážááˇáş partition áĄáá áşááᯠááąáŤááşá¸á ááşááźááşá¸ááźááˇáş schema áĄáá áşáá áşááŻááᯠáááşááááŻááşáááşá áááŻáááŻáááşáážáŹ áááşáááş Hive ááž table schema áááŻáá°á partition áĄáá áşá schema áážááˇáşááąáŤááşá¸á ááşáááşáááŻáĄááşáááşáᯠáááŻáááŻáááşá Hive ááž á ááşá¸áááşáážáŻ áááşááŹááąááŹááᯠáááşááźááşá¸á áááşá¸ááᯠááŹááŽáááŻááşáá˝á˛áá áşááŻáá˝ááş ááááşá¸áááşá¸áᏠSpark ááᯠáĄááŻáśá¸ááźáŻá áĄáááşá¸áááˇáşáážá áşááŻááŻáśá¸ááᯠáá áşááźááŻááşáááşáááşááźááşá¸ááźááˇáş áááşá¸ááᯠááŻááşááąáŹááşáááŻááşáááşá
áĄááźáąááśáĄáŹá¸ááźááˇáşá áááşáááŻáĄááşááąáŹáĄááŹáĄáŹá¸ááŻáśá¸áážááááş- Hive áážááá°áááşá¸áááŹá¸ááŻáśá áśáážááˇáş áĄáááşá¸áááˇáşáĄáá áşá ááŤáááŻáˇáážáŹáááşá¸ ááąááŹáážááááşá ááťááşáážáááąááąá¸áááşáážáŹ á áááŻá¸áááŻááşááťááşáážáŹá áŹááŻáśá áśáážááˇáş áá˝ááşáááşáĄáá áşááťáŹá¸ááᯠáááşááŽá¸ááŹá¸ááąáŹ áĄáááŻááşá¸áá˝á˛ááž ááąáŤááşá¸á ááşááŹá¸ááąáŹ á ááąá¸áĄáá áşááᯠááá°áááşááźá áşáááşá
from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")
áááŻáˇááąáŹááşá ááťá˝ááşáŻááşáááŻáˇáááş ááááşáĄáááŻááşá¸áĄá
áĄáááŻááşá¸ áááŹá¸áážááşááŻáśáááşááźááşá¸ DDL ááᯠáááşááŽá¸áááşá
áá˝ááşá¸áááşáá
áşááŻááŻáśá¸ áážááşáááşá
á˝áŹ áĄááŻááşááŻááşááŤáá ááźáąáŹááááşá áááŚá¸ áááşáá
áşáᯠáážááá˛áˇááźáŽá¸á áááŹá¸ááᯠHive áá˝ááş áážááşáááşá
á˝áŹ áááşááŽá¸áá˛áˇááŤáá ááťá˝ááşáŻááşáááŻáˇáááş áá˝ááşá¸ááśááŹá¸ááąáŹ áááŹá¸ááŻáśá
áśáá
áşááŻááᯠááážáááŤáááşá
ááąáŹááşááŻáśá¸ááźáżááŹáážáŹ Hive table áá˝ááş partition áá áşááŻááᯠáĄáá˝ááşááá° áááˇáşáááááąáŹááźáąáŹááˇáşá áááşá¸á partition áááşááąáŹááşááŻáśááᯠááźááşáááşáááş Hive ááᯠáá˝ááşá¸áĄáŹá¸ááąá¸áááş áááŻáĄááşáááş-
from pyspark.sql import HiveContext
hc = HiveContext(spark)
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)
JSON áááŻáááşááźááşá¸áážááˇáş áááşá¸áááŻáĄááźáąááśá á áááŻá¸áááŻááşááťááşáážáŹá áŹáááşááŽá¸ááźááşá¸ááááŻá¸áážááşá¸ááąáŹáĄááŻááşáááş ááŽá¸ááźáŹá¸á áŽáážáŹáá˝áąááááˇáşááźáąáážááşá¸áááşá¸ááťáŹá¸ááźá áşááźáŽá¸ áá˝ááşáááŻááşááąáŹáĄáááşáĄáá˛ááťáŹá¸á á˝áŹááᯠááťáąáŹáşáá˝ážáŹá¸áááŻááşáááşá á¤ááźáąáážááşá¸áááşá¸ááťáŹá¸áááş áááŻá¸áážááşá¸ááąáŹáşáááşá¸ áááşá¸áááŻáˇáááŻáážáŹáá˝áąáááş áĄááťáááşááťáŹá¸á á˝áŹáááŻáĄááşááŤáááşá
áĄááąáŹááşá¸ááźáááşá¸ ááąáŹááşááŻááşáážáŻááᯠáĄááąáŹááşáĄáááşááąáŹáşáááşá
- áááşááąáŹááşáážáŻáááŻááşááťáŹá¸ááᯠáááşáážáŹá¸áááş á áááŻá¸áááŻááşááťááşáážáŹá áŹáááŻáˇ áĄáááşá¸áááˇáşááťáŹá¸áááˇáşááŤá
- Spark áááŻááşáááˇáşáááŻááşááąáŹ áĄáááşá¸áĄááźá áşááąááŹáážá áĄáá˝ááşáĄáá˝ááşááťáŹá¸ááᯠáááŻááşáá˝ááşááźáąáážááşá¸ááŤá
- áááŻá¸áážááşá¸ááąáŹáĄááťááŻá¸áĄá áŹá¸ááťáŹá¸ááᯠstring áááŻáˇ ááŹá áşááŤá
- áĄáá˝ááşáĄáááşááťáŹá¸ááᯠá áŹááŻáśá¸ááąá¸áĄááźá áş ááźáąáŹááşá¸ááŤá
- Hive áá˝ááş ááŽá¸ááźáŹá¸ááąááŹáĄááşááŻááşáážááˇáş áááŹá¸áážááşááŻáśáááşááźááşá¸ (DDL áááşááŽá¸áážáŻ)
- Hive áážááˇáş áá˝á˛áááşáááááąáŹ áĄáá˝ááşáĄáááşááťáŹá¸ááᯠáážáąáŹááşáááş áááąáˇááŤáážááˇáş
- Hive áá˝ááş áááŹá¸áážááşááŻáśáááşááźááşá¸ááᯠáĄááşááááşááŻááşáááş ááąáˇááŹááŤá
áĄááťááşá¸ááťáŻááşááźáąáŹáááťážááş á áááŻá¸áááŻááşááťááşáážáŹá áŹááťáŹá¸ áááşááąáŹááşáááş ááŻáśá¸ááźááşááťááşáááş ááťááŻáˇáá˝ááşá¸ááťááşááťáŹá¸á á˝áŹááźááˇáş ááźááˇáşáážááşááąááŤáááşá áááŻáˇááźáąáŹááˇáş áĄááąáŹááşáĄáááşááąáŹáşááŹáá˝ááş áĄáááşáĄáá˛ááťáŹá¸ ááźáŻáśááŹááŤá áĄáąáŹááşááźááşááąáŹ ááťá˝ááşá¸ááťááşáážáŻáážáááąáŹ áĄáá˝áąáˇáĄááźáŻáśáážáááąáŹ ááŻááşááąáŹáşáááŻááşáááşááś áážááˇáşááźááşá¸áááş áááŻááąáŹááşá¸ááŤáááşá
á¤ááąáŹááşá¸ááŤá¸áááŻáááşáážáŻááźááşá¸áĄáá˝ááşááťáąá¸áá°á¸áááşááŤáááşá ááťá˝ááşáŻááşáááŻáˇáááşáááˇáşáĄáá˝ááşáĄááŻáśá¸áááşááąáŹáĄááťááşáĄáááşááťáŹá¸áááŻáážáŹáá˝áąáá˝áąáˇáážááááşááťážáąáŹáşáááˇáşááŤáááşá
source: www.habr.com