Spark schemaEvolution fil-prattika

Għeżież qarrejja, wara nofsinhar it-tajjeb!

F'dan l-artikolu, il-konsulent ewlieni għall-qasam tan-negozju Big Data Solutions ta 'Neoflex jiddeskrivi fid-dettall għażliet għall-bini ta' storefronts ta 'struttura varjabbli bl-użu ta' Apache Spark.

Bħala parti minn proġett ta 'analiżi tad-dejta, ħafna drabi jqum il-kompitu li jinbnew vetrini bbażati fuq data strutturata b'mod laxk.

Tipikament dawn huma zkuk, jew tweġibiet minn diversi sistemi, salvati fil-forma ta 'JSON jew XML. Id-dejta tittella’ fuq Hadoop, imbagħad jeħtieġ li jinbena storefront minnha. Nistgħu norganizzaw aċċess għall-vetrina maħluqa, pereżempju, permezz ta 'Impala.

F'dan il-każ, it-tqassim tal-vetrina fil-mira mhux magħruf minn qabel. Barra minn hekk, l-iskema ma tistax titfassal minn qabel, peress li tiddependi fuq id-dejta, u qed nittrattaw din id-dejta strutturata dgħajjef ħafna.

Pereżempju, illum it-tweġiba li ġejja hija rreġistrata:

{source: "app1", error_code: ""}

u għada t-tweġiba li ġejja ġejja mill-istess sistema:

{source: "app1", error_code: "error", description: "Network error"}

Bħala riżultat, għandu jiżdied qasam ieħor mal-vetrina - deskrizzjoni, u ħadd ma jaf jekk hux se jiġi jew le.

Il-kompitu li jinħoloq mart fuq data bħal din huwa pjuttost standard, u Spark għandu numru ta 'għodod għal dan. Għall-parsing tad-dejta tas-sors, hemm appoġġ kemm għal JSON kif ukoll għall-XML, u għal skema li ma kinitx magħrufa qabel, huwa pprovdut appoġġ ta' schemaEvolution.

L-ewwel daqqa t'għajn, is-soluzzjoni tidher sempliċi. Għandek bżonn tieħu l-folder b'JSON u taqrah fil-qafas tad-data. Spark se toħloq skema u ddawwar id-dejta nested fi strutturi. Sussegwentement, kollox jeħtieġ li jiġi ffrankat fil-parkè, li huwa wkoll appoġġjat f'Impala, billi tirreġistra l-vetrina fil-metastore Hive.

Kollox jidher li huwa sempliċi.

Madankollu, mill-eżempji qosra fid-dokumentazzjoni mhuwiex ċar x'għandek tagħmel b'numru ta 'problemi fil-prattika.

Id-dokumentazzjoni tiddeskrivi approċċ mhux għall-ħolqien ta 'storefront, iżda għall-qari JSON jew XML fi dataframe.

Jiġifieri, sempliċiment turi kif taqra u parse JSON:

df = spark.read.json(path...)

Dan huwa biżżejjed biex id-dejta tkun disponibbli għal Spark.

Fil-prattika, ix-xenarju huwa ħafna aktar kumpless milli sempliċement jaqra fajls JSON minn folder u joħloq dataframe. Is-sitwazzjoni tidher bħal din: diġà hemm ċerta vetrina, dejta ġdida tasal kuljum, jeħtieġ li jiżdiedu mal-vetrina, mingħajr ma ninsew li l-iskema tista 'tvarja.

L-iskema tas-soltu għall-bini ta’ storefront hija kif ġej:

Pass 1. Id-dejta titgħabba f'Hadoop, segwita minn tagħbija addizzjonali ta 'kuljum u miżjuda ma' partizzjoni ġdida. Ir-riżultat huwa folder bid-dejta tas-sors, maqsuma bil-ġurnata.

Pass 2. Matul it-tagħbija inizjali, dan il-folder jinqara u jiġi analizzat bl-użu ta 'Spark. Id-dataframe li jirriżulta jiġi ffrankat f'format li jista 'jiġi analizzat, pereżempju, fil-parkè, li mbagħad jista' jiġi importat f'Impala. Dan joħloq storefront fil-mira bid-dejta kollha li akkumulat sa dan il-punt.

Pass 3. Tinħoloq download li se taġġorna l-vetrina kuljum.
Tqum il-kwistjoni tat-tagħbija inkrementali, il-ħtieġa li tinqasam il-faċċata tal-ħanut, u l-kwistjoni tal-appoġġ tat-tqassim ġenerali tal-faċċata tal-ħanut.

Ejja nagħtu eżempju. Ejja ngħidu li l-ewwel pass tal-bini ta 'repożitorju ġie implimentat, u t-tlugħ ta' fajls JSON f'folder huwa kkonfigurat.

Il-ħolqien ta' dataframe minnhom u mbagħad issalvahom bħala vetrina mhix problema. Dan huwa l-ewwel pass li jista’ jinstab faċilment fid-dokumentazzjoni ta’ Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Kollox jidher tajjeb.

Qrajna u analizzajna l-JSON, imbagħad insalvaw il-qafas tad-data bħala parquet, nirreġistrawh f'Hive bi kwalunkwe mod konvenjenti:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Ikollna vetrina.

Iżda, l-għada ġiet miżjuda dejta ġdida mis-sors. Għandna folder b'JSON, u storefront maħluq ibbażat fuq dan il-folder. Wara li tgħabbi l-porzjon li jmiss tad-dejta mis-sors, il-vetrina m'għandhiex biżżejjed dejta għal ġurnata waħda.

Soluzzjoni loġika tkun li tqassam il-vetrina bil-ġurnata, li tippermettilek li żżid partizzjoni ġdida kull jum li jmiss. Il-mekkaniżmu għal dan huwa magħruf ukoll; Spark jippermettilek tirreġistra partizzjonijiet separatament.

L-ewwel, nagħmlu t-tagħbija inizjali, niffrankaw id-dejta kif deskritt hawn fuq, u nżidu biss il-qsim. Din l-azzjoni tissejjaħ initialization storefront u ssir darba biss:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

L-għada niżżlu biss il-partizzjoni l-ġdida:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Li jibqa 'huwa li terġa' tirreġistra f'Hive biex taġġorna l-iskema.
Madankollu, dan huwa fejn jinqalgħu l-problemi.

L-ewwel problema. Illum jew għada, il-parkè li jirriżulta ma jibqax jinqara. Dan huwa dovut għal kif il-parkè u JSON jittrattaw oqsma vojta b'mod differenti.

Ejja nikkunsidraw sitwazzjoni tipika. Pereżempju, ilbieraħ jasal JSON:

День 1: {"a": {"b": 1}},

u llum l-istess JSON jidher bħal dan:

День 2: {"a": null}

Ejja ngħidu li għandna żewġ diviżorji differenti, kull wieħed b'linja waħda.
Meta naqraw id-dejta tas-sors kollu, Spark se jkun jista 'jiddetermina t-tip, u se jifhem li "a" huwa qasam tat-tip "struttura", b'qasam nested "b" tat-tip INT. Iżda, jekk kull partizzjoni ġiet salvata separatament, allura r-riżultat huwa parkè bi skemi ta 'diviżorji inkompatibbli:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Din is-sitwazzjoni hija magħrufa sew, għalhekk ġiet miżjuda għażla apposta biex jitneħħew oqsma vojta meta tiġi analizzata d-dejta tas-sors:

df = spark.read.json("...", dropFieldIfAllNull=True)

F'dan il-każ, il-parkè se jikkonsisti minn diviżorji li jistgħu jinqraw flimkien.
Għalkemm dawk li jkunu għamlu dan fil-prattika se jitbissem bl-imrar. Għaliex? Iva, għax wisq probabbli se jinqalgħu żewġ sitwazzjonijiet oħra. Jew tlieta. Jew erbgħa. L-ewwel, li hija kważi ċerta, hija li t-tipi numeriċi se jidhru differenti f'fajls JSON differenti. Pereżempju, {intField: 1} u {intField: 1.1}. Jekk oqsma bħal dawn jidhru f'lott wieħed, allura l-iskema merge jaqra kollox b'mod korrett, li jwassal għall-aktar tip preċiż. Imma jekk f'oħrajn differenti, allura wieħed ikollu intField: int, u l-ieħor ikollu intField: double.

Biex timmaniġġja din is-sitwazzjoni hemm il-bandiera li ġejja:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Issa għandna folder fejn jinsabu l-ħitan, li jista 'jinqara f'dataframe wieħed u parkè validu tal-vetrina kollu. Iva? Nru.

Irridu niftakru li rreġistrajna t-tabella f'Hive. Id-doqqajs mhuwiex sensittiv għall-każi fl-ismijiet tal-oqsma, iżda l-parkè huwa. Għalhekk, diviżorji bi skemi: field1: int, u Field1: int huma l-istess għal Hive, iżda mhux għal Spark. Tinsiex tibdel l-ismijiet tal-kampijiet f'ittri żgħar.

Wara dan, kollox jidher li jkun tajjeb.

Madankollu, mhux kollha daqshekk sempliċi. Tqum it-tieni problema magħrufa wkoll. Peress li kull partizzjoni ġdida tiġi ssejvjata separatament, il-folder tal-partizzjoni jkun fih fajls tas-servizz Spark, pereżempju, il-bandiera tas-suċċess tal-operazzjoni _SUCCESS. Dan se jirriżulta fi żball meta tipprova parkè. Biex tevita dan, trid tikkonfigura l-konfigurazzjoni billi tipprevjeni lil Spark milli żżid fajls tas-servizz mal-folder:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Jidher li issa kuljum diviżorju tal-parkè ġdid huwa miżjud mal-folder tal-maħżen fil-mira, fejn tinsab id-dejta parsed għall-ġurnata. Ħadna ħsieb bil-quddiem biex niżguraw li ma kienx hemm diviżorji b'kunflitti tat-tip tad-dejta.

Imma qed niffaċċjaw it-tielet problema. Issa l-iskema ġenerali mhix magħrufa, barra minn hekk, f'Hive it-tabella għandha l-iskema ħażina, peress li kull partizzjoni ġdida x'aktarx introduċiet distorsjoni fl-iskema.

It-tabella trid tiġi reġistrata mill-ġdid. Dan jista 'jsir sempliċiment: aqra l-parkè tal-vetrina mill-ġdid, ħu l-iskema u oħloq DDL ibbażat fuqha, li biha tista' terġa 'tirreġistra l-folder f'Hive bħala tabella esterna, taġġorna l-iskema tal-vetrina fil-mira.

Niffaċċjaw ir-raba’ problema. Meta rreġistrajna t-tabella għall-ewwel darba, konna noqgħodu fuq Spark. Issa nagħmluh aħna stess, u rridu niftakru li l-oqsma tal-parkè jistgħu jibdew b'karattri li mhumiex permessi minn Hive. Pereżempju, Spark jarmi linji li ma setgħetx parse fil-qasam "corrupt_record". Għalqa bħal din ma tistax tiġi rreġistrata f'Hive mingħajr ma jaħrab.

Billi nkunu nafu dan, irridu nġibu d-dijagramma:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Kodiċi ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace ("<", "<`").replace(",", ",`"). replace ("array<`", "array<") jagħmel DDL sikur, jiġifieri, minflok:

create table tname (_field1 string, 1field string)

Bl-ismijiet tal-kamp bħal "_field1, 1field", isir DDL sikur fejn l-ismijiet tal-kamp huma maħruba: oħloq tabella `tname` (`_field1` string, `1field` string).

Tqum il-mistoqsija: kif tikseb b'mod korrett dataframe bi skema kompluta (f'kodiċi pf)? Kif tikseb dan il-pf? Din hija l-ħames problema. Aqra mill-ġdid id-dijagramma tal-ħitan kollha mill-folder bil-fajls tal-parkè tal-vetrina fil-mira? Dan il-metodu huwa l-aktar sikur, iżda diffiċli.

L-iskema hija diġà f'Hive. Tista 'tikseb skema ġdida billi tgħaqqad l-iskema tat-tabella kollha u l-partizzjoni l-ġdida. Dan ifisser li għandek bżonn tieħu l-iskema tal-mejda minn Hive u tgħaqqadha mal-iskema tal-partizzjoni l-ġdida. Dan jista 'jsir billi taqra l-metadata tat-test minn Hive, issalvaha f'folder temporanju, u taqra ż-żewġ diviżorji f'daqqa billi tuża Spark.

Essenzjalment, hemm dak kollu li għandek bżonn: l-iskema tal-mejda oriġinali f'Hive u partizzjoni ġdida. Għandna wkoll data. Li jibqa 'huwa li tinkiseb skema ġdida li tgħaqqad l-iskema tal-maħżen u oqsma ġodda mill-partizzjoni maħluqa:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Sussegwentement, noħolqu t-tabella ta 'reġistrazzjoni DDL, bħal fil-framment preċedenti.
Jekk il-katina kollha taħdem b'mod korrett, jiġifieri, kien hemm tagħbija inizjali, u t-tabella ġiet maħluqa b'mod korrett f'Hive, allura nġibu skema ta 'tabella aġġornata.

L-aħħar problema hija li ma tistax faċilment iżżid partizzjoni ma 'tabella Hive, peress li tinkiser. Għandek bżonn iġiegħel lil Hive jiffissa l-istruttura tal-partizzjoni tiegħu:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Il-kompitu sempliċi ta 'qari JSON u l-ħolqien ta' storefront ibbażat fuq dan jirriżulta li jingħelbu għadd ta 'diffikultajiet impliċiti, li s-soluzzjonijiet għalihom għandhom jinstabu separatament. U għalkemm dawn is-soluzzjonijiet huma sempliċi, tieħu ħafna ħin biex issibhom.

Biex nimplimentaw il-kostruzzjoni ta’ vetrina, kellna:

  • Żid diviżorji mal-vetrina, teħles mill-fajls tas-servizz
  • Ittratta oqsma vojta fid-dejta tas-sors li Spark ittajpja
  • Cast tipi sempliċi għall-istring
  • Ikkonverti l-ismijiet tal-kampijiet f'ittri żgħar
  • Tlugħ ta' data separata u reġistrazzjoni ta' tabella f'Hive (ħolqien ta' DDL)
  • Tinsiex taħrab ismijiet ta' oqsma li jistgħu ma jkunux kompatibbli ma' Hive
  • Tgħallem taġġorna r-reġistrazzjoni tat-tabella f'Hive

Fil-qosor, aħna ninnotaw li d-deċiżjoni li jinbnew storefronts hija mimlija b'ħafna nases. Għalhekk, jekk jinqalgħu diffikultajiet fl-implimentazzjoni, huwa aħjar li tirrikorri għal sieħeb b'esperjenza b'kompetenza ta 'suċċess.

Grazzi talli qrajt dan l-artiklu, nittamaw li ssib l-informazzjoni utli.

Sors: www.habr.com

Żid kumment