Spark schemaEvolution u praksi

Dragi čitatelji, dobar dan!

U ovom članku vodeći konzultant Neoflexovog poslovnog područja Big Data Solutions detaljno opisuje opcije za izgradnju vitrina s promjenjivom strukturom pomoću Apache Spark.

Kao dio projekta analize podataka često se pojavljuje zadatak izgradnje izloga na temelju slabo strukturiranih podataka.

Obično su to zapisnici ili odgovori iz različitih sustava, spremljeni kao JSON ili XML. Podaci se učitavaju u Hadoop, a zatim od njih trebate izgraditi izlog. Pristup kreiranoj izlozi možemo organizirati npr. putem Impale.

U ovom slučaju shema ciljnog izloga nije unaprijed poznata. Štoviše, shema se također ne može sastaviti unaprijed, jer ovisi o podacima, a mi imamo posla s tim vrlo labavo strukturiranim podacima.

Na primjer, danas je zabilježen sljedeći odgovor:

{source: "app1", error_code: ""}

a sutra iz istog sustava dolazi sljedeći odgovor:

{source: "app1", error_code: "error", description: "Network error"}

Zbog toga u izlog treba dodati još jedno polje - opis, a nitko ne zna hoće li doći ili ne.

Zadatak stvaranja prodajnog izloga na takvim podacima prilično je standardan, a Spark za to ima brojne alate. Za raščlanjivanje izvornih podataka postoji podrška za JSON i XML, a za prethodno nepoznatu shemu dostupna je podrška za schemaEvolution.

Na prvi pogled rješenje izgleda jednostavno. Morate uzeti mapu s JSON-om i pročitati je u podatkovnom okviru. Spark će stvoriti shemu, pretvoriti ugniježđene podatke u strukture. Dalje, sve je potrebno spremiti u parket, što je također podržano u Impali, registracijom izloga u Hive metastore.

Čini se da je sve jednostavno.

No, iz kratkih primjera u dokumentaciji nije jasno što s nizom problema u praksi.

Dokumentacija opisuje pristup ne stvaranju prodajnog izloga, već čitanju JSON-a ili XML-a u okviru podataka.

Naime, jednostavno pokazuje kako čitati i analizirati JSON:

df = spark.read.json(path...)

Ovo je dovoljno da podaci budu dostupni Sparku.

U praksi, skripta je puno kompliciranija nego samo čitanje JSON datoteka iz mape i stvaranje podatkovnog okvira. Situacija izgleda ovako: već postoji određeni izlog, svaki dan dolaze novi podaci, potrebno ih je dodati u izlog, ne zaboravljajući da se shema može razlikovati.

Uobičajena shema za izgradnju vitrine je sljedeća:

Korak 1. Podaci se učitavaju u Hadoop uz naknadno dnevno ponovno učitavanje i dodaju u novu particiju. Ispada mapa s početnim podacima podijeljena po danima.

Korak 2. Tijekom početnog učitavanja ovu mapu čita i analizira Spark. Rezultirajući podatkovni okvir sprema se u formatu koji se može analizirati, na primjer, u parketu, koji se zatim može uvesti u Impalu. Ovo stvara ciljani izlog sa svim podacima koji su se nakupili do ove točke.

Korak 3. Stvoreno je preuzimanje koje će ažurirati izlog svaki dan.
Tu je pitanje inkrementalnog opterećenja, potrebe pregrađivanja vitrine i pitanje održavanja opće sheme vitrine.

Uzmimo primjer. Recimo da je implementiran prvi korak izgradnje repozitorija i da su JSON datoteke učitane u mapu.

Stvaranje okvira podataka iz njih, a zatim ga spremiti kao izlog, nije problem. Ovo je prvi korak koji se lako može pronaći u Spark dokumentaciji:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Čini se da je sve u redu.

Čitamo i analiziramo JSON, zatim spremamo podatkovni okvir kao parket, registrirajući ga u Hive na bilo koji prikladan način:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Dobivamo prozor.

No, sutradan su dodani novi podaci iz izvora. Imamo mapu s JSON-om i izlog stvoren iz te mape. Nakon učitavanja sljedeće serije podataka iz izvora, prodajnom mjestu nedostaju podaci za jedan dan.

Logično rješenje bi bilo pregrađivanje izloga po danima, što će omogućiti dodavanje nove pregrade svaki sljedeći dan. Mehanizam za to je također dobro poznat, Spark vam omogućuje odvojeno pisanje particija.

Prvo, radimo početno učitavanje, spremajući podatke kao što je gore opisano, dodajući samo particioniranje. Ta se radnja naziva inicijalizacija izloga i izvodi se samo jednom:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Sljedeći dan učitavamo samo novu particiju:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Sve što preostaje je ponovno se registrirati u Hive kako bi se ažurirala shema.
Međutim, tu nastaju problemi.

Prvi problem. Prije ili kasnije, dobiveni parket bit će nečitljiv. To je zbog načina na koji parquet i JSON različito tretiraju prazna polja.

Razmotrimo tipičnu situaciju. Na primjer, jučer je stigao JSON:

День 1: {"a": {"b": 1}},

a danas isti JSON izgleda ovako:

День 2: {"a": null}

Recimo da imamo dvije različite particije, svaka s jednom linijom.
Kada pročitamo sve izvorne podatke, Spark će moći odrediti tip, te će razumjeti da je "a" polje tipa "struktura", s ugniježđenim poljem "b" tipa INT. Ali, ako je svaka particija spremljena zasebno, tada dobivamo parket s nekompatibilnim shemama particija:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Ova situacija je dobro poznata, pa je posebno dodana opcija - prilikom parsiranja izvornih podataka ukloniti prazna polja:

df = spark.read.json("...", dropFieldIfAllNull=True)

U ovom slučaju parket će se sastojati od pregrada koje se mogu čitati zajedno.
Iako će se oni koji su to učinili u praksi ovdje gorko nasmiješiti. Zašto? Da, jer će vjerojatno biti još dvije situacije. Ili tri. Ili četiri. Prvi, koji će se gotovo sigurno dogoditi, jest da će numerički tipovi izgledati drugačije u različitim JSON datotekama. Na primjer, {intField: 1} i {intField: 1.1}. Ako se takva polja nađu u jednoj particiji, tada će spajanje sheme sve ispravno pročitati, što dovodi do najtočnijeg tipa. Ali ako su u različitim, tada će jedan imati intField: int, a drugi će imati intField: double.

Postoji sljedeća oznaka za rješavanje ove situacije:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Sada imamo mapu u kojoj se nalaze particije koje se mogu čitati u jedan dataframe i valjani parket cijele vitrine. Da? Ne.

Moramo zapamtiti da smo registrirali tablicu u Hive. Košnica ne razlikuje velika i mala slova u nazivima polja, dok parket razlikuje velika i mala slova. Stoga su particije sa shemama: field1: int i Field1: int iste za Hive, ali ne i za Spark. Ne zaboravite nazive polja pretvoriti u mala slova.

Nakon toga se čini da je sve u redu.

Međutim, nije sve tako jednostavno. Postoji i drugi, također dobro poznati problem. Budući da se svaka nova particija sprema zasebno, mapa particije sadržavat će Spark servisne datoteke, na primjer, oznaku uspjeha operacije _SUCCESS. To će rezultirati greškom pri pokušaju postavljanja parketa. Kako biste to izbjegli, trebate konfigurirati konfiguraciju kako biste spriječili Spark da doda servisne datoteke u mapu:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Čini se da se sada svaki dan nova particija parketa dodaje u ciljnu mapu vitrine, gdje se nalaze analizirani podaci za taj dan. Unaprijed smo se pobrinuli da nema particija s sukobom tipa podataka.

No, imamo i treći problem. Sada opća shema nije poznata, štoviše, tablica u Hiveu ima netočnu shemu, budući da je svaka nova particija najvjerojatnije unijela izobličenje u shemu.

Trebate ponovno registrirati tablicu. To se može učiniti jednostavno: ponovno pročitajte parket izloga, uzmite shemu i na temelju nje izradite DDL s kojim ćete ponovno registrirati mapu u Hiveu kao vanjsku tablicu, ažurirajući shemu ciljanog izloga.

Imamo četvrti problem. Kad smo prvi put registrirali tablicu, oslonili smo se na Spark. Sada to radimo sami i moramo zapamtiti da polja parketa mogu započeti znakovima koji nisu dopušteni za Hive. Na primjer, Spark izbacuje retke koje nije mogao analizirati u polju "corrupt_record". Takvo polje ne može se registrirati u Hiveu a da se ne izbjegne.

Znajući ovo, dobivamo shemu:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Šifra ("_oštećen_zapis", "`_oštećen_zapis`") + " " + f[1].zamijeni(":", "`:").zamijeni("<", "<`").zamijeni(",", ",`").zamijeni("niz<`", "niz<") čini siguran DDL, tj. umjesto:

create table tname (_field1 string, 1field string)

S nazivima polja kao što je "_field1, 1field", siguran DDL se pravi tamo gdje su nazivi polja escapedirani: kreirajte tablicu `tname` (`_field1` niz, `1field` niz).

Postavlja se pitanje: kako ispravno dobiti podatkovni okvir s potpunom shemom (u pf kodu)? Kako doći do ovog pf-a? Ovo je peti problem. Ponovno pročitajte shemu svih particija iz mape s parketnim datotekama ciljne vitrine? Ova metoda je najsigurnija, ali teška.

Shema je već u Hiveu. Novu shemu možete dobiti kombiniranjem sheme cijele tablice i nove particije. Dakle, trebate uzeti shemu tablice iz Hivea i kombinirati je sa shemom nove particije. To se može učiniti čitanjem testnih metapodataka iz Hivea, spremanjem u privremenu mapu i korištenjem Spark za čitanje obje particije odjednom.

Zapravo, postoji sve što vam treba: izvorna shema tablice u Hiveu i nova particija. Imamo i podatke. Ostaje samo dobiti novu shemu koja kombinira shemu izloga i nova polja iz stvorene particije:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Zatim stvaramo DDL registracije tablice, kao u prethodnom isječku.
Ako cijeli lanac radi ispravno, naime, došlo je do inicijalizirajućeg učitavanja, a tablica je ispravno kreirana u Hiveu, tada dobivamo ažuriranu shemu tablice.

I posljednji problem je taj što ne možete samo dodati particiju u Hive tablicu, jer će ona biti pokvarena. Morate prisiliti Hive da popravi strukturu particije:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Jednostavan zadatak čitanja JSON-a i stvaranja izloga na temelju njega rezultira prevladavanjem niza implicitnih poteškoća, rješenja za koja morate tražiti zasebno. I iako su ta rješenja jednostavna, potrebno je dosta vremena da se pronađu.

Za realizaciju konstrukcije vitrine, morao sam:

  • Dodajte particije u izlog, riješite se servisnih datoteka
  • Postupajte s praznim poljima u izvornim podacima koje je Spark upisao
  • Prebacivanje jednostavnih tipova u niz
  • Pretvorite nazive polja u mala slova
  • Odvojeni prijenos podataka i registracija tablice u Hive (DDL generacija)
  • Ne zaboravite izbjeći imena polja koja bi mogla biti nekompatibilna s Hiveom
  • Naučite kako ažurirati registraciju tablice u Hiveu

Ukratko, napominjemo da je odluka o izgradnji izloga prepuna mnogih zamki. Stoga je u slučaju poteškoća u provedbi bolje kontaktirati iskusnog partnera s uspješnom ekspertizom.

Hvala vam što ste pročitali ovaj članak, nadamo se da će vam informacije biti korisne.

Izvor: www.habr.com

Dodajte komentar