Spark sémaEvolúció a gyakorlatban

Kedves olvasók, szép napot!

Ebben a cikkben a Neoflex Big Data Solutions üzletágának vezető tanácsadója részletesen ismerteti a változó szerkezetű kirakatok Apache Spark segítségével történő felépítésének lehetőségeit.

Egy adatelemzési projekt keretében gyakran felmerül a kirakatok laza szerkezetű adatok alapján történő építése.

Általában ezek naplók vagy különféle rendszerek válaszai, JSON vagy XML formátumban mentve. Az adatok feltöltődnek a Hadoopba, majd kirakatot kell építeni belőlük. A létrehozott kirakathoz való hozzáférést megszervezhetjük például az Impalán keresztül.

Ebben az esetben a cél kirakat sémája nem ismert előre. Ráadásul a sémát sem lehet előre elkészíteni, hiszen az adatoktól függ, és ezekkel a nagyon lazán strukturált adatokkal van dolgunk.

Például ma a következő válasz kerül naplózásra:

{source: "app1", error_code: ""}

és holnap ugyanebből a rendszerből a következő válasz érkezik:

{source: "app1", error_code: "error", description: "Network error"}

Ennek eredményeként még egy mezőt kell hozzáadni a vitrinhez - leírás, és senki sem tudja, hogy jön-e vagy sem.

A kirakat létrehozásának feladata az ilyen adatokon meglehetősen szabványos, és a Spark számos eszközzel rendelkezik erre. A forrásadatok elemzéséhez a JSON és az XML is támogatott, egy korábban ismeretlen séma esetén pedig a schemaEvolution támogatása biztosított.

Első pillantásra egyszerűnek tűnik a megoldás. Vegyünk egy mappát JSON-val, és be kell olvasni egy adatkeretbe. A Spark létrehoz egy sémát, és a beágyazott adatokat struktúrákká alakítja. Továbbá mindent parkettába kell menteni, ami az Impalában is támogatott, a Hive metastore-ban a kirakat regisztrálásával.

Minden egyszerűnek tűnik.

A dokumentációban található rövid példákból azonban nem derül ki, hogy a gyakorlatban mihez kezdjünk számos problémával.

A dokumentáció egy olyan megközelítést ír le, amellyel nem lehet kirakatot létrehozni, hanem JSON-t vagy XML-t kell beolvasni egy adatkeretbe.

Nevezetesen, egyszerűen megmutatja, hogyan kell olvasni és elemezni a JSON-t:

df = spark.read.json(path...)

Ez elég ahhoz, hogy az adatokat a Spark rendelkezésére bocsássa.

A gyakorlatban a szkript sokkal bonyolultabb, mint a JSON-fájlok beolvasása egy mappából és egy adatkeret létrehozása. A helyzet így néz ki: már van egy bizonyos kirakat, minden nap érkeznek új adatok, ezeket fel kell venni a kirakatba, nem szabad elfelejteni, hogy a séma eltérhet.

A kirakat felépítésének szokásos sémája a következő:

Lépés 1. Az adatokat a rendszer betölti a Hadoopba, ezt követően napi újratöltéssel, és hozzáadja egy új partícióhoz. Kiderül, hogy egy mappa a kezdeti adatokkal, napok szerint felosztva.

Lépés 2. A kezdeti betöltés során ezt a mappát a Spark beolvassa és elemzi. Az így kapott adatkeret elemezhető formátumban, például parkettában kerül mentésre, amelyet azután importálhatunk az Impalába. Ezzel létrehoz egy cél kirakatot az eddig felhalmozott összes adattal.

Lépés 3. Létrejön egy letöltés, amely minden nap frissíti a kirakatot.
Felmerül a növekményes terhelés kérdése, a vitrin felosztásának szükségessége, valamint a vitrin általános sémája fenntartása.

Vegyünk egy példát. Tegyük fel, hogy a tároló létrehozásának első lépése megtörtént, és a JSON-fájlok feltöltődnek egy mappába.

Az adatkeret létrehozása belőlük, majd kirakatként mentése nem jelent problémát. Ez a legelső lépés, amely könnyen megtalálható a Spark dokumentációjában:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Úgy tűnik, minden rendben van.

Beolvastuk és elemeztük a JSON-t, majd az adatkeretet parkettaként mentjük, és tetszőleges módon regisztráljuk a Hive-ban:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Kapunk egy ablakot.

Másnap azonban új adatokat adtak hozzá a forrásból. Van egy JSON-mappánk, és ebből a mappából egy kirakat. A következő adatköteg forrásból való betöltése után az adattárból egy napnyi adat hiányzik.

A logikus megoldás az lenne, ha a kirakatot naponta particionálnánk, ami lehetővé teszi, hogy minden következő napon új partíciót adjunk hozzá. Ennek a mechanizmusa is jól ismert, a Spark lehetővé teszi a partíciók külön írását.

Először egy kezdeti betöltést végzünk, az adatokat a fent leírtak szerint mentjük, és csak particionálást adunk hozzá. Ezt a műveletet kirakat inicializálásnak nevezik, és csak egyszer hajtják végre:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Másnap csak egy új partíciót töltünk be:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

A séma frissítéséhez csak újra kell regisztrálnia a Hive-ban.
Azonban itt adódnak problémák.

Első probléma. Előbb-utóbb a keletkező parketta olvashatatlan lesz. Ez annak köszönhető, hogy a parketta és a JSON eltérően kezeli az üres mezőket.

Nézzünk egy tipikus helyzetet. Például tegnap megérkezik a JSON:

День 1: {"a": {"b": 1}},

és ma ugyanaz a JSON így néz ki:

День 2: {"a": null}

Tegyük fel, hogy két különböző partíciónk van, mindegyik egy sorral.
Amikor elolvassuk a teljes forrásadatot, a Spark meg tudja határozni a típust, és megérti, hogy az "a" egy "struktúra" típusú mező, egy INT típusú "b" beágyazott mezővel. De ha minden partíciót külön-külön mentünk, akkor kapunk egy parkettát inkompatibilis partíciós sémákkal:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Ez a helyzet jól ismert, ezért speciálisan hozzáadtunk egy opciót - a forrásadatok elemzésekor távolítsa el az üres mezőket:

df = spark.read.json("...", dropFieldIfAllNull=True)

Ebben az esetben a parketta összeolvasható válaszfalakból áll.
Bár akik ezt a gyakorlatban is megtették, az itt keserűen mosolyog. Miért? Igen, mert valószínűleg még két szituáció lesz. Vagy három. Vagy négy. Az első, ami szinte biztosan előfordul, az, hogy a numerikus típusok eltérően fognak kinézni a különböző JSON-fájlokban. Például {intField: 1} és {intField: 1.1}. Ha ilyen mezők találhatók egy partícióban, akkor a séma egyesítése mindent helyesen olvas, ami a legpontosabb típushoz vezet. De ha különböző, akkor az egyik intField: int, a másik pedig intField: double lesz.

A helyzet kezelésére a következő jelző található:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Most van egy mappánk, ahol egyetlen adatkeretbe beolvasható partíciók és a teljes vitrin érvényes parkettája található. Igen? Nem.

Emlékeznünk kell arra, hogy a Hive-ban regisztráltuk az asztalt. A hive nem érzékeny a mezőnevekben, a parketta viszont. Ezért a következő sémákkal rendelkező partíciók: field1: int és Field1: int ugyanazok a Hive esetében, de nem a Spark esetében. Ne felejtse el kisbetűssé alakítani a mezőneveket.

Ezek után úgy tűnik, minden rendben van.

Azonban nem minden olyan egyszerű. Van egy második, szintén jól ismert probléma. Mivel minden új partíció külön kerül mentésre, a partíciómappa tartalmazni fogja a Spark szolgáltatásfájlokat, például a _SUCCESS művelet sikerjelzőjét. Ez hibát eredményez a parkettázás során. Ennek elkerülése érdekében úgy kell konfigurálnia a konfigurációt, hogy a Spark ne adjon szolgáltatásfájlokat a mappához:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Úgy tűnik, hogy most minden nap egy új parketta partíció kerül a vitrin célmappájába, ahol az adott nap elemzett adatai találhatók. Előzetesen ügyeltünk arra, hogy ne legyenek adattípus-ütközéses partíciók.

Van azonban egy harmadik problémánk. Most az általános séma nem ismert, ráadásul a Hive táblája hibás sémát tartalmaz, mivel minden új partíció valószínűleg torzítást vitt a sémába.

Újra kell regisztrálni az asztalt. Ezt egyszerűen megtehetjük: olvassuk el újra a kirakat parkettáját, vegyük elő a sémát, és készítsünk az alapján egy DDL-t, amivel a Hive-ben lévő mappát külső táblaként újraregisztrálhatjuk, frissítve a cél kirakat sémáját.

Van egy negyedik problémánk. Amikor először regisztráltuk az asztalt, a Sparkra hagyatkoztunk. Most mi magunk csináljuk, és emlékeznünk kell arra, hogy a parkettás mezők kezdődhetnek olyan karakterekkel, amelyek nem engedélyezettek a Hive számára. Például a Spark a "corrupt_record" mezőben olyan sorokat dob ​​ki, amelyeket nem tudott elemezni. Egy ilyen mező nem regisztrálható a Hive-ban anélkül, hogy kikerülne.

Ennek ismeretében a következő sémát kapjuk:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Kód ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("tömb<`", "tömb<") biztonságossá teszi a DDL-t, azaz ahelyett, hogy:

create table tname (_field1 string, 1field string)

Az olyan mezőneveknél, mint a "_field1, 1field", biztonságos DDL jön létre, ahol a mezőnevek kihagyásra kerülnek: Create table `tname` (`_field1` string, `1field` string).

Felmerül a kérdés: hogyan lehet helyesen kapni egy adatkeretet teljes sémával (pf kódban)? Hogyan lehet megszerezni ezt a pf-t? Ez az ötödik probléma. Újraolvassa az összes partíció sémáját a cél kirakat parketta fájljait tartalmazó mappából? Ez a módszer a legbiztonságosabb, de nehéz.

A séma már a Hive-ben van. Új sémát kaphat a teljes tábla és az új partíció sémájának kombinálásával. Tehát át kell vennie a Hive-ból a táblázatsémát, és kombinálnia kell az új partíció sémájával. Ezt úgy teheti meg, hogy beolvassa a teszt metaadatokat a Hive-ből, elmenti egy ideiglenes mappába, és a Spark segítségével egyszerre olvassa be mindkét partíciót.

Valójában minden megtalálható, amire szüksége van: az eredeti táblaséma a Hive-ben és az új partíció. Adataink is vannak. Már csak egy új séma beszerzése van hátra, amely egyesíti a kirakat sémáját és a létrehozott partíció új mezőit:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Ezután létrehozzuk a táblázat regisztrációs DDL-jét, mint az előző részletben.
Ha a teljes lánc megfelelően működik, azaz inicializálási terhelés történt, és a tábla megfelelően lett létrehozva a Hive-ban, akkor egy frissített táblasémát kapunk.

És az utolsó probléma az, hogy nem lehet csak partíciót hozzáadni egy Hive-táblához, mert az elromlik. Kényszeríteni kell a Hive-t a partíció szerkezetének javítására:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

A JSON olvasása és az arra épülő kirakat létrehozása egyszerű feladat számos implicit nehézség leküzdését eredményezi, amelyek megoldását külön kell keresni. És bár ezek a megoldások egyszerűek, sok időbe telik megtalálni őket.

A kirakat építésének megvalósításához a következőket kellett teljesítenem:

  • Adjon hozzá partíciókat a kirakathoz, így megszabadulhat a szolgáltatási fájloktól
  • Kezelje a Spark által beírt forrásadatok üres mezőit
  • Egyszerű típusok öntése karakterláncba
  • A mezőnevek átalakítása kisbetűsre
  • Külön adatfeltöltés és táblázat regisztráció a Hive-ban (DDL generáció)
  • Ne felejtse el kihagyni azokat a mezőneveket, amelyek esetleg nem kompatibilisek a Hive-vel
  • Ismerje meg, hogyan frissítheti az asztal regisztrációját a Hive-ban

Összefoglalva megjegyezzük, hogy a kirakatépítési döntés számos buktatóval jár. Ezért a megvalósítás nehézségei esetén jobb, ha egy tapasztalt, sikeres szakértelemmel rendelkező partnert keres fel.

Köszönjük, hogy elolvasta ezt a cikket, reméljük hasznosnak találja az információt.

Forrás: will.com

Hozzászólás