Spark schemaEvolution pochita

Okondedwa owerenga, tsiku labwino!

M'nkhaniyi, mlangizi wotsogola wa bizinesi ya Neoflex's Big Data Solutions akufotokoza mwatsatanetsatane zosankha zomangira mawonetsero osinthika pogwiritsa ntchito Apache Spark.

Monga gawo la ntchito yosanthula deta, ntchito yomanga malo osungiramo zinthu zotengera deta yosasinthika nthawi zambiri imakhalapo.

Nthawi zambiri awa amakhala logi, kapena mayankho ochokera kumakina osiyanasiyana, osungidwa ngati JSON kapena XML. Deta imakwezedwa ku Hadoop, ndiye muyenera kupanga malo ogulitsira kuchokera kwa iwo. Titha kulinganiza mwayi wopezeka kuwonetsero wopangidwa, mwachitsanzo, kudzera ku Impala.

Pachifukwa ichi, schema ya malo ogulitsa omwe akuwongolera sichidziwika kale. Kuphatikiza apo, dongosololi silingakonzedwenso pasadakhale, chifukwa zimatengera deta, ndipo tikulimbana ndi izi zomwe zidapangidwa mosasamala.

Mwachitsanzo, lero yankho lotsatirali lalembedwa:

{source: "app1", error_code: ""}

ndipo mawa kuchokera ku dongosolo lomwelo limabwera yankho ili:

{source: "app1", error_code: "error", description: "Network error"}

Zotsatira zake, gawo limodzi linanso liyenera kuwonjezeredwa kuwonetsero - kufotokozera, ndipo palibe amene akudziwa ngati idzabwera kapena ayi.

Ntchito yopangira malo ogulitsira pazida zotere ndiyabwino kwambiri, ndipo Spark ili ndi zida zingapo za izi. Pakusanthula zomwe zachokera, pali chithandizo cha JSON ndi XML, komanso schema yosadziwika kale, chithandizo cha schemaEvolution chimaperekedwa.

Poyang'ana koyamba, yankho likuwoneka losavuta. Muyenera kutenga chikwatu ndi JSON ndikuwerenga mu dataframe. Spark ipanga schema, sinthani zomwe zasungidwa kukhala zomanga. Kuphatikiza apo, zonse ziyenera kupulumutsidwa mu parquet, yomwe imathandizidwanso ku Impala, polembetsa malo ogulitsira mu Hive metastore.

Chilichonse chikuwoneka chophweka.

Komabe, sizikudziwikiratu kuchokera ku zitsanzo zazifupi zomwe zili muzolemba zoyenera kuchita ndi mavuto angapo pochita.

Zolembazo zikufotokoza njira yoti musapange malo ogulitsira, koma kuwerenga JSON kapena XML mu dataframe.

Mwakutero, zimangowonetsa momwe mungawerenge ndikuwerengera JSON:

df = spark.read.json(path...)

Izi ndi zokwanira kuti deta ipezeke kwa Spark.

M'malo mwake, script ndizovuta kwambiri kuposa kungowerenga mafayilo a JSON kuchokera pafoda ndikupanga dataframe. Zomwe zimawoneka ngati izi: pali kale malo ena ogulitsira, deta yatsopano imabwera tsiku lililonse, iyenera kuwonjezeredwa ku sitolo, osaiwala kuti chiwembucho chikhoza kusiyana.

Ndondomeko yokhazikika yopangira chiwonetsero ndi motere:

Khwelero 1. Deta imakwezedwa ku Hadoop ndikutsitsanso tsiku ndi tsiku ndikuwonjezeredwa kugawo latsopano. Zimakhala chikwatu chokhala ndi data yoyamba yogawidwa masana.

Khwelero 2. Pakunyamula koyambirira, foda iyi imawerengedwa ndikusinthidwa ndi Spark. Zotsatira za dataframe zimasungidwa mumtundu wowerengeka, mwachitsanzo, parquet, yomwe imatha kutumizidwa ku Impala. Izi zimapanga chiwonetsero chandamale ndi data yonse yomwe yasonkhanitsidwa mpaka pano.

Khwelero 3. Kutsitsa kumapangidwa komwe kudzasintha malo ogulitsira tsiku lililonse.
Pali funso lakuchulukirachulukira, kufunikira kogawa chiwonetserocho, ndi funso losunga dongosolo lachiwonetsero.

Tiyeni titenge chitsanzo. Tinene kuti sitepe yoyamba yomanga malo yakhazikitsidwa, ndipo mafayilo a JSON amakwezedwa pafoda.

Kupanga dataframe kuchokera kwa iwo, ndikusunga ngati chiwonetsero, si vuto. Ili ndiye gawo loyamba lomwe lingapezeke mosavuta muzolemba za Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Chilichonse chikuwoneka bwino.

Tidawerenga ndikuyika JSON, kenako timasunga dataframe ngati parquet, ndikulembetsa mu Hive mwanjira iliyonse yabwino:

df.write.format(β€œparquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Timapeza zenera.

Koma, tsiku lotsatira, deta yatsopano yochokera ku gwero inawonjezedwa. Tili ndi foda yokhala ndi JSON, ndi chiwonetsero chopangidwa kuchokera mufoda iyi. Pambuyo pokweza gulu lotsatira la data kuchokera kugwero, data mart ikusowa deta ya tsiku limodzi.

Yankho lomveka lingakhale kugawa malo ogulitsa masana, zomwe zidzalola kuwonjezera magawo atsopano tsiku lotsatira. Makina a izi amadziwikanso bwino, Spark amakulolani kuti mulembe magawo padera.

Choyamba, timachita katundu woyamba, kupulumutsa deta monga tafotokozera pamwambapa, ndikuwonjezera magawo okhawo. Izi zimatchedwa kuyambitsa kwa sitolo ndipo zimachitika kamodzi kokha:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Tsiku lotsatira, timangodzaza gawo latsopano:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Zomwe zatsala ndikulembetsanso ku Hive kuti musinthe schema.
Komabe, apa ndipamene pamakhala mavuto.

Vuto loyamba. Posakhalitsa, parquet yotulukayo idzakhala yosawerengeka. Izi ndichifukwa cha momwe parquet ndi JSON amachitira minda yopanda kanthu mosiyana.

Tiyeni tione mmene zinthu zilili. Mwachitsanzo, dzulo JSON afika:

Π”Π΅Π½ΡŒ 1: {"a": {"b": 1}},

ndipo lero JSON yemweyo akuwoneka motere:

Π”Π΅Π½ΡŒ 2: {"a": null}

Tiyerekeze kuti tili ndi magawo awiri osiyana, aliwonse ali ndi mzere umodzi.
Tikawerenga zonse zomwe zimayambira, Spark azitha kudziwa mtundu wake, ndipo amvetsetsa kuti "a" ndi gawo la "mapangidwe", okhala ndi gawo "b" lamtundu wa INT. Koma, ngati gawo lililonse lidapulumutsidwa padera, ndiye kuti timapeza parquet yokhala ndi magawano osagwirizana:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Izi ndi zodziwika bwino, kotero njira yawonjezedwa mwapadera - posanthula zomwe zachokera, chotsani magawo opanda kanthu:

df = spark.read.json("...", dropFieldIfAllNull=True)

Pankhaniyi, parquet idzakhala ndi magawo omwe amatha kuwerengedwa pamodzi.
Ngakhale omwe achita izi mwakuchita adzamwetulira mowawa pano. Chifukwa chiyani? Inde, chifukwa n’kutheka kuti pali zinthu zina ziwiri. Kapena atatu. Kapena anayi. Choyamba, chomwe chidzachitika ndithu, ndikuti mitundu ya manambala idzawoneka mosiyana m'mafayilo osiyanasiyana a JSON. Mwachitsanzo, {intField: 1} ndi {intField: 1.1}. Ngati minda yotere ikupezeka mu gawo limodzi, ndiye kuti kuphatikiza kwa schema kumawerenga zonse molondola, zomwe zimatsogolera ku mtundu wolondola kwambiri. Koma ngati muzosiyana, ndiye kuti wina adzakhala ndi intField: int, ndipo winayo adzakhala ndi intField: double.

Pali mbendera yotsatilayi yothanirana ndi vutoli:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Tsopano tili ndi foda pomwe pali magawo omwe amatha kuwerengedwa mu dataframe imodzi ndi parquet yovomerezeka ya chiwonetsero chonse. Inde? Ayi.

Tiyenera kukumbukira kuti tinalembetsa tebulo mu Hive. Mng'oma siwovuta m'mayina am'munda, pomwe parquet ndizovuta. Chifukwa chake, magawo okhala ndi ma schemas: field1: int, ndi Field1: int ndi ofanana ndi Hive, koma osati Spark. Musaiwale kutembenuza mayina am'munda kukhala zilembo zochepa.

Pambuyo pake, zonse zikuwoneka bwino.

Komabe, si zonse zosavuta. Palinso vuto lachiwiri, komanso lodziwika bwino. Popeza gawo lililonse latsopano limasungidwa padera, chikwatu chogawa chimakhala ndi mafayilo amtundu wa Spark, mwachitsanzo, _SUCCESS ntchito yopambana mbendera. Izi zimabweretsa cholakwika poyesa kupanga parquet. Kuti mupewe izi, muyenera kukonza masinthidwe kuti muteteze Spark kuti asawonjezere mafayilo amtundu kufoda:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Zikuwoneka kuti tsopano tsiku lililonse gawo latsopano la parquet likuwonjezedwa ku foda yowonetsera chandamale, pomwe deta yosinthidwa ya tsiku ili. Tinasamaliratu kuti panalibe magawo omwe ali ndi mkangano wamtundu wa data.

Koma, tili ndi vuto lachitatu. Tsopano schema wamba sichidziwika, kuwonjezera apo, tebulo mu Hive lili ndi schema yolakwika, popeza gawo lililonse latsopano limatha kuyambitsa kusokonekera mu schema.

Muyenera kulembetsanso tebulo. Izi zitha kuchitika mophweka: werenganinso parquet ya sitolo, tengani schema ndikupanga DDL potengera izo, zomwe mungalembetsenso chikwatu mu Hive ngati tebulo lakunja, kukonzanso schema ya malo omwe mukufuna.

Tili ndi vuto lachinayi. Titalembetsa tebulo kwa nthawi yoyamba, tidadalira Spark. Tsopano timachita tokha, ndipo tiyenera kukumbukira kuti minda ya parquet ikhoza kuyamba ndi zilembo zomwe siziloledwa kwa Hive. Mwachitsanzo, Spark amaponya mizere yomwe sakanatha kuyilemba mugawo la "corrupt_record". Munda wotere sungalembetsedwe mu Mng'oma popanda kuthawa.

Podziwa izi, timapeza ndondomekoyi:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

kachidindo ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").m'malo("<", "<`").m'malo(",", ",`").replace("magulu<`", "array<"). amapanga DDL yotetezeka, i.e. m'malo mwa:

create table tname (_field1 string, 1field string)

Ndi mayina am'munda monga "_field1, 1field", DDL yotetezeka imapangidwa pomwe mayina amasamba athawidwa: pangani tebulo `tname` (`_field1` chingwe, `1field` chingwe).

Funso limadzuka: momwe mungapezere bwino dataframe yokhala ndi schema yathunthu (mu code pf)? Mupeza bwanji izi pf? Ili ndi vuto lachisanu. Werenganinso chiwembu cha magawo onse a chikwatu chokhala ndi mafayilo a parquet a chiwonetsero chandamale? Njira imeneyi ndi yotetezeka, koma yovuta.

Dongosolo lili kale mu Hive. Mutha kupeza schema yatsopano pophatikiza schema ya tebulo lonse ndi magawo atsopano. Chifukwa chake muyenera kutenga schema ya tebulo kuchokera ku Hive ndikuyiphatikiza ndi schema ya gawo latsopanolo. Izi zitha kuchitika powerenga metadata yoyeserera kuchokera ku Hive, kuisunga kufoda yakanthawi, ndikugwiritsa ntchito Spark kuti muwerenge magawo onse awiri nthawi imodzi.

M'malo mwake, pali chilichonse chomwe mungafune: schema ya tebulo loyambirira mu Hive ndi magawo atsopano. Tilinso ndi data. Zimangotsala pang'ono kupeza schema yatsopano yomwe imaphatikiza schema yakutsogolo ndi minda yatsopano kuchokera pamagawo omwe adapangidwa:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Kenako, timapanga tebulo lolembetsa DDL, monga momwe tafotokozera m'mbuyomu.
Ngati unyolo wonse ukugwira ntchito bwino, ndiye kuti, panali katundu woyambira, ndipo tebulo linapangidwa molondola mu Hive, ndiye kuti timapeza schema ya tebulo losinthidwa.

Ndipo vuto lomaliza ndiloti simungangowonjezera magawo pa tebulo la Hive, chifukwa lidzasweka. Muyenera kukakamiza Hive kukonza magawo ake:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Ntchito yosavuta yowerengera JSON ndikupanga malo osungiramo zinthu zomwe zimachokera kumabweretsa kuthana ndi zovuta zingapo, zothetsera zomwe muyenera kuyang'ana padera. Ndipo ngakhale kuti mayankhowa ndi osavuta, zimatenga nthawi yayitali kuti muwapeze.

Kuti mupange chiwonetsero chazithunzi, ndiyenera:

  • Onjezani magawo pawonetsero, kuchotsa mafayilo amtundu
  • Yang'anani ndi magawo opanda kanthu mu data yomwe Spark adalemba
  • Ikani mitundu yosavuta ku chingwe
  • Sinthani mayina am'munda kukhala zilembo zazing'ono
  • Olekanitsa kukweza kwa data ndikulembetsa patebulo mu Hive (m'badwo wa DDL)
  • Osayiwala kuthawa mayina omwe sangagwirizane ndi Hive
  • Phunzirani momwe mungasinthire kulembetsa kwa tebulo mu Hive

Pomaliza, tikuwona kuti lingaliro lomanga mazenera am'sitolo lili ndi misampha yambiri. Choncho, pakakhala zovuta pakukhazikitsa, ndi bwino kukaonana ndi mnzanu wodziwa bwino yemwe ali ndi luso lopambana.

Zikomo powerenga nkhaniyi, tikukhulupirira kuti nkhaniyi ndi yothandiza.

Source: www.habr.com

Kuwonjezera ndemanga