Spark schemaEvolution i roto i te mahi

E nga kaipānui, tena ra!

I roto i tenei tuhinga, ko te kaitohutohu matua o te rohe pakihi Raraunga Raraunga Nui a Neoflex e whakaatu ana i nga waahanga mo te hanga whakaaturanga hanganga rereke ma te whakamahi i te Apache Spark.

Hei wahanga o te kaupapa tātari raraunga, ka ara ake te mahi ki te hanga i nga papaa i runga i nga raraunga hangai noa.

I te nuinga o te wa he raarangi enei, he whakautu mai i nga punaha rereke, ka tiakina hei JSON, XML ranei. Ka tukuna nga raraunga ki Hadoop, katahi ka hiahia koe ki te hanga i tetahi toa mai i a raatau. Ka taea e taatau te whakarite uru ki te whakaaturanga i hangaia, hei tauira, na roto i a Impala.

I tenei take, kaore i te mohiotia te aronuinga o te toa toa i mua. I tua atu, kaore e taea te tuhi i mua i te kaupapa, na te mea kei runga i nga raraunga, kei te mahi maatau ki enei raraunga tino hangai.

Hei tauira, i tenei ra ka tuhia te whakautu e whai ake nei:

{source: "app1", error_code: ""}

a apopo mai i taua punaha ka puta mai te whakautu e whai ake nei:

{source: "app1", error_code: "error", description: "Network error"}

Ko te mutunga mai, kia kotahi ano te mara ki te whakaaturanga - whakaahuatanga, karekau he tangata e mohio ka tae mai, kaore ranei.

Ko te mahi o te hanga i tetahi toa i runga i aua raraunga he tino paerewa, a he maha nga taputapu a Spark mo tenei. Mo te tarai i nga raraunga puna, he tautoko mo te JSON me te XML, a, mo te kaupapa kaore i mohiotia i mua, ka whakaratohia he tautoko mo te schemaEvolution.

I te titiro tuatahi, he ngawari te ahua o te otinga. Me tango e koe he kōpaki me JSON ka panui ki roto i te anga raraunga. Ka hangaia e Spark he kaupapa, ka huri i nga raraunga kohanga hei hanganga. I tua atu, me tiaki nga mea katoa i roto i te parquet, e tautokohia ana i Impala, ma te rehita i te toa i roto i te Hive metastore.

He ngawari nga mea katoa.

Engari, kaore i te marama mai i nga tauira poto i roto i nga tuhinga me aha te maha o nga raruraru i roto i nga mahi.

Ko nga tuhinga e whakaatu ana i tetahi huarahi kia kaua e hanga he toa, engari ki te panui JSON, XML ranei ki roto i te anga raraunga.

Ara, ka whakaatu noa me pehea te panui me te panui JSON:

df = spark.read.json(path...)

He nui tenei kia waatea nga raraunga ki a Spark.

I roto i te mahi, he uaua ake te tuhi i te panui noa i nga konae JSON mai i te kōpaki me te hanga anga raraunga. Ko te ahua o tenei ahuatanga: kei reira tetahi toa toa, ka tae mai nga raraunga hou i ia ra, me taapiri atu ki te toa, kaua e wareware he rereke te kaupapa.

Ko te kaupapa o mua mo te hanga whakaaturanga ko enei e whai ake nei:

Hipanga 1. Ka utaina nga raraunga ki Hadoop me te whakahou ano i ia ra ka taapiri atu ki tetahi waahanga hou. Ka puta he kōpaki me nga raraunga tuatahi kua wehewehea ma te ra.

Hipanga 2. I te wa o te uta tuatahi, ka panuihia tenei kōpaki ka panuihia e Spark. Ka tiakina te anga raraunga ka puta ki roto i te whakatakotoranga parsable, hei tauira, ki te parquet, ka taea te kawemai ki Impala. Ka waihangahia he whakaaturanga whainga me nga raraunga katoa kua kohia tae noa ki tenei wa.

Hipanga 3. Ka hangaia he tikiake hei whakahou i te toa toa ia ra.
He patai mo te pikinga ake o te utaina, te hiahia ki te wehewehe i te pouaka whakaata, me te patai ki te pupuri i te kaupapa whanui o te pouaka whakaata.

Ka tango tatou i tetahi tauira. Me kii kua whakatinanahia te mahi tuatahi mo te hanga putunga, ka tukuna nga konae JSON ki tetahi kōpaki.

Ko te hanga anga raraunga mai i a raatau, ka tiakina hei whakaaturanga, ehara i te raru. Koinei te taahiraa tuatahi ka kitea ngawari i roto i nga tuhinga Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

He pai nga mea katoa.

Ka panuihia ka panuitia e matou a JSON, katahi ka tiakina e matou te anga raraunga hei parquet, ka rehitatia i roto i te Hive i nga huarahi pai:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Ka whiwhi tatou i te matapihi.

Engari, i te ra i muri mai, ka taapirihia nga raraunga hou mai i te puna. Kei a matou he kōpaki me JSON, me tetahi whakaaturanga i hangaia mai i tenei kōpaki. Whai muri i te utaina o nga raraunga e whai ake nei mai i te puna, kei te ngaro te utu raraunga mo te ra kotahi.

Ko te otinga arorau ko te wehewehe i te toa i ia ra, ka taea te taapiri i tetahi waahanga hou ia ra e whai ake nei. Ko te tikanga mo tenei ka tino mohiotia, ka taea e Spark te tuhi wehe wehe.

Tuatahi, ka mahia e matou he kawenga tuatahi, me te penapena i nga raraunga i whakaahuatia i runga ake nei, me te taapiri i te wehewehenga anake. Ko tenei mahi ka kiia ko te arawhiti i mua i te toa, ka mahia kotahi anake:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

I te ra i muri mai, ka utaina he waahanga hou anake:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Ko nga mea e toe ana ko te rehita ano ki Hive hei whakahou i te kaupapa.
Engari, koinei te waahi ka puta mai nga raruraru.

Te raruraru tuatahi. Kare e roa, ka kore e taea te panui te parquet ka puta. Na te rerekee o te mahi a te parquet me te JSON i nga mara putunga.

Ka whakaarohia e tatou tetahi ahuatanga angamaheni. Hei tauira, inanahi kua tae mai a JSON:

День 1: {"a": {"b": 1}},

a i tenei ra ko te JSON ano te ahua penei:

День 2: {"a": null}

Me kii tatou e rua nga wehewehenga rereke, me te raina kotahi.
Ina panuihia e matou te katoa o nga raraunga puna, ka taea e Spark te whakatau i te momo, ka mohio ko "a" he mara o te momo "hanganga", me te mara kohanga "b" o te momo INT. Engari, ki te mea kua tiakina motuhake ia wehewehenga, ka whiwhi tatou i te parquet me nga kaupapa wehewehe hototahi:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

E mohiotia ana tenei ahuatanga, no reira kua taapirihia he whiringa - i te wa e tohatoha ana i nga raraunga puna, tangohia nga mara putua:

df = spark.read.json("...", dropFieldIfAllNull=True)

I tenei keehi, ka uru te parquet ki nga waahanga ka taea te panui tahi.
Ahakoa ko te hunga i mahi i tenei mahi ka kata kawa ki konei. He aha? Ae, na te mea ka rua ano pea nga ahuatanga. E toru ranei. E wha ranei. Ko te tuatahi, ka tata ka puta, ko nga momo tau ka rereke te ahua ki nga konae JSON rereke. Hei tauira, {intField: 1} me {intField: 1.1}. Mena ka kitea nga mara i roto i te waahanga kotahi, ka panuihia e te hanumi aronuinga nga mea katoa, ka puta ki te momo tino tika. Engari ki te mea i roto i nga mea rereke, ka whai intField: int tetahi, ka whai ano tetahi intField: rua.

Ko te haki e whai ake nei hei hapai i tenei ahuatanga:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Inaianei kei a matou he kōpaki kei reira nga wehewehenga ka taea te panui ki te anga raraunga kotahi me te parakete whaimana o te whakaaturanga katoa. Ae? Kao.

Me mahara tatou i rehitatia e matou te tepu ki Hive. Ko te Hive ehara i te take tairongo ki nga ingoa mara, engari ko te parquet he ahua tairongo. No reira, he rite tonu nga wehewehenga me nga kaupapa: mara1: int, me te Mara1: int he rite mo Hive, engari kaua mo Spark. Kaua e wareware ki te huri i nga ingoa mara ki te puiti iti.

I muri i tera, ka ahua pai nga mea katoa.

Heoi, ehara i te mea ngawari katoa. He raruraru tuarua, rongonui hoki. I te mea ka tiakina motuhake ia wehewehenga hou, kei roto i te kōpaki wehewehe nga konae ratonga Spark, hei tauira, te haki angitu mahi _SUCCESS. Ka puta he hapa i te wa e ngana ana ki te parquet. Hei karo i tenei, me whirihora e koe te whirihoranga hei aukati i a Spark ki te taapiri i nga konae ratonga ki te kōpaki:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Ko te ahua nei i ia ra ka taapirihia he wehenga parquet hou ki te kōpaki whakaaturanga whainga, kei reira nga raraunga porotiti mo te ra. I tiaki matou i mua kaore he wehewehenga me te papā momo raraunga.

Engari, he raru tuatoru kei a matou. Inaianei kaore i te mohiotia te aronuinga whaanui, i tua atu, he aronuinga he to te ripanga i Hive, na te mea ko ia wehewehenga hou i whakaurua he korikori ki roto i te aronuinga.

Me rehita ano koe i te ripanga. Ka taea te mahi noa: panui ano te parquet o te toa toa, tango i te aronuinga me te hanga i tetahi DDL i runga i taua mea, hei rehita ano i te kōpaki ki Hive hei ripanga o waho, me te whakahou i te aronuinga o te toa toa.

He raruraru tuawha. I te rehitatanga o te teepu mo te wa tuatahi, i whakawhirinaki matou ki a Spark. Inaianei ka mahia e matou, me mahara ano ka timata nga mara parquet me nga tohu kaore e whakaaetia mo Hive. Hei tauira, ka whiua e Spark nga rarangi kaore e taea te poroporoaki i te mara "corrupt_record". E kore e taea te rehitatia taua mara ki Hive me te kore e mawhiti.

Ma te mohio ki tenei, ka whiwhi tatou i te kaupapa:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Waehere ("_record_corrupt", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("huānga<`", "huānga<") hanga DDL haumaru, ara hei utu mo:

create table tname (_field1 string, 1field string)

Me nga ingoa mara penei "_field1, 1field", ka hangaia te DDL haumaru ki te waahi ka mawhiti nga ingoa mara: waihanga ripanga `tname` (`_field1` aho, `1field` aho).

Ka puta ake te patai: me pehea e tika ai te tiki i te anga raraunga me te aronui katoa (i roto i te waehere pf)? Me pehea te tiki i tenei pf? Koinei te raruraru tuarima. Panuitia ano te kaupapa o nga wehewehenga katoa mai i te kōpaki me nga konae parquet o te whakaaturanga whaainga? Ko tenei tikanga te mea haumaru, engari he uaua.

Kei Hive kē te aronuinga. Ka taea e koe te tiki aronuinga hou ma te whakakotahi i te aronuinga o te ripanga katoa me te wehewehenga hou. Na me tango e koe te aronuinga ripanga mai i Hive ka whakakotahi ki te aronuinga o te wehewehenga hou. Ka taea tenei ma te panui i nga metadata whakamatautau mai i Hive, ka penapena ki te kōpaki rangitahi, me te whakamahi i te Spark ki te panui i nga waahanga e rua i te wa kotahi.

Inaa, kei reira nga mea katoa e hiahia ana koe: ko te hangahanga ripanga taketake i Hive me te wehewehenga hou. Kei a matou ano nga raraunga. Ka noho noa ki te tiki aronuinga hou e whakakotahi ana i te aronuinga o mua toa me nga mara hou mai i te wehenga i hangaia:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

I muri mai, ka hangaia e matou te DDL rehita ripanga, penei i te waahanga o mua.
Mena ka tika te mahi o te mekameka katoa, ara, he kawenga arawhiti, a he tika te hanga o te ripanga ki Hive, katahi ka whiwhi tatou i te aronuinga ripanga kua whakahoutia.

A ko te raru whakamutunga kaore e taea e koe te taapiri noa i te wehewehenga ki te ripanga Hive, na te mea ka pakaru. Me akiaki koe i a Hive ki te whakatika i tana hanganga wehewehe:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Ko te mahi ngawari ki te panui i a JSON me te hanga i tetahi toa toa i runga i a ia ka puta he maha o nga uauatanga, nga otinga me rapu motuhake koe. A ahakoa he ngawari enei otinga, ka roa te wa ki te kimi.

Hei whakatinana i te hanganga o te whakaaturanga, me:

  • Taapirihia nga waahanga ki te whakaaturanga, te whakakore i nga konae ratonga
  • Me mahi nga mara putua i roto i nga raraunga puna i patohia e Spark
  • Maka momo ngawari ki te aho
  • Tahuri ingoa āpure ki te pūriki
  • Tukuake raraunga wehe me te rehita ripanga ki Hive (whakatupuranga DDL)
  • Kaua e wareware ki te mawhiti i nga ingoa mara kaore pea i te hototahi ki a Hive
  • Akohia me pehea te whakahou i te rehitatanga ripanga ki Hive

Ko te whakarapopototanga, ka kite matou ko te whakatau ki te hanga i nga matapihi toa he maha nga mahanga. No reira, mena he uaua ki te whakatinana, he pai ake te whakapiri atu ki tetahi hoa mohio whai tohungatanga angitu.

Mauruuru koe mo te panui i tenei tuhinga, ko te tumanako ka whai hua nga korero.

Source: will.com

Tāpiri i te kōrero