Spark schemaEvolution sa pagsasanay

Minamahal na mga mambabasa, magandang araw!

Sa artikulong ito, ang nangungunang consultant ng Neoflex's Big Data Solutions business area ay detalyadong naglalarawan sa mga opsyon para sa pagbuo ng variable structure showcases gamit ang Apache Spark.

Bilang bahagi ng isang proyekto sa pagsusuri ng data, madalas na umuusbong ang gawain ng pagbuo ng mga storefront batay sa maluwag na structured na data.

Kadalasan ito ay mga log, o mga tugon mula sa iba't ibang mga system, na naka-save bilang JSON o XML. Ang data ay na-upload sa Hadoop, pagkatapos ay kailangan mong bumuo ng isang storefront mula sa kanila. Maaari naming ayusin ang access sa ginawang showcase, halimbawa, sa pamamagitan ng Impala.

Sa kasong ito, hindi alam muna ang schema ng target na storefront. Bukod dito, ang scheme ay hindi rin maaaring iguhit nang maaga, dahil ito ay nakasalalay sa data, at kami ay nakikitungo sa mga napakaluwag na structured na data na ito.

Halimbawa, ngayon ang sumusunod na tugon ay naka-log:

{source: "app1", error_code: ""}

at bukas mula sa parehong sistema ay darating ang sumusunod na sagot:

{source: "app1", error_code: "error", description: "Network error"}

Bilang resulta, dapat na magdagdag ng isa pang field sa showcase - paglalarawan, at walang nakakaalam kung darating ito o hindi.

Ang gawain ng paglikha ng storefront sa naturang data ay medyo pamantayan, at ang Spark ay may ilang mga tool para dito. Para sa pag-parse ng source data, mayroong suporta para sa parehong JSON at XML, at para sa dati nang hindi kilalang schema, ibinibigay ang suporta para sa schemaEvolution.

Sa unang sulyap, ang solusyon ay mukhang simple. Kailangan mong kumuha ng folder na may JSON at basahin ito sa isang dataframe. Gagawa ng schema si Spark, gagawing mga istruktura ang nested data. Dagdag pa, ang lahat ay kailangang i-save sa parquet, na sinusuportahan din sa Impala, sa pamamagitan ng pagrehistro sa storefront sa Hive metastore.

Parang simple lang ang lahat.

Gayunpaman, hindi malinaw mula sa mga maikling halimbawa sa dokumentasyon kung ano ang gagawin sa isang bilang ng mga problema sa pagsasanay.

Ang dokumentasyon ay naglalarawan ng isang diskarte hindi upang lumikha ng isang storefront, ngunit upang basahin ang JSON o XML sa isang dataframe.

Ibig sabihin, ipinapakita lamang nito kung paano basahin at i-parse ang JSON:

df = spark.read.json(path...)

Ito ay sapat na upang gawing available ang data sa Spark.

Sa pagsasagawa, ang script ay mas kumplikado kaysa sa pagbabasa lamang ng mga JSON file mula sa isang folder at paggawa ng dataframe. Ang sitwasyon ay ganito: mayroon nang isang tiyak na storefront, bagong data ang pumapasok araw-araw, kailangan nilang idagdag sa storefront, hindi nakakalimutan na ang scheme ay maaaring mag-iba.

Ang karaniwang pamamaraan para sa pagbuo ng isang showcase ay ang mga sumusunod:

Hakbang 1. Ang data ay na-load sa Hadoop na may kasunod na araw-araw na pag-reload at idinagdag sa isang bagong partition. Ito ay lumiliko ang isang folder na may paunang data na nahahati sa araw.

Hakbang 2. Sa panahon ng paunang pag-load, ang folder na ito ay binabasa at na-parse ng Spark. Ang resultang dataframe ay nai-save sa isang parsable na format, halimbawa, sa parquet, na maaaring ma-import sa Impala. Lumilikha ito ng target na showcase kasama ang lahat ng data na naipon hanggang sa puntong ito.

Hakbang 3. May ginawang pag-download na mag-a-update sa storefront araw-araw.
May tanong tungkol sa incremental loading, ang pangangailangan na hatiin ang showcase, at ang tanong ng pagpapanatili ng pangkalahatang pamamaraan ng showcase.

Kumuha tayo ng isang halimbawa. Sabihin nating naipatupad na ang unang hakbang ng pagbuo ng repository, at ang mga JSON file ay na-upload sa isang folder.

Ang paglikha ng dataframe mula sa kanila, pagkatapos ay i-save ito bilang isang showcase, ay hindi isang problema. Ito ang pinakaunang hakbang na madaling mahanap sa dokumentasyon ng Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Mukhang maayos ang lahat.

Binasa at na-parse namin ang JSON, pagkatapos ay nai-save namin ang dataframe bilang parquet, nirerehistro ito sa Hive sa anumang maginhawang paraan:

df.write.format(β€œparquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Kumuha kami ng bintana.

Ngunit, sa susunod na araw, bagong data mula sa pinagmulan ay idinagdag. Mayroon kaming isang folder na may JSON, at isang showcase na ginawa mula sa folder na ito. Pagkatapos i-load ang susunod na batch ng data mula sa pinagmulan, nawawala ang data mart ng isang araw na halaga ng data.

Ang lohikal na solusyon ay ang paghahati sa storefront ayon sa araw, na magbibigay-daan sa pagdaragdag ng bagong partition bawat susunod na araw. Ang mekanismo para dito ay kilala rin, pinapayagan ka ng Spark na magsulat ng mga partisyon nang hiwalay.

Una, gumawa kami ng paunang pag-load, pag-save ng data tulad ng inilarawan sa itaas, pagdaragdag lamang ng partitioning. Ang pagkilos na ito ay tinatawag na storefront initialization at ginagawa nang isang beses lang:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Kinabukasan, naglo-load lang kami ng bagong partition:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Ang natitira na lang ay muling magparehistro sa Hive para i-update ang schema.
Gayunpaman, ito ay kung saan lumitaw ang mga problema.

Unang problema. Maaga o huli, ang resultang parquet ay hindi mababasa. Ito ay dahil sa kung paano naiiba ang pagtrato ng parquet at JSON sa mga bakanteng field.

Isaalang-alang natin ang isang tipikal na sitwasyon. Halimbawa, kahapon dumating si JSON:

Π”Π΅Π½ΡŒ 1: {"a": {"b": 1}},

at ngayon ang parehong JSON ay ganito ang hitsura:

Π”Π΅Π½ΡŒ 2: {"a": null}

Sabihin nating mayroon tayong dalawang magkaibang partisyon, bawat isa ay may isang linya.
Kapag nabasa namin ang buong source data, matutukoy ng Spark ang uri, at mauunawaan niya na ang "a" ay isang field ng uri ng "structure", na may nested field na "b" ng uri ng INT. Ngunit, kung ang bawat partisyon ay nai-save nang hiwalay, pagkatapos ay makakakuha kami ng isang parquet na may hindi tugmang mga scheme ng partisyon:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Kilalang-kilala ang sitwasyong ito, kaya espesyal na idinagdag ang isang opsyon - kapag nag-parse ng source data, alisin ang mga walang laman na field:

df = spark.read.json("...", dropFieldIfAllNull=True)

Sa kasong ito, ang parquet ay binubuo ng mga partisyon na maaaring basahin nang magkasama.
Bagama't ang mga nakagawa nito sa pagsasanay ay mapangiti ng mapait dito. Bakit? Oo, dahil malamang na may dalawa pang sitwasyon. O tatlo. O apat. Ang una, na halos tiyak na mangyayari, ay ang mga numeric na uri ay magmumukhang iba sa iba't ibang JSON file. Halimbawa, {intField: 1} at {intField: 1.1}. Kung ang nasabing mga patlang ay matatagpuan sa isang partisyon, pagkatapos ay babasahin ng schema merge ang lahat nang tama, na humahantong sa pinakatumpak na uri. Ngunit kung magkaiba, ang isa ay magkakaroon ng intField: int, at ang isa ay magkakaroon ng intField: double.

Mayroong sumusunod na bandila upang mahawakan ang sitwasyong ito:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Ngayon ay mayroon kaming isang folder kung saan may mga partisyon na maaaring basahin sa isang solong dataframe at isang wastong parquet ng buong showcase. Oo? Hindi.

Dapat nating tandaan na inirehistro natin ang talahanayan sa Hive. Ang pugad ay hindi case sensitive sa mga pangalan ng field, habang ang parquet ay case sensitive. Samakatuwid, ang mga partisyon na may mga schema: field1: int, at Field1: int ay pareho para sa Hive, ngunit hindi para sa Spark. Huwag kalimutang i-convert ang mga pangalan ng field sa lower case.

Pagkatapos nito, tila maayos na ang lahat.

Gayunpaman, hindi lahat ay napakasimple. May pangalawa, kilalang problema rin. Dahil ang bawat bagong partition ay naka-save nang hiwalay, ang partition folder ay maglalaman ng Spark service file, halimbawa, ang _SUCCESS operation success flag. Magreresulta ito sa isang error kapag sinusubukang i-parquet. Upang maiwasan ito, kailangan mong i-configure ang configuration upang maiwasan ang Spark na magdagdag ng mga file ng serbisyo sa folder:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Tila na ngayon araw-araw isang bagong parquet partition ay idinagdag sa target na showcase folder, kung saan matatagpuan ang na-parse na data para sa araw. Inaalagaan namin nang maaga na walang mga partisyon na may conflict sa uri ng data.

Ngunit, mayroon tayong pangatlong problema. Ngayon ang pangkalahatang schema ay hindi alam, bukod pa rito, ang talahanayan sa Hive ay may maling schema, dahil ang bawat bagong partition ay malamang na nagpakilala ng pagbaluktot sa schema.

Kailangan mong muling irehistro ang talahanayan. Magagawa ito nang simple: basahin muli ang parquet ng storefront, kunin ang schema at lumikha ng isang DDL batay dito, kung saan irerehistro muli ang folder sa Hive bilang panlabas na talahanayan, ina-update ang schema ng target na storefront.

Mayroon kaming pang-apat na problema. Noong unang beses naming irehistro ang talahanayan, umasa kami sa Spark. Ngayon ginagawa namin ito sa aming sarili, at kailangan naming tandaan na ang mga parquet field ay maaaring magsimula sa mga character na hindi pinapayagan para sa Hive. Halimbawa, naglalabas ang Spark ng mga linyang hindi nito ma-parse sa field na "corrupt_record". Ang nasabing field ay hindi maaaring irehistro sa Hive nang hindi na-escape.

Alam ito, nakukuha namin ang scheme:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Kodigo ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") gumagawa ng ligtas na DDL, ibig sabihin, sa halip na:

create table tname (_field1 string, 1field string)

Gamit ang mga pangalan ng field tulad ng "_field1, 1field", ang ligtas na DDL ay ginawa kung saan ang mga pangalan ng field ay naka-escape: gumawa ng table `tname` (`_field1` string, `1field` string).

Ang tanong ay lumitaw: kung paano maayos na makakuha ng isang dataframe na may kumpletong schema (sa pf code)? Paano makukuha ang pf na ito? Ito ang ikalimang problema. Basahin muli ang scheme ng lahat ng mga partisyon mula sa folder na may mga parquet file ng target na showcase? Ang pamamaraang ito ay ang pinakaligtas, ngunit mahirap.

Ang schema ay nasa Hive na. Maaari kang makakuha ng bagong schema sa pamamagitan ng pagsasama-sama ng schema ng buong talahanayan at ang bagong partition. Kaya kailangan mong kunin ang table schema mula sa Hive at pagsamahin ito sa schema ng bagong partition. Magagawa ito sa pamamagitan ng pagbabasa ng metadata ng pagsubok mula sa Hive, pag-save nito sa isang pansamantalang folder, at paggamit ng Spark upang basahin ang parehong mga partisyon nang sabay-sabay.

Sa katunayan, mayroong lahat ng kailangan mo: ang orihinal na schema ng talahanayan sa Hive at ang bagong partition. May data din kami. Ito ay nananatiling lamang upang makakuha ng bagong schema na pinagsasama ang storefront schema at mga bagong field mula sa ginawang partition:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Susunod, gagawin namin ang DDL sa pagpaparehistro ng talahanayan, tulad ng sa nakaraang snippet.
Kung gumagana nang tama ang buong chain, ibig sabihin, nagkaroon ng initializing load, at ang talahanayan ay ginawa nang tama sa Hive, pagkatapos ay makakakuha tayo ng na-update na table schema.

At ang huling problema ay hindi ka maaaring magdagdag ng partisyon sa isang talahanayan ng Hive, dahil ito ay masisira. Kailangan mong pilitin ang Hive na ayusin ang istraktura ng partition nito:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Ang simpleng gawain ng pagbabasa ng JSON at paggawa ng storefront batay dito ay nagreresulta sa pagdaig sa ilang implicit na paghihirap, mga solusyon na kailangan mong hanapin nang hiwalay. At kahit na ang mga solusyon na ito ay simple, ito ay nangangailangan ng maraming oras upang mahanap ang mga ito.

Upang maipatupad ang pagtatayo ng showcase, kailangan kong:

  • Magdagdag ng mga partisyon sa showcase, inaalis ang mga file ng serbisyo
  • Harapin ang mga walang laman na field sa source data na na-type ni Spark
  • Mag-cast ng mga simpleng uri sa isang string
  • I-convert ang mga pangalan ng field sa lowercase
  • Paghiwalayin ang pag-upload ng data at pagpaparehistro ng talahanayan sa Hive (pagbuo ng DDL)
  • Huwag kalimutang takasan ang mga pangalan ng field na maaaring hindi tugma sa Hive
  • Matutunan kung paano i-update ang pagpaparehistro ng talahanayan sa Hive

Summing up, tandaan namin na ang desisyon na magtayo ng mga bintana ng tindahan ay puno ng maraming mga pitfalls. Samakatuwid, sa kaso ng mga kahirapan sa pagpapatupad, mas mahusay na makipag-ugnay sa isang nakaranasang kasosyo na may matagumpay na kadalubhasaan.

Salamat sa pagbabasa ng artikulong ito, inaasahan naming mahanap mo ang impormasyon na kapaki-pakinabang.

Pinagmulan: www.habr.com

Magdagdag ng komento