Spark schemaEvolucija u praksi

Dragi čitaoci, dobar dan!

U ovom članku, vodeći konsultant za poslovno područje Big Data Solutions Neoflex-a detaljno opisuje opcije za izgradnju izloga promjenjive strukture koristeći Apache Spark.

Kao dio projekta analize podataka, često se javlja zadatak izgradnje vitrina na osnovu labavo strukturiranih podataka.

Obično su to zapisnici, ili odgovori različitih sistema, sačuvani u obliku JSON ili XML-a. Podaci se učitavaju u Hadoop, a zatim se od njih mora napraviti izlog. Možemo organizirati pristup kreiranom izlogu, na primjer, preko Impale.

U ovom slučaju, izgled ciljnog izloga je unaprijed nepoznat. Štaviše, šema se ne može izraditi unaprijed, jer ovisi o podacima, a mi imamo posla sa ovim vrlo slabo strukturiranim podacima.

Na primjer, danas je zabilježen sljedeći odgovor:

{source: "app1", error_code: ""}

a sutra iz istog sistema stiže sljedeći odgovor:

{source: "app1", error_code: "error", description: "Network error"}

Kao rezultat toga, u izlog bi trebalo dodati još jedno polje - opis, a niko ne zna hoće li doći ili ne.

Zadatak kreiranja tržišta na takvim podacima je prilično standardan, a Spark ima niz alata za to. Za raščlanjivanje izvornih podataka postoji podrška i za JSON i za XML, a za prethodno nepoznatu šemu, obezbeđena je podrška za schemaEvolution.

Na prvi pogled rješenje izgleda jednostavno. Morate uzeti folder sa JSON-om i pročitati ga u okvir podataka. Spark će kreirati šemu i pretvoriti ugniježđene podatke u strukture. Zatim sve treba spremiti u parket, što je podržano i u Impali, registracijom izloga u Hive metastore.

Čini se da je sve jednostavno.

Međutim, iz kratkih primjera u dokumentaciji nije jasno šta učiniti sa nizom problema u praksi.

Dokumentacija opisuje pristup ne za kreiranje izloga, već za čitanje JSON ili XML-a u okvir podataka.

Naime, jednostavno pokazuje kako čitati i raščlaniti JSON:

df = spark.read.json(path...)

Ovo je dovoljno da podaci budu dostupni Sparku.

U praksi, scenario je mnogo složeniji od pukog čitanja JSON fajlova iz fascikle i kreiranja okvira podataka. Situacija izgleda ovako: već postoji određeni izlog, svaki dan stižu novi podaci, potrebno ih je dodati u izlog, ne zaboravljajući da se shema može razlikovati.

Uobičajena shema za izgradnju izloga je sljedeća:

Korak 1. Podaci se učitavaju u Hadoop, nakon čega slijedi svakodnevno dodatno učitavanje i dodaju se na novu particiju. Rezultat je folder sa izvornim podacima, podijeljen po danima.

Korak 2. Tokom početnog učitavanja, ovaj folder se čita i analizira pomoću Spark-a. Rezultirajući okvir podataka pohranjuje se u formatu koji se može analizirati, na primjer, u parketu, koji se zatim može uvesti u Impalu. Ovo stvara ciljni izlog sa svim podacima koji su se akumulirali do ove tačke.

Korak 3. Kreira se preuzimanje koje će ažurirati izlog svaki dan.
Postavlja se pitanje inkrementalnog učitavanja, potrebe za pregradnjom izloga i pitanje podrške generalnom izgledu izloga.

Dajemo primjer. Recimo da je prvi korak izgradnje spremišta implementiran i konfigurisano je postavljanje JSON fajlova u folder.

Nije problem kreirati okvir podataka od njih, a zatim ih sačuvati kao izlog. Ovo je prvi korak koji se lako može pronaći u Spark dokumentaciji:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Čini se da je sve u redu.

Pročitali smo i raščlanili JSON, zatim spremamo okvir podataka kao parket, registrirajući ga u Hive na bilo koji pogodan način:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Dobijamo izlog.

No, sljedeći dan su dodani novi podaci iz izvora. Imamo folder sa JSON-om i izlog kreiran na osnovu ovog foldera. Nakon učitavanja sljedećeg dijela podataka iz izvora, izlog nema dovoljno podataka za jedan dan.

Logično rješenje bi bilo da se izlog podijeli po danima, što će vam omogućiti da dodajete novu particiju svakog sljedećeg dana. Mehanizam za to je također dobro poznat; Spark vam omogućava da zasebno snimate particije.

Prvo, izvršimo početno učitavanje, spremamo podatke kako je gore opisano, dodajući samo particioniranje. Ova radnja se zove inicijalizacija izloga i radi se samo jednom:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Sljedećeg dana preuzimamo samo novu particiju:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Sve što ostaje je da se ponovo registrujete u Hive-u da ažurirate šemu.
Međutim, tu nastaju problemi.

Prvi problem. Prije ili kasnije, nastali parket više neće biti čitljiv. To je zbog načina na koji parket i JSON različito tretiraju prazna polja.

Hajde da razmotrimo tipičnu situaciju. Na primjer, jučer stiže JSON:

День 1: {"a": {"b": 1}},

a danas isti JSON izgleda ovako:

День 2: {"a": null}

Recimo da imamo dvije različite particije, svaka sa jednim redom.
Kada pročitamo kompletne izvorne podatke, Spark će moći da odredi tip i shvatiće da je „a“ polje tipa „struktura“, sa ugnežđenim poljem „b“ tipa INT. Ali, ako je svaka particija sačuvana zasebno, onda je rezultat parket s nekompatibilnim shemama pregrada:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Ova situacija je dobro poznata, pa je posebno dodana opcija za uklanjanje praznih polja prilikom raščlanjivanja izvornih podataka:

df = spark.read.json("...", dropFieldIfAllNull=True)

U tom slučaju parket će se sastojati od pregrada koje se mogu čitati zajedno.
Iako će se oni koji su to učinili u praksi gorko nasmiješiti. Zašto? Da, jer će se najvjerovatnije pojaviti još dvije situacije. Ili tri. Ili četiri. Prvi, koji je gotovo siguran, je da će numerički tipovi izgledati drugačije u različitim JSON datotekama. Na primjer, {intField: 1} i {intField: 1.1}. Ako se takva polja pojave u jednoj seriji, onda će spajanje šeme sve ispravno pročitati, što će dovesti do najpreciznijeg tipa. Ali ako su u različitim, onda će jedan imati intField: int, a drugi će imati intField: double.

Za rješavanje ove situacije postoji sljedeća zastavica:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Sada imamo fasciklu u kojoj se nalaze particije, koja se može čitati u jedan dataframe i važeći parket čitavog izloga. Da? br.

Moramo zapamtiti da smo registrovali tabelu u Hive. Hive ne razlikuje velika i mala slova u nazivima polja, ali parket jeste. Stoga su particije sa šemama: field1: int i Field1: int iste za Hive, ali ne i za Spark. Ne zaboravite promijeniti nazive polja u mala slova.

Nakon ovoga, čini se da je sve u redu.

Međutim, nije sve tako jednostavno. Pojavljuje se drugi, takođe dobro poznat problem. Pošto se svaka nova particija sprema zasebno, folder particije će sadržavati Spark servisne datoteke, na primjer, oznaku uspjeha operacije _SUCCESS. To će rezultirati greškom pri pokušaju parketa. Da biste to izbjegli, morate konfigurirati konfiguraciju tako što ćete spriječiti Spark da dodaje servisne datoteke u folder:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Čini se da se sada svaki dan dodaje nova particija parketa u ciljni folder izloga, gdje se nalaze raščlanjeni podaci za taj dan. Unaprijed smo se pobrinuli da nema particija sa sukobima tipova podataka.

Ali suočeni smo sa trećim problemom. Sada opšta šema nije poznata, štaviše, u Hiveu tabela ima pogrešnu šemu, pošto je svaka nova particija najverovatnije unela izobličenje u šemu.

Tabelu je potrebno ponovo registrovati. To se može učiniti jednostavno: ponovo pročitajte parket izloga, uzmite šemu i na osnovu nje kreirajte DDL, pomoću kojeg možete ponovo registrovati folder u Hiveu kao eksternu tabelu, ažurirajući šemu ciljnog izloga.

Suočavamo se sa četvrtim problemom. Kada smo prvi put registrovali tabelu, oslonili smo se na Spark. Sada to radimo sami, i moramo imati na umu da polja parketa mogu početi sa znakovima koje Hive ne dozvoljava. Na primjer, Spark izbacuje linije koje nije mogao raščlaniti u polje “corrupt_record”. Takvo polje se ne može registrovati u košnici bez bijega.

Znajući ovo, dobijamo dijagram:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Kod ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("niz<`", "niz<") radi siguran DDL, odnosno umjesto:

create table tname (_field1 string, 1field string)

Sa imenima polja kao što su "_field1, 1field", pravi se siguran DDL gdje su nazivi polja izbačeni: kreirajte tablicu `tname` (string `_field1`, string `1field`).

Postavlja se pitanje: kako ispravno dobiti dataframe sa kompletnom šemom (u pf kodu)? Kako do ovog pf-a? Ovo je peti problem. Ponovno pročitati dijagram svih particija iz foldera sa fajlovima parketa ciljnog izloga? Ova metoda je najsigurnija, ali teška.

Shema je već u Hiveu. Možete dobiti novu shemu kombiniranjem sheme cijele tablice i nove particije. To znači da trebate uzeti shemu tablice iz Hive-a i kombinirati je sa shemom nove particije. Ovo se može učiniti čitanjem testnih metapodataka iz Hive-a, spremanjem ih u privremenu mapu i čitanjem obje particije odjednom pomoću Spark-a.

U suštini, postoji sve što vam treba: originalna šema tabele u Hive-u i nova particija. Imamo i podatke. Sve što ostaje je nabaviti novu shemu koja kombinira shemu izloga i nova polja iz kreirane particije:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Zatim kreiramo tablicu registracije DDL, kao u prethodnom fragmentu.
Ako cijeli lanac radi ispravno, naime, došlo je do početnog učitavanja, a tablica je ispravno kreirana u Hiveu, tada dobijamo ažuriranu šemu tablice.

Posljednji problem je što ne možete lako dodati particiju u Hive tablicu, jer će se pokvariti. Morate prisiliti Hive da popravi svoju particijsku strukturu:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Jednostavan zadatak čitanja JSON-a i kreiranja izloga zasnovanog na njemu rezultira prevazilaženjem niza implicitnih poteškoća za koje se rješenja moraju pronaći zasebno. I iako su ova rješenja jednostavna, potrebno je dosta vremena da ih se pronađe.

Za realizaciju izgradnje vitrine morali smo:

  • Dodajte particije na izlog, riješite se servisnih datoteka
  • Bavite se praznim poljima u izvornim podacima koje je Spark otkucao
  • Prebacite jednostavne tipove na niz
  • Pretvorite nazive polja u mala slova
  • Odvojeno učitavanje podataka i registracija tablice u Hive (DDL kreiranje)
  • Ne zaboravite pobjeći imena polja koja možda nisu kompatibilna sa Hive
  • Naučite ažurirati registraciju tablice u Hiveu

Da rezimiramo, napominjemo da je odluka o izgradnji izloga prepuna mnogih zamki. Stoga, ako se pojave poteškoće u implementaciji, bolje je obratiti se iskusnom partneru s uspješnom stručnošću.

Hvala vam što ste pročitali ovaj članak, nadamo se da će vam informacije biti korisne.

izvor: www.habr.com

Dodajte komentar