Dzirksteles shēmaEvolūcija praksē

Cienījamie lasītāji, laba diena!

Å ajā rakstā Neoflex Big Data Solutions biznesa jomas vadoÅ”ais konsultants detalizēti apraksta iespējas veidot mainÄ«gas struktÅ«ras vitrÄ«nas, izmantojot Apache Spark.

Datu analīzes projekta ietvaros bieži rodas uzdevums veidot veikalu skatlogus, pamatojoties uz brīvi strukturētiem datiem.

Parasti tie ir žurnāli vai atbildes no dažādām sistēmām, kas saglabātas kā JSON vai XML. Dati tiek augÅ”upielādēti pakalpojumā Hadoop, un pēc tam jums no tiem jāizveido veikala mājaslapa. Piekļuvi izveidotajai vitrÄ«nai varam organizēt, piemēram, caur Impala.

Å ajā gadÄ«jumā mērÄ·a veikala mājaslapas shēma iepriekÅ” nav zināma. Turklāt shēmu arÄ« nevar sastādÄ«t iepriekÅ”, jo tā ir atkarÄ«ga no datiem, un mums ir darÄ«Å”ana ar Å”iem ļoti vāji strukturētajiem datiem.

Piemēram, Å”odien tiek reÄ£istrēta Ŕāda atbilde:

{source: "app1", error_code: ""}

un rÄ«t no tās paÅ”as sistēmas nāk Ŕāda atbilde:

{source: "app1", error_code: "error", description: "Network error"}

Rezultātā vitrīnai vajadzētu pievienot vēl vienu lauku - aprakstu, un neviens nezina, vai tas nāks vai nē.

Uzdevums izveidot veikalu, izmantojot Ŕādus datus, ir diezgan standarta, un Spark tam ir vairāki rÄ«ki. Avota datu parsÄ“Å”anai tiek atbalstÄ«ts gan JSON, gan XML, savukārt iepriekÅ” nezināmai shēmai tiek nodroÅ”ināts shēmas Evolution atbalsts.

No pirmā acu uzmetiena risinājums Ŕķiet vienkārÅ”s. Jums ir jāņem mape ar JSON un jālasa tā datu rāmÄ«. Spark izveidos shēmu, pārvērtÄ«s ligzdotos datus struktÅ«rās. Tālāk viss jāsaglabā parketā, kas tiek atbalstÄ«ts arÄ« Impalā, reÄ£istrējot veikala vitrÄ«nu Hive metaveikalā.

Šķiet, ka viss ir vienkārŔi.

Tomēr no īsajiem piemēriem dokumentācijā nav skaidrs, ko darīt ar vairākām problēmām praksē.

Dokumentācijā ir aprakstīta pieeja nevis izveidot veikala mājaslapu, bet gan nolasīt JSON vai XML datu ietvarā.

Proti, tas vienkārÅ”i parāda, kā lasÄ«t un parsēt JSON:

df = spark.read.json(path...)

Tas ir pietiekami, lai dati būtu pieejami Spark.

Praksē skripts ir daudz sarežģītāks nekā tikai JSON failu lasÄ«Å”ana no mapes un datu rāmja izveide. Situācija izskatās Ŕādi: jau ir noteikta vitrÄ«na, katru dienu ienāk jauni dati, tie jāpievieno vitrÄ«nai, neaizmirstot, ka shēma var atŔķirties.

Parastā vitrÄ«nas bÅ«vniecÄ«bas shēma ir Ŕāda:

Solis 1. Dati tiek ielādēti Hadoop ar turpmāku ikdienas atkārtotu ielādi un pievienoti jaunam nodalījumam. Izrādās mape ar sākotnējiem datiem, kas sadalīti pa dienām.

Solis 2. Sākotnējās ielādes laikā Å”o mapi lasa un parsē Spark. IegÅ«tais datu rāmis tiek saglabāts parsējamā formātā, piemēram, parketā, ko pēc tam var importēt Impala. Tādējādi tiek izveidota mērÄ·a vitrÄ«na ar visiem datiem, kas ir uzkrāti lÄ«dz Å”im brÄ«dim.

Solis 3. Tiek izveidota lejupielāde, kas katru dienu atjauninās veikala mājaslapu.
Ir jautājums par pakāpenisku ielādi, nepiecieÅ”amÄ«bu sadalÄ«t vitrÄ«nas nodalÄ«jumu, kā arÄ« jautājums par vitrÄ«nas vispārējās shēmas saglabāŔanu.

Ņemsim piemēru. Pieņemsim, ka ir veikts pirmais repozitorija izveides solis, un JSON faili tiek augÅ”upielādēti mapē.

Izveidot no tiem datu rāmi un pēc tam to saglabāt kā vitrīnu nav problēma. Šis ir pats pirmais solis, ko var viegli atrast Spark dokumentācijā:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Šķiet, ka viss ir kārtībā.

Mēs nolasām un parsējām JSON, pēc tam saglabājam datu rāmi kā parketu, reģistrējot to Hive jebkurā ērtā veidā:

df.write.format(ā€œparquetā€).option('path','<External Table Path>').saveAsTable('<Table Name>')

Mēs saņemam logu.

Bet nākamajā dienā tika pievienoti jauni dati no avota. Mums ir mape ar JSON un no Ŕīs mapes izveidota vitrÄ«na. Pēc nākamās datu paketes ielādes no avota datu tirgum trÅ«kst vienas dienas datu.

Loģisks risinājums būtu veikala telpas sadalīŔana pa dienām, kas ļaus katru nākamo dienu pievienot jaunu nodalījumu. Arī tā mehānisms ir labi zināms, Spark ļauj rakstīt nodalījumus atseviŔķi.

Pirmkārt, mēs veicam sākotnējo ielādi, saglabājot datus, kā aprakstÄ«ts iepriekÅ”, pievienojot tikai sadalÄ«Å”anu. Å o darbÄ«bu sauc par veikala inicializÄ“Å”anu, un tā tiek veikta tikai vienu reizi:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Nākamajā dienā mēs ielādējam tikai jaunu nodalījumu:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Atliek tikai atkārtoti reģistrēties Hive, lai atjauninātu shēmu.
Tomēr Å”eit rodas problēmas.

Pirmā problēma. Agrāk vai vēlāk iegÅ«tais parkets bÅ«s nesalasāms. Tas ir saistÄ«ts ar to, ka parkets un JSON atŔķirÄ«gi pieiet tukÅ”iem laukiem.

Apskatīsim tipisku situāciju. Piemēram, vakar JSON ierodas:

Š”ŠµŠ½ŃŒ 1: {"a": {"b": 1}},

un Ŕodien tas pats JSON izskatās Ŕādi:

Š”ŠµŠ½ŃŒ 2: {"a": null}

Pieņemsim, ka mums ir divi dažādi nodalījumi, katrs ar vienu līniju.
Kad mēs nolasÄ«sim visus avota datus, Spark varēs noteikt veidu un sapratÄ«s, ka "a" ir "struktÅ«ras" tipa lauks ar ligzdotu lauku "b", kura tips ir INT. Bet, ja katrs nodalÄ«jums tika saglabāts atseviŔķi, mēs iegÅ«stam parketu ar nesaderÄ«gām starpsienu shēmām:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Å Ä« situācija ir labi zināma, tāpēc ir Ä«paÅ”i pievienota opcija - analizējot avota datus, noņemiet tukÅ”os laukus:

df = spark.read.json("...", dropFieldIfAllNull=True)

Šajā gadījumā parkets sastāvēs no starpsienām, kuras var salasīt kopā.
Lai gan tie, kas to ir darÄ«juÅ”i praksē, te rÅ«gti smaidÄ«s. Kāpēc? Jā, jo, visticamāk, bÅ«s vēl divas situācijas. Vai trÄ«s. Vai četras. Pirmais, kas gandrÄ«z noteikti notiks, ir tas, ka ciparu veidi dažādos JSON failos izskatÄ«sies atŔķirÄ«gi. Piemēram, {intField: 1} un {intField: 1.1}. Ja Ŕādi lauki tiek atrasti vienā nodalÄ«jumā, tad shēmas sapludināŔana visu nolasÄ«s pareizi, novedot pie visprecÄ«zākā veida. Bet, ja dažādās, tad vienā bÅ«s intField: int, bet otrā bÅ«s intField: double.

Lai risinātu Ŕo situāciju, ir Ŕāds karogs:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Tagad mums ir mape, kurā ir nodalījumi, kurus var nolasīt vienā datu rāmī, un derīgs visas vitrīnas parkets. Jā? Nē.

Jāatceras, ka tabulu reģistrējām Stropā. Strops lauku nosaukumos nav reģistrjutīgs, savukārt parkets ir reģistrjutīgs. Tāpēc nodalījumi ar shēmām: lauks1: int un Field1: int ir vienādi Hive, bet ne Spark. Neaizmirstiet pārveidot lauku nosaukumus uz mazajiem burtiem.

Pēc tam Ŕķiet, ka viss ir kārtÄ«bā.

Tomēr ne viss ir tik vienkārÅ”i. Ir otra, arÄ« labi zināma problēma. Tā kā katrs jaunais nodalÄ«jums tiek saglabāts atseviŔķi, nodalÄ«juma mapē bÅ«s Spark pakalpojuma faili, piemēram, _SUCCESS operācijas veiksmes karodziņŔ. Tas radÄ«s kļūdu, mēģinot ieklāt parketu. Lai no tā izvairÄ«tos, jums ir jākonfigurē konfigurācija, lai neļautu Spark mapei pievienot pakalpojuma failus:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Å Ä·iet, ka tagad katru dienu mērÄ·a vitrÄ«nas mapei tiek pievienots jauns parketa nodalÄ«jums, kurā atrodas dienas parsētie dati. Mēs jau iepriekÅ” parÅ«pējāmies, lai nebÅ«tu nodalÄ«jumu ar datu tipu konfliktu.

Bet mums ir treŔā problēma. Tagad vispārējā shēma nav zināma, turklāt Hive tabulai ir nepareiza shēma, jo katrs jauns nodalÄ«jums, visticamāk, shēmā ieviesa traucējumus.

Jums ir jāpārreÄ£istrē tabula. To var izdarÄ«t vienkārÅ”i: vēlreiz izlasiet veikala fasādes parketu, paņemiet shēmu un, pamatojoties uz to, izveidojiet DDL, ar kuru atkārtoti reÄ£istrēt mapi Hive kā ārējo tabulu, atjauninot mērÄ·a veikala shēmu.

Mums ir ceturtā problēma. Pirmo reizi reÄ£istrējot galdu, paļāvāmies uz Spark. Tagad mēs to darām paÅ”i, un mums jāatceras, ka parketa lauki var sākties ar rakstzÄ«mēm, kuras nav atļautas Hive. Piemēram, Spark laukā ā€œcorrupt_recordā€ izmet rindas, kuras nevarēja parsēt. Šādu lauku nevar reÄ£istrēt Hive, ja tas netiek izlaists.

Zinot to, mēs iegūstam shēmu:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Kods ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("masīvs<`", "masīvs<") padara droŔu DDL, t.i., vietā:

create table tname (_field1 string, 1field string)

Izmantojot lauku nosaukumus, piemēram, "_field1, 1field", tiek izveidots droÅ”s DDL, kur lauku nosaukumi ir atsoļoti: izveidojiet tabulu `tname` (virkne_lauks1, virkne 1lauks).

Rodas jautājums: kā pareizi iegÅ«t datu rāmi ar pilnu shēmu (pf kodā)? Kā iegÅ«t Å”o pf? Å Ä« ir piektā problēma. Vai atkārtoti izlasÄ«t visu nodalÄ«jumu shēmu no mapes ar mērÄ·a vitrÄ«nas parketa failiem? Å Ä« metode ir visdroŔākā, bet sarežģītākā.

Shēma jau ir Hive. Jūs varat iegūt jaunu shēmu, apvienojot visas tabulas un jaunā nodalījuma shēmu. Tātad jums ir jāņem tabulas shēma no Hive un jāapvieno ar jaunā nodalījuma shēmu. To var izdarīt, nolasot testa metadatus no Hive, saglabājot tos pagaidu mapē un izmantojot Spark, lai vienlaikus lasītu abus nodalījumus.

Faktiski ir viss, kas jums nepiecieÅ”ams: sākotnējā tabulas shēma Hive un jaunais nodalÄ«jums. Mums ir arÄ« dati. Atliek tikai iegÅ«t jaunu shēmu, kas apvieno veikala shēmu un jaunus laukus no izveidotā nodalÄ«juma:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Tālāk mēs izveidojam tabulas reÄ£istrācijas DDL, tāpat kā iepriekŔējā fragmentā.
Ja visa ķēde darbojas pareizi, proti, notikusi inicializācijas slodze un tabula Hive tika izveidota pareizi, tad mēs iegūstam atjauninātu tabulas shēmu.

Un pēdējā problēma ir tāda, ka Hive tabulai nevar vienkārÅ”i pievienot nodalÄ«jumu, jo tas tiks bojāts. Jums ir jāpiespiež Hive salabot nodalÄ«juma struktÅ«ru:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

VienkārÅ”ais uzdevums nolasÄ«t JSON un uz tā pamata izveidot veikalu, ļauj pārvarēt vairākas netieÅ”as grÅ«tÄ«bas, kuru risinājumi jums ir jāmeklē atseviŔķi. Un, lai gan Å”ie risinājumi ir vienkārÅ”i, to atraÅ”ana prasa daudz laika.

Lai īstenotu vitrīnas uzbūvi, man bija:

  • Pievienojiet vitrÄ«nai nodalÄ«jumus, atbrÄ«vojoties no pakalpojumu failiem
  • RÄ«kojieties ar tukÅ”iem laukiem Spark ievadÄ«tajos avota datos
  • Apraidiet virknē vienkārÅ”us veidus
  • Konvertējiet lauku nosaukumus uz mazajiem burtiem
  • AtseviŔķa datu augÅ”upielāde un tabulu reÄ£istrācija Hive (DDL paaudze)
  • Neaizmirstiet atslēgt lauku nosaukumus, kas varētu bÅ«t nesaderÄ«gi ar Hive
  • Uzziniet, kā atjaunināt tabulas reÄ£istrāciju programmā Hive

Apkopojot, mēs atzÄ«mējam, ka lēmums bÅ«vēt skatlogus ir pilns ar daudzām nepilnÄ«bām. Tāpēc Ä«stenoÅ”anas grÅ«tÄ«bu gadÄ«jumā labāk ir sazināties ar pieredzējuÅ”u partneri ar veiksmÄ«gu pieredzi.

Paldies, ka izlasÄ«jāt Å”o rakstu, ceram, ka informācija jums noderēs.

Avots: www.habr.com

Pievieno komentāru