I-Spark schemaEvolution ekusebenzeni

Bafundi abathandekayo, usuku oluhle!

Kulesi sihloko, uchwepheshe oholayo wendawo yebhizinisi ye-Neoflex's Big Data Solutions uchaza ngokuningiliziwe izinketho zokwakha imibukiso yesakhiwo esiguquguqukayo kusetshenziswa i-Apache Spark.

Njengengxenye yephrojekthi yokuhlaziya idatha, umsebenzi wokwakha izingaphambili zesitolo ngokusekelwe kudatha ehlelwe ngokuxekethile ngokuvamile uyavela.

Ngokuvamile lawa amalogi, noma izimpendulo ezivela kumasistimu ahlukahlukene, alondolozwa njenge-JSON noma i-XML. Idatha ilayishwa ku-Hadoop, bese udinga ukwakha isitolo sangaphambili kusuka kubo. Singahlela ukufinyelela embukisweni odaliwe, isibonelo, nge-Impala.

Kulokhu, i-schema sangaphambili kwesitolo esiqondiwe awaziwa ngaphambili. Ngaphezu kwalokho, uhlelo nalo alukwazi ukudwetshwa kusenesikhathi, njengoba luncike kudatha, futhi sibhekene nale datha ehlelwe ngokuxekethile.

Isibonelo, namuhla kufakwe impendulo elandelayo:

{source: "app1", error_code: ""}

futhi kusasa ohlelweni olufanayo kuza impendulo elandelayo:

{source: "app1", error_code: "error", description: "Network error"}

Ngenxa yalokho, enye inkambu kufanele yengezwe embukisweni - incazelo, futhi akekho owaziyo ukuthi izofika noma cha.

Umsebenzi wokudala indawo yesitolo kudatha enjalo usezingeni elihle, futhi i-Spark inamathuluzi amaningi alokhu. Ukuze kuncozululwe idatha yomthombo, kukhona ukusekelwa kwakho kokubili i-JSON ne-XML, futhi ku-schema esingaziwa ngaphambilini, usekelo lwe-schemaEvolution lunikeziwe.

Uma uthi nhlΓ‘, ikhambi libukeka lilula. Udinga ukuthatha ifolda ene-JSON futhi uyifunde kuhlaka lwedatha. I-Spark izodala i-schema, iguqule idatha efakwe esidlekeni ibe izakhiwo. Ngaphezu kwalokho, yonke into idinga ukugcinwa ku-parquet, ebuye isekelwe e-Impala, ngokubhalisa indawo yesitolo ku-metastore yaseHive.

Konke kubonakala kulula.

Nokho, akucaci ezibonelweni ezimfushane ezisencwadini ukuthi yini okufanele yenziwe ngenani lezinkinga ekusebenzeni.

Amadokhumenti achaza indlela yokungadali indawo yangaphambili yesitolo, kodwa yokufunda i-JSON noma i-XML kuhlaka lwedatha.

Okungukuthi, imane ibonise ukuthi ifundwa kanjani futhi iyihlukanise i-JSON:

df = spark.read.json(path...)

Lokhu kwanele ukwenza idatha itholakale ku-Spark.

Empeleni, iskripthi siyinkimbinkimbi kakhulu kunokufunda amafayela e-JSON kufolda bese udala uhlaka lwedatha. Isimo sibukeka kanje: sekuvele kunesitolo sangaphambili esithile, idatha entsha ifika nsuku zonke, idinga ukungezwa esitolo sangaphambili, ungakhohlwa ukuthi uhlelo lungase luhluke.

Uhlelo olujwayelekile lokwakha umbukiso lumi kanje:

Isinyathelo 1 Idatha ilayishwa ku-Hadoop ngokulayishwa kabusha kwansuku zonke okulandelayo futhi yengezwe esabelweni esisha. Kuvela ifolda enedatha yokuqala ehlukaniswa ngosuku.

Isinyathelo 2 Ngesikhathi sokulayisha kokuqala, le folda ifundwa futhi ihlukaniswe yi-Spark. Uhlaka lwedatha oluwumphumela lulondolozwa ngefomethi ehlukanisekayo, isibonelo, ku-parquet, engangeniswa ku-Impala. Lokhu kudala umbukiso oqondiwe onayo yonke idatha eqoqwe kuze kube manje.

Isinyathelo 3 Kudalwe ukulanda okuzobuyekeza isitolo sangaphambili nsuku zonke.
Kunombuzo wokulayisha okukhuphukayo, isidingo sokuhlukanisa umbukiso, kanye nombuzo wokunakekela isikimu esijwayelekile sombukiso.

Ake sithathe isibonelo. Ake sithi isinyathelo sokuqala sokwakha inqolobane sesiqalisiwe, futhi amafayela e-JSON alayishwa kufolda.

Ukudala uhlaka lwedatha kubo, bese ulugcina njengombukiso, akuyona inkinga. Lesi isinyathelo sokuqala esingatholakala kalula emibhalweni ye-Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Konke kubonakala kuhamba kahle.

Sifunda futhi sayihlukanisa i-JSON, bese silondoloza i-dataframe njenge-parquet, siyibhalisa ku-Hive nganoma iyiphi indlela elula:

df.write.format(β€œparquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Sithola iwindi.

Kodwa, ngosuku olulandelayo, idatha entsha evela emthonjeni yengezwa. Sinefolda ene-JSON, kanye nombukiso odalwe kusukela kule folda. Ngemva kokulayisha iqoqo elilandelayo ledatha kusukela kumthombo, i-data mart ayinayo idatha yosuku olulodwa.

Isixazululo esinengqondo kungaba ukuhlukanisa isitolo sangaphambili emini, okuzovumela ukwengeza ukwahlukanisa okusha njalo ngosuku olulandelayo. Indlela yalokhu yaziwa kahle, i-Spark ikuvumela ukuthi ubhale izingxenye ngokuhlukana.

Okokuqala, senza umthwalo wokuqala, silondoloza idatha njengoba kuchazwe ngenhla, sengeza ukwahlukanisa kuphela. Lesi senzo sibizwa ngokuthi ukuqalisa kwesitolo futhi senziwa kanye kuphela:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Ngosuku olulandelayo, silayisha kuphela ukwahlukanisa okusha:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Okusele ukubhalisa kabusha ku-Hive ukuze ubuyekeze i-schema.
Nokho, yilapho izinkinga ziphakama khona.

Inkinga yokuqala. Ngokushesha noma kamuva, i-parquet ewumphumela ngeke ifundeke. Lokhu kungenxa yokuthi i-parquet ne-JSON ziphatha kanjani izinkambu ezingenalutho ngendlela ehlukile.

Ake sicabangele isimo esivamile. Isibonelo, izolo u-JSON uyafika:

Π”Π΅Π½ΡŒ 1: {"a": {"b": 1}},

futhi namuhla i-JSON efanayo ibukeka kanje:

Π”Π΅Π½ΡŒ 2: {"a": null}

Ake sithi sinezingxenye ezimbili ezihlukene, ngasinye sinomugqa owodwa.
Uma sifunda yonke idatha yomthombo, i-Spark izokwazi ukunquma uhlobo, futhi izoqonda ukuthi u-"a" uyinkambu yohlobo "lwesakhiwo", enenkambu evalelwe "b" yohlobo lwe-INT. Kodwa, uma ukwahlukanisa ngakunye kugcinwe ngokwehlukana, khona-ke sithola i-parquet enezinhlelo zokuhlukanisa ezingahambelani:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Lesi simo saziwa kahle, ngakho-ke inketho yengezwe ngokukhethekile - lapho uhlaziya idatha yomthombo, susa izinkambu ezingenalutho:

df = spark.read.json("...", dropFieldIfAllNull=True)

Kulokhu, i-parquet izoba nokuhlukaniswa okungafundwa ndawonye.
Yize labo abenze lokhu ngokuzijwayeza bazomamatheka kabuhlungu lapha. Kungani? Yebo, ngoba kungenzeka kube nezimo ezimbili ezengeziwe. Noma ezintathu. Noma ezine. Esokuqala, esicishe senzeke, ukuthi izinhlobo zezinombolo zizobukeka zihlukile kumafayela e-JSON ahlukene. Isibonelo, i-{intField: 1} kanye ne-{intField: 1.1}. Uma izinkambu ezinjalo zitholakala ku-partition eyodwa, khona-ke ukuhlanganisa kwe-schema kuzofunda yonke into ngendlela efanele, okuholela ohlotsheni olunembe kakhulu. Kodwa uma kwezihlukene, enye izoba ne-IntField: int, kanti enye izoba ne-IntField: kabili.

Kukhona ifulegi elilandelayo lokusingatha lesi simo:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Manje sesinefolda lapho kukhona ukuhlukaniswa okungafundwa kuhlaka lwedatha olulodwa kanye ne-parquet evumelekile yawo wonke umbukiso. Yebo? Cha.

Kufanele sikhumbule ukuthi sabhalisa itafula eHive. I-Hive ayizwela kakhulu emagameni ezinkambu, kuyilapho i-parquet izwela kakhulu. Ngakho-ke, ukwahlukanisa ngama-schemas: field1: int, kanye ne-Field1: int kuyafana ku-Hive, kodwa hhayi ku-Spark. Ungakhohlwa ukuguqula amagama enkundla abe ngosonhlamvukazi.

Ngemva kwalokho, konke kubonakala kuhamba kahle.

Nokho, akuwona wonke elula kangaka. Kukhona inkinga yesibili, eyaziwa kakhulu. Njengoba ukwahlukanisa okusha kugcinwa ngokuhlukile, ifolda yokuhlukanisa izoqukatha amafayela wesevisi ye-Spark, isibonelo, ifulegi lempumelelo yokusebenza _SUCCESS. Lokhu kuzoholela ephutheni lapho uzama ukufaka i-parquet. Ukuze ugweme lokhu, udinga ukulungisa ukucushwa ukuze uvimbele i-Spark ekungezeni amafayela wesevisi kufolda:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Kubonakala sengathi manje nsuku zonke ukwahlukanisa okusha kwe-parquet kwengezwa kufolda yokubonisa okuhlosiwe, lapho idatha ehluziwe yosuku itholakala khona. Sinakekele kusenesikhathi ukuthi kwakungekho ukwahlukanisa okungqubuzana kohlobo lwedatha.

Kodwa, sinenkinga yesithathu. Manje i-schema evamile ayaziwa, ngaphezu kwalokho, ithebula ku-Hive line-schema engalungile, njengoba ukuhlukaniswa okusha ngakunye kungenzeka kwethule ukuhlanekezela ku-schema.

Udinga ukubhalisa kabusha ithebula. Lokhu kungenziwa kalula: funda i-parquet yesitolo sangaphambili futhi, thatha i-schema bese udala i-DDL esekelwe kuso, ozophinde ubhalise ngayo ifolda ku-Hive njengetafula langaphandle, ubuyekeze i-schema sangaphambili kwesitolo okuhlosiwe.

Sinenkinga yesine. Ngesikhathi sibhalisa itafula okokuqala, sithembele kuSpark. Manje sizenzela ngokwethu, futhi sidinga ukukhumbula ukuthi amasimu e-parquet angaqala ngezinhlamvu ezingavunyelwe ku-Hive. Isibonelo, i-Spark ikhiphela ngaphandle imigqa engakwazi ukuyihlaziya kunkambu ethi "corrupt_record". Inkambu enjalo ayikwazi ukubhaliswa ku-Hive ngaphandle kokubaleka.

Ukwazi lokhu, sithola uhlelo:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Ikhodi ("irekhodi_lenkohlakalo", "`_irekhodi_lenkohlakalo`") + " " + f[1].buyisela(":", "`:").buyisela("<", "<`").buyisela(",", ",`").buyisela("uhlu<`", "uhlu<") yenza i-DDL ephephile, i.e. esikhundleni sokuthi:

create table tname (_field1 string, 1field string)

Ngamagama enkambu afana ne-"_field1, 1field", i-DDL ephephile yenziwa lapho amagama ezinkambu abalekelwa khona: dala ithebula elithi `tname` (`_field1` string, `1field` string).

Umbuzo uphakama: kanjani ukuthola kahle uhlaka lwedatha nge-schema ephelele (ngekhodi ye-pf)? Ungayithola kanjani le pf? Lena inkinga yesihlanu. Phinda ufunde uhlelo lwazo zonke izingxenye ezisuka kufolda enamafayela e-parquet ombukiso oqondiwe? Le ndlela iphephile, kodwa inzima.

I-schema sesivele siku-Hive. Ungathola i-schema esisha ngokuhlanganisa i-schema yethebula lonke kanye nokwahlukanisa okusha. Ngakho-ke udinga ukuthatha i-schema yetafula ku-Hive futhi usihlanganise ne-schema ye-partition entsha. Lokhu kungenziwa ngokufunda imethadatha yokuhlola evela ku-Hive, ukuyigcina kufolda yesikhashana, nokusebenzisa i-Spark ukufunda kokubili ukwahlukanisa ngesikhathi esisodwa.

Eqinisweni, kukhona konke okudingayo: i-schema yetafula lokuqala ku-Hive kanye nokwahlukanisa okusha. Siphinde sibe nedatha. Kusele kuphela ukuthola i-schema esisha esihlanganisa i-schema sangaphambili kwesitolo kanye nezinkambu ezintsha kusukela ekuhlukaniseni okudaliwe:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Okulandelayo, sakha i-DDL yokubhaliswa kwetafula, njengakumazwibela adlule.
Uma lonke uchungechunge lusebenza kahle, okungukuthi, kwakukhona umthwalo wokuqalisa, futhi itafula ladalwa ngendlela efanele ku-Hive, khona-ke sithola i-schema yetafula ebuyekeziwe.

Futhi inkinga yokugcina ukuthi awukwazi nje ukwengeza ukwahlukanisa etafuleni leHive, ngoba lizophulwa. Udinga ukuphoqa i-Hive ukulungisa isakhiwo sayo sokuhlukanisa:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Umsebenzi olula wokufunda i-JSON nokudala indawo yangaphambili yesitolo ngokusekelwe kuyo uphumela ekunqobeni inani lobunzima obucacile, izixazululo okufanele uzibheke ngokwehlukana. Futhi nakuba lezi zixazululo zilula, kuthatha isikhathi esiningi ukuzithola.

Ukuze ngisebenzise ukwakhiwa kombukiso, bekufanele:

  • Engeza ama-partitions embukisweni, ususe amafayela wesevisi
  • Bhekana nezinkambu ezingenalutho kudatha yomthombo ethayiphiwe ngu-Spark
  • Sakaza izinhlobo ezilula kuyunithi yezinhlamvu
  • Guqula amagama ezinkambu abe ngofeleba abancane
  • Hlukanisa ukulayishwa kwedatha nokubhaliswa kwetafula ku-Hive (isizukulwane se-DDL)
  • Ungakhohlwa ukubalekela amagama ezinkambu angase angahambisani ne-Hive
  • Funda ukuthi ungabuyekeza kanjani ukubhaliswa kwethebula ku-Hive

Ukufingqa, siphawula ukuthi isinqumo sokwakha amafasitela ezitolo sigcwele izingibe eziningi. Ngakho-ke, uma kuba nobunzima ekusetshenzisweni, kungcono ukuxhumana nozakwethu onolwazi onobuchwepheshe obuphumelelayo.

Siyabonga ngokufunda lesi sihloko, sithemba ukuthi uzothola ulwazi luwusizo.

Source: www.habr.com

Engeza amazwana