Spark schemaEvoluția în practică

Dragi cititori, bună seara!

În acest articol, consultantul principal pentru zona de afaceri Big Data Solutions a Neoflex descrie în detaliu opțiunile de construire a vitrinelor cu structură variabilă folosind Apache Spark.

Ca parte a unui proiect de analiză a datelor, apare adesea sarcina de a construi vitrine bazate pe date slab structurate.

De obicei, acestea sunt jurnalele sau răspunsurile de la diferite sisteme, salvate sub formă de JSON sau XML. Datele sunt încărcate în Hadoop, apoi trebuie construită o vitrină din ele. Putem organiza accesul la vitrina creată, de exemplu, prin Impala.

În acest caz, aspectul vitrinei țintă este necunoscut în prealabil. Mai mult decât atât, schema nu poate fi întocmită în prealabil, deoarece depinde de date, iar noi avem de-a face cu aceste date foarte slab structurate.

De exemplu, astăzi este înregistrat următorul răspuns:

{source: "app1", error_code: ""}

iar mâine următorul răspuns vine din același sistem:

{source: "app1", error_code: "error", description: "Network error"}

Drept urmare, în vitrina ar trebui adăugat un alt câmp - descriere și nimeni nu știe dacă va veni sau nu.

Sarcina de a crea un magazin pe astfel de date este destul de standard, iar Spark are o serie de instrumente pentru aceasta. Pentru analizarea datelor sursă, există suport atât pentru JSON, cât și pentru XML, iar pentru o schemă necunoscută anterior, este oferit suport schemaEvolution.

La prima vedere, soluția pare simplă. Trebuie să luați folderul cu JSON și să îl citiți în cadrul de date. Spark va crea o schemă și va transforma datele imbricate în structuri. În continuare, totul trebuie salvat în parchet, care este suportat și în Impala, prin înregistrarea vitrinei în metamagazinul Hive.

Totul pare a fi simplu.

Cu toate acestea, din exemplele scurte din documentație nu este clar ce să faci cu o serie de probleme în practică.

Documentația descrie o abordare nu pentru crearea unei vitrine, ci pentru citirea JSON sau XML într-un cadru de date.

Și anume, arată pur și simplu cum să citești și să analizezi JSON:

df = spark.read.json(path...)

Acest lucru este suficient pentru a face datele disponibile pentru Spark.

În practică, scenariul este mult mai complex decât simpla citire a fișierelor JSON dintr-un folder și crearea unui cadru de date. Situația arată astfel: există deja o anumită vitrină, date noi sosesc în fiecare zi, acestea trebuie adăugate în vitrină, fără a uita că schema poate diferi.

Schema obișnuită pentru construirea unei vitrine este următoarea:

Pasul 1. Datele sunt încărcate în Hadoop, urmate de încărcare suplimentară zilnică și adăugate la o nouă partiție. Rezultatul este un folder cu date sursă, partiționat pe zi.

Pasul 2. În timpul încărcării inițiale, acest folder este citit și analizat folosind Spark. Cadrul de date rezultat este salvat într-un format care poate fi analizat, de exemplu, în parchet, care poate fi apoi importat în Impala. Acest lucru creează o vitrină țintă cu toate datele acumulate până în acest moment.

Pasul 3. Este creată o descărcare care va actualiza vitrina în fiecare zi.
Apare problema încărcării incrementale, nevoia de a partiționa vitrina și problema susținerii aspectului general al vitrinei.

Să dăm un exemplu. Să presupunem că primul pas de construire a unui depozit a fost implementat și încărcarea fișierelor JSON într-un folder este configurată.

Nu este nicio problemă să creați un cadru de date din ele și apoi să le salvați ca o vitrină. Acesta este primul pas care poate fi găsit cu ușurință în documentația Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Totul pare să fie în regulă.

Am citit și analizat JSON, apoi salvăm cadrul de date ca parchet, înregistrându-l în Hive în orice mod convenabil:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Primim o vitrină.

Dar, a doua zi au fost adăugate date noi de la sursă. Avem un folder cu JSON și o vitrină creată pe baza acestui folder. După încărcarea următoarei porțiuni de date din sursă, vitrina nu are suficiente date pentru o zi.

O soluție logică ar fi să partiționați vitrina pe zi, ceea ce vă va permite să adăugați o nouă partiție în fiecare zi. Mecanismul pentru aceasta este de asemenea bine cunoscut; Spark vă permite să înregistrați partițiile separat.

Mai întâi, facem încărcarea inițială, salvând datele așa cum este descris mai sus, adăugând doar partiționare. Această acțiune se numește inițializare vitrine și se face o singură dată:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

A doua zi descărcam doar noua partiție:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Tot ce rămâne este să vă reînregistrați în Hive pentru a actualiza schema.
Totuși, aici apar problemele.

Prima problema. Mai devreme sau mai târziu, parchetul rezultat nu va mai fi lizibil. Acest lucru se datorează modului în care parchetul și JSON tratează câmpurile goale în mod diferit.

Să luăm în considerare o situație tipică. De exemplu, ieri sosește JSON:

День 1: {"a": {"b": 1}},

și astăzi același JSON arată astfel:

День 2: {"a": null}

Să presupunem că avem două partiții diferite, fiecare cu o linie.
Când citim toate datele sursă, Spark va putea determina tipul și va înțelege că „a” este un câmp de tip „structură”, cu un câmp imbricat „b” de tip INT. Dar, dacă fiecare partiție a fost salvată separat, atunci rezultatul este un parchet cu scheme de partiții incompatibile:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Această situație este bine cunoscută, așa că a fost adăugată o opțiune special pentru a elimina câmpurile goale la analizarea datelor sursă:

df = spark.read.json("...", dropFieldIfAllNull=True)

În acest caz, parchetul va fi format din pereți despărțitori care pot fi citite împreună.
Deși cei care au făcut asta în practică vor zâmbi amar. De ce? Da, pentru că cel mai probabil vor apărea încă două situații. Sau trei. Sau patru. Primul, care este aproape sigur, este că tipurile numerice vor arăta diferit în diferite fișiere JSON. De exemplu, {intField: 1} și {intField: 1.1}. Dacă astfel de câmpuri apar într-un singur lot, atunci îmbinarea schemei va citi totul corect, conducând la tipul cel mai precis. Dar dacă sunt diferite, atunci unul va avea intField: int, iar celălalt va avea intField: double.

Pentru a gestiona această situație există următorul steag:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Acum avem un folder în care se află partițiile, care poate fi citit într-un singur cadru de date și un parchet valabil al întregii vitrine. Da? Nu.

Trebuie să ne amintim că am înregistrat masa în Hive. Stupul nu face distincție între majuscule și minuscule în numele câmpurilor, dar parchetul este. Prin urmare, partițiile cu scheme: field1: int și Field1: int sunt aceleași pentru Hive, dar nu pentru Spark. Nu uitați să schimbați numele câmpurilor în minuscule.

După aceasta, totul pare să fie bine.

Cu toate acestea, nu toate sunt atât de simple. Apare o a doua problemă, de asemenea bine-cunoscută. Deoarece fiecare partiție nouă este salvată separat, folderul partiției va conține fișiere de serviciu Spark, de exemplu, indicatorul de succes al operațiunii _SUCCESS. Acest lucru va duce la o eroare la încercarea de parchet. Pentru a evita acest lucru, trebuie să configurați configurația împiedicând Spark să adauge fișiere de serviciu în folder:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Se pare că acum în fiecare zi se adaugă o nouă partiție de parchet în folderul vitrinei țintă, unde se află datele analizate pentru ziua respectivă. Am avut grijă în prealabil să ne asigurăm că nu există partiții cu conflicte de tip de date.

Dar ne confruntăm cu o a treia problemă. Acum schema generală nu este cunoscută, în plus, în Hive tabelul are schema greșită, deoarece fiecare nouă partiție a introdus cel mai probabil o distorsiune în schemă.

Tabelul trebuie reînregistrat. Acest lucru se poate face simplu: citiți din nou parchetul vitrinei, luați schema și creați un DDL pe baza acesteia, cu care puteți reînregistra folderul în Hive ca tabel extern, actualizând schema vitrinei țintă.

Ne confruntăm cu o a patra problemă. Când am înregistrat masa pentru prima dată, ne-am bazat pe Spark. Acum o facem singuri și trebuie să ne amintim că câmpurile de parchet pot începe cu caractere care nu sunt permise de Hive. De exemplu, Spark aruncă linii pe care nu le-a putut analiza în câmpul „corrupt_record”. Un astfel de câmp nu poate fi înregistrat în Hive fără a scăpa.

Știind acest lucru, obținem diagrama:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Cod ("_corupt_record", "`_corupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("matrice<`", "matrice<") face DDL sigur, adică în loc de:

create table tname (_field1 string, 1field string)

Cu nume de câmpuri precum „_field1, 1field”, se realizează un DDL sigur în care numele câmpurilor sunt escape: creați tabelul `tname` (șir `_field1`, șir `1field`).

Apare întrebarea: cum să obțineți corect un cadru de date cu o schemă completă (în codul pf)? Cum să obțineți acest pf? Aceasta este a cincea problemă. Recitiți diagrama tuturor partițiilor din folderul cu fișiere parchet a vitrinei țintă? Această metodă este cea mai sigură, dar dificilă.

Schema este deja în Hive. Puteți obține o nouă schemă combinând schema întregului tabel și noua partiție. Aceasta înseamnă că trebuie să luați schema tabelului din Hive și să o combinați cu schema noii partiții. Acest lucru se poate face citind metadatele de testare din Hive, salvându-le într-un folder temporar și citind ambele partiții simultan folosind Spark.

În esență, există tot ce aveți nevoie: schema originală a tabelului în Hive și o nouă partiție. Avem și date. Tot ce rămâne este să obțineți o nouă schemă care combină schema vitrinei și noi câmpuri din partiția creată:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

În continuare, creăm DDL de înregistrare a tabelului, ca în fragmentul anterior.
Dacă întregul lanț funcționează corect, și anume, a existat o încărcare inițială, iar tabelul a fost creat corect în Hive, atunci obținem o schemă de tabel actualizată.

Ultima problemă este că nu puteți adăuga cu ușurință o partiție la un tabel Hive, deoarece se va rupe. Trebuie să forțați Hive să-și repare structura partiției:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Sarcina simplă de a citi JSON și de a crea o vitrină bazată pe acesta are ca rezultat depășirea unui număr de dificultăți implicite, ale căror soluții trebuie găsite separat. Și deși aceste soluții sunt simple, este nevoie de mult timp pentru a le găsi.

Pentru a implementa construcția unei vitrine, a trebuit să:

  • Adăugați partiții în vitrina, scăpând de fișierele de serviciu
  • Gestionați câmpurile goale din datele sursă pe care Spark le-a tastat
  • Transmite tipuri simple pe șir
  • Convertiți numele câmpurilor în minuscule
  • Încărcarea separată a datelor și înregistrarea tabelului în Hive (crearea DDL)
  • Nu uitați să scăpați de numele câmpurilor care ar putea să nu fie compatibile cu Hive
  • Aflați cum să actualizați înregistrarea mesei în Hive

Pentru a rezuma, observăm că decizia de a construi vitrine este plină de multe capcane. Prin urmare, dacă apar dificultăți în implementare, este mai bine să apelați la un partener cu experiență și expertiză de succes.

Vă mulțumim că ați citit acest articol, sperăm că informațiile sunt utile.

Sursa: www.habr.com

Adauga un comentariu