Spark schemaEvolution na omume

Ezigbo ndị na-agụ akwụkwọ, ụbọchị ọma!

N'edemede a, onye isi ndụmọdụ mpaghara azụmahịa Neoflex's Big Data Solutions na-akọwa n'ụzọ zuru ezu nhọrọ maka ịmepụta ihe ngosi agbanwe agbanwe site na iji Apache Spark.

Dị ka akụkụ nke ọrụ nyocha data, ọrụ nke iwu ụlọ ahịa na-adabere na data ahaziri nke ọma na-ebilitekarị.

Ọtụtụ mgbe ndị a bụ ndekọ, ma ọ bụ nzaghachi sitere na sistemụ dị iche iche, echekwara dị ka JSON ma ọ bụ XML. A na-ebugote data ahụ na Hadoop, mgbe ahụ, ị ​​ga-achọ iwu ụlọ ahịa site na ha. Anyị nwere ike ịhazi ohere ịnweta ihe ngosi emepụtara, dịka ọmụmaatụ, site na Impala.

N'okwu a, amabeghị atụmatụ nke ebe a na-echekwa ụlọ ahịa. Ọzọkwa, a pụghị iwepụta atụmatụ ahụ n'ọdịnihu, n'ihi na ọ dabere na data ahụ, anyị na-ejikwa data ndị a ahaziri nke ọma.

Dịka ọmụmaatụ, taa a na-abanye na nzaghachi ndị a:

{source: "app1", error_code: ""}

ma echi sitere n'otu usoro ahụ na-abịa azịza ndị a:

{source: "app1", error_code: "error", description: "Network error"}

N'ihi ya, a ga-agbakwunye otu ubi ọzọ na ihe ngosi - nkọwa, na ọ dịghị onye maara ma ọ ga-abịa ma ọ bụ na ọ gaghị.

Ọrụ nke ịmepụta ụlọ ahịa na data dị otú ahụ bụ ezigbo ọkọlọtọ, na Spark nwere ọtụtụ ngwaọrụ maka nke a. Maka ịkọwapụta data isi mmalite, enwere nkwado maka ma JSON na XML, yana maka atụmatụ amabeghị mbụ, enyere nkwado maka schemaEvolution.

Na nlele mbụ, ngwọta ahụ dị mfe. Ịkwesịrị iji JSON were folda wee gụọ ya n'ime ebe nchekwa data. Spark ga-emepụta atụmatụ, tụgharịa data akwụ ụgwọ ka ọ bụrụ ihe owuwu. Ọzọkwa, a ga-echekwa ihe niile na parquet, nke a na-akwado na Impala, site na ịdenye aha ụlọ ahịa dị na Hive metastore.

Ihe niile yiri ka ọ dị mfe.

Otú ọ dị, ọ bụghị ihe doro anya site na ihe atụ dị mkpirikpi dị na akwụkwọ ahụ ihe a ga-eme na ọtụtụ nsogbu na omume.

Akwụkwọ ahụ na-akọwa ụzọ ọ bụghị ịmepụta ụlọ ahịa, kama ịgụ JSON ma ọ bụ XML n'ime ebe nchekwa data.

Ya bụ, ọ na-egosi naanị otu esi agụ na ịtụgharị JSON:

df = spark.read.json(path...)

Nke a ezuola ime ka data dị na Spark.

Na omume, edemede ahụ dị mgbagwoju anya karịa ịgụ faịlụ JSON site na nchekwa na ịmepụta dataframe. Ọnọdụ ahụ dị ka nke a: enweelarị ụlọ ahịa ụfọdụ, data ọhụrụ na-abịa kwa ụbọchị, ọ dị mkpa ka agbakwunye ha n'ihu ụlọ ahịa, na-echefu na atụmatụ ahụ nwere ike ịdị iche.

Atụmatụ a na-emekarị maka iwu ụlọ ngosi bụ nke a:

Kwụpụ 1. A na-etinye data ahụ n'ime Hadoop yana nbugharị kwa ụbọchị na-esote ma tinye ya na nkebi ọhụrụ. Ọ na-atụgharị folda nwere data mbụ kewara kwa ụbọchị.

Kwụpụ 2. N'oge ibu nke mbụ, Spark na-agụ ma tụgharịa folda a. A na-echekwa nchekwa data nke arụpụtara n'ụdị atụ, dịka ọmụmaatụ, na parquet, nke enwere ike ibubata na Impala. Nke a na-emepụta ihe ngosi ebumnuche na data niile chịkọbara ruo ebe a.

Kwụpụ 3. Emepụtara nbudata nke ga-emelite ihu ụlọ ahịa kwa ụbọchị.
Enwere ajụjụ banyere ntinye ntinye, mkpa ọ dị ịkewa ihe ngosi ahụ, na ajụjụ nke ịnọgide na-enwe atụmatụ izugbe nke ihe ngosi ahụ.

Ka anyị were ihe atụ. Ka anyị kwuo na emejuputala nzọụkwụ mbụ nke iwulite ebe nchekwa, na-ebugote faịlụ JSON na nchekwa.

Ịmepụta dataframe site na ha, wee chekwaa ya dị ka ihe ngosi, abụghị nsogbu. Nke a bụ nzọụkwụ mbụ enwere ike ịchọta ngwa ngwa na akwụkwọ Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Ihe niile dị ka ọ dị mma.

Anyị na-agụ ma tụba JSON, mgbe ahụ, anyị na-echekwa dataframe dị ka parquet, na-edebanye aha ya na Hive n'ụzọ ọ bụla dabara adaba:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Anyị na-enweta windo.

Mana, n'echi ya, agbakwunyere data ọhụrụ sitere na isi iyi. Anyị nwere folda nwere JSON, yana ihe ngosi emepụtara na nchekwa a. Mgbe ebudatara batch data na-esote site na isi mmalite, data mart na-efu efu data otu ụbọchị.

Ihe ngwọta ezi uche dị na ya ga-abụ nkewa n'ihu ụlọ ahịa kwa ụbọchị, nke ga-enye ohere ịgbakwunye nkebi ọhụrụ kwa ụbọchị na-esote. Usoro maka nke a makwaara nke ọma, Spark na-enye gị ohere ide akụkụ dị iche iche.

Nke mbụ, anyị na-eme ibu mbụ, na-echekwa data dị ka akọwara n'elu, na-agbakwunye naanị nkewa. A na-akpọ ihe omume a mmalite ụlọ ahịa na-eme naanị otu ugboro:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

N'echi ya, anyị na-ebu naanị akụkụ ọhụrụ:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Naanị ihe fọdụrụ bụ ịdebanye aha na Hive ka imelite atụmatụ ahụ.
Otú ọ dị, nke a bụ ebe nsogbu na-ebilite.

Nsogbu mbụ. N'oge na-adịghị anya, parquet ga-esi na ya pụta agaghị agụ ya. Nke a bụ n'ihi ka parquet na JSON si emeso ubi efu n'ụzọ dị iche.

Ka anyị tụlee otu ọnọdụ. Dịka ọmụmaatụ, ụnyaahụ JSON bịarutere:

День 1: {"a": {"b": 1}},

ma taa otu JSON dị ka nke a:

День 2: {"a": null}

Ka anyị kwuo na anyị nwere akụkụ abụọ dị iche iche, nke ọ bụla nwere otu ahịrị.
Mgbe anyị gụrụ data isi mmalite niile, Spark ga-enwe ike ịchọpụta ụdị ahụ, ọ ga-aghọta na "a" bụ mpaghara ụdị "ụdị", nke nwere oghere akwụ ụgwọ "b" nke ụdị INT. Mana, ọ bụrụ na echekwara nkebi ọ bụla iche iche, mgbe ahụ anyị ga-enweta parquet na atụmatụ nkebi na-adabaghị adaba:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

A maara ọnọdụ a nke ọma, yabụ agbakwunyere nhọrọ pụrụ iche - mgbe ị na-atụgharị data isi mmalite, wepụ ubi efu:

df = spark.read.json("...", dropFieldIfAllNull=True)

N'okwu a, parquet ga-agụnye akụkụ nke nwere ike ịgụkọ ọnụ.
Ọ bụ ezie na ndị mere nke a na omume ga-amụmụ ọnụ ọchị ebe a. Gịnị kpatara? Ee, n'ihi na o yikarịrị ka a ga-enwe ọnọdụ abụọ ọzọ. Ma ọ bụ atọ. Ma ọ bụ anọ. Nke mbụ, nke ga-emerịrị, bụ na ụdị ọnụọgụ ga-adị iche na faịlụ JSON dị iche iche. Dịka ọmụmaatụ, {intField: 1} na {intField: 1.1}. Ọ bụrụ na achọtara ubi ndị dị otú ahụ n'otu akụkụ, mgbe ahụ, nchịkọta atụmatụ ahụ ga-agụ ihe niile n'ụzọ ziri ezi, na-eduga n'ụdị kachasị mma. Ma ọ bụrụ na ndị dị iche iche, mgbe ahụ otu ga-enwe intField: int, na nke ọzọ ga-enwe intField: okpukpu abụọ.

Enwere ọkọlọtọ na-esonụ iji dozie ọnọdụ a:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Ugbu a, anyị nwere nchekwa ebe enwere akụkụ nke enwere ike ịgụ n'ime otu dataframe na parquet ziri ezi nke ihe ngosi niile. Ee? Mba.

Anyị ga-echeta na anyị debara aha na tebụl na ekwo Ekwo. Ekwo Ekwo adịghị enwe mmetụta na aha ubi, ebe parquet na-enwe mmetụta nke ukwuu. Ya mere, nkebi nwere schemas: field1: int, na Field1: int bụ otu maka Hive, ma ọ bụghị maka Spark. Echefula ịtụgharị aha ubi ka ọ bụrụ obere mkpụrụedemede.

Mgbe nke ahụ gasịrị, ihe niile yiri ka ọ dị mma.

Otú ọ dị, ọ bụghị ihe niile dị mfe. Enwere nsogbu nke abụọ, nke a makwaara nke ọma. Ebe ọ bụ na echekwara nkebi ọhụrụ ọ bụla iche iche, folda nkebi ga-enwe faịlụ ọrụ Spark, dịka ọmụmaatụ, ọkọlọtọ _SUCCESS arụmọrụ. Nke a ga-ebute mperi mgbe ị na-agbalị ịme parquet. Iji zere nke a, ịkwesịrị ịhazi nhazi ahụ iji gbochie Spark ịgbakwunye faịlụ ọrụ na nchekwa:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Ọ dị ka ugbu a kwa ụbọchị, a na-agbakwunye nkebi parquet ọhụrụ na folda ihe ngosi a na-echekwa, ebe data tụgharịrị maka ụbọchị dị. Anyị leziri anya n'ihu na enweghị nkebi nwere esemokwu ụdị data.

Ma, anyị nwere nsogbu nke atọ. Ugbu a n'ozuzu schema na-amaghị, Ọzọkwa, na tebụl na ekwo Ekwo nwere ihe na-ezighị ezi schema, ebe ọ bụla ọhụrụ nkebi yikarịrị ẹkenam a distortion n'ime schema.

Ịkwesịrị ịdebanye aha tebụl ahụ ọzọ. Enwere ike ịme nke a n'ụzọ dị mfe: gụọ parquet nke ụlọ ahịa ahụ ọzọ, were schema wee mepụta DDL dabere na ya, nke iji debanye aha folda dị na Hive dị ka tebụl dị n'èzí, na-emelite atụmatụ nke ihu ụlọ ahịa ahụ.

Anyị nwere nsogbu nke anọ. Mgbe anyị debanyere tebụl ahụ maka oge mbụ, anyị dabere na Spark. Ugbu a, anyị na-eme ya onwe anyị, na anyị kwesịrị icheta na parquet ubi nwere ike na-amalite na odide na-adịghị ekwe ka ekwo Ekwo. Dịka ọmụmaatụ, Spark na-atụpụrụ ahịrị ndị ọ na-enweghị ike ịtụgharị n'ọhịa "corrupt_record". Enweghị ike ịdebanye aha ubi dị otú ahụ na Hive na-enweghị mgbanarị.

N'ịmara nke a, anyị na-enweta atụmatụ:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Usoro ("_corrupt_record", "`_corrupt_record`") + "" + f[1].dochie(":", "`:").dochie("<", """").dochie(",",", ",`").dochie("usoro <`", "n'usoro<") na-eme DDL nchekwa, ya bụ kama:

create table tname (_field1 string, 1field string)

N'iji aha ubi dịka "_field1, 1field", eme DDL nchekwa ebe a na-agbanarị aha ubi: mepụta tebụl `tname` (` _field1` string, `1field` string).

Ajụjụ na-ebilite: otu esi enweta dataframe nke ọma na atụmatụ zuru oke (na koodu PF)? Kedu ka esi enweta pf a? Nke a bụ nsogbu nke ise. Gụgharịa atụmatụ nke akụkụ niile sitere na folda nwere faịlụ parquet nke ihe ngosi ebumnuche? Usoro a bụ nke kachasị nchebe, mana siri ike.

Atụmatụ ahụ adịlarị na Hive. Ị nwere ike nweta atụmatụ ọhụrụ site na ijikọta atụmatụ nke tebụl dum na nkebi ọhụrụ. Ya mere, ị ga-ewere table schema si Hive ma jikọta ya na schema nke ọhụrụ nkebi. Enwere ike ime nke a site n'ịgụ metadata ule sitere na Hive, chekwaa ya na nchekwa nwa oge, yana iji Spark gụọ akụkụ abụọ ahụ ozugbo.

N'ezie, e nwere ihe niile ị chọrọ: mbụ table schema na ekwo Ekwo na ọhụrụ nkebi. Anyị nwekwara data. Ọ na-adị naanị iji nweta atụmatụ ọhụrụ na-ejikọta atụmatụ ụlọ ahịa na mpaghara ọhụrụ sitere na nkebi emepụtara:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Ọzọ, anyị na-emepụta ndebanye aha tebụl DDL, dị ka ọ dị na snippet gara aga.
Ọ bụrụ na agbụ niile na-arụ ọrụ n'ụzọ ziri ezi, ya bụ, e nwere mmalite ibu ibu, na tebụl e kere n'ụzọ ziri ezi na ekwo Ekwo, anyị na-enweta ihe emelitere table schema.

Na ikpeazụ nsogbu bụ na ị na-apụghị nnọọ tinye a nkebi na a hive table, n'ihi na ọ ga-agbajikwa. Ịkwesịrị ịmanye Hive ka ọ dozie nhazi nkebi ya:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Ọrụ dị mfe nke ịgụ JSON na ịmepụta ụlọ ahịa na-adabere na ya na-ebute imeri ọtụtụ nsogbu ndị na-enweghị isi, ngwọta nke ị ga-achọ iche iche. Ma ọ bụ ezie na ngwọta ndị a dị mfe, ọ na-ewe oge dị ukwuu iji chọta ha.

Iji mejuputa owuwu nke ihe ngosi ahụ, aghaghị m:

  • Tinye akụkụ na ihe ngosi ahụ, wepụ faịlụ ọrụ
  • Mekọrịta oghere efu na data isi mmalite Spark pịnyere
  • Tụgharịa ụdị dị mfe na eriri
  • Tụgharịa aha ubi ka ọ bụrụ obere mkpụrụedemede
  • Bulite data dị iche na ndebanye aha tebụl na Hive (ọgbọ DDL)
  • Echefula ịgbanarị aha ubi nwere ike ekwekọghị na Hive
  • Mụta otu esi emelite ndebanye aha tebụl na Hive

N'ịchịkọta, anyị na-achọpụta na mkpebi iji wuo windo ụlọ ahịa jupụtara n'ọtụtụ ọnyà. Ya mere, n'ihe banyere ihe isi ike na mmejuputa iwu, ọ ka mma ịkpọtụrụ onye mmekọ nwere ahụmahụ nwere ọkachamara na-aga nke ọma.

Daalụ maka ịgụ akụkọ a, anyị nwere olileanya na ozi ahụ bara uru.

isi: www.habr.com

Tinye a comment