Spark schemaEvolution v praksi

Dragi bralci, dober dan!

V tem članku vodilni svetovalec za poslovno področje Big Data Solutions podjetja Neoflex podrobno opisuje možnosti za gradnjo izložb s spremenljivo strukturo s pomočjo Apache Spark.

Kot del projekta analize podatkov se pogosto pojavi naloga gradnje predstavitev na podlagi ohlapno strukturiranih podatkov.

Običajno so to dnevniki ali odgovori iz različnih sistemov, shranjeni v obliki JSON ali XML. Podatki se naložijo v Hadoop, nato pa je treba iz njih zgraditi trgovino. Dostop do izdelane izložbe lahko organiziramo na primer preko Impale.

V tem primeru je postavitev ciljne izložbe vnaprej neznana. Še več, sheme ni mogoče sestaviti vnaprej, saj je odvisna od podatkov, mi pa imamo opravka s temi zelo šibko strukturiranimi podatki.

Na primer, danes je zabeležen naslednji odgovor:

{source: "app1", error_code: ""}

in jutri iz istega sistema pride naslednji odgovor:

{source: "app1", error_code: "error", description: "Network error"}

Posledično je treba v izložbo dodati še eno polje - opis, in nihče ne ve, ali bo prišel ali ne.

Naloga ustvarjanja tržnice na takih podatkih je dokaj standardna in Spark ima za to številna orodja. Za razčlenjevanje izvornih podatkov je na voljo podpora za JSON in XML, za predhodno neznano shemo pa je na voljo podpora za schemaEvolution.

Na prvi pogled je rešitev preprosta. Vzeti morate mapo z JSON in jo prebrati v podatkovni okvir. Spark bo ustvaril shemo in ugnezdene podatke spremenil v strukture. Nato je treba vse shraniti v parket, kar je podprto tudi v Impali, z registracijo izložbe v Hive metastore.

Zdi se, da je vse preprosto.

Vendar iz kratkih primerov v dokumentaciji ni jasno, kaj storiti s številnimi težavami v praksi.

Dokumentacija opisuje pristop ne za ustvarjanje izložbe, ampak za branje JSON ali XML v podatkovni okvir.

Namreč preprosto pokaže, kako brati in razčleniti JSON:

df = spark.read.json(path...)

To je dovolj, da so podatki na voljo Sparku.

V praksi je scenarij veliko bolj zapleten kot le branje datotek JSON iz mape in ustvarjanje podatkovnega okvira. Situacija je videti takole: določena vitrina že obstaja, vsak dan prihajajo novi podatki, ki jih je treba dodati v vitrino, ne da bi pozabili, da se shema lahko razlikuje.

Običajna shema za gradnjo izložbe je naslednja:

Korak 1. Podatki se naložijo v Hadoop, sledi dnevno dodatno nalaganje in dodajo v novo particijo. Rezultat je mapa z izvornimi podatki, razdeljena po dnevih.

Korak 2. Med začetnim nalaganjem se ta mapa prebere in razčleni s pomočjo Spark. Nastali podatkovni okvir se shrani v formatu, ki ga je mogoče analizirati, na primer v parketu, ki ga je nato mogoče uvoziti v Impalo. To ustvari ciljno trgovino z vsemi podatki, ki so se zbrali do te točke.

Korak 3. Ustvari se prenos, ki bo vsak dan posodabljal trgovino.
Pojavi se vprašanje postopnega nalaganja, potrebe po razdelitvi izložbe in vprašanje podpore splošni postavitvi izložbe.

Dajmo primer. Recimo, da je prvi korak gradnje repozitorija implementiran in da je nalaganje datotek JSON v mapo konfigurirano.

Iz njih ni problema ustvariti podatkovnega okvira in jih nato shraniti kot predstavitev. To je prvi korak, ki ga zlahka najdete v dokumentaciji Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Zdi se, da je vse v redu.

Prebrali smo in razčlenili JSON, nato pa podatkovni okvir shranimo kot parket in ga registriramo v Hive na poljuben primeren način:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Dobimo izložbo.

Toda naslednji dan so bili iz vira dodani novi podatki. Imamo mapo z JSON in na podlagi te mape ustvarjeno trgovino. Po nalaganju naslednjega dela podatkov iz vira trgovina nima dovolj podatkov za en dan.

Logična rešitev bi bila razdelitev izložbe po dnevih, kar vam bo omogočilo, da vsak naslednji dan dodate novo particijo. Tudi mehanizem za to je dobro znan; Spark omogoča ločeno snemanje particij.

Najprej izvedemo začetno nalaganje, shranimo podatke, kot je opisano zgoraj, in dodamo samo particioniranje. To dejanje se imenuje inicializacija izložbe in se izvede samo enkrat:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Naslednji dan prenesemo samo novo particijo:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Vse kar ostane je, da se ponovno registrirate v Hive, da posodobite shemo.
Vendar se tu pojavijo težave.

Prva težava. Prej ali slej nastali parket ne bo več berljiv. To je posledica tega, kako parquet in JSON različno obravnavata prazna polja.

Poglejmo tipično situacijo. Na primer, včeraj je prispel JSON:

День 1: {"a": {"b": 1}},

in danes je isti JSON videti takole:

День 2: {"a": null}

Recimo, da imamo dve različni particiji, vsaka z eno črto.
Ko preberemo celotne izvorne podatke, bo Spark lahko določil vrsto in razumel, da je »a« polje tipa »struktura« z ugnezdenim poljem »b« tipa INT. Če pa je bila vsaka particija shranjena posebej, je rezultat parket z nezdružljivimi particijskimi shemami:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Ta situacija je dobro znana, zato je bila posebej dodana možnost za odstranitev praznih polj pri razčlenjevanju izvornih podatkov:

df = spark.read.json("...", dropFieldIfAllNull=True)

V tem primeru bo parket sestavljen iz predelnih sten, ki se lahko berejo skupaj.
Čeprav se bodo tisti, ki so to storili v praksi, grenko nasmehnili. Zakaj? Da, ker se bosta najverjetneje pojavili še dve situaciji. Ali tri. Ali štiri. Prvi, ki je skoraj gotov, je, da bodo številske vrste videti drugače v različnih datotekah JSON. Na primer {intField: 1} in {intField: 1.1}. Če se takšna polja pojavijo v enem paketu, bo spajanje sheme pravilno prebralo vse, kar vodi do najbolj natančnega tipa. Če pa v različnih, bo eden imel intField: int, drugi pa bo imel intField: double.

Za obravnavo te situacije obstaja naslednja zastavica:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Zdaj imamo mapo, kjer se nahajajo particije, ki jih lahko preberemo v en sam podatkovni okvir in veljaven parket celotne izložbe. da? št.

Ne smemo pozabiti, da smo tabelo registrirali v Hive. Panj pri imenih polj ne razlikuje med velikimi in malimi črkami, parket pa razlikuje. Zato so particije s shemama: field1: int in Field1: int enake za Hive, ne pa tudi za Spark. Ne pozabite spremeniti imen polj v male črke.

Po tem se zdi, da je vse v redu.

Vendar pa ni vse tako preprosto. Pojavi se drugi, prav tako dobro znani problem. Ker je vsaka nova particija shranjena ločeno, bo mapa particije vsebovala storitvene datoteke Spark, na primer zastavico uspeha operacije _SUCCESS. To bo povzročilo napako pri poskusu parketa. Da bi se temu izognili, morate konfigurirati konfiguracijo tako, da Sparku preprečite dodajanje servisnih datotek v mapo:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Zdi se, da se sedaj vsak dan v mapo ciljne izložbe, kjer se nahajajo razčlenjeni podatki za dan, doda nova parketna particija. Vnaprej smo poskrbeli, da ni bilo particij s konflikti tipov podatkov.

Soočeni pa smo s tretjim problemom. Zdaj splošna shema ni znana, poleg tega ima tabela v Hive napačno shemo, saj je vsaka nova particija najverjetneje vnesla popačenje v shemo.

Tabelo je treba ponovno registrirati. To lahko storite preprosto: še enkrat preberite parket izložbe, vzemite shemo in na njeni podlagi ustvarite DDL, s katerim lahko mapo v Hive ponovno registrirate kot zunanjo tabelo in tako posodobite shemo ciljne izložbe.

Srečujemo se s četrto težavo. Ko smo tabelo registrirali prvič, smo se zanašali na Spark. Zdaj to počnemo sami in ne smemo pozabiti, da se polja parketa lahko začnejo z znaki, ki jih Hive ne dovoljuje. Na primer, Spark vrže vrstice, ki jih ni mogel razčleniti v polju "corrupt_record". Takšnega polja ni mogoče registrirati v Hive brez uhajanja.

Če to vemo, dobimo diagram:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Koda: ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("matrika<`", "matrika<") naredi varen DDL, to je namesto:

create table tname (_field1 string, 1field string)

Z imeni polj, kot je »_field1, 1field«, se naredi varen DDL, kjer so imena polj ubežna: ustvarite tabelo `tname` (niz `_field1`, niz `1field`).

Postavlja se vprašanje: kako pravilno pridobiti podatkovni okvir s celotno shemo (v kodi pf)? Kako do tega pf? To je peti problem. Ponovno prebrati diagram vseh particij iz mape s parketnimi datotekami ciljne izložbe? Ta metoda je najvarnejša, vendar težka.

Shema je že v Hive. Novo shemo lahko dobite tako, da združite shemo celotne tabele in nove particije. To pomeni, da morate vzeti shemo tabele iz Hive in jo združiti s shemo nove particije. To lahko storite tako, da preberete testne metapodatke iz Hive, jih shranite v začasno mapo in preberete obe particiji hkrati s pomočjo Spark.

V bistvu obstaja vse, kar potrebujete: originalna shema tabele v Hive in nova particija. Imamo tudi podatke. Vse, kar ostane, je pridobiti novo shemo, ki združuje shemo izložbe in nova polja iz ustvarjene particije:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Nato ustvarimo registracijo tabele DDL, kot v prejšnjem fragmentu.
Če celotna veriga deluje pravilno, in sicer je prišlo do začetnega nalaganja in je bila tabela pravilno ustvarjena v Hive, potem dobimo posodobljeno shemo tabele.

Zadnja težava je, da particije ne morete preprosto dodati v tabelo Hive, saj se bo pokvarila. Hive morate prisiliti, da popravi strukturo particije:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Enostavna naloga branja JSON in ustvarjanja izložbe na njegovi podlagi povzroči premagovanje številnih implicitnih težav, za katere je treba rešitve poiskati ločeno. In čeprav so te rešitve preproste, je za njihovo iskanje potrebno veliko časa.

Za izvedbo konstrukcije vitrine smo morali:

  • Dodajte particije v trgovino in se znebite servisnih datotek
  • Obravnavajte prazna polja v izvornih podatkih, ki jih je vnesel Spark
  • Pretvori preproste vrste v niz
  • Pretvori imena polj v male črke
  • Ločeno nalaganje podatkov in registracija tabele v Hive (ustvarjanje DDL)
  • Ne pozabite ubežati imen polj, ki morda niso združljiva s Hive
  • Naučite se posodobiti registracijo tabele v Hive

Če povzamemo, ugotavljamo, da je odločitev za izgradnjo trgovin polna številnih pasti. Zato se je v primeru težav pri izvajanju bolje obrniti na izkušenega partnerja z uspešnim strokovnim znanjem.

Hvala, ker ste prebrali ta članek, upamo, da vam bodo informacije koristile.

Vir: www.habr.com

Dodaj komentar