Kibirkšties schemaEvoliucija praktikoje

Mieli skaitytojai, laba diena!

Šiame straipsnyje pagrindinis „Neoflex Big Data Solutions“ verslo srities konsultantas išsamiai aprašo kintamos struktūros vitrinų kūrimo naudojant „Apache Spark“ parinktis.

Vykdant duomenų analizės projektą dažnai iškyla užduotis kurti vitrinas remiantis laisvai struktūriniais duomenimis.

Paprastai tai yra žurnalai arba atsakymai iš įvairių sistemų, išsaugoti kaip JSON arba XML. Duomenys įkeliami į „Hadoop“, tada iš jų reikia sukurti parduotuvę. Galime organizuoti prieigą prie sukurtos vitrinos, pavyzdžiui, per Impala.

Šiuo atveju tikslinės parduotuvės filialo schema iš anksto nežinoma. Be to, schema taip pat negali būti sudaryta iš anksto, nes ji priklauso nuo duomenų, ir mes susiduriame su šiais labai laisvai struktūriniais duomenimis.

Pavyzdžiui, šiandien registruojamas toks atsakymas:

{source: "app1", error_code: ""}

o rytoj iš tos pačios sistemos ateina toks atsakymas:

{source: "app1", error_code: "error", description: "Network error"}

Dėl to prie vitrinos reikėtų pridėti dar vieną lauką – aprašymą, ir niekas nežino, ateis ar ne.

Užduotis sukurti tokių duomenų rinkinį yra gana standartinė, o „Spark“ tam turi daugybę įrankių. Šaltinio duomenų analizei palaikomas ir JSON, ir XML, o anksčiau nežinomai schemai – schemaEvolution palaikymas.

Iš pirmo žvilgsnio sprendimas atrodo paprastas. Turite paimti aplanką su JSON ir perskaityti jį į duomenų rėmelį. „Spark“ sukurs schemą, įdėtus duomenis pavers struktūromis. Toliau viską reikia išsaugoti parkete, kuris palaikomas ir Impaloje, registruojant vitriną Avilio metaparduotuvėje.

Atrodo, kad viskas paprasta.

Tačiau iš trumpų pavyzdžių dokumentuose neaišku, ką daryti su daugeliu problemų praktiškai.

Dokumentacijoje aprašomas būdas ne sukurti parduotuvės filialą, o skaityti JSON arba XML į duomenų rėmelį.

Būtent, tai tiesiog parodo, kaip skaityti ir analizuoti JSON:

df = spark.read.json(path...)

To pakanka, kad duomenys būtų prieinami „Spark“.

Praktiškai scenarijus yra daug sudėtingesnis nei tiesiog nuskaityti JSON failus iš aplanko ir sukurti duomenų rėmelį. Situacija atrodo taip: jau yra tam tikra vitrininė, kasdien ateina nauji duomenys, juos reikia įtraukti į vitriną, nepamirštant, kad schema gali skirtis.

Įprasta vitrinos pastatymo schema yra tokia:

Žingsnis 1. Duomenys įkeliami į „Hadoop“, vėliau kasdien įkeliami iš naujo ir pridedami prie naujo skaidinio. Pasirodo, aplankas su pradiniais duomenimis, suskirstytas pagal dieną.

Žingsnis 2. Pradinio įkėlimo metu šis aplankas nuskaitomas ir analizuojamas naudojant „Spark“. Gautas duomenų rėmelis išsaugomas formatu, kurį galima analizuoti, pavyzdžiui, parkete, kurį vėliau galima importuoti į Impala. Taip sukuriama tikslinė parduotuvė su visais iki šiol sukauptais duomenimis.

Žingsnis 3. Sukuriamas atsisiuntimas, kuris kasdien atnaujins parduotuvės filialą.
Kyla klausimas dėl laipsniško apkrovimo, poreikio skaidyti vitriną ir bendros vitrinos schemos išlaikymo klausimas.

Paimkime pavyzdį. Tarkime, kad atliktas pirmasis saugyklos kūrimo žingsnis, o JSON failai įkeliami į aplanką.

Iš jų sukurti duomenų rėmelį ir išsaugoti jį kaip demonstraciją nėra problema. Tai pats pirmasis žingsnis, kurį galima lengvai rasti „Spark“ dokumentacijoje:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Panašu, kad viskas gerai.

Perskaitome ir analizuojame JSON, tada duomenų rėmelį išsaugome kaip parketą, bet kokiu patogiu būdu užregistruodami jį „Hive“:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Gauname langą.

Tačiau kitą dieną buvo pridėti nauji šaltinio duomenys. Turime aplanką su JSON ir iš šio aplanko sukurtą vitriną. Įkėlus kitą duomenų paketą iš šaltinio, duomenų rinkai trūksta vienos dienos duomenų.

Logiškas sprendimas būtų padalinti vitriną per dieną, o tai leis kiekvieną kitą dieną pridėti naują skaidinį. To mechanizmas taip pat gerai žinomas, „Spark“ leidžia rašyti skaidinius atskirai.

Pirmiausia atliekame pradinį įkėlimą, išsaugome duomenis, kaip aprašyta aukščiau, pridedant tik skaidymą. Šis veiksmas vadinamas parduotuvės filialo inicijavimu ir atliekamas tik vieną kartą:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Kitą dieną įkeliame tik naują skaidinį:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Belieka iš naujo užsiregistruoti „Hive“, kad būtų atnaujinta schema.
Tačiau čia ir kyla problemų.

Pirma problema. Anksčiau ar vėliau susidaręs parketas bus neįskaitomas. Taip yra dėl to, kad parketas ir JSON skirtingai žiūri į tuščius laukus.

Panagrinėkime tipišką situaciją. Pavyzdžiui, vakar JSON atvyksta:

День 1: {"a": {"b": 1}},

ir šiandien tas pats JSON atrodo taip:

День 2: {"a": null}

Tarkime, kad turime du skirtingus skaidinius, kurių kiekvienas turi vieną eilutę.
Kai perskaitysime visus šaltinio duomenis, „Spark“ galės nustatyti tipą ir supras, kad „a“ yra „struktūros“ tipo laukas, kurio įdėtas laukas „b“ yra INT tipo. Bet jei kiekviena pertvara buvo išsaugota atskirai, tada gauname parketą su nesuderinamomis pertvarų schemomis:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Ši situacija yra gerai žinoma, todėl buvo specialiai pridėta parinktis - analizuodami šaltinio duomenis pašalinkite tuščius laukus:

df = spark.read.json("...", dropFieldIfAllNull=True)

Tokiu atveju parketas bus sudarytas iš pertvarų, kurias bus galima skaityti kartu.
Nors tie, kurie tai padarė praktiškai, čia karčiai šypsosis. Kodėl? Taip, nes greičiausiai bus dar dvi situacijos. Arba trys. Arba keturi. Pirmasis, kuris beveik neabejotinai įvyks, yra tas, kad skaitiniai tipai skirtinguose JSON failuose atrodys skirtingai. Pavyzdžiui, {intField: 1} ir {intField: 1.1}. Jei tokie laukai randami viename skaidinyje, schemos sujungimas viską nuskaitys teisingai, todėl bus nustatytas tiksliausias tipas. Bet jei skirtinguose, tai vienas turės intField: int, o kitas turės intField: double.

Šiai situacijai spręsti yra tokia vėliavėlė:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Dabar turime aplanką, kuriame yra skaidiniai, kuriuos galima nuskaityti į vieną duomenų rėmelį, ir galiojantis visos vitrinos parketas. Taip? Nr.

Turime prisiminti, kad lentelę užregistravome Avilyje. Avilys neskiria didžiųjų ir mažųjų raidžių laukų pavadinimuose, o parketas – didžiosios ir mažosios raidės. Todėl skaidiniai su schemomis: field1: int ir Field1: int yra vienodi „Hive“, bet ne „Spark“. Nepamirškite konvertuoti laukų pavadinimų į mažąsias raides.

Po to atrodo, kad viskas gerai.

Tačiau ne viskas taip paprasta. Yra antra, taip pat gerai žinoma problema. Kadangi kiekvienas naujas skaidinys išsaugomas atskirai, skaidinio aplanke bus Spark paslaugos failai, pavyzdžiui, sėkmingos operacijos vėliavėlė _SUCCESS. Dėl to bandant dengti parketą bus padaryta klaida. Norėdami to išvengti, turite sukonfigūruoti konfigūraciją, kad „Spark“ nepridėtų paslaugų failų į aplanką:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Atrodo, kad dabar kiekvieną dieną į tikslinį vitrinų aplanką pridedama nauja parketo pertvara, kurioje yra išanalizuoti tos dienos duomenys. Iš anksto pasirūpinome, kad nebūtų skaidinių su duomenų tipo konfliktu.

Tačiau turime trečią problemą. Dabar bendroji schema nežinoma, be to, Hive lentelė turi neteisingą schemą, nes kiekvienas naujas skaidinys greičiausiai įvedė schemos iškraipymus.

Reikia iš naujo registruoti lentelę. Tai galima padaryti paprastai: dar kartą perskaitykite vitrinos parketą, paimkite schemą ir pagal ją sukurkite DDL, su kuria perregistruosite aplanką „Hive“ kaip išorinę lentelę, atnaujindami tikslinės vitrinos schemą.

Turime ketvirtą problemą. Kai pirmą kartą registravome lentelę, pasikliovėme „Spark“. Dabar tai darome patys ir turime atsiminti, kad parketo laukai gali prasidėti simboliais, kurių Aviliui neleidžiama. Pavyzdžiui, „Spark“ lauke „corrupt_record“ išmeta eilutes, kurių negalėjo išanalizuoti. Tokio lauko negalima užregistruoti Avilyje jo nepabėgus.

Žinodami tai, gauname schemą:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Kodas ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("masyvas<`", "masyvas<") padaro saugų DDL, t. y. vietoj:

create table tname (_field1 string, 1field string)

Naudojant laukų pavadinimus, pvz., „_laukas1, 1 laukas“, sukuriamas saugus DDL, kuriame laukų pavadinimai yra pabėgę: sukurkite lentelę „tname“ (eilutė „_laukas1“, eilutė „1 laukas“).

Kyla klausimas: kaip tinkamai gauti duomenų rėmelį su visa schema (pf kodu)? Kaip gauti šį pf? Tai jau penktoji problema. Iš naujo perskaityti visų skaidinių schemą iš aplanko su tikslinės vitrinos parketo failais? Šis metodas yra saugiausias, bet sudėtingas.

Schema jau yra „Hive“. Naują schemą galite gauti sujungę visos lentelės ir naujo skaidinio schemą. Taigi jums reikia paimti lentelės schemą iš „Hive“ ir sujungti ją su naujojo skaidinio schema. Tai galima padaryti nuskaitant bandomuosius metaduomenis iš „Hive“, išsaugant juos laikinajame aplanke ir naudojant „Spark“, kad iš karto nuskaitytumėte abu skaidinius.

Tiesą sakant, yra viskas, ko jums reikia: originali lentelės schema „Hive“ ir naujas skaidinys. Taip pat turime duomenų. Belieka tik gauti naują schemą, kuri sujungia parduotuvės filialo schemą ir naujus laukus iš sukurto skaidinio:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Toliau sukuriame lentelės registracijos DDL, kaip ir ankstesniame fragmente.
Jei visa grandinė veikia teisingai, ty buvo inicijavimo apkrova, o lentelė Hive buvo sukurta teisingai, tada gauname atnaujintą lentelės schemą.

Ir paskutinė problema yra ta, kad jūs negalite tiesiog pridėti skaidinio prie „Hive“ lentelės, nes jis bus sugadintas. Turite priversti Hive pataisyti skaidinio struktūrą:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Paprasta užduotis nuskaityti JSON ir pagal jį sukurti vitriną leidžia įveikti daugybę numanomų sunkumų, kurių sprendimų reikia ieškoti atskirai. Ir nors šie sprendimai yra paprasti, jų paieška užima daug laiko.

Norėdami įgyvendinti vitrinos konstrukciją, turėjau:

  • Pridėkite skaidinius prie vitrinos, atsikratykite paslaugų failų
  • Apdorokite tuščius šaltinio duomenų laukus, kuriuos įvedė Spark
  • Perduokite paprastus tipus į eilutę
  • Konvertuoti laukų pavadinimus į mažąsias raides
  • Atskiras duomenų įkėlimas ir lentelių registracija „Hive“ (DDL karta)
  • Nepamirškite palikti laukų pavadinimų, kurie gali būti nesuderinami su „Hive“.
  • Sužinokite, kaip atnaujinti lentelės registraciją „Hive“.

Apibendrinant, pastebime, kad sprendimas statyti vitrinas yra kupinas daugybės spąstų. Todėl, iškilus sunkumų įgyvendinant, geriau kreiptis į patyrusį, sėkmingą patirtį turintį partnerį.

Dėkojame, kad perskaitėte šį straipsnį, tikimės, kad informacija jums bus naudinga.

Šaltinis: www.habr.com

Добавить комментарий