SädeskeemEvolutsioon praktikas

Kallid lugejad, tere pärastlõunal!

Selles artiklis kirjeldab Neoflexi ärivaldkonna Big Data Solutions juhtiv konsultant üksikasjalikult võimalusi Apache Sparki abil muutuva struktuuriga vitriinide ehitamiseks.

Andmeanalüüsi projekti raames tekib sageli ülesanne ehitada vitriine, mis põhinevad lõdvalt struktureeritud andmetel.

Tavaliselt on need logid või vastused erinevatest süsteemidest, mis on salvestatud JSON- või XML-vormingus. Andmed laaditakse üles Hadoopi, seejärel tuleb nendest luua poefassaad. Saame korraldada ligipääsu loodud poesaalile näiteks Impala kaudu.

Sel juhul pole sihtpoe paigutus ette teada. Pealegi ei saa skeemi ette koostada, kuna see sõltub andmetest ja me tegeleme nende väga nõrgalt struktureeritud andmetega.

Näiteks täna logitakse järgmine vastus:

{source: "app1", error_code: ""}

ja homme tuleb samast süsteemist järgmine vastus:

{source: "app1", error_code: "error", description: "Network error"}

Sellest tulenevalt tuleks poe esiküljele lisada veel üks väli - kirjeldus ja keegi ei tea, kas see tuleb või mitte.

Selliste andmete põhjal marti loomise ülesanne on üsna tavaline ja Sparkil on selleks mitmeid tööriistu. Lähteandmete sõelumiseks on olemas nii JSON-i kui ka XML-i tugi ning varem tundmatu skeemi puhul pakutakse schemaEvolutioni tuge.

Esmapilgul tundub lahendus lihtne. Peate võtma JSON-iga kausta ja lugema selle andmeraami. Spark loob skeemi ja muudab pesastatud andmed struktuurideks. Järgmiseks on vaja kõik parkett salvestada, mida toetab ka Impala, registreerides Taru metapoes poe.

Kõik näib olevat lihtne.

Dokumentatsioonis toodud lühinäidetest ei saa aga aru, mida praktikas mitmete probleemidega peale hakata.

Dokumentatsioonis kirjeldatakse lähenemisviisi, mis ei ole mõeldud poefassaadi loomiseks, vaid JSON-i või XML-i lugemiseks andmeraami.

Nimelt näitab see lihtsalt, kuidas JSON-i lugeda ja sõeluda:

df = spark.read.json(path...)

Sellest piisab, et andmed Sparkile kättesaadavaks teha.

Praktikas on stsenaarium palju keerulisem kui lihtsalt JSON-failide lugemine kaustast ja andmeraami loomine. Olukord näeb välja selline: teatud vitriin on juba olemas, iga päev saabub uusi andmeid, need tuleb vitriinile lisada, unustamata, et skeem võib erineda.

Poe esikülje ehitamise tavaline skeem on järgmine:

Samm 1. Andmed laaditakse Hadoopi, millele järgneb igapäevane täiendav laadimine ja lisatakse uude partitsiooni. Tulemuseks on kaust lähteandmetega, mis on jagatud päevade kaupa.

Samm 2. Esialgse laadimise ajal loetakse ja sõelutakse seda kausta Sparki abil. Saadud andmeraam salvestatakse analüüsitavas vormingus, näiteks parkett, mida saab seejärel Impalasse importida. See loob sihtpoe kõigi seni kogunenud andmetega.

Samm 3. Luuakse allalaaditav fail, mis värskendab poepinda iga päev.
Tekib järkjärgulise laadimise küsimus, vajadus poe osadeks eraldada ja poe üldise paigutuse toetamise küsimus.

Toome näite. Oletame, et hoidla loomise esimene samm on teostatud ja JSON-failide kausta üleslaadimine on konfigureeritud.

Pole probleemi luua nendest andmeraam ja seejärel salvestada need esitlusena. See on kõige esimene samm, mille leiate hõlpsalt Sparki dokumentatsioonist:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Tundub, et kõik on korras.

Oleme JSON-i lugenud ja sõelunud, seejärel salvestame andmeraami parkettina, registreerides selle tarus mis tahes mugaval viisil:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Saame vitriin.

Kuid järgmisel päeval lisati allikast uued andmed. Meil on JSON-iga kaust ja selle kausta põhjal loodud kauplus. Pärast allikast järgmise andmeosa laadimist pole poefassaadil üheks päevaks piisavalt andmeid.

Loogiline lahendus oleks poe fassaadide jaotamine päevade kaupa, mis võimaldab igal järgmisel päeval lisada uue partitsiooni. Selle mehhanism on samuti hästi teada; Spark võimaldab teil partitsioonid eraldi salvestada.

Esiteks teeme esmase laadimise, salvestades andmed ülalkirjeldatud viisil, lisades ainult partitsiooni. Seda toimingut nimetatakse poe esikülje lähtestamiseks ja seda tehakse ainult üks kord.

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Järgmisel päeval laadime alla ainult uue partitsiooni:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Jääb vaid skeemi värskendamiseks Hive'is uuesti registreeruda.
Siin aga tekivad probleemid.

Esimene probleem. Varem või hiljem pole tekkiv parkett enam loetav. See on tingitud sellest, kuidas parkett ja JSON kohtlevad tühje välju erinevalt.

Vaatleme tüüpilist olukorda. Näiteks eile saabus JSON:

День 1: {"a": {"b": 1}},

ja täna näeb sama JSON välja selline:

День 2: {"a": null}

Oletame, et meil on kaks erinevat sektsiooni, millest igaühel on üks rida.
Kui loeme kogu lähteandmeid, saab Spark määrata tüübi ja mõistab, et "a" on väli tüüpi "struktuur", mille pesastatud väli "b" on tüüpi INT. Kuid kui iga vahesein salvestati eraldi, on tulemuseks sobimatute vaheseinte skeemidega parkett:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

See olukord on hästi teada, seega on spetsiaalselt lisatud valik tühjade väljade eemaldamiseks lähteandmete sõelumisel:

df = spark.read.json("...", dropFieldIfAllNull=True)

Sel juhul koosneb parkett vaheseintest, mida saab kokku lugeda.
Kuigi need, kes on seda praktikas teinud, naeratavad kibedalt. Miks? Jah, sest suure tõenäosusega tekib veel kaks olukorda. Või kolm. Või neli. Esimene, mis on peaaegu kindel, on see, et numbritüübid näevad erinevates JSON-failides välja erinevad. Näiteks {intField: 1} ja {intField: 1.1}. Kui sellised väljad kuvatakse ühes partiis, loeb skeemi liitmine kõik õigesti, mis viib kõige täpsema tüübini. Kui aga erinevates, siis ühel on intField: int ja teisel intField: double.

Selle olukorra lahendamiseks on järgmine lipp:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Nüüd on meil kaust, kus asuvad vaheseinad, mida saab lugeda ühtseks andmeraamiks ja kogu poe esiküljele kehtivaks parketiks. Jah? Ei.

Peame meeles pidama, et registreerisime tabeli Tarus. Taru ei ole põldude nimedes tõstutundlik, kuid parkett küll. Seetõttu on skeemidega partitsioonid: field1: int ja Field1: int Hive jaoks samad, kuid mitte Sparki jaoks. Ärge unustage väljade nimesid väiketähtedeks muuta.

Pärast seda tundub kõik korras olevat.

Siiski pole kõik nii lihtne. Tekib teine, samuti hästi tuntud probleem. Kuna iga uus partitsioon salvestatakse eraldi, sisaldab partitsioonikaust Sparki teenusefaile, näiteks toimingu õnnestumise lippu _SUCCESS. See põhjustab parketi proovimisel vea. Selle vältimiseks peate konfigureerima konfiguratsiooni, takistades Sparkil teenusefaile kausta lisada:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Tundub, et nüüd lisatakse sihtpoe kausta iga päev uus parketi partitsioon, kus asuvad päeva sõelutud andmed. Hoolitsesime eelnevalt selle eest, et poleks andmetüüpide konfliktidega sektsioone.

Kuid oleme silmitsi kolmanda probleemiga. Nüüd pole üldist skeemi teada, pealegi on Hive'is tabelis vale skeem, kuna iga uus partitsioon tõi tõenäoliselt skeemi moonutuse.

Laud tuleb uuesti registreerida. Seda saab teha lihtsalt: lugege uuesti poe esikülje parkett, võtke skeem ja looge selle põhjal DDL, millega saate Taru kausta ümber registreerida välise tabelina, värskendades sihtpoe skeemi.

Oleme silmitsi neljanda probleemiga. Kui tabelit esimest korda registreerisime, toetusime Sparkile. Nüüd teeme seda ise ja peame meeles pidama, et parketiväljad võivad alata tegelastega, mida Taru ei luba. Näiteks viskab Spark väljale „corrupt_record” välja read, mida ta ei saanud sõeluda. Sellist välja ei saa ilma põgenemiseta Tarus registreerida.

Seda teades saame diagrammi:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Kood ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("massiivi<`", "massiivi<") teeb ohutut DDL-i, st selle asemel:

create table tname (_field1 string, 1field string)

Väljanimedega, nagu „_field1, 1field”, luuakse turvaline DDL, kus väljade nimed on paokoodiga: looge tabel „tname” (string „_field1”, string „1field”).

Tekib küsimus: kuidas õigesti hankida andmeraami koos täieliku skeemiga (pf-koodis)? Kuidas seda pf-i saada? See on viies probleem. Lugege uuesti kõigi vaheseinte diagrammi sihtpoe parkettfailidega kaustast? See meetod on kõige ohutum, kuid raske.

Skeem on juba tarus. Uue skeemi saate, kui kombineerite kogu tabeli ja uue partitsiooni skeemi. See tähendab, et peate võtma Hive'ist tabeliskeemi ja ühendama selle uue partitsiooni skeemiga. Seda saab teha, lugedes Hive'ist testi metaandmeid, salvestades need ajutisse kausta ja lugedes Sparki abil korraga mõlemat partitsiooni.

Põhimõtteliselt on kõik, mida vajate: taru algne tabeliskeem ja uus partitsioon. Meil on ka andmed. Jääb üle vaid hankida uus skeem, mis ühendab poe skeemi ja loodud partitsiooni uued väljad:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Järgmisena loome tabeli registreerimise DDL, nagu eelmises fragmendis.
Kui kogu kett töötab õigesti, nimelt oli esialgne koormus ja tabel oli Hive'is õigesti loodud, saame värskendatud tabeliskeemi.

Viimane probleem on see, et te ei saa taru tabelisse partitsiooni lihtsalt lisada, kuna see läheb katki. Peate sundima Hive'i partitsioonistruktuuri parandama:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Lihtsa ülesandega lugeda JSON-i ja luua selle põhjal poe esindus, saadakse üle mitmetest kaudsetest raskustest, millele lahendused tuleb leida eraldi. Ja kuigi need lahendused on lihtsad, kulub nende leidmiseks palju aega.

Vitriini ehitamiseks pidime:

  • Lisage poele partitsioonid, vabastades teenindusfailidest
  • Tegelege Sparki sisestatud lähteandmete tühjade väljadega
  • Kandke stringi lihtsad tüübid
  • Teisendage väljade nimed väiketähtedeks
  • Eraldi andmete üleslaadimine ja tabeli registreerimine tarus (DDL-i loomine)
  • Ärge unustage väljade nimesid, mis ei pruugi Hive'iga ühilduda
  • Siit saate teada, kuidas Hive'is tabeli registreerimist värskendada

Kokkuvõtteks märgime, et vitriinide ehitamise otsus on täis palju lõkse. Seetõttu on rakendamisel raskuste ilmnemisel parem pöörduda edukate teadmistega kogenud partneri poole.

Täname, et lugesite seda artiklit, loodame, et see teave on teile kasulik.

Allikas: www.habr.com

Lisa kommentaar