Spark schemaEvolution ekusebenzeni

Bafundi abathandekayo, usuku oluhle!

Kule nqaku, umcebisi ohamba phambili we-Neoflex's Big Data Solutions indawo yoshishino ichaza ngokweenkcukacha iinketho zokwakha imiboniso yesakhiwo esiguquguqukayo usebenzisa i-Apache Spark.

Njengenxalenye yeprojekthi yokuhlalutya idatha, umsebenzi wokwakha iindawo zokugcina iimpahla ezisekelwe kwidatha eyakhiwe ngokukhululekileyo ihlala ivela.

Ngokuqhelekileyo ezi zilogi, okanye iimpendulo ezivela kwiinkqubo ezahlukeneyo, zigcinwe njenge-JSON okanye i-XML. Idatha ilayishwe kwi-Hadoop, ngoko kufuneka wakhe i-storefront kubo. Sinokuququzelela ukufikelela kumboniso owenziweyo, umzekelo, nge-Impala.

Kule meko, i-schema sevenkile ekujoliswe kuyo ayaziwa ngaphambili. Ngaphezu koko, inkqubo nayo ayinakuzotywa kwangaphambili, njengoko ixhomekeke kwidatha, kwaye sijongana nale datha yakhiwe ngokukhululekileyo.

Umzekelo, namhlanje kufakwe le mpendulo ilandelayo:

{source: "app1", error_code: ""}

kwaye ngomso kwinkqubo efanayo iza le mpendulo ilandelayo:

{source: "app1", error_code: "error", description: "Network error"}

Ngenxa yoko, enye intsimi kufuneka yongezwe kwi-showcase - inkcazo, kwaye akukho mntu uyazi ukuba iya kuza okanye ayiyi kufika.

Umsebenzi wokudala indawo egcina ivenkile kwidatha enjalo ngumgangatho omhle, kwaye i-Spark inenani lezixhobo zoku. Ukwahlulahlula idatha yomthombo, kukho inkxaso yazo zombini i-JSON kunye ne-XML, kunye ne-schema engaziwa ngaphambili, inkxaso ye-schemaEvolution inikezelwe.

Ekuboneni kokuqala, isisombululo sibonakala silula. Kufuneka uthathe ifolda kunye ne-JSON kwaye uyifunde kwi-dataframe. I-Spark iya kudala i-schema, iguqule idatha efakwe kwindlwane ibe yizakhiwo. Ukuqhubela phambili, yonke into idinga ukugcinwa kwi-parquet, ekwaxhaswa kwi-Impala, ngokubhalisa i-storefront kwi-metastore ye-Hive.

Yonke into ibonakala ilula.

Nangona kunjalo, akucaci kwimizekelo emifutshane kumaxwebhu ukuba wenzeni ngenani leengxaki ekusebenzeni.

Amaxwebhu achaza indlela yokungenzi ivenkile, kodwa ukufunda i-JSON okanye i-XML kwi-dataframe.

Oko kukuthi, ibonisa ngokulula indlela yokufunda kunye nokwahlula iJSON:

df = spark.read.json(path...)

Oku kwanele ukwenza idatha ifumaneke kwi-Spark.

Ngokwenza, iskripthi sinzima kakhulu kunokufunda iifayile ze-JSON kwifolda kunye nokudala i-dataframe. Imeko ibonakala ngathi: sele sele kukho indawo yokugcina impahla, idatha entsha ifika yonke imihla, kufuneka ifakwe kwi-storefront, ingalibali ukuba iskimu sinokuhluka.

Iskimu esiqhelekileyo sokwakha umboniso simi ngolu hlobo lulandelayo:

Isinyathelo 1. Idatha ilayishwe kwiHadoop ngokulayishwa kwakhona kwemihla ngemihla kwaye yongezwa kwisahlulelo esitsha. Kuvela ifolda enedatha yokuqala eyahlulwe ngemini.

Isinyathelo 2. Ngexesha lomthwalo wokuqala, le folda ifundwa kwaye yahlulwa nguSpark. Uluhlu lwedatha oluphumayo lugcinwa kwifomathi ecazululekayo, umzekelo, kwiparquet, enokuthi emva koko ingeniswe kwi-Impala. Oku kudala umboniso ojoliswe kuyo kunye nayo yonke idatha eqokelelwe ukuza kuthi ga ngoku.

Isinyathelo 3. Ukhuphelo lwenziwe oluza kuhlaziya indawo yangaphambili yevenkile yonke imihla.
Kukho umbuzo wokulayisha okunyukayo, imfuneko yokwahlula umboniso, kunye nombuzo wokugcinwa kweskimu esiqhelekileyo somboniso.

Makhe sithathe umzekelo. Masithi inyathelo lokuqala lokwakha indawo yokugcina liphunyeziwe, kwaye iifayile ze-JSON zilayishwe kwifolda.

Ukudala i-dataframe kubo, emva koko ukuyigcina njengomboniso, akuyongxaki. Eli linyathelo lokuqala elinokufumaneka ngokulula kuxwebhu lweSpark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Yonke into ibonakala ilungile.

Sifunde kwaye sahlulahlula i-JSON, emva koko sigcina i-dataframe njengeparquet, siyibhalisa kwiHive ngayo nayiphi na indlela efanelekileyo:

df.write.format(β€œparquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Sifumana ifestile.

Kodwa, ngosuku olulandelayo, idatha entsha evela kumthombo yongezwa. Sinefolda ene-JSON, kunye nomboniso owenziwe kule folda. Emva kokulayisha ibhetshi elandelayo yedatha kumthombo, i-data mart ilahlekile ngosuku olunye lwedatha.

Isisombululo esinengqiqo iya kuba kukwahlulahlula indawo yevenkile emini, nto leyo eya kuvumela ukongeza isahlulelo esitsha yonke imihla elandelayo. Umatshini wale nto waziwa kakuhle, iSpark ikuvumela ukuba ubhale izahlulo ngokwahlukeneyo.

Okokuqala, senza umthwalo wokuqala, sigcina idatha njengoko kuchazwe ngasentla, songeza kuphela ukwahlula. Esi senzo sibizwa ngokuba kukuqalisa kwevenkile kwaye senziwa kube kanye kuphela:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Ngosuku olulandelayo, silayisha kuphela isahlulelo esitsha:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Ekuphela kwento eseleyo kukubhalisa kwakhona kwi-Hive ukuhlaziya ischema.
Noko ke, kulapho kuvela khona iingxaki.

Ingxaki yokuqala. Kungekudala okanye kamva, i-parquet ebangelwayo ayiyi kufundeka. Oku kungenxa yendlela i-parquet kunye ne-JSON esondela ngayo kumasimi angenanto ngokwahlukileyo.

Makhe siqwalasele imeko eqhelekileyo. Umzekelo, izolo uJSON uyafika:

Π”Π΅Π½ΡŒ 1: {"a": {"b": 1}},

kwaye namhlanje iJSON efanayo ijongeka ngolu hlobo:

Π”Π΅Π½ΡŒ 2: {"a": null}

Masithi sinezahlulo ezimbini ezahlukeneyo, ngalinye linomgca omnye.
Xa sifunda yonke idatha yomthombo, i-Spark iya kukwazi ukumisela uhlobo, kwaye uya kuqonda ukuba "a" yintsimi yohlobo "lwesakhiwo", kunye nentsimi ene-nested "b" yohlobo lwe-INT. Kodwa, ukuba isahlulelo ngasinye sigcinwe ngokwahlukileyo, ngoko sifumana i-parquet enezicwangciso zezahlulo ezingahambelaniyo:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Le meko yaziwa kakuhle, ngoko ke ukhetho longezwe ngokukodwa- xa ucazulula idatha yomthombo, susa iindawo ezingenanto:

df = spark.read.json("...", dropFieldIfAllNull=True)

Kule meko, i-parquet iya kuba nezahlulo ezinokufundwa kunye.
Nangona abo baye benza oku ngokuziqhelanisa baya kuncuma ngokukrakra apha. Ngoba? Ewe, kuba kusenokwenzeka ukuba kukho iimeko ezimbini ezingakumbi. Okanye ezintathu. Okanye ezine. Eyokuqala, ephantse yenzeke ngokuqinisekileyo, kukuba iindidi zamanani ziya kujongeka zahlukile kwiifayile ze-JSON ezahlukeneyo. Umzekelo, {intField: 1} kunye {intField: 1.1}. Ukuba loo mimandla ifunyenwe kwisahlulo esinye, ngoko ukudibanisa kwe-schema kuya kufunda yonke into ngokuchanekileyo, ekhokelela kuhlobo oluchanekileyo. Kodwa ukuba kwezinye ezahlukeneyo, enye iya kuba ne-IntField: int, kwaye enye iya kuba ne-InField: kabini.

Kukho le flegi ilandelayo yokusingatha le meko:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Ngoku sinefolda apho kukho izahlulo ezinokuthi zifundwe kwi-dataframe enye kunye ne-parquet esebenzayo ye-showcase yonke. Ewe? Hayi.

Kufuneka sikhumbule ukuba sibhalise itafile eHive. I-Hive ayinabungozi kumagama entsimi, ngelixa i-parquet inovakalelo. Ke ngoko, izahlulelo ezine-schemas: field1: int, kunye neField1: int ziyafana kwiHive, kodwa hayi kwiSpark. Ungalibali ukuguqula amagama entsimi abe ngonobumba omncinci.

Emva koko, yonke into ibonakala ilungile.

Nangona kunjalo, ayizizo zonke ezilula. Kukho eyesibini, nayo eyaziwayo ingxaki. Ekubeni isahlulelo esitsha ngasinye sigcinwa ngokwahlukileyo, ifolda yokwahlula iya kuba neefayile zenkonzo yeSpark, umzekelo, iflegi yempumelelo yokusebenza kwe_SUCCESS. Oku kuya kubangela impazamo xa uzama i-parquet. Ukunqanda oku, kufuneka uqwalasele uqwalaselo ukukhusela iSpark ekongezeni iifayile zenkonzo kwifolda:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Kubonakala ngathi ngoku yonke imihla isahlulelo esitsha separquet songezwa kwifolda yokubonisa ekujoliswe kuyo, apho idatha ecazululiweyo yosuku ikhona. Sithathele ingqalelo kwangaphambili ukuba akukho kwahlulo olunodidi lwedatha.

Kodwa, sinengxaki yesithathu. Ngoku i-schema ngokubanzi ayaziwa, ngaphezu koko, itafile e-Hive ine-schema engalunganga, kuba isahlulelo esitsha ngasinye sinokuthi sazisa ukugqwetha kwi-schema.

Kufuneka uphinde ubhalise itafile. Oku kunokwenziwa ngokulula: funda i-parquet yendawo engaphambili yevenkile kwakhona, thatha i-schema kwaye wenze i-DDL esekelwe kuyo, apho ubhalisa kwakhona ifolda kwi-Hive njengetafile yangaphandle, ukuhlaziya i-schema yendawo yevenkile ekujoliswe kuyo.

Sinengxaki yesine. Xa sibhalisa itafile okokuqala, sathembela kuSpark. Ngoku sizenza ngokwethu, kwaye kufuneka sikhumbule ukuba amasimi eparquet angaqala ngabalinganiswa abangavumelekanga kwiHive. Umzekelo, i-Spark ikhupha imigca ebengenakuyicazulula kwindawo ethi "corrupt_record". Intsimi enjalo ayinakubhaliswa kwi-Hive ngaphandle kokubaleka.

Ukwazi oku, sifumana iskimu:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Ikhowudi ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").buyisela("uluhlu<`", "uluhlu<") yenza i-DDL ekhuselekileyo, o.k. endaweni yokuba:

create table tname (_field1 string, 1field string)

Ngamagama entsimi afana ne- "_field1, 1field", i-DDL ekhuselekileyo yenziwa apho amagama angaphandle abalekelwe khona: yenza itheyibhile `tname` (`_indawo1` umtya, `umhlaba omnye`).

Umbuzo uvela: indlela yokufumana ngokufanelekileyo i-dataframe kunye ne-schema epheleleyo (kwikhowudi ye-pf)? Uyifumana njani le pf? Le yingxaki yesihlanu. Phinda ufunde iskim sazo zonke izahlulelo ukusuka kwisiqulathi seefayili ezineefayile zeparquet zomboniso ekujoliswe kuwo? Le ndlela yeyona ikhuselekileyo, kodwa inzima.

Isicwangciso sele sikuHive. Unokufumana i-schema esitsha ngokudibanisa i-schema yetafile yonke kunye nesahlulelo esitsha. Ke kufuneka uthathe i-schema yetafile kwi-Hive kwaye uyidibanise kunye ne-schema yesahlulelo esitsha. Oku kunokwenziwa ngokufunda imetadata yovavanyo kwiHive, ukuyigcina kwifolda yethutyana, kunye nokusebenzisa iSpark ukufunda zombini izahlulo ngexesha elinye.

Ngapha koko, kukho yonke into oyifunayo: i-schema yetafile yokuqala kwi-Hive kunye nesahlulelo esitsha. Sikwanayo nedatha. Kuhlala kuphela ukufumana ischema esitsha esidibanisa i-schema yangaphambili yevenkile kunye nemimandla emitsha ukusuka kwisahlulelo esiyiliweyo:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Emva koko, senza itafile yokubhalisa i-DDL, njengakwi-snippet yangaphambili.
Ukuba ikhonkco lonke lisebenza ngokuchanekileyo, oko kukuthi, kwakukho umthwalo wokuqalisa, kwaye itafile yenziwe ngokuchanekileyo kwi-Hive, ngoko sifumana i-schema yetafile ehlaziyiweyo.

Kwaye ingxaki yokugqibela kukuba awukwazi nje ukongeza isahlulelo kwitafile yeHive, kuba iya kwaphulwa. Kufuneka unyanzele iHive ukulungisa isahlulelo sayo:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Umsebenzi olula wokufunda i-JSON kunye nokudala i-storefront esekelwe kuyo iphumela ekoyiseni inani lobunzima obucacileyo, izisombululo omele uzijonge ngokwahlukileyo. Kwaye nangona ezi zisombululo zilula, kuthatha ixesha elininzi ukuzifumana.

Ukuphumeza ukwakhiwa komboniso, kwakufuneka:

  • Yongeza izahlulo kumboniso, ukulahla iifayile zenkonzo
  • Jongana nemihlaba engenanto kwidatha yomthombo echwethezwe nguSpark
  • Phosa iintlobo ezilula kumtya
  • Guqula amagama endle abe ngonobumba abancinci
  • Ukwahlula ukulayishwa kwedatha kunye nokubhaliswa kwetafile kwiHive (isizukulwana seDDL)
  • Ungalibali ukubaleka amagama entsimi anokuthi angahambelani neHive
  • Funda indlela yokuhlaziya ukubhaliswa kwetafile kwiHive

Ukushwankathela, siqaphela ukuba isigqibo sokwakha iifestile zeevenkile sigcwele imigibe emininzi. Ngoko ke, xa kukho ubunzima ekuphunyezweni, kungcono ukuqhagamshelana neqabane elinamava kunye nobuchule obuphumelelayo.

Enkosi ngokufunda eli nqaku, sinethemba lokuba uya kulufumana ulwazi oluluncedo.

umthombo: www.habr.com

Yongeza izimvo