Spark schemaEvolution dina prakték

Pembaca anu hormat, wilujeng siang!

Dina artikel ieu, konsultan ngarah pikeun wewengkon bisnis Big Data Solutions of Neoflex ngajelaskeun sacara rinci pilihan pikeun ngawangun storefronts struktur variabel ngagunakeun Apache Spark.

Salaku bagian tina proyék analisis data, tugas ngawangun showcases dumasar kana data terstruktur longgar mindeng timbul.

Biasana ieu log, atanapi réspon tina sababaraha sistem, disimpen dina bentuk JSON atanapi XML. Data diunggah ka Hadoop, teras toko kedah diwangun ti dinya. Urang tiasa ngatur aksés ka storefront dijieun, contona, ngaliwatan Impala.

Dina hal ieu, tata perenah toko udagan henteu dipikanyaho sateuacanna. Leuwih ti éta, skéma nu teu bisa digambar up sateuacanna, sabab gumantung kana data, sarta kami kaayaan ieu data terstruktur pisan lemah.

Contona, dinten ieu réspon di handap ieu asup:

{source: "app1", error_code: ""}

sareng énjing réspon di handap ieu asalna tina sistem anu sami:

{source: "app1", error_code: "error", description: "Network error"}

Hasilna, widang sejen kudu ditambahkeun kana storefront - pedaran, sarta teu saurang ogé weruh naha éta bakal datang atanapi henteu.

Tugas nyiptakeun pasar dina data sapertos kitu cukup standar, sareng Spark ngagaduhan sababaraha alat pikeun ieu. Pikeun ngungkabkeun data sumber, aya dukungan pikeun JSON sareng XML, sareng pikeun skéma anu teu dipikanyaho, dukungan schemaEvolution disayogikeun.

Dina glance kahiji, leyuran Sigana basajan. Anjeun kedah nyandak folder sareng JSON sareng baca kana pigura data. Spark bakal nyieun skéma jeung ngarobah data nested kana struktur. Salajengna, sadayana kedah disimpen dina parquet, anu ogé dirojong di Impala, ku ngadaptarkeun storefront di metastore Hive.

Sagalana sigana basajan.

Nanging, tina conto pondok dina dokuméntasi henteu écés naon anu kudu dilakukeun ku sababaraha masalah dina prakna.

Dokuméntasi ngajelaskeun pendekatan sanés pikeun nyiptakeun toko, tapi pikeun maca JSON atanapi XML kana pigura data.

Nyaéta, éta ngan ukur nunjukkeun kumaha maca sareng nga-parse JSON:

df = spark.read.json(path...)

Ieu cukup pikeun nyieun data sadia pikeun Spark.

Dina prakna, skénario éta langkung rumit tibatan ngan ukur maca file JSON tina polder sareng nyiptakeun dataframe. Kaayaan sapertos kieu: tos aya paméran anu tangtu, data énggal sumping unggal dinten, aranjeunna kedah diasupkeun kana showcase, teu hilap yén skéma tiasa bénten.

Skéma biasa pikeun ngawangun etalase nyaéta kieu:

Lengkah 1. Data ieu dimuat kana Hadoop, dituturkeun ku loading tambahan poean sarta ditambahkeun kana partisi anyar. Hasilna nyaéta polder sareng data sumber, dipisahkeun ku dinten.

Lengkah 2. Salila loading awal, folder ieu dibaca tur parsed maké Spark. Dataframe hasilna disimpen dina format nu bisa dianalisis, contona, dina parquet, nu lajeng bisa diimpor kana Impala. Ieu nyiptakeun toko target sareng sadaya data anu akumulasi dugi ka titik ieu.

Lengkah 3. A download dijieun anu bakal ngamutahirkeun storefront unggal poé.
Patarosan ngeunaan loading incremental timbul, perlu pikeun partisi storefront, sarta sual ngarojong tata perenah umum storefront nu.

Hayu urang masihan conto. Hayu urang nyarios léngkah munggaran pikeun ngawangun gudang parantos dilaksanakeun, sareng unggah file JSON kana polder dikonpigurasi.

Henteu aya masalah pikeun nyiptakeun dataframe ti aranjeunna teras simpen salaku paméran. Ieu mangrupikeun léngkah munggaran anu gampang dipendakan dina dokuméntasi Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Sagalana sigana rupa.

Kami parantos maca sareng ngémutan JSON, teras urang simpen dataframe salaku parquet, ngadaptarkeunana di Hive ku cara anu gampang:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Urang meunang showcase a.

Tapi, poé saterusna data anyar ditambahkeun ti sumberna. Simkuring boga folder kalawan JSON, sarta storefront dijieun dumasar kana folder ieu. Saatos ngamuat bagian salajengna data tina sumberna, storefront teu gaduh data anu cekap pikeun sadinten.

Solusi anu logis nyaéta pikeun ngabagi etalase unggal dinten, anu bakal ngamungkinkeun anjeun nambihan partisi énggal unggal dinten salajengna. Mékanisme pikeun ieu ogé dipikanyaho; Spark ngamungkinkeun anjeun pikeun ngarékam partisi nyalira.

Kahiji, urang ngalakukeun loading awal, nyimpen data sakumaha ditétélakeun di luhur, nambahan ukur partitioning. Aksi ieu disebut initialization storefront sarta dipigawé ngan sakali:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Poé saterusna urang ngan ngundeur partisi anyar:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Sadaya anu tetep nyaéta ngadaptar deui di Hive pikeun ngapdet skéma.
Sanajan kitu, ieu téh dimana masalah timbul.

Masalah kahiji. Moal lami deui atanapi engké, parquet anu hasilna moal tiasa dibaca deui. Ieu disababkeun ku cara parquet sareng JSON ngubaran sawah kosong sacara béda.

Hayu urang nganggap kaayaan has. Salaku conto, kamari JSON sumping:

День 1: {"a": {"b": 1}},

sareng ayeuna JSON sami sapertos kieu:

День 2: {"a": null}

Anggap urang gaduh dua partisi anu béda, masing-masing gaduh hiji garis.
Lamun urang maca sakabéh data sumber, Spark bakal bisa nangtukeun jenis, sarta bakal ngarti yén "a" mangrupakeun widang tipe "struktur", kalawan widang nested "b" tipe INT. Tapi, upami unggal partisi disimpen nyalira, hasilna mangrupikeun parquet kalayan skéma partisi anu teu cocog:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Situasi ieu dipikanyaho, janten pilihan parantos tambihan khusus pikeun ngahapus kolom kosong nalika nga-parsing data sumber:

df = spark.read.json("...", dropFieldIfAllNull=True)

Dina hal ieu, parquet bakal diwangun ku partisi anu tiasa dibaca babarengan.
Sanajan jalma anu geus ngalakukeun ieu dina prakna bakal seuri pait. Naha? Sumuhun, sabab paling dipikaresep dua kaayaan deui bakal timbul. Atawa tilu. Atawa opat. Anu kahiji, anu ampir pasti, nyaéta yén jinis angka bakal béda dina file JSON anu béda. Contona, {intField: 1} jeung {intField: 1.1}. Upami widang sapertos kitu muncul dina hiji angkatan, maka skéma ngahiji bakal maca sadayana leres, ngarah kana jinis anu paling akurat. Tapi lamun dina leuwih béda, mangka hiji bakal boga intField: int, jeung lianna bakal intField: ganda.

Pikeun nanganan kaayaan ieu aya bandéra handap:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Ayeuna urang gaduh folder dimana partitions lokasina, nu bisa dibaca kana dataframe tunggal sarta parquet valid sakabéh storefront. Sumuhun? No.

Urang kudu inget yen urang didaptarkeun tabel di Hive. Hive henteu sénsitip kana ngaran lapangan, tapi parquet. Ku alatan éta, partisi kalawan schemas: field1: int, sarta Field1: int sarua keur Hive, tapi teu keur Spark. Tong hilap gentos nami lapangan janten hurup leutik.

Sanggeus ieu, sagalana sigana rupa.

Sanajan kitu, teu kabeh jadi basajan. Masalah kadua, ogé dipikawanoh timbul. Kusabab unggal partisi anyar disimpen nyalira, folder partisi bakal ngandung file jasa Spark, contona, bendera kasuksésan operasi _SUCCESS. Ieu bakal ngakibatkeun kasalahan nalika nyobian parket. Pikeun ngahindarkeun ieu, anjeun kedah ngonpigurasikeun konfigurasi ku nyegah Spark tina nambihan file jasa kana polder:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Sigana ayeuna unggal dintenna partisi parquet anyar ditambahkeun kana folder storefront target, dimana data parsed pikeun poé ieu lokasina. Urang jaga sateuacanna pikeun mastikeun yén teu aya partisi sareng konflik tipe data.

Tapi urang nyanghareupan masalah katilu. Ayeuna schema umum teu dipikawanoh, komo deui, dina Hive tabel boga schema salah, saprak unggal partisi anyar paling dipikaresep ngawanohkeun distorsi kana schema nu.

Méja kudu didaptarkeun deui. Ieu bisa dilakukeun ngan saukur: baca parquet of storefront deui, nyandak skéma jeung nyieun DDL dumasar kana eta, kalawan nu bisa ulang ngadaptar folder di Hive salaku tabel éksternal, ngamutahirkeun skéma of storefront target.

Urang nyanghareupan masalah kaopat. Nalika urang ngadaptar tabel pikeun kahiji kalina, urang ngandelkeun Spark. Ayeuna urang ngalakukeun eta sorangan, sarta kami kudu inget yen widang parquet bisa dimimitian ku karakter nu teu diwenangkeun ku Hive. Salaku conto, Spark ngaluarkeun garis anu teu tiasa diparse dina kolom "corrupt_record". Widang sapertos kitu teu tiasa didaptarkeun di Hive tanpa kabur.

Nyaho ieu, urang meunang diagram:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

kode ("_record_corrupt", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") teu aman DDL, nyaeta, tinimbang:

create table tname (_field1 string, 1field string)

Kalawan ngaran widang kawas "_field1, 1field", a DDL aman dijieun dimana ngaran widang lolos: nyieun tabel `tname` (`_field1` string, `1field` string).

Patarosan timbul: kumaha leres kéngingkeun dataframe kalayan skéma lengkep (dina kode pf)? Kumaha carana kéngingkeun pf ieu? Ieu masalah kalima. Baca deui diagram sadaya partisi tina folder kalayan file parquet tina toko target? Metoda ieu nu safest, tapi hésé.

Skéma geus aya di Hive. Anjeun tiasa kéngingkeun skéma énggal ku ngagabungkeun skéma sadaya méja sareng partisi énggal. Ieu ngandung harti yén anjeun kedah nyandak schema tabel ti Hive tur ngagabungkeun deui jeung schema tina partisi anyar. Ieu tiasa dilakukeun ku maca metadata tés tina Hive, simpen kana polder samentawis, sareng maca duanana partisi sakaligus nganggo Spark.

Intina, aya sagalana nu peryogi: schema tabel aslina di Hive sarta partisi anyar. Urang ogé boga data. Sadaya anu tetep nyaéta kéngingkeun skéma énggal anu ngagabungkeun skéma storefront sareng widang énggal tina partisi anu diciptakeun:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Salajengna, urang nyieun DDL pendaptaran tabel, sakumaha dina sempalan saméméhna.
Lamun sakabéh ranté jalan neuleu, nyaéta, aya beban awal, sarta tabél ieu leres dijieun di Hive, lajeng urang meunang hiji schema tabel diropéa.

Masalah panungtungan nyaéta nu teu bisa gampang nambahkeun partisi ka méja Hive, sabab bakal megatkeun. Anjeun kedah maksa Hive pikeun ngalereskeun struktur partisi na:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Tugas saderhana pikeun maca JSON sareng nyiptakeun etalase dumasar kana éta nyababkeun sababaraha kasusah implisit, solusi anu kedah dipendakan nyalira. Sareng sanaos solusi ieu saderhana, peryogi seueur waktos pikeun mendakanana.

Pikeun ngalaksanakeun pangwangunan showcase, urang kedah:

  • Tambahkeun partisi ka etalase, ngaleungitkeun file jasa
  • Nungkulan widang kosong dina data sumber nu Spark geus diketik
  • Tuang jenis basajan pikeun string
  • Ngarobah ngaran widang ka hurup leutik
  • Unggah data misah sareng pendaptaran tabel di Hive (nyieun DDL)
  • Tong hilap ngémutan nami lapangan anu henteu cocog sareng Hive
  • Diajar ngapdet pendaptaran tabel di Hive

Pikeun nyimpulkeun, urang dicatet yén kaputusan ngawangun storefronts nyaeta fraught kalawan loba pitfalls. Ku alatan éta, lamun kasusah timbul dina palaksanaan, eta leuwih hade mun balik ka pasangan ngalaman kalawan kaahlian suksés.

Hatur nuhun pikeun maca tulisan ieu, kami ngarepkeun anjeun mendakan inpormasi anu mangpaat.

sumber: www.habr.com

Tambahkeun komentar