Spark schemaEvolution di pratîkê de

Xwendevanên delal, roj baş!

Di vê gotarê de, şêwirmendê pêşeng ê qada karsaziyê ya Çareseriyên Daneyên Mezin ên Neoflex bi hûrgulî vebijarkên ji bo avakirina pêşangehên strukturên guhêrbar bi karanîna Apache Spark rave dike.

Wekî beşek ji projeyek analîzkirina daneyê, pir caran peywira avakirina pêşangehên li ser bingeha daneyên birêkûpêk çêdibe.

Bi gelemperî ev têketin, an bersivên ji pergalên cihêreng in, ku wekî JSON an XML têne tomar kirin. Daneyên li Hadoop-ê têne barkirin, wê hingê hûn hewce ne ku ji wan pêşangehek çêbikin. Em dikarin gihîştina pêşangeha hatî afirandin, mînakî, bi riya Impala ve organîze bikin.

Di vê rewşê de, şemaya firoşgeha mebest ji berê de nayê zanîn. Digel vê yekê, nexşe jî nikare pêşwext were xêz kirin, ji ber ku ew bi daneyan ve girêdayî ye, û em bi van daneya pir strukturkirî re mijûl dibin.

Mînakî, îro bersiva jêrîn tê tomar kirin:

{source: "app1", error_code: ""}

û sibê ji heman pergalê ev bersiv tê:

{source: "app1", error_code: "error", description: "Network error"}

Wekî encamek, pêdivî ye ku zeviyek din li pêşangehê were zêdekirin - şirove, û kes nizane ka ew ê were an na.

Karê afirandina pêşangehek li ser daneyên wusa pir standard e, û Spark ji bo vê yekê gelek amûr hene. Ji bo parkirina daneya çavkaniyê, hem ji bo JSON û hem jî ji XML re piştgirî heye, û ji bo şemayek berê nenas, piştgirî ji bo schemaEvolution tê peyda kirin.

Di nihêrîna pêşîn de, çareserî hêsan xuya dike. Hûn hewce ne ku peldankek bi JSON re bigirin û wê di çarçoveyek daneyê de bixwînin. Spark dê şemayek biafirîne, daneyên hêlînê veguherîne avahiyan. Wekî din, pêdivî ye ku her tişt di parketê de, ku di Impala de jî tê piştgirî kirin, bi tomarkirina pêşangeha li metastora Hive were hilanîn.

Her tişt hêsan xuya dike.

Lêbelê, ji mînakên kurt ên di belgeyê de ne diyar e ku di pratîkê de bi çend pirsgirêkan re çi bikin.

Belgekirin nêzîkatiyek ne ji bo afirandina pêşangehek, lê ji bo xwendina JSON an XML di çarçoveyek daneyê de vedibêje.

Ango, ew bi hêsanî nîşan dide ka meriv çawa JSON dixwîne û pars dike:

df = spark.read.json(path...)

Ev bes e ku daneyan ji Spark re peyda bike.

Di pratîkê de, skrîpt ji xwendina pelên JSON ji peldankek û afirandina çarçoveyek daneyê pir tevlihevtir e. Rewş bi vî rengî xuya dike: jixwe pêşangehek diyar heye, her roj daneyên nû tê de, pêdivî ye ku ew li pêşangehê werin zêdekirin, ji bîr nekin ku dibe ku nexşe cûda be.

Plana asayî ya avakirina pêşangehek wiha ye:

Step 1. Daneyên bi nûvekirina rojane ya paşîn di Hadoop de têne barkirin û li dabeşek nû tê zêdekirin. Ew peldankek bi daneyên destpêkê bi roj veqetandî derdikeve holê.

Step 2. Di dema barkirina destpêkê de, ev peldank ji hêla Spark ve tê xwendin û parkirin. Çarçoveya daneyê ya ku di encamê de di formek parsable de tê hilanîn, mînakî, di parketê de, ku paşê dikare li Impala were şandin. Ev bi hemî daneyên ku heya vê gavê berhev kirine pêşangehek armancê diafirîne.

Step 3. Dakêşînek tête çêkirin ku dê her roj pêşangeha nûve bike.
Pirsgirêka barkirina zêde, hewcedariya dabeşkirina pêşangehê, û pirsa domandina nexşeya giştî ya pêşangehê heye.

Werin em mînakek bidin. Ka em bibêjin ku gava yekem a avakirina depoyek hate bicîh kirin, û pelên JSON li peldankek têne barkirin.

Afirandina çarçoveyek daneyê ji wan, dûv re hilanîna wê wekî pêşangeh, ne pirsgirêk e. Ev gava yekem e ku bi hêsanî di belgeya Spark de tê dîtin:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Her tişt baş xuya dike.

Me JSON xwend û pars kir, dûv re em dataframe wekî parketek hilînin, wê bi rengek hêsan li Hive tomar bikin:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Em pencereyek digirin.

Lê, roja din, daneyên nû ji çavkaniyê hatin zêdekirin. Peldankek me bi JSON heye, û pêşangehek ji vê peldankê hatî afirandin. Piştî barkirina berhevoka paşîn a daneyê ji çavkaniyê, daneya mart daneya rojekê winda dike.

Çareseriya mentiqî dê dabeşkirina pêşangehê bi roj be, ku dê rê bide ku her roja din dabeşek nû lê zêde bike. Mekanîzmaya vê yekê jî baş tê zanîn, Spark dihêle hûn dabeşan ji hev cuda binivîsin.

Pêşîn, em barkirinek destpêkê dikin, wekî ku li jor hatî destnîşan kirin daneyan hilînin, tenê dabeşkirinê lê zêde dikin. Ji vê kiryarê re destpêkkirina storefront tê gotin û tenê carekê tê kirin:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Roja din, em tenê dabeşek nû bar dikin:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Tiştê ku dimîne ev e ku hûn ji nû ve li Hive qeyd bikin da ku şema nûve bikin.
Lêbelê, li vir pirsgirêk derdikevin.

Pirsgirêka yekem. Zû an dereng, parketa encam dê neyê xwendin. Ev ji ber vê yekê ye ku parket û JSON bi rengek cûda nêzikî zeviyên vala dibin.

Ka em rewşek tîpîk bifikirin. Mînakî, duh JSON tê:

День 1: {"a": {"b": 1}},

û îro heman JSON bi vî rengî xuya dike:

День 2: {"a": null}

Ka em bibêjin du beşên me yên cuda hene, her yek bi yek rêzê.
Dema ku em hemî daneyên çavkaniyê bixwînin, Spark dê bikaribe celebê diyar bike, û dê fêm bike ku "a" qadek ji celebê "avahî" ye, bi qada hêlîn "b" ya celebê INT. Lê, heke her parçeyek ji hev veqetandî hate hilanîn, wê hingê em parketek bi nexşeyên dabeşkirinê yên lihevhatî distînin:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Ev rewş baş tê zanîn, ji ber vê yekê vebijarkek bi taybetî hatî zêdekirin - dema ku daneyên çavkaniyê pars dikin, qadên vala derxînin:

df = spark.read.json("...", dropFieldIfAllNull=True)

Di vê rewşê de, parket dê ji dabeşên ku bi hev re têne xwendin pêk tê.
Her çend kesên ku di pratîkê de ev yek kirine jî dê li vir bi ken bi ken. Çima? Erê, ji ber ku dibe ku du rewşên din jî hebin. An sê. An jî çar. Ya yekem, ku dê hema bê guman çêbibe, ev e ku celebên hejmarî dê di pelên cûda yên JSON de cûda xuya bikin. Mînakî, {intField: 1} û {intField: 1.1}. Ger zeviyên weha di yek dabeşkirinê de werin dîtin, wê hingê yekbûna şema dê her tiştî rast bixwîne, û bibe sedema celebê herî rast. Lê heke di yên cûda de, wê hingê yek dê bibe intField: int, û ya din dê bibe xwediyê intField: ducar.

Ji bo birêvebirina vê rewşê ala jêrîn heye:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Naha peldankek me heye ku tê de beş hene ku dikarin di çarçoveyek daneya yekane û parketek derbasdar a tevahiya pêşangehê de werin xwendin. Erê? Na.

Divê em ji bîr mekin ku me tablo li Hîvê tomar kir. Hive di navên zeviyê de ne hesas e, dema ku parket hesas e. Ji ber vê yekê, dabeşên bi şema: field1: int, û Field1: int ji bo Hive yek in, lê ne ji bo Spark. Ji bîr nekin ku navên zeviyan bi tîpên piçûk veguherînin.

Piştî vê yekê, xuya dike ku her tişt baş e.

Lêbelê, hemî ne ewqas hêsan e. Pirsgirêkek duyemîn jî heye ku tê zanîn. Ji ber ku her dabeşek nû ji hev cuda tê hilanîn, peldanka dabeşkirinê dê pelên karûbarê Spark-ê hebe, mînakî, ala serkeftina operasyona _SUCCESS. Ev ê di dema hewldana parketkirinê de bibe sedema xeletiyekê. Ji bo ku hûn ji vê yekê dûr nekevin, hûn hewce ne ku veavakirinê mîheng bikin da ku nehêle Spark pelên karûbarê li peldankê zêde bike:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Wusa dixuye ku naha her roj dabeşek nû ya parketê li peldanka pêşangeha armancê, ku daneyên parkirî yên rojê lê hene, tê zêde kirin. Me di pêş de hişyar kir ku tu partîsiyonên bi pevçûnek celebê daneyê tune.

Lê, pirsgirêka me ya sêyemîn heye. Naha şemaya giştî nayê zanîn, ji bilî vê, tabloya li Hive xwedan şemayek xelet e, ji ber ku her dabeşek nû bi îhtîmalek mezin xeletiyek xistiye nav şemayê.

Pêdivî ye ku hûn tabloyê ji nû ve tomar bikin. Ev dikare bi hêsanî were kirin: dîsa parketa pêşangehê bixwînin, şemayê bigirin û li ser bingeha wê DDL biafirînin, ku pê re peldanka li Hive wekî tabloyek derveyî ji nû ve were tomar kirin, şemaya pêşangeha armancê nûve bike.

Pirsgirêka me ya çaremîn heye. Dema ku me tabloya yekem car tomar kir, me pişta xwe da Spark. Naha em bi xwe wiya dikin, û divê em ji bîr mekin ku zeviyên parketê dikarin bi karakterên ku ji bo Hive ne destûr in dest pê bikin. Mînakî, Spark xêzên ku nekarî di qada "corrupt_record" de parsek bike derdixe. Zeviyek weha bêyî ku jê xilas bibe li Hive nayê tomar kirin.

Bi zanîna vê yekê, em nexşeyê digirin:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

code ("_corrupt_record", "`_corrupt_record") + "" + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") DDL ewledar dike, ango li şûna:

create table tname (_field1 string, 1field string)

Bi navên zeviyê yên mîna "_field1, 1field", DDL-ya ewle li cîhê ku navên zeviyê jê direvin tê çêkirin: tabloya `tname` biafirînin (rêzika `_field1`, rêzika `1field`).

Pirs derdikeve holê: meriv çawa bi rengek bêkêmasî (di koda pf-ê de) çarçoveyek daneyê rast bi dest dixe? Meriv çawa vê pf-ê digire? Pirsgirêka pêncemîn ev e. Pîlana hemî dabeşan ji peldanka bi pelên parket ên pêşangeha armancê ji nû ve bixwînin? Ev rêbaza herî ewle ye, lê zehmet e.

Schema jixwe li Hive ye. Hûn dikarin bi berhevkirina şemaya tevahiya tabloyê û dabeşkirina nû şemayek nû bistînin. Ji ber vê yekê hûn hewce ne ku şema sifrê ji Hive bistînin û wê bi şemaya dabeşkirina nû re bikin yek. Ev dikare bi xwendina metadata testê ji Hive, hilanîna wê li peldankek demkî, û xwendina her du beşan bi yekcarî bi Spark re were kirin.

Di rastiyê de, her tiştê ku hûn hewce ne hene: şema tabloya orjînal li Hive û dabeşkirina nû. Daneyên me jî hene. Tenê dimîne ku meriv şemayek nû ya ku şema pêşangeha dikanê û qadên nû ji dabeşkirina hatî afirandin bi hev re bigire:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Dûv re, em DDL-ya qeydkirina tabloyê diafirînin, wekî di perçeya berê de.
Ger zincîra tevahî rast bixebite, ango, barek destpêkek hebû, û tablo li Hive rast hate afirandin, wê hingê em şemek tabloyek nûvekirî digirin.

Û pirsgirêka dawî ev e ku hûn nekarin tenê dabeşek li tabloyek Hive zêde bikin, ji ber ku ew ê têk bibe. Hûn hewce ne ku Hive zorê bikin ku strukturê dabeşkirina xwe rast bike:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Karê hêsan a xwendina JSON û afirandina pêşangehek ku li ser bingeha wê ye, dibe sedema derbaskirina hejmarek dijwariyên nepenî, çareseriyên ku divê hûn ji hev cuda lê bigerin. Û her çend ev çareserî hêsan in jî, ji bo dîtina wan gelek dem digire.

Ji bo pêkanîna avakirina pêşangehê, ez neçar bûm:

  • Parçeyan li pêşangehê zêde bikin, ji pelên karûbarê xilas bibin
  • Di daneyên çavkaniyê yên ku Spark nivîsandine de bi qadên vala re mijûl bibin
  • Tîpên sade li xêzekê biavêjin
  • Navên zeviyan bi tîpên piçûk veguherînin
  • Veqetandina daneyan û tomarkirina tabloyê li Hive (hilberîna DDL)
  • Ji bîr nekin ku ji navên zeviyê ku dibe ku bi Hive re hevaheng bin birevin
  • Fêr bibin ka meriv çawa qeydkirina tabloyê li Hive nûve dike

Bi kurtasî, em destnîşan dikin ku biryara çêkirina pencereyên dikanan bi gelek xeletiyan re tije ye. Ji ber vê yekê, di rewşek dijwar de di pêkanînê de, çêtir e ku hûn bi hevalbendek bi ezmûn û pisporiya serketî re têkilî daynin.

Spas ji bo xwendina vê gotarê, em hêvî dikin ku hûn agahdariya kêrhatî bibînin.

Source: www.habr.com

Add a comment