Spark schemaEvolution käytännössä

Hyvät lukijat, hyvää iltapäivää!

Tässä artikkelissa Neoflexin Big Data Solutions -liiketoiminta-alueen johtava konsultti kuvaa yksityiskohtaisesti vaihtoehdot muuttuvan rakenteellisen julkisivujen rakentamiseen Apache Sparkilla.

Osana data-analyysiprojektia herää usein tehtävänä rakentaa vitriinejä löyhästi strukturoidun datan pohjalta.

Tyypillisesti nämä ovat lokeja tai eri järjestelmien vastauksia, jotka on tallennettu JSON- tai XML-muodossa. Tiedot ladataan Hadoopille, jonka jälkeen niistä on rakennettava julkisivu. Voimme järjestää luodun julkisivun pääsyn esimerkiksi Impalan kautta.

Tässä tapauksessa kohdeliikkeen ulkoasua ei tiedetä etukäteen. Lisäksi kaaviota ei voida laatia etukäteen, koska se riippuu tiedoista, ja käsittelemme tätä erittäin heikosti jäsenneltyä dataa.

Esimerkiksi tänään seuraava vastaus kirjataan:

{source: "app1", error_code: ""}

ja huomenna seuraava vastaus tulee samasta järjestelmästä:

{source: "app1", error_code: "error", description: "Network error"}

Tämän seurauksena myymälään pitäisi lisätä toinen kenttä - kuvaus, eikä kukaan tiedä, tuleeko se vai ei.

Tehtävä luoda mart tällaisille tiedoille on melko tavallinen, ja Sparkilla on useita työkaluja tähän. Lähdetietojen jäsentämiseen on sekä JSON- että XML-tuki, ja aiemmin tuntemattomalle skeemalle tarjotaan schemaEvolution-tuki.

Ensi silmäyksellä ratkaisu näyttää yksinkertaiselta. Sinun on otettava JSON-kansio ja luettava se tietokehykseen. Spark luo skeeman ja muuttaa sisäkkäiset tiedot rakenteiksi. Seuraavaksi kaikki on tallennettava parketti, jota myös Impala tukee, rekisteröimällä myymälä Hiven metakauppaan.

Kaikki näyttää olevan yksinkertaista.

Dokumentaation lyhyistä esimerkeistä ei kuitenkaan ole selvää, mitä tehdä useille ongelmille käytännössä.

Dokumentaatio kuvaa lähestymistapaa, jolla ei luoda julkisivua, vaan JSON- tai XML-tiedosto luetaan tietokehykseen.

Nimittäin se näyttää yksinkertaisesti kuinka lukea ja jäsentää JSON:

df = spark.read.json(path...)

Tämä riittää antamaan tiedot Sparkille.

Käytännössä skenaario on paljon monimutkaisempi kuin vain JSON-tiedostojen lukeminen kansiosta ja tietokehyksen luominen. Tilanne näyttää tältä: tietty showcase on jo olemassa, uusia tietoja saapuu joka päivä, ne on lisättävä näyteikkunaan, unohtamatta, että kaavio voi vaihdella.

Tavallinen kaavio liikkeen rakentamiseksi on seuraava:

Vaihe 1. Tiedot ladataan Hadoopiin, minkä jälkeen niitä ladataan päivittäin ja ne lisätään uuteen osioon. Tuloksena on kansio, jossa on lähdetiedot, jaettuna päiväkohtaisesti.

Vaihe 2. Alkulatauksen aikana tämä kansio luetaan ja jäsennetään Sparkilla. Tuloksena oleva datakehys tallennetaan analysoitavaan muotoon, esimerkiksi parketti, joka voidaan sitten tuoda Impalaan. Tämä luo kohdemyymälän, jossa on kaikki tähän mennessä kertyneet tiedot.

Vaihe 3. Luodaan lataus, joka päivittää julkisivun joka päivä.
Herää kysymys asteittaisesta lataamisesta, tarve osioida myymälä ja kysymys julkisivujen yleisen asettelun tukemisesta.

Otetaan esimerkki. Oletetaan, että arkiston rakentamisen ensimmäinen vaihe on toteutettu ja JSON-tiedostojen lataaminen kansioon on määritetty.

Ei ole ongelma luoda niistä datakehys ja sitten tallentaa ne esittelyyn. Tämä on aivan ensimmäinen askel, joka löytyy helposti Spark-dokumentaatiosta:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Kaikki näyttää olevan kunnossa.

Olemme lukeneet ja jäsentäneet JSON:n, sitten tallennamme datakehyksen parketiksi ja rekisteröimme sen Hiveen millä tahansa sopivalla tavalla:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Saamme näyteikkunan.

Mutta seuraavana päivänä uusia tietoja lisättiin lähteestä. Meillä on JSON-kansio ja tämän kansion perusteella luotu julkisivu. Kun seuraava dataosuus on ladattu lähteestä, julkisivulla ei ole tarpeeksi dataa yhdeksi päiväksi.

Looginen ratkaisu olisi osioida myymälä päivittäin, jolloin voit lisätä uuden osion joka seuraava päivä. Myös tämän mekanismi tunnetaan hyvin; Spark mahdollistaa osioiden tallentamisen erikseen.

Ensin suoritamme alkulatauksen, tallennamme tiedot yllä kuvatulla tavalla ja lisäämme vain osioinnin. Tätä toimintoa kutsutaan julkisivun alustukseksi ja se tehdään vain kerran:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Seuraavana päivänä lataamme vain uuden osion:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Jäljelle jää vain rekisteröityminen uudelleen Hiveen skeeman päivittämistä varten.
Tästä kuitenkin syntyy ongelmia.

Ensimmäinen ongelma. Ennemmin tai myöhemmin syntynyt parketti ei ole enää luettavissa. Tämä johtuu siitä, kuinka parketti ja JSON käsittelevät tyhjiä kenttiä eri tavalla.

Ajatellaanpa tyypillistä tilannetta. Esimerkiksi eilen JSON saapuu:

День 1: {"a": {"b": 1}},

ja tänään sama JSON näyttää tältä:

День 2: {"a": null}

Oletetaan, että meillä on kaksi eri osiota, joista jokaisessa on yksi rivi.
Kun luemme kaikki lähdetiedot, Spark pystyy määrittämään tyypin ja ymmärtämään, että "a" on "rakenne"-tyyppinen kenttä, jossa on sisäkkäinen kenttä "b" tyyppiä INT. Mutta jos jokainen osio tallennettiin erikseen, tuloksena on parketti, jossa on yhteensopimattomia osiokaavioita:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Tämä tilanne on hyvin tiedossa, joten siihen on erityisesti lisätty vaihtoehto tyhjien kenttien poistamiseksi lähdetietoja jäsennettäessä:

df = spark.read.json("...", dropFieldIfAllNull=True)

Tässä tapauksessa parketti koostuu väliseinistä, jotka voidaan lukea yhteen.
Vaikka ne, jotka ovat tehneet tämän käytännössä, hymyilevät katkerasti. Miksi? Kyllä, koska todennäköisesti syntyy kaksi muuta tilannetta. Tai kolme. Tai neljä. Ensimmäinen, mikä on lähes varmaa, on, että numeeriset tyypit näyttävät erilaisilta eri JSON-tiedostoissa. Esimerkiksi {intField: 1} ja {intField: 1.1}. Jos tällaiset kentät näkyvät yhdessä erässä, skeeman yhdistäminen lukee kaiken oikein, mikä johtaa tarkimpaan tyyppiin. Mutta jos eri, niin toisessa on intField: int ja toisessa intField: double.

Tämän tilanteen käsittelemiseksi on seuraava lippu:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Nyt meillä on kansio, jossa osiot sijaitsevat, joka voidaan lukea yhdeksi datakehykseksi ja koko myymälän päteväksi parketiksi. Joo? Ei.

Meidän on muistettava, että rekisteröimme pöydän Hiveen. Pesä ei erota kirjainkoolla peltojen nimissä, mutta parketti on. Siksi osiot, joissa on skeemat: field1: int ja Field1: int, ovat samat Hivelle, mutta eivät Sparkille. Älä unohda muuttaa kenttien nimet pieniksi kirjaimiksi.

Tämän jälkeen kaikki näyttää olevan hyvin.

Kaikki eivät kuitenkaan ole niin yksinkertaisia. Toinen, myös hyvin tunnettu ongelma ilmenee. Koska jokainen uusi osio tallennetaan erikseen, osiokansio sisältää Spark-palvelutiedostoja, esimerkiksi _SUCCESS-toiminnon onnistumislipun. Tämä johtaa virheeseen yritettäessä parkettia. Tämän välttämiseksi sinun on määritettävä asetukset estämällä Sparkia lisäämästä palvelutiedostoja kansioon:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Näyttää siltä, ​​että nyt joka päivä uusi parkettiosio lisätään kohdemyymäläkansioon, jossa sijaitsevat päivän jäsennetyt tiedot. Huolehdimme etukäteen siitä, ettei osioissa ole tietotyyppiristiriitoja.

Mutta edessämme on kolmas ongelma. Nyt yleistä skeemaa ei tunneta, lisäksi Hiven taulukossa on väärä skeema, koska jokainen uusi osio toi todennäköisesti vääristymän skeemaan.

Pöytä on rekisteröitävä uudelleen. Tämä voidaan tehdä yksinkertaisesti: lue julkisivun parketti uudelleen, ota skeema ja luo sen perusteella DDL, jolla voit rekisteröidä kansion uudelleen Hiven ulkopuoliseksi taulukoksi päivittäen kohdejulkiston skeeman.

Meillä on neljäs ongelma. Kun rekisteröimme pöydän ensimmäistä kertaa, luotimme Sparkiin. Nyt teemme sen itse, ja meidän on muistettava, että parkettikentät voivat alkaa hahmoilla, joita Hive ei salli. Esimerkiksi Spark heittää pois rivejä, joita se ei voinut jäsentää "corrupt_record" -kentässä. Tällaista kenttää ei voi rekisteröidä Hiveen pakenematta.

Kun tiedämme tämän, saamme kaavion:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Koodi ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("taulukko<`", "taulukko<") tekee turvallisen DDL:n, eli sen sijaan:

create table tname (_field1 string, 1field string)

Kenttien nimillä, kuten “_field1, 1field”, luodaan turvallinen DDL, jossa kenttien nimet on erotettu: luo taulukko `tname` (`_field1` merkkijono, `1field` merkkijono).

Herää kysymys: kuinka saada oikein datakehys täydellisellä skeemalla (pf-koodissa)? Kuinka saada tämä pf? Tämä on viides ongelma. Luetko uudelleen kaikkien osioiden kaavion kohdeliikkeen parkettitiedostoja sisältävästä kansiosta? Tämä menetelmä on turvallisin, mutta vaikea.

Kaava on jo Hivessä. Voit saada uuden skeeman yhdistämällä koko taulukon ja uuden osion skeeman. Tämä tarkoittaa, että sinun on otettava taulukkokaavio Hivesta ja yhdistettävä se uuden osion skeemaan. Tämä voidaan tehdä lukemalla testimetatiedot Hivesta, tallentamalla ne väliaikaiseen kansioon ja lukemalla molemmat osiot kerralla Sparkilla.

Pohjimmiltaan siellä on kaikki mitä tarvitset: alkuperäinen Hiven taulukkoskeema ja uusi osio. Meillä on myös dataa. Jäljelle jää vain uusi skeema, joka yhdistää myymäläkaavion ja uudet kentät luodusta osiosta:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Seuraavaksi luomme taulukon rekisteröinnin DDL, kuten edellisessä fragmentissa.
Jos koko ketju toimii oikein, eli oli alkulataus ja taulukko luotiin oikein Hiveen, niin saamme päivitetyn taulukkoskeeman.

Viimeinen ongelma on, että et voi helposti lisätä osiota Hive-taulukkoon, koska se rikkoutuu. Sinun on pakotettava Hive korjaamaan osiorakenne:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Yksinkertainen tehtävä lukea JSON ja luoda sen pohjalta myymälä johtaa useiden implisiittisten vaikeuksien voittamiseksi, joihin on löydettävä ratkaisut erikseen. Ja vaikka nämä ratkaisut ovat yksinkertaisia, niiden löytäminen vie paljon aikaa.

Vitriinin rakentamisen toteuttamiseksi meidän piti:

  • Lisää osioita myymälään päästäksesi eroon palvelutiedostoista
  • Käsittele Sparkin kirjoittamien lähdetietojen tyhjiä kenttiä
  • Suoratoista yksinkertaiset tyypit merkkijonoon
  • Muunna kenttien nimet pienillä kirjaimilla
  • Erillinen tietojen lataus ja taulukon rekisteröinti Hivessä (DDL:n luominen)
  • Älä unohda ohittaa kenttien nimiä, jotka eivät ehkä ole yhteensopivia Hiven kanssa
  • Opi päivittämään pöytärekisteröinti Hivessa

Yhteenvetona toteamme, että päätös rakentaa julkisivuja on täynnä monia sudenkuoppia. Siksi, jos käyttöönotossa ilmenee vaikeuksia, on parempi kääntyä kokeneen kumppanin puoleen, jolla on onnistunut asiantuntemus.

Kiitos, että luit tämän artikkelin, toivomme, että tiedoista on hyötyä.

Lähde: will.com

Lisää kommentti