Spark schemaEvolution ing laku

Para pamaca sing ditresnani, dina sing apik!

Ing artikel iki, konsultan utama area bisnis Solusi Data Besar Neoflex nerangake kanthi rinci babagan opsi kanggo mbangun pameran struktur variabel nggunakake Apache Spark.

Minangka bagΓ©an saka proyek analisis data, tugas mbangun storefronts adhedhasar data longgar kabentuk asring muncul.

Biasane iki log, utawa respon saka macem-macem sistem, disimpen minangka JSON utawa XML. Data kasebut diunggah menyang Hadoop, mula sampeyan kudu nggawe etalase. Kita bisa ngatur akses menyang showcase digawe, contone, liwat Impala.

Ing kasus iki, skema toko target ora dingerteni sadurunge. Kajaba iku, skema kasebut uga ora bisa digambar luwih dhisik, amarga gumantung saka data, lan kita lagi ngurusi data sing wis kabentuk banget.

Contone, dina iki respon ing ngisor iki dicathet:

{source: "app1", error_code: ""}

lan sesuk saka sistem sing padha teka jawaban ing ngisor iki:

{source: "app1", error_code: "error", description: "Network error"}

AkibatΓ©, siji lapangan liyane kudu ditambahake menyang showcase - gambaran, lan ora ana sing ngerti apa bakal teka utawa ora.

Tugas nggawe storefront ing data kasebut cukup standar, lan Spark duwe sawetara alat kanggo iki. Kanggo parsing data sumber, ana dhukungan kanggo JSON lan XML, lan kanggo skema sing durung dingerteni, dhukungan kanggo schemaEvolution diwenehake.

Ing kawitan marketing, solusi katon prasaja. Sampeyan kudu njupuk folder karo JSON lan maca menyang dataframe. Spark bakal nggawe skema, ngowahi data bersarang dadi struktur. Luwih, kabeh kudu disimpen ing parket, sing uga didhukung ing Impala, kanthi ndhaptar storefront ing metastore Hive.

Kabeh katon prasaja.

Nanging, iku ora cetha saka conto singkat ing dokumentasi apa apa karo sawetara masalah ing laku.

Dokumentasi nggambarake pendekatan ora kanggo nggawe storefront, nanging maca JSON utawa XML menyang dataframe.

Yaiku, mung nuduhake carane maca lan ngurai JSON:

df = spark.read.json(path...)

Iki cukup kanggo nggawe data kasedhiya kanggo Spark.

Ing laku, skrip luwih rumit tinimbang mung maca file JSON saka folder lan nggawe dataframe. Kahanan kaya mangkene: wis ana storefront tartamtu, data anyar teka saben dina, kudu ditambahake ing storefront, ora lali yen skema bisa beda-beda.

Skema biasa kanggo mbangun showcase yaiku:

Langkah 1. Data kasebut dimuat menyang Hadoop kanthi reload saben dina lan ditambahake menyang partisi anyar. Pranyata folder karo data awal partisi dening dina.

Langkah 2. Sajrone mbukak wiwitan, folder iki diwaca lan diurai dening Spark. Dataframe asil disimpen ing format parsable, contone, ing parket, kang banjur bisa diimpor menyang Impala. Iki nggawe showcase target karo kabeh data sing wis akumulasi nganti titik iki.

Langkah 3. A download digawe sing bakal nganyari storefront saben dina.
Ana pitakonan babagan loading incremental, perlu kanggo partisi showcase, lan pitakonan kanggo njaga skema umum showcase.

Ayo dadi conto. Ayo ngomong yen langkah pisanan mbangun repositori wis dileksanakake, lan file JSON diunggah menyang folder.

Nggawe dataframe saka wong-wong mau, banjur nyimpen minangka showcase, ora masalah. Iki minangka langkah pisanan sing bisa ditemokake kanthi gampang ing dokumentasi Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Kabeh katon apik.

Kita maca lan ngurai JSON, banjur nyimpen dataframe minangka parket, ndhaptar ing Hive kanthi cara sing trep:

df.write.format(β€œparquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

We njaluk jendhela.

Nanging, dina sabanjure, data anyar saka sumber kasebut ditambahake. Kita duwe folder karo JSON, lan pertunjukan digawe saka folder iki. Sawise mbukak kumpulan data sabanjure saka sumber, data mart ilang data sedina.

Solusi logis yaiku partisi storefront saben dina, sing bakal ngidini nambah partisi anyar saben dina sabanjure. Mekanisme iki uga dikenal, Spark ngidini sampeyan nulis partisi kanthi kapisah.

Kaping pisanan, kita nindakake beban awal, nyimpen data kaya sing kasebut ing ndhuwur, mung nambah partisi. Tumindak iki diarani initialization storefront lan mung rampung sapisan:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Dina sabanjure, kita mung mbukak partisi anyar:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Kabeh sing isih ana yaiku ndhaptar maneh ing Hive kanggo nganyari skema.
Nanging, ing kene ana masalah.

Masalah pisanan. Cepet utawa mengko, parket sing diasilake bakal ora bisa diwaca. Iki amarga cara parket lan JSON nambani lapangan kosong kanthi beda.

Ayo nimbang kahanan sing khas. Contone, wingi JSON teka:

Π”Π΅Π½ΡŒ 1: {"a": {"b": 1}},

lan dina iki JSON padha katon kaya iki:

Π”Π΅Π½ΡŒ 2: {"a": null}

Ayo kita duwe rong partisi sing beda-beda, saben siji baris.
Nalika kita maca kabeh data sumber, Spark bakal bisa kanggo nemtokake jinis, lan bakal ngerti sing "a" lapangan jinis "struktur", karo lapangan nested "b" jinis INT. Nanging, yen saben partisi disimpen kanthi kapisah, banjur entuk parket kanthi skema partisi sing ora kompatibel:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Kahanan iki wis dikenal, mula opsi wis ditambahake khusus - nalika ngurai data sumber, mbusak kolom kosong:

df = spark.read.json("...", dropFieldIfAllNull=True)

Ing kasus iki, parket bakal kalebu partisi sing bisa diwaca bebarengan.
Senajan wong-wong sing wis nindakake iki ing laku bakal mesem pait kene. Kenging punapa? Ya, amarga ana rong kahanan liyane. Utawa telu. Utawa papat. Pisanan, sing meh mesthi kedadeyan, yaiku jinis numerik bakal katon beda ing file JSON sing beda. Contone, {intField: 1} lan {intField: 1.1}. Yen kolom kasebut ditemokake ing siji partisi, banjur skema gabungan bakal maca kabeh kanthi bener, anjog menyang jinis sing paling akurat. Nanging yen beda-beda, siji bakal duwe intField: int, lan liyane bakal duwe intField: pindho.

Ana gendera ing ngisor iki kanggo nangani kahanan iki:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Saiki kita duwe folder ing ngendi ana partisi sing bisa diwaca dadi dataframe siji lan parket sing bener saka kabeh pertunjukan. iya ta? Ora.

Kita kudu elinga yen kita ndhaptar meja ing Hive. Hive ora sensitif huruf cilik ing jeneng lapangan, dene parket sensitif huruf cilik. Mulane, partisi karo skema: field1: int, lan Field1: int padha kanggo Hive, nanging ora kanggo Spark. Aja lali ngowahi jeneng lapangan dadi huruf cilik.

Sawise iku, kabeh katon apik.

Nanging, ora kabeh supaya prasaja. Ana masalah kapindho, uga kondhang. Wiwit saben partisi anyar disimpen kanthi kapisah, folder partisi bakal ngemot file layanan Spark, contone, bendera sukses operasi _SUCCESS. Iki bakal nyebabake kesalahan nalika nyoba parket. Kanggo ngindhari iki, sampeyan kudu ngatur konfigurasi kanggo nyegah Spark saka nambah file layanan menyang folder:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Kayane saiki saben dina partisi parket anyar ditambahake menyang folder showcase target, ing ngendi data sing diurai kanggo dina kasebut. We njupuk care ing advance sing ora ana partisi karo konflik jinis data.

Nanging, kita duwe masalah katelu. Saiki skema umum ora dikawruhi, luwih-luwih, tabel ing Hive duwe skema sing salah, amarga saben partisi anyar bisa uga ngenalake distorsi menyang skema kasebut.

Sampeyan kudu ndhaftar maneh meja. Iki bisa rampung mung: maca parket saka storefront maneh, njupuk skema lan nggawe DDL adhedhasar iku, kanggo maneh ndhaftar folder ing Hive minangka meja external, nganyari skema saka storefront target.

Kita duwe masalah kaping papat. Nalika kita ndhaftar meja kanggo pisanan, kita gumantung ing Spark. Saiki kita nindakake dhewe, lan kita kudu elinga yen kothak parket bisa diwiwiti karo karakter sing ora diijini kanggo Hive. Contone, Spark mbuwang garis sing ora bisa diurai ing kolom "corrupt_record". Kolom kasebut ora bisa didaftar ing Hive tanpa bisa lolos.

Ngerti iki, kita entuk skema:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Kode ("_record_corrupt", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") nggawe DDL aman, yaiku:

create table tname (_field1 string, 1field string)

Kanthi jeneng lapangan kaya "_field1, 1field", DDL aman digawe ing ngendi jeneng lapangan bisa lolos: gawe tabel `tname` (`_field1` string, `1field` string).

Pitakonan muncul: carane entuk dataframe kanthi skema lengkap (ing kode pf)? Carane njaluk pf iki? Iki masalah kaping lima. Waca maneh skema kabeh partisi saka folder kanthi file parket saka pertunjukan target? Cara iki paling aman, nanging angel.

Skema kasebut wis ana ing Hive. Sampeyan bisa entuk skema anyar kanthi nggabungake skema kabeh tabel lan partisi anyar. Dadi sampeyan kudu njupuk skema tabel saka Hive lan gabungke karo skema partisi anyar. Iki bisa ditindakake kanthi maca metadata tes saka Hive, simpen menyang folder sementara, lan nggunakake Spark kanggo maca loro partisi bebarengan.

Nyatane, ana kabeh sing dibutuhake: skema tabel asli ing Hive lan partisi anyar. Kita uga duwe data. Iku tetep mung kanggo entuk skema anyar sing nggabungake skema storefront lan lapangan anyar saka partisi sing digawe:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Sabanjure, kita nggawe DDL registrasi tabel, kaya ing cuplikan sadurunge.
Yen kabeh chain bisa digunakake kanthi bener, yaiku, ana beban wiwitan, lan tabel digawe kanthi bener ing Hive, mula kita entuk skema tabel sing dianyari.

Lan masalah pungkasan iku sampeyan ora bisa mung nambah partisi menyang Tabel Hive, amarga bakal bejat. Sampeyan kudu meksa Hive kanggo ndandani struktur partisi:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Tugas prasaja maca JSON lan nggawe storefront adhedhasar asil ngatasi sawetara kesulitan implisit, solusi sing kudu sampeyan goleki kanthi kapisah. Lan sanajan solusi kasebut prasaja, butuh wektu akeh kanggo nemokake.

Kanggo ngleksanakake pambangunan showcase, aku kudu:

  • Tambah partisi menyang showcase, nyingkirake file layanan
  • Ngatasi kolom kosong ing data sumber sing diketik Spark
  • Cast jinis prasaja kanggo senar
  • Ngonversi jeneng lapangan dadi huruf cilik
  • Unggahan data sing kapisah lan registrasi tabel ing Hive (generasi DDL)
  • Aja lali uwal jeneng lapangan sing bisa uga ora kompatibel karo Hive
  • Sinau carane nganyari registrasi tabel ing Hive

Summing up, kita nyathet yen keputusan kanggo mbangun jendhela toko kebak karo akeh pitfalls. Mulane, yen ana kesulitan ing implementasine, luwih becik ngubungi mitra sing berpengalaman kanthi keahlian sing sukses.

Matur nuwun kanggo maca artikel iki, muga-muga sampeyan bisa nemokake informasi sing migunani.

Source: www.habr.com

Add a comment