Spark schemaEvolution ma ka hana

Aloha e ka poʻe heluhelu, lā maikaʻi!

Ma kēia ʻatikala, wehewehe kikoʻī ka mea aʻoaʻo alakaʻi o Neoflex's Big Data Solutions ʻoihana i nā koho no ke kūkulu ʻana i nā hale hōʻikeʻike hoʻololi me ka Apache Spark.

Ma ke ʻano he ʻāpana o ka ʻikepili ʻikepili, ʻo ka hana o ke kūkulu ʻana i nā hale kūʻai e pili ana i ka ʻikepili i hoʻonohonoho ʻia.

ʻO ka maʻamau, he mau log kēia, a i ʻole nā ​​pane mai nā ʻōnaehana like ʻole, mālama ʻia e like me JSON a i ʻole XML. Hoʻouka ʻia ka ʻikepili iā Hadoop, a laila pono ʻoe e kūkulu i kahi hale kūʻai mai lākou. Hiki iā mākou ke hoʻonohonoho i ke komo ʻana i ka hale hōʻikeʻike i hana ʻia, no ka laʻana, ma o Impala.

I kēia hihia, ʻaʻole ʻike mua ʻia ka schema o ka hale kūʻai target. Eia kekahi, ʻaʻole hiki ke huki mua ʻia ka papahana, no ka mea, pili ia i ka ʻikepili, a ke hana nei mākou i kēia mau ʻikepili i hoʻonohonoho ʻia.

No ka laʻana, i kēia lā ua hoʻopaʻa ʻia kēia pane:

{source: "app1", error_code: ""}

a ʻapōpō mai ka ʻōnaehana hoʻokahi e hiki mai ka pane penei:

{source: "app1", error_code: "error", description: "Network error"}

ʻO ka hopena, pono e hoʻohui ʻia hoʻokahi kahua hou i ka hale hōʻikeʻike - wehewehe, a ʻaʻohe mea i ʻike inā hiki mai a ʻaʻole paha.

ʻO ka hana o ka hana ʻana i kahi hale kūʻai ma ia ʻikepili he maʻamau, a he nui nā mea hana a Spark no kēia. No ka paʻi ʻana i ka ʻikepili kumu, aia ke kākoʻo no JSON a me XML, a no kahi schema i ʻike mua ʻole ʻia, hāʻawi ʻia ke kākoʻo no schemaEvolution.

I ka nānā muaʻana, he mea maʻalahi ka hopena. Pono ʻoe e lawe i kahi waihona me JSON a heluhelu iā ia i loko o kahi dataframe. E hana ʻo Spark i kahi schema, e hoʻohuli i ka ʻikepili pūnana i nā hale. Eia hou, pono e mālama ʻia nā mea a pau i loko o ka parquet, i kākoʻo ʻia hoʻi ma Impala, ma ke kau inoa ʻana i ka hale kūʻai ma ka Hive metastore.

He mea maʻalahi nā mea a pau.

Eia naʻe, ʻaʻole maopopo mai nā hiʻohiʻona pōkole i ka palapala i ka mea e hana ai me nā pilikia he nui i ka hana.

Hōʻike ka palapala i kahi ala ʻaʻole e hana i kahi hale kūʻai, akā e heluhelu iā JSON a i ʻole XML i kahi ʻikepili.

ʻO ia hoʻi, hōʻike wale ia pehea e heluhelu ai a paʻi iā JSON:

df = spark.read.json(path...)

Ua lawa kēia i mea e loaʻa ai ka ʻikepili iā Spark.

I ka hoʻomaʻamaʻa, ʻoi aku ka paʻakikī o ka palapala ma mua o ka heluhelu ʻana i nā faila JSON mai kahi waihona a me ka hana ʻana i kahi dataframe. Penei ke kūlana: aia kekahi hale kūʻai, hele mai nā ʻikepili hou i kēlā me kēia lā, pono e hoʻohui ʻia i ka hale kūʻai, ʻaʻole poina e ʻokoʻa paha ka papahana.

ʻO ka papahana maʻamau no ke kūkulu ʻana i kahi hōʻikeʻike penei:

Kai 1. Hoʻokomo ʻia ka ʻikepili i Hadoop me ka hoʻouka hou ʻana i kēlā me kēia lā a hoʻohui ʻia i kahi ʻāpana hou. Hoʻololi ia i kahi waihona me ka ʻikepili mua i hoʻokaʻawale ʻia e ka lā.

Kai 2. I ka wā o ka hoʻouka mua ʻana, heluhelu ʻia kēia waihona a hoʻopaʻa ʻia e Spark. Mālama ʻia ka dataframe i loaʻa i kahi ʻano parsable, no ka laʻana, i loko o ka parquet, a laila hiki ke lawe ʻia i loko o Impala. Hoʻokumu kēia i kahi hōʻikeʻike pahuhopu me nā ʻikepili āpau i hōʻiliʻili a hiki i kēia manawa.

Kai 3. Hoʻokumu ʻia kahi hoʻoiho e hōʻano hou i ka hale kūʻai i kēlā me kēia lā.
Aia kekahi nīnau e pili ana i ka hoʻouka ʻana, ka pono e hoʻokaʻawale i ka hale hōʻikeʻike, a me ka nīnau no ka mālama ʻana i ke ʻano laulā o ka hale hōʻikeʻike.

E lawe kākou i kekahi laʻana. E ʻōlelo kākou ua hoʻokō ʻia ka hana mua o ke kūkulu ʻana i kahi waihona, a ua hoʻouka ʻia nā faila JSON i kahi waihona.

ʻO ka hana ʻana i kahi dataframe mai lākou, a laila mālama iā ia ma ke ʻano he hōʻikeʻike, ʻaʻole ia he pilikia. ʻO kēia ka hana mua loa i hiki ke maʻalahi i ka palapala Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Me he mea lā ua maikaʻi nā mea a pau.

Heluhelu a hoʻopaʻa mākou iā JSON, a laila mālama mākou i ka dataframe ma ke ʻano he parquet, e hoʻopaʻa inoa iā ia ma Hive ma kahi ala kūpono:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Loaʻa iā mākou kahi puka makani.

Akā, i ka lā aʻe, ua hoʻohui ʻia nā ʻikepili hou mai ke kumu. Loaʻa iā mākou kahi waihona me JSON, a me kahi hōʻikeʻike i hana ʻia mai kēia waihona. Ma hope o ka hoʻouka ʻana i ka pūʻulu ʻikepili aʻe mai ke kumu, nele ka ʻikepili i ka waiwai o hoʻokahi lā.

ʻO ka hopena kūpono e hoʻokaʻawale i ka hale kūʻai i ka lā, e hiki ai ke hoʻohui i kahi ʻāpana hou i kēlā me kēia lā aʻe. Ua ʻike maikaʻi ʻia ka mīkini no kēia, ʻae ʻo Spark iā ʻoe e kākau i nā ʻāpana ʻokoʻa.

ʻO ka mea mua, hana mākou i kahi ukana mua, e mālama i ka ʻikepili e like me ka mea i hōʻike ʻia ma luna, e hoʻohui i ka partitioning wale nō. Kapa ʻia kēia hana i ka hoʻomaka mua hale kūʻai a hana ʻia i hoʻokahi wale nō:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

I ka lā aʻe, hoʻouka mākou i kahi ʻāpana hou wale nō:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

ʻO nā mea i koe, ʻo ke kau inoa hou ʻana ma Hive e hoʻohou i ka schema.
Eia naʻe, ma kēia kahi e ulu ai nā pilikia.

Pilikia mua. Ma hope a ma hope paha, ʻaʻole hiki ke heluhelu ʻia ka parquet hopena. Ma muli o ke ʻano o ka mālama ʻana o parquet a me JSON i nā māla ʻokoʻa.

E noʻonoʻo kākou i kahi kūlana maʻamau. No ka laʻana, i nehinei ua hiki mai ʻo JSON:

День 1: {"a": {"b": 1}},

a i kēia lā ua like ka JSON me kēia:

День 2: {"a": null}

E ʻōlelo kākou he ʻelua ʻāpana ʻokoʻa, kēlā me kēia me hoʻokahi laina.
Ke heluhelu mākou i ka ʻikepili kumu holoʻokoʻa, hiki iā Spark ke hoʻoholo i ke ʻano, a e hoʻomaopopo ʻo "a" kahi kahua o ke ʻano "kūkulu", me kahi kahua nested "b" o ke ʻano INT. Akā, inā mālama ʻokoʻa kēlā me kēia ʻāpana, a laila loaʻa iā mākou kahi parquet me nā papa hana hoʻokaʻawale ʻole:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Ua ʻike maikaʻi ʻia kēia kūlana, no laila ua hoʻohui ʻia kahi koho - i ka wā e hoʻokaʻawale ai i ka ʻikepili kumu, e wehe i nā kahua ʻole:

df = spark.read.json("...", dropFieldIfAllNull=True)

I kēia hihia, aia ka parquet i nā ʻāpana hiki ke heluhelu pū ʻia.
ʻOiai ka poʻe i hana i kēia ma ka hoʻomaʻamaʻa e ʻakaʻaka ʻawaʻawa ma ʻaneʻi. No ke aha mai? ʻAe, no ka mea aia paha ʻelua mau kūlana. A i ʻole ʻekolu. A i ʻole ʻehā. ʻO ka mea mua, ʻaneʻane e hiki mai ana, ʻo ia ke ʻano o nā ʻano helu i nā faila JSON like ʻole. No ka laʻana, {intField: 1} a me {intField: 1.1}. Inā ʻike ʻia kēlā mau māla i hoʻokahi ʻāpana, a laila e heluhelu pono ka schema merge i nā mea āpau, e alakaʻi ana i ke ʻano pololei loa. Akā inā ma nā ʻano ʻokoʻa, a laila e loaʻa i kekahi intField: int, a i kekahi intField: pālua.

Aia kēia hae e hoʻoponopono i kēia kūlana:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

I kēia manawa, loaʻa iā mākou kahi waihona kahi i hiki ke heluhelu ʻia i loko o kahi dataframe hoʻokahi a me kahi parquet kūpono o ka hale hōʻikeʻike holoʻokoʻa. ʻAe? ʻAʻole.

Pono mākou e hoʻomanaʻo ua hoʻopaʻa inoa mākou i ka papaʻaina ma Hive. ʻAʻole paʻakikī ka hive ma nā inoa kahua, ʻoiai ʻo ka parquet ka helu helu. No laila, ʻo nā ʻāpana me nā schemas: field1: int, a me Field1: int ua like ia no Hive, akā ʻaʻole no Spark. Mai poina e hoʻololi i nā inoa kahua i ka helu haʻahaʻa.

Ma hope o kēlā, ua maikaʻi nā mea a pau.

Eia naʻe, ʻaʻole maʻalahi nā mea a pau. Aia kekahi pilikia ʻelua, kaulana hoʻi. Ma muli o ka mālama ʻia ʻana o kēlā me kēia ʻāpana hou, e loaʻa i ka folder partition nā faila lawelawe Spark, no ka laʻana, ka hae kūleʻa hana _SUCCESS. ʻO kēia ka hopena i ka hewa i ka wā e hoʻāʻo ai i ka parquet. No ka pale ʻana i kēia, pono ʻoe e hoʻonohonoho i ka hoʻonohonoho e pale aku iā Spark mai ka hoʻohui ʻana i nā faila lawelawe i ka waihona:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Me he mea lā i kēia manawa ua hoʻohui ʻia kahi ʻāpana parquet hou i ka waihona hōʻikeʻike pahuhopu, kahi i loaʻa ai ka ʻikepili i hoʻopaʻa ʻia no ka lā. Ua mālama mua mākou ʻaʻohe ʻāpana me ka hakakā ʻano ʻikepili.

Akā, he kolu ko mākou pilikia. I kēia manawa, ʻaʻole ʻike ʻia ka schema maʻamau, ʻo ia hoʻi, he schema hewa ka papa ma Hive, no ka mea, ua hoʻokomo ʻia kēlā me kēia ʻāpana hou i kahi distortion i ka schema.

Pono ʻoe e hoʻopaʻa inoa hou i ka papaʻaina. Hiki ke hana maʻalahi kēia: heluhelu hou i ka parquet o ka hale kūʻai, e lawe i ka schema a hana i kahi DDL e pili ana iā ia, me ka mea e hoʻopaʻa inoa hou ai i ka waihona ma Hive ma ke ʻano he papa waho, e hoʻonui ana i ka schema o ka hale kūʻai target.

He pilikia ʻehā ko mākou. I ko mākou hoʻopaʻa inoa ʻana i ka papaʻaina no ka manawa mua, ua hilinaʻi mākou iā Spark. I kēia manawa, hana mākou iā mākou iho, a pono mākou e hoʻomanaʻo e hiki ke hoʻomaka nā māla parquet me nā kiʻi i ʻae ʻole ʻia no Hive. No ka laʻana, hoʻolei ʻo Spark i nā laina ʻaʻole hiki iā ia ke paʻi i ke kahua "corrupt_record". ʻAʻole hiki ke hoʻopaʻa inoa ʻia kēlā kahua ma Hive me ka ʻole e pakele.

I ka ʻike ʻana i kēia, loaʻa iā mākou ka papahana:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

kuhi ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("hui<`", "hui<") hana i ka DDL palekana, ʻo ia hoʻi ma kahi o:

create table tname (_field1 string, 1field string)

Me nā inoa kahua e like me "_field1, 1field", hana ʻia ka DDL palekana kahi i pakele ai nā inoa kahua: hana i ka papa ʻaina `tname` (`_field1` string, `1field` string).

Ke kū nei ka nīnau: pehea e kiʻi pono ai i kahi dataframe me kahi schema piha (ma ka pf code)? Pehea e loaa ai keia pf? ʻO kēia ka pilikia ʻelima. Heluhelu hou i ka hoʻolālā o nā ʻāpana āpau mai ka waihona me nā faila parquet o ka pahu hōʻike? ʻO kēia ʻano ka palekana, akā paʻakikī.

Aia ke kumumanao ma Hive. Hiki iā ʻoe ke kiʻi i kahi schema hou ma ka hoʻohui ʻana i ka schema o ka papaʻaina holoʻokoʻa a me ka ʻāpana hou. No laila pono ʻoe e lawe i ka schema papaʻaina mai Hive a hui pū me ka schema o ka ʻāpana hou. Hiki ke hana i kēia ma ka heluhelu ʻana i ka metadata hoʻāʻo mai Hive, mālama iā ia i kahi waihona manawa, a me ka hoʻohana ʻana iā Spark e heluhelu i nā ʻāpana ʻelua i ka manawa hoʻokahi.

ʻOiaʻiʻo, aia nā mea a pau āu e pono ai: ka papa kuhikuhi papa mua ma Hive a me ka ʻāpana hou. Loaʻa iā mākou ka ʻikepili. Loaʻa wale ka loaʻa ʻana o kahi schema hou e hoʻohui i ka schema storefront a me nā kahua hou mai ka pā i hana ʻia:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

A laila, hana mākou i ka papa inoa DDL, e like me ka snippet mua.
Inā hana pololei ke kaulahao holoʻokoʻa, ʻo ia hoʻi, aia kahi hoʻoili hoʻomaka, a ua hana pololei ʻia ka papa ʻaina ma Hive, a laila loaʻa iā mākou kahi papa papaʻaina hou.

A ʻo ka pilikia hope loa, ʻaʻole hiki iā ʻoe ke hoʻohui i kahi pā i kahi pākaukau Hive, no ka mea, e haki ia. Pono ʻoe e hoʻoikaika iā Hive e hoʻoponopono i kāna ʻano ʻāpana:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

ʻO ka hana maʻalahi o ka heluhelu ʻana iā JSON a me ka hoʻokumu ʻana i kahi hale kūʻai e pili ana i ka hopena i ka lanakila ʻana i kekahi mau pilikia paʻakikī, nā hopena āu e ʻimi pono ai. A ʻoiai he maʻalahi kēia mau hoʻonā, nui ka manawa e loaʻa ai.

No ka hoʻokō ʻana i ke kūkulu ʻana i ka hale hōʻikeʻike, pono wau e:

  • E hoʻohui i nā ʻāpana i ka hale hōʻikeʻike, e kāpae i nā faila lawelawe
  • E hana me nā kahua hakahaka i ka ʻikepili kumu i paʻi ʻia e Spark
  • E hoʻolei i nā ʻano maʻalahi i kahi kaula
  • E hoʻohuli i nā inoa kahua i nā hua liʻiliʻi
  • Hoʻokaʻawale i ka hoʻouka ʻikepili a me ka hoʻopaʻa inoa papa ma Hive (hanau DDL)
  • Mai poina e pakele i nā inoa kahua i kūpono ʻole me Hive
  • E aʻo pehea e hoʻohou ai i ke kau inoa papa ma Hive

I ka hōʻuluʻulu ʻana, ʻike mākou i ka hoʻoholo ʻana e kūkulu i nā puka makani hale kūʻai i piha i nā pitfalls he nui. No laila, inā pilikia ka hoʻokō ʻana, ʻoi aku ka maikaʻi o ka hoʻopili ʻana i kahi hoa loea me ka ʻike kūleʻa.

Mahalo iā ʻoe no ka heluhelu ʻana i kēia ʻatikala, manaʻolana mākou e ʻike pono ʻoe i ka ʻike.

Source: www.habr.com

Pākuʻi i ka manaʻo hoʻopuka