Għeżież qarrejja, wara nofsinhar it-tajjeb!
F'dan l-artikolu, il-konsulent ewlieni għall-qasam tan-negozju Big Data Solutions ta 'Neoflex jiddeskrivi fid-dettall għażliet għall-bini ta' storefronts ta 'struttura varjabbli bl-użu ta' Apache Spark.
Bħala parti minn proġett ta 'analiżi tad-dejta, ħafna drabi jqum il-kompitu li jinbnew vetrini bbażati fuq data strutturata b'mod laxk.
Tipikament dawn huma zkuk, jew tweġibiet minn diversi sistemi, salvati fil-forma ta 'JSON jew XML. Id-dejta tittella’ fuq Hadoop, imbagħad jeħtieġ li jinbena storefront minnha. Nistgħu norganizzaw aċċess għall-vetrina maħluqa, pereżempju, permezz ta 'Impala.
F'dan il-każ, it-tqassim tal-vetrina fil-mira mhux magħruf minn qabel. Barra minn hekk, l-iskema ma tistax titfassal minn qabel, peress li tiddependi fuq id-dejta, u qed nittrattaw din id-dejta strutturata dgħajjef ħafna.
Pereżempju, illum it-tweġiba li ġejja hija rreġistrata:
{source: "app1", error_code: ""}
u għada t-tweġiba li ġejja ġejja mill-istess sistema:
{source: "app1", error_code: "error", description: "Network error"}
Bħala riżultat, għandu jiżdied qasam ieħor mal-vetrina - deskrizzjoni, u ħadd ma jaf jekk hux se jiġi jew le.
Il-kompitu li jinħoloq mart fuq data bħal din huwa pjuttost standard, u Spark għandu numru ta 'għodod għal dan. Għall-parsing tad-dejta tas-sors, hemm appoġġ kemm għal JSON kif ukoll għall-XML, u għal skema li ma kinitx magħrufa qabel, huwa pprovdut appoġġ ta' schemaEvolution.
L-ewwel daqqa t'għajn, is-soluzzjoni tidher sempliċi. Għandek bżonn tieħu l-folder b'JSON u taqrah fil-qafas tad-data. Spark se toħloq skema u ddawwar id-dejta nested fi strutturi. Sussegwentement, kollox jeħtieġ li jiġi ffrankat fil-parkè, li huwa wkoll appoġġjat f'Impala, billi tirreġistra l-vetrina fil-metastore Hive.
Kollox jidher li huwa sempliċi.
Madankollu, mill-eżempji qosra fid-dokumentazzjoni mhuwiex ċar x'għandek tagħmel b'numru ta 'problemi fil-prattika.
Id-dokumentazzjoni tiddeskrivi approċċ mhux għall-ħolqien ta 'storefront, iżda għall-qari JSON jew XML fi dataframe.
Jiġifieri, sempliċiment turi kif taqra u parse JSON:
df = spark.read.json(path...)
Dan huwa biżżejjed biex id-dejta tkun disponibbli għal Spark.
Fil-prattika, ix-xenarju huwa ħafna aktar kumpless milli sempliċement jaqra fajls JSON minn folder u joħloq dataframe. Is-sitwazzjoni tidher bħal din: diġà hemm ċerta vetrina, dejta ġdida tasal kuljum, jeħtieġ li jiżdiedu mal-vetrina, mingħajr ma ninsew li l-iskema tista 'tvarja.
L-iskema tas-soltu għall-bini ta’ storefront hija kif ġej:
Pass 1. Id-dejta titgħabba f'Hadoop, segwita minn tagħbija addizzjonali ta 'kuljum u miżjuda ma' partizzjoni ġdida. Ir-riżultat huwa folder bid-dejta tas-sors, maqsuma bil-ġurnata.
Pass 2. Matul it-tagħbija inizjali, dan il-folder jinqara u jiġi analizzat bl-użu ta 'Spark. Id-dataframe li jirriżulta jiġi ffrankat f'format li jista 'jiġi analizzat, pereżempju, fil-parkè, li mbagħad jista' jiġi importat f'Impala. Dan joħloq storefront fil-mira bid-dejta kollha li akkumulat sa dan il-punt.
Pass 3. Tinħoloq download li se taġġorna l-vetrina kuljum.
Tqum il-kwistjoni tat-tagħbija inkrementali, il-ħtieġa li tinqasam il-faċċata tal-ħanut, u l-kwistjoni tal-appoġġ tat-tqassim ġenerali tal-faċċata tal-ħanut.
Ejja nagħtu eżempju. Ejja ngħidu li l-ewwel pass tal-bini ta 'repożitorju ġie implimentat, u t-tlugħ ta' fajls JSON f'folder huwa kkonfigurat.
Il-ħolqien ta' dataframe minnhom u mbagħad issalvahom bħala vetrina mhix problema. Dan huwa l-ewwel pass li jista’ jinstab faċilment fid-dokumentazzjoni ta’ Spark:
df = spark.read.option("mergeSchema", True).json(".../*")
df.printSchema()
root
|-- a: long (nullable = true)
|-- b: string (nullable = true)
|-- c: struct (nullable = true) |
|-- d: long (nullable = true)
Kollox jidher tajjeb.
Qrajna u analizzajna l-JSON, imbagħad insalvaw il-qafas tad-data bħala parquet, nirreġistrawh f'Hive bi kwalunkwe mod konvenjenti:
df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')
Ikollna vetrina.
Iżda, l-għada ġiet miżjuda dejta ġdida mis-sors. Għandna folder b'JSON, u storefront maħluq ibbażat fuq dan il-folder. Wara li tgħabbi l-porzjon li jmiss tad-dejta mis-sors, il-vetrina m'għandhiex biżżejjed dejta għal ġurnata waħda.
Soluzzjoni loġika tkun li tqassam il-vetrina bil-ġurnata, li tippermettilek li żżid partizzjoni ġdida kull jum li jmiss. Il-mekkaniżmu għal dan huwa magħruf ukoll; Spark jippermettilek tirreġistra partizzjonijiet separatament.
L-ewwel, nagħmlu t-tagħbija inizjali, niffrankaw id-dejta kif deskritt hawn fuq, u nżidu biss il-qsim. Din l-azzjoni tissejjaħ initialization storefront u ssir darba biss:
df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)
L-għada niżżlu biss il-partizzjoni l-ġdida:
df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")
Li jibqa 'huwa li terġa' tirreġistra f'Hive biex taġġorna l-iskema.
Madankollu, dan huwa fejn jinqalgħu l-problemi.
L-ewwel problema. Illum jew għada, il-parkè li jirriżulta ma jibqax jinqara. Dan huwa dovut għal kif il-parkè u JSON jittrattaw oqsma vojta b'mod differenti.
Ejja nikkunsidraw sitwazzjoni tipika. Pereżempju, ilbieraħ jasal JSON:
День 1: {"a": {"b": 1}},
u llum l-istess JSON jidher bħal dan:
День 2: {"a": null}
Ejja ngħidu li għandna żewġ diviżorji differenti, kull wieħed b'linja waħda.
Meta naqraw id-dejta tas-sors kollu, Spark se jkun jista 'jiddetermina t-tip, u se jifhem li "a" huwa qasam tat-tip "struttura", b'qasam nested "b" tat-tip INT. Iżda, jekk kull partizzjoni ġiet salvata separatament, allura r-riżultat huwa parkè bi skemi ta 'diviżorji inkompatibbli:
df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)
Din is-sitwazzjoni hija magħrufa sew, għalhekk ġiet miżjuda għażla apposta biex jitneħħew oqsma vojta meta tiġi analizzata d-dejta tas-sors:
df = spark.read.json("...", dropFieldIfAllNull=True)
F'dan il-każ, il-parkè se jikkonsisti minn diviżorji li jistgħu jinqraw flimkien.
Għalkemm dawk li jkunu għamlu dan fil-prattika se jitbissem bl-imrar. Għaliex? Iva, għax wisq probabbli se jinqalgħu żewġ sitwazzjonijiet oħra. Jew tlieta. Jew erbgħa. L-ewwel, li hija kważi ċerta, hija li t-tipi numeriċi se jidhru differenti f'fajls JSON differenti. Pereżempju, {intField: 1} u {intField: 1.1}. Jekk oqsma bħal dawn jidhru f'lott wieħed, allura l-iskema merge jaqra kollox b'mod korrett, li jwassal għall-aktar tip preċiż. Imma jekk f'oħrajn differenti, allura wieħed ikollu intField: int, u l-ieħor ikollu intField: double.
Biex timmaniġġja din is-sitwazzjoni hemm il-bandiera li ġejja:
df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)
Issa għandna folder fejn jinsabu l-ħitan, li jista 'jinqara f'dataframe wieħed u parkè validu tal-vetrina kollu. Iva? Nru.
Irridu niftakru li rreġistrajna t-tabella f'Hive. Id-doqqajs mhuwiex sensittiv għall-każi fl-ismijiet tal-oqsma, iżda l-parkè huwa. Għalhekk, diviżorji bi skemi: field1: int, u Field1: int huma l-istess għal Hive, iżda mhux għal Spark. Tinsiex tibdel l-ismijiet tal-kampijiet f'ittri żgħar.
Wara dan, kollox jidher li jkun tajjeb.
Madankollu, mhux kollha daqshekk sempliċi. Tqum it-tieni problema magħrufa wkoll. Peress li kull partizzjoni ġdida tiġi ssejvjata separatament, il-folder tal-partizzjoni jkun fih fajls tas-servizz Spark, pereżempju, il-bandiera tas-suċċess tal-operazzjoni _SUCCESS. Dan se jirriżulta fi żball meta tipprova parkè. Biex tevita dan, trid tikkonfigura l-konfigurazzjoni billi tipprevjeni lil Spark milli żżid fajls tas-servizz mal-folder:
hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
Jidher li issa kuljum diviżorju tal-parkè ġdid huwa miżjud mal-folder tal-maħżen fil-mira, fejn tinsab id-dejta parsed għall-ġurnata. Ħadna ħsieb bil-quddiem biex niżguraw li ma kienx hemm diviżorji b'kunflitti tat-tip tad-dejta.
Imma qed niffaċċjaw it-tielet problema. Issa l-iskema ġenerali mhix magħrufa, barra minn hekk, f'Hive it-tabella għandha l-iskema ħażina, peress li kull partizzjoni ġdida x'aktarx introduċiet distorsjoni fl-iskema.
It-tabella trid tiġi reġistrata mill-ġdid. Dan jista 'jsir sempliċiment: aqra l-parkè tal-vetrina mill-ġdid, ħu l-iskema u oħloq DDL ibbażat fuqha, li biha tista' terġa 'tirreġistra l-folder f'Hive bħala tabella esterna, taġġorna l-iskema tal-vetrina fil-mira.
Niffaċċjaw ir-raba’ problema. Meta rreġistrajna t-tabella għall-ewwel darba, konna noqgħodu fuq Spark. Issa nagħmluh aħna stess, u rridu niftakru li l-oqsma tal-parkè jistgħu jibdew b'karattri li mhumiex permessi minn Hive. Pereżempju, Spark jarmi linji li ma setgħetx parse fil-qasam "corrupt_record". Għalqa bħal din ma tistax tiġi rreġistrata f'Hive mingħajr ma jaħrab.
Billi nkunu nafu dan, irridu nġibu d-dijagramma:
f_def = ""
for f in pf.dtypes:
if f[0] != "date_load":
f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<")
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)
Kodiċi ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace ("<", "<`").replace(",", ",`"). replace ("array<`", "array<") jagħmel DDL sikur, jiġifieri, minflok:
create table tname (_field1 string, 1field string)
Bl-ismijiet tal-kamp bħal "_field1, 1field", isir DDL sikur fejn l-ismijiet tal-kamp huma maħruba: oħloq tabella `tname` (`_field1` string, `1field` string).
Tqum il-mistoqsija: kif tikseb b'mod korrett dataframe bi skema kompluta (f'kodiċi pf)? Kif tikseb dan il-pf? Din hija l-ħames problema. Aqra mill-ġdid id-dijagramma tal-ħitan kollha mill-folder bil-fajls tal-parkè tal-vetrina fil-mira? Dan il-metodu huwa l-aktar sikur, iżda diffiċli.
L-iskema hija diġà f'Hive. Tista 'tikseb skema ġdida billi tgħaqqad l-iskema tat-tabella kollha u l-partizzjoni l-ġdida. Dan ifisser li għandek bżonn tieħu l-iskema tal-mejda minn Hive u tgħaqqadha mal-iskema tal-partizzjoni l-ġdida. Dan jista 'jsir billi taqra l-metadata tat-test minn Hive, issalvaha f'folder temporanju, u taqra ż-żewġ diviżorji f'daqqa billi tuża Spark.
Essenzjalment, hemm dak kollu li għandek bżonn: l-iskema tal-mejda oriġinali f'Hive u partizzjoni ġdida. Għandna wkoll data. Li jibqa 'huwa li tinkiseb skema ġdida li tgħaqqad l-iskema tal-maħżen u oqsma ġodda mill-partizzjoni maħluqa:
from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")
Sussegwentement, noħolqu t-tabella ta 'reġistrazzjoni DDL, bħal fil-framment preċedenti.
Jekk il-katina kollha taħdem b'mod korrett, jiġifieri, kien hemm tagħbija inizjali, u t-tabella ġiet maħluqa b'mod korrett f'Hive, allura nġibu skema ta 'tabella aġġornata.
L-aħħar problema hija li ma tistax faċilment iżżid partizzjoni ma 'tabella Hive, peress li tinkiser. Għandek bżonn iġiegħel lil Hive jiffissa l-istruttura tal-partizzjoni tiegħu:
from pyspark.sql import HiveContext
hc = HiveContext(spark)
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)
Il-kompitu sempliċi ta 'qari JSON u l-ħolqien ta' storefront ibbażat fuq dan jirriżulta li jingħelbu għadd ta 'diffikultajiet impliċiti, li s-soluzzjonijiet għalihom għandhom jinstabu separatament. U għalkemm dawn is-soluzzjonijiet huma sempliċi, tieħu ħafna ħin biex issibhom.
Biex nimplimentaw il-kostruzzjoni ta’ vetrina, kellna:
- Żid diviżorji mal-vetrina, teħles mill-fajls tas-servizz
- Ittratta oqsma vojta fid-dejta tas-sors li Spark ittajpja
- Cast tipi sempliċi għall-istring
- Ikkonverti l-ismijiet tal-kampijiet f'ittri żgħar
- Tlugħ ta' data separata u reġistrazzjoni ta' tabella f'Hive (ħolqien ta' DDL)
- Tinsiex taħrab ismijiet ta' oqsma li jistgħu ma jkunux kompatibbli ma' Hive
- Tgħallem taġġorna r-reġistrazzjoni tat-tabella f'Hive
Fil-qosor, aħna ninnotaw li d-deċiżjoni li jinbnew storefronts hija mimlija b'ħafna nases. Għalhekk, jekk jinqalgħu diffikultajiet fl-implimentazzjoni, huwa aħjar li tirrikorri għal sieħeb b'esperjenza b'kompetenza ta 'suċċess.
Grazzi talli qrajt dan l-artiklu, nittamaw li ssib l-informazzjoni utli.
Sors: www.habr.com