CienÄ«jamie lasÄ«tÄji, laba diena!
Å ajÄ rakstÄ Neoflex Big Data Solutions biznesa jomas vadoÅ”ais konsultants detalizÄti apraksta iespÄjas veidot mainÄ«gas struktÅ«ras vitrÄ«nas, izmantojot Apache Spark.
Datu analÄ«zes projekta ietvaros bieži rodas uzdevums veidot veikalu skatlogus, pamatojoties uz brÄ«vi strukturÄtiem datiem.
Parasti tie ir žurnÄli vai atbildes no dažÄdÄm sistÄmÄm, kas saglabÄtas kÄ JSON vai XML. Dati tiek augÅ”upielÄdÄti pakalpojumÄ Hadoop, un pÄc tam jums no tiem jÄizveido veikala mÄjaslapa. Piekļuvi izveidotajai vitrÄ«nai varam organizÄt, piemÄram, caur Impala.
Å ajÄ gadÄ«jumÄ mÄrÄ·a veikala mÄjaslapas shÄma iepriekÅ” nav zinÄma. TurklÄt shÄmu arÄ« nevar sastÄdÄ«t iepriekÅ”, jo tÄ ir atkarÄ«ga no datiem, un mums ir darÄ«Å”ana ar Å”iem ļoti vÄji strukturÄtajiem datiem.
PiemÄram, Å”odien tiek reÄ£istrÄta Å”Äda atbilde:
{source: "app1", error_code: ""}
un rÄ«t no tÄs paÅ”as sistÄmas nÄk Å”Äda atbilde:
{source: "app1", error_code: "error", description: "Network error"}
RezultÄtÄ vitrÄ«nai vajadzÄtu pievienot vÄl vienu lauku - aprakstu, un neviens nezina, vai tas nÄks vai nÄ.
Uzdevums izveidot veikalu, izmantojot Å”Ädus datus, ir diezgan standarta, un Spark tam ir vairÄki rÄ«ki. Avota datu parsÄÅ”anai tiek atbalstÄ«ts gan JSON, gan XML, savukÄrt iepriekÅ” nezinÄmai shÄmai tiek nodroÅ”inÄts shÄmas Evolution atbalsts.
No pirmÄ acu uzmetiena risinÄjums Ŕķiet vienkÄrÅ”s. Jums ir jÄÅem mape ar JSON un jÄlasa tÄ datu rÄmÄ«. Spark izveidos shÄmu, pÄrvÄrtÄ«s ligzdotos datus struktÅ«rÄs. TÄlÄk viss jÄsaglabÄ parketÄ, kas tiek atbalstÄ«ts arÄ« ImpalÄ, reÄ£istrÄjot veikala vitrÄ«nu Hive metaveikalÄ.
Å Ä·iet, ka viss ir vienkÄrÅ”i.
TomÄr no Ä«sajiem piemÄriem dokumentÄcijÄ nav skaidrs, ko darÄ«t ar vairÄkÄm problÄmÄm praksÄ.
DokumentÄcijÄ ir aprakstÄ«ta pieeja nevis izveidot veikala mÄjaslapu, bet gan nolasÄ«t JSON vai XML datu ietvarÄ.
Proti, tas vienkÄrÅ”i parÄda, kÄ lasÄ«t un parsÄt JSON:
df = spark.read.json(path...)
Tas ir pietiekami, lai dati būtu pieejami Spark.
PraksÄ skripts ir daudz sarežģītÄks nekÄ tikai JSON failu lasÄ«Å”ana no mapes un datu rÄmja izveide. SituÄcija izskatÄs Å”Ädi: jau ir noteikta vitrÄ«na, katru dienu ienÄk jauni dati, tie jÄpievieno vitrÄ«nai, neaizmirstot, ka shÄma var atŔķirties.
ParastÄ vitrÄ«nas bÅ«vniecÄ«bas shÄma ir Å”Äda:
Solis 1. Dati tiek ielÄdÄti Hadoop ar turpmÄku ikdienas atkÄrtotu ielÄdi un pievienoti jaunam nodalÄ«jumam. IzrÄdÄs mape ar sÄkotnÄjiem datiem, kas sadalÄ«ti pa dienÄm.
Solis 2. SÄkotnÄjÄs ielÄdes laikÄ Å”o mapi lasa un parsÄ Spark. IegÅ«tais datu rÄmis tiek saglabÄts parsÄjamÄ formÄtÄ, piemÄram, parketÄ, ko pÄc tam var importÄt Impala. TÄdÄjÄdi tiek izveidota mÄrÄ·a vitrÄ«na ar visiem datiem, kas ir uzkrÄti lÄ«dz Å”im brÄ«dim.
Solis 3. Tiek izveidota lejupielÄde, kas katru dienu atjauninÄs veikala mÄjaslapu.
Ir jautÄjums par pakÄpenisku ielÄdi, nepiecieÅ”amÄ«bu sadalÄ«t vitrÄ«nas nodalÄ«jumu, kÄ arÄ« jautÄjums par vitrÄ«nas vispÄrÄjÄs shÄmas saglabÄÅ”anu.
Å emsim piemÄru. PieÅemsim, ka ir veikts pirmais repozitorija izveides solis, un JSON faili tiek augÅ”upielÄdÄti mapÄ.
Izveidot no tiem datu rÄmi un pÄc tam to saglabÄt kÄ vitrÄ«nu nav problÄma. Å is ir pats pirmais solis, ko var viegli atrast Spark dokumentÄcijÄ:
df = spark.read.option("mergeSchema", True).json(".../*")
df.printSchema()
root
|-- a: long (nullable = true)
|-- b: string (nullable = true)
|-- c: struct (nullable = true) |
|-- d: long (nullable = true)
Å Ä·iet, ka viss ir kÄrtÄ«bÄ.
MÄs nolasÄm un parsÄjÄm JSON, pÄc tam saglabÄjam datu rÄmi kÄ parketu, reÄ£istrÄjot to Hive jebkurÄ ÄrtÄ veidÄ:
df.write.format(āparquetā).option('path','<External Table Path>').saveAsTable('<Table Name>')
MÄs saÅemam logu.
Bet nÄkamajÄ dienÄ tika pievienoti jauni dati no avota. Mums ir mape ar JSON un no Ŕīs mapes izveidota vitrÄ«na. PÄc nÄkamÄs datu paketes ielÄdes no avota datu tirgum trÅ«kst vienas dienas datu.
LoÄ£isks risinÄjums bÅ«tu veikala telpas sadalÄ«Å”ana pa dienÄm, kas ļaus katru nÄkamo dienu pievienot jaunu nodalÄ«jumu. ArÄ« tÄ mehÄnisms ir labi zinÄms, Spark ļauj rakstÄ«t nodalÄ«jumus atseviŔķi.
PirmkÄrt, mÄs veicam sÄkotnÄjo ielÄdi, saglabÄjot datus, kÄ aprakstÄ«ts iepriekÅ”, pievienojot tikai sadalÄ«Å”anu. Å o darbÄ«bu sauc par veikala inicializÄÅ”anu, un tÄ tiek veikta tikai vienu reizi:
df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)
NÄkamajÄ dienÄ mÄs ielÄdÄjam tikai jaunu nodalÄ«jumu:
df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")
Atliek tikai atkÄrtoti reÄ£istrÄties Hive, lai atjauninÄtu shÄmu.
TomÄr Å”eit rodas problÄmas.
PirmÄ problÄma. AgrÄk vai vÄlÄk iegÅ«tais parkets bÅ«s nesalasÄms. Tas ir saistÄ«ts ar to, ka parkets un JSON atŔķirÄ«gi pieiet tukÅ”iem laukiem.
ApskatÄ«sim tipisku situÄciju. PiemÄram, vakar JSON ierodas:
ŠŠµŠ½Ń 1: {"a": {"b": 1}},
un Å”odien tas pats JSON izskatÄs Å”Ädi:
ŠŠµŠ½Ń 2: {"a": null}
PieÅemsim, ka mums ir divi dažÄdi nodalÄ«jumi, katrs ar vienu lÄ«niju.
Kad mÄs nolasÄ«sim visus avota datus, Spark varÄs noteikt veidu un sapratÄ«s, ka "a" ir "struktÅ«ras" tipa lauks ar ligzdotu lauku "b", kura tips ir INT. Bet, ja katrs nodalÄ«jums tika saglabÄts atseviŔķi, mÄs iegÅ«stam parketu ar nesaderÄ«gÄm starpsienu shÄmÄm:
df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)
Å Ä« situÄcija ir labi zinÄma, tÄpÄc ir Ä«paÅ”i pievienota opcija - analizÄjot avota datus, noÅemiet tukÅ”os laukus:
df = spark.read.json("...", dropFieldIfAllNull=True)
Å ajÄ gadÄ«jumÄ parkets sastÄvÄs no starpsienÄm, kuras var salasÄ«t kopÄ.
Lai gan tie, kas to ir darÄ«juÅ”i praksÄ, te rÅ«gti smaidÄ«s. KÄpÄc? JÄ, jo, visticamÄk, bÅ«s vÄl divas situÄcijas. Vai trÄ«s. Vai Äetras. Pirmais, kas gandrÄ«z noteikti notiks, ir tas, ka ciparu veidi dažÄdos JSON failos izskatÄ«sies atŔķirÄ«gi. PiemÄram, {intField: 1} un {intField: 1.1}. Ja Å”Ädi lauki tiek atrasti vienÄ nodalÄ«jumÄ, tad shÄmas sapludinÄÅ”ana visu nolasÄ«s pareizi, novedot pie visprecÄ«zÄkÄ veida. Bet, ja dažÄdÄs, tad vienÄ bÅ«s intField: int, bet otrÄ bÅ«s intField: double.
Lai risinÄtu Å”o situÄciju, ir Å”Äds karogs:
df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)
Tagad mums ir mape, kurÄ ir nodalÄ«jumi, kurus var nolasÄ«t vienÄ datu rÄmÄ«, un derÄ«gs visas vitrÄ«nas parkets. JÄ? NÄ.
JÄatceras, ka tabulu reÄ£istrÄjÄm StropÄ. Strops lauku nosaukumos nav reÄ£istrjutÄ«gs, savukÄrt parkets ir reÄ£istrjutÄ«gs. TÄpÄc nodalÄ«jumi ar shÄmÄm: lauks1: int un Field1: int ir vienÄdi Hive, bet ne Spark. Neaizmirstiet pÄrveidot lauku nosaukumus uz mazajiem burtiem.
PÄc tam Ŕķiet, ka viss ir kÄrtÄ«bÄ.
TomÄr ne viss ir tik vienkÄrÅ”i. Ir otra, arÄ« labi zinÄma problÄma. TÄ kÄ katrs jaunais nodalÄ«jums tiek saglabÄts atseviŔķi, nodalÄ«juma mapÄ bÅ«s Spark pakalpojuma faili, piemÄram, _SUCCESS operÄcijas veiksmes karodziÅÅ”. Tas radÄ«s kļūdu, mÄÄ£inot ieklÄt parketu. Lai no tÄ izvairÄ«tos, jums ir jÄkonfigurÄ konfigurÄcija, lai neļautu Spark mapei pievienot pakalpojuma failus:
hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
Å Ä·iet, ka tagad katru dienu mÄrÄ·a vitrÄ«nas mapei tiek pievienots jauns parketa nodalÄ«jums, kurÄ atrodas dienas parsÄtie dati. MÄs jau iepriekÅ” parÅ«pÄjÄmies, lai nebÅ«tu nodalÄ«jumu ar datu tipu konfliktu.
Bet mums ir treÅ”Ä problÄma. Tagad vispÄrÄjÄ shÄma nav zinÄma, turklÄt Hive tabulai ir nepareiza shÄma, jo katrs jauns nodalÄ«jums, visticamÄk, shÄmÄ ieviesa traucÄjumus.
Jums ir jÄpÄrreÄ£istrÄ tabula. To var izdarÄ«t vienkÄrÅ”i: vÄlreiz izlasiet veikala fasÄdes parketu, paÅemiet shÄmu un, pamatojoties uz to, izveidojiet DDL, ar kuru atkÄrtoti reÄ£istrÄt mapi Hive kÄ ÄrÄjo tabulu, atjauninot mÄrÄ·a veikala shÄmu.
Mums ir ceturtÄ problÄma. Pirmo reizi reÄ£istrÄjot galdu, paļÄvÄmies uz Spark. Tagad mÄs to darÄm paÅ”i, un mums jÄatceras, ka parketa lauki var sÄkties ar rakstzÄ«mÄm, kuras nav atļautas Hive. PiemÄram, Spark laukÄ ācorrupt_recordā izmet rindas, kuras nevarÄja parsÄt. Å Ädu lauku nevar reÄ£istrÄt Hive, ja tas netiek izlaists.
Zinot to, mÄs iegÅ«stam shÄmu:
f_def = ""
for f in pf.dtypes:
if f[0] != "date_load":
f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<")
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)
Kods ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("masÄ«vs<`", "masÄ«vs<") padara droÅ”u DDL, t.i., vietÄ:
create table tname (_field1 string, 1field string)
Izmantojot lauku nosaukumus, piemÄram, "_field1, 1field", tiek izveidots droÅ”s DDL, kur lauku nosaukumi ir atsoļoti: izveidojiet tabulu `tname` (virkne_lauks1, virkne 1lauks).
Rodas jautÄjums: kÄ pareizi iegÅ«t datu rÄmi ar pilnu shÄmu (pf kodÄ)? KÄ iegÅ«t Å”o pf? Å Ä« ir piektÄ problÄma. Vai atkÄrtoti izlasÄ«t visu nodalÄ«jumu shÄmu no mapes ar mÄrÄ·a vitrÄ«nas parketa failiem? Å Ä« metode ir visdroÅ”ÄkÄ, bet sarežģītÄkÄ.
ShÄma jau ir Hive. JÅ«s varat iegÅ«t jaunu shÄmu, apvienojot visas tabulas un jaunÄ nodalÄ«juma shÄmu. TÄtad jums ir jÄÅem tabulas shÄma no Hive un jÄapvieno ar jaunÄ nodalÄ«juma shÄmu. To var izdarÄ«t, nolasot testa metadatus no Hive, saglabÄjot tos pagaidu mapÄ un izmantojot Spark, lai vienlaikus lasÄ«tu abus nodalÄ«jumus.
Faktiski ir viss, kas jums nepiecieÅ”ams: sÄkotnÄjÄ tabulas shÄma Hive un jaunais nodalÄ«jums. Mums ir arÄ« dati. Atliek tikai iegÅ«t jaunu shÄmu, kas apvieno veikala shÄmu un jaunus laukus no izveidotÄ nodalÄ«juma:
from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")
TÄlÄk mÄs izveidojam tabulas reÄ£istrÄcijas DDL, tÄpat kÄ iepriekÅ”ÄjÄ fragmentÄ.
Ja visa Ä·Äde darbojas pareizi, proti, notikusi inicializÄcijas slodze un tabula Hive tika izveidota pareizi, tad mÄs iegÅ«stam atjauninÄtu tabulas shÄmu.
Un pÄdÄjÄ problÄma ir tÄda, ka Hive tabulai nevar vienkÄrÅ”i pievienot nodalÄ«jumu, jo tas tiks bojÄts. Jums ir jÄpiespiež Hive salabot nodalÄ«juma struktÅ«ru:
from pyspark.sql import HiveContext
hc = HiveContext(spark)
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)
VienkÄrÅ”ais uzdevums nolasÄ«t JSON un uz tÄ pamata izveidot veikalu, ļauj pÄrvarÄt vairÄkas netieÅ”as grÅ«tÄ«bas, kuru risinÄjumi jums ir jÄmeklÄ atseviŔķi. Un, lai gan Å”ie risinÄjumi ir vienkÄrÅ”i, to atraÅ”ana prasa daudz laika.
Lai īstenotu vitrīnas uzbūvi, man bija:
- Pievienojiet vitrīnai nodalījumus, atbrīvojoties no pakalpojumu failiem
- Rīkojieties ar tukŔiem laukiem Spark ievadītajos avota datos
- Apraidiet virknÄ vienkÄrÅ”us veidus
- KonvertÄjiet lauku nosaukumus uz mazajiem burtiem
- AtseviŔķa datu augÅ”upielÄde un tabulu reÄ£istrÄcija Hive (DDL paaudze)
- Neaizmirstiet atslÄgt lauku nosaukumus, kas varÄtu bÅ«t nesaderÄ«gi ar Hive
- Uzziniet, kÄ atjauninÄt tabulas reÄ£istrÄciju programmÄ Hive
Apkopojot, mÄs atzÄ«mÄjam, ka lÄmums bÅ«vÄt skatlogus ir pilns ar daudzÄm nepilnÄ«bÄm. TÄpÄc Ä«stenoÅ”anas grÅ«tÄ«bu gadÄ«jumÄ labÄk ir sazinÄties ar pieredzÄjuÅ”u partneri ar veiksmÄ«gu pieredzi.
Paldies, ka izlasÄ«jÄt Å”o rakstu, ceram, ka informÄcija jums noderÄs.
Avots: www.habr.com