Spark schemaEvolution amin'ny fampiharana

Ry mpamaky, mirary tolakandro!

Ato amin'ity lahatsoratra ity, ny mpanolo-tsaina lehibe amin'ny sehatry ny fandraharahana Big Data Solutions an'ny Neoflex dia mamaritra amin'ny antsipiriany ny safidy amin'ny fananganana tranombarotra miovaova amin'ny alΓ lan'ny Apache Spark.

Ao anatin'ny tetikasa famakafakana angon-drakitra, matetika no mipoitra ny asa fananganana fampirantiana mifototra amin'ny angon-drakitra tsy misy rafitra.

Matetika ireo dia diary, na valinteny avy amin'ny rafitra isan-karazany, voatahiry amin'ny endrika JSON na XML. Ampidirina ao amin'ny Hadoop ny angon-drakitra, avy eo dia mila manangana fivarotana iray avy aminy. Afaka mandamina ny fidirana amin'ny entam-barotra noforonina isika, ohatra, amin'ny alΓ lan'ny Impala.

Amin'ity tranga ity, ny fisehon'ny toeram-pivarotana kendrena dia tsy fantatra mialoha. Ankoatr'izay, tsy azo amboarina mialoha ny drafitra, satria miankina amin'ny angon-drakitra izany, ary miatrika an'io angon-drakitra tena malemy io izahay.

Ohatra, izao no valiny manaraka izao:

{source: "app1", error_code: ""}

ary rahampitso dia avy amin'ny rafitra iray ihany ity valiny manaraka ity:

{source: "app1", error_code: "error", description: "Network error"}

Vokatr'izany dia tokony asiana saha hafa ny entam-barotra - famaritana, ary tsy misy mahalala na ho avy na tsia.

Ny asan'ny famoronana tsena amin'ny angon-drakitra toy izany dia mahazatra, ary ny Spark dia manana fitaovana maromaro ho an'izany. Ho an'ny famakafakana ny angona loharano dia misy fanohanana ho an'ny JSON sy XML, ary ho an'ny tetika tsy fantatra teo aloha dia omena fanohanana ny schemaEvolution.

Raha vao jerena dia tsotra ny vahaolana. Mila maka ny lahatahiry miaraka amin'i JSON ianao ary mamaky azy ao amin'ny dataframe. Hamorona tetika i Spark ary hamadika ny angona misy akany ho rafitra. Manaraka, mila tehirizina amin'ny parquet ny zava-drehetra, izay tohana ihany koa ao amin'ny Impala, amin'ny fisoratana anarana ny fivarotana ao amin'ny metastore Hive.

Toa tsotra ny zava-drehetra.

Na izany aza, avy amin'ireo ohatra fohy ao amin'ny antontan-taratasy dia tsy mazava izay tokony hatao amin'ny olana maromaro amin'ny fampiharana.

Ny antontan-taratasy dia manoritsoritra fomba iray tsy amin'ny famoronana entam-barotra, fa amin'ny famakiana JSON na XML amin'ny angon-drakitra.

Izany hoe, mampiseho fotsiny ny fomba famakiana sy famakiana JSON:

df = spark.read.json(path...)

Ampy izany mba hahatonga ny angon-drakitra ho an'ny Spark.

Amin'ny fampiharana, ny scenario dia sarotra kokoa noho ny mamaky ny rakitra JSON avy amin'ny lahatahiry iray ary mamorona angon-drakitra. Toy izao ny toe-javatra: efa misy ny fampisehoana iray, tonga isan'andro ny angon-drakitra vaovao, mila ampidirina ao amin'ny showcase izy ireo, tsy adino fa mety tsy hitovy ny drafitra.

Ny drafitra mahazatra amin'ny fananganana trano fivarotana dia toy izao manaraka izao:

Dingana 1. Ampidirina ao amin'ny Hadoop ny angon-drakitra, arahin'ny fandefasana fanampiny isan'andro ary ampidirina amin'ny fizarana vaovao. Ny vokatr'izany dia laha-tahiry misy angona loharano, mizara isan'andro.

Dingana 2. Mandritra ny fampandehanana voalohany dia vakiana ity lahatahiry ity amin'ny alalan'ny Spark. Ny angon-drakitra vokarina dia voatahiry amin'ny endrika azo anaovana fanadihadiana, ohatra, amin'ny parquet, izay azo ampidirina ao Impala avy eo. Izany dia mamorona toeram-pivarotana kendrena miaraka amin'ny angona rehetra voaangona hatramin'izao.

Dingana 3. Namboarina ny fampidinana izay hanavao ny trano fivarotana isan'andro.
Mipoitra ny fanontaniana momba ny fampiakarana entana, ny fizarΓ na ny entam-barotra, ary ny fanohanan'ny lamina ankapobeny ny trano fivarotana.

Andeha isika hanome ohatra. Andeha atao hoe efa nampiharina ny dingana voalohany amin'ny fananganana tahiry, ary ny fampiakarana ny rakitra JSON amin'ny lahatahiry iray dia namboarina.

Tsy olana ny mamorona angon-drakitra avy amin'izy ireo ary avy eo mitahiry azy ireo ho fampirantiana. Ity no dingana voalohany azo mora hita ao amin'ny tahirin-kevitra Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Toa milamina daholo ny zava-drehetra.

Novakianay sy novakianay ny JSON, avy eo tehirizinay ho parquet ny dataframe, manoratra azy ao amin'ny Hive amin'ny fomba mety:

df.write.format(β€œparquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Mahazo fampirantiana izahay.

Saingy, ny ampitson'iny dia nisy angona vaovao nampiana avy amin'ny loharano. Manana lahatahiry misy JSON izahay, ary toeram-pivarotana noforonina mifototra amin'ity lahatahiry ity. Aorian'ny fametahana ny ampahany manaraka amin'ny angona avy amin'ny loharano, dia tsy manana angona ampy mandritra ny iray andro ny trano fivarotana.

Ny vahaolana lojika dia ny fisarahana ny trano fivarotana isan'andro, izay ahafahanao manampy partition vaovao isan'andro. Ny mekanika amin'izany dia fantatra ihany koa; Spark dia ahafahanao mirakitra fizarazarana misaraka.

Voalohany, manao ny fampidinana voalohany isika, mitahiry ny angon-drakitra araka ny voalaza etsy ambony, ary ny fizarana fotsiny. Ity hetsika ity dia antsoina hoe fanombohana storefront ary atao indray mandeha ihany:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Ny ampitson'iny ihany no alainay ny partition vaovao:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Ny hany sisa tavela dia ny fisoratana anarana indray ao amin'ny Hive hanavao ny skema.
Eto anefa no misy olana.

Olana voalohany. Na ho ela na ho haingana dia tsy ho azo vakina intsony ny parquet vokarina. Izany dia noho ny fomba fitondrana parquet sy JSON tsy mitovy amin'ny saha foana.

Andeha isika handinika toe-javatra mahazatra. Ohatra, omaly JSON tonga:

Π”Π΅Π½ΡŒ 1: {"a": {"b": 1}},

ary amin'izao fotoana izao dia toa izao ilay JSON:

Π”Π΅Π½ΡŒ 2: {"a": null}

Andeha atao hoe manana fizarazarana roa samy hafa isika, samy manana andalana iray.
Rehefa mamaky ny angon-drakitra loharano manontolo isika, dia ho afaka hamantatra ny karazana i Spark, ary hahatakatra fa ny "a" dia sahan'ny karazana "rafitra", misy saha "b" misy karazana INT. Saingy, raha voavonjy misaraka ny fizarazarana tsirairay, dia parquet miaraka amin'ny rafitra fisarahana tsy mifanentana ny vokatra:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Ity toe-javatra ity dia fantatra tsara, noho izany dia nasiana safidy manokana hanesorana saha tsy misy na inona na inona rehefa manadihady ny angona loharano:

df = spark.read.json("...", dropFieldIfAllNull=True)

Amin'ity tranga ity, ny parquet dia ahitana partitions izay azo vakiana miaraka.
Na dia mitsiky mangidy aza ireo izay nanao izany tamin'ny fampiharana. Nahoana? Eny, satria mety hisy toe-javatra roa hafa hipoitra. Na telo. Na efatra. Ny voalohany, izay saika azo antoka, dia ny karazana tarehimarika dia tsy mitovy amin'ny rakitra JSON samihafa. Ohatra, {intField: 1} sy {intField: 1.1}. Raha toa ka miseho amin'ny andiany iray ny saha toy izany, dia hamaky tsara ny zava-drehetra ny fitambaran'ny schema, mitondra any amin'ny karazana marina indrindra. Fa raha amin'ny samy hafa, ny iray dia hanana intField: int, ary ny iray hafa intField: double.

Mba hiatrehana ity toe-javatra ity dia misy ny saina manaraka:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Amin'izao fotoana izao dia manana lahatahiry misy ny fisarahana isika, izay azo vakiana ao anaty dataframe tokana sy parquet manan-kery amin'ny trano fivarotana iray manontolo. Eny? Tsia.

Tsy maintsy tsaroantsika fa nanoratra ny latabatra tao Hive isika. Ny hive dia tsy saro-pady amin'ny anaran'ny saha, fa parquet. Noho izany, ny fisarahana amin'ny schemas: field1: int, ary Field1: int dia mitovy amin'ny Hive, fa tsy ho an'ny Spark. Aza adino ny manova ny anaran'ny saha ho litera kely.

Rehefa izany dia toa milamina ny zava-drehetra.

Na izany aza, tsy ny rehetra no tsotra. Mipoitra ny olana faharoa, fantatra ihany koa. Koa satria voatahiry misaraka ny fizarana vaovao tsirairay, ny lahatahiry fisarahana dia ahitana rakitra serivisy Spark, ohatra, ny sainam-pahombiazan'ny asa _SUCCESS. Hiteraka hadisoana izany rehefa manandrana manamboatra parquet. Mba hisorohana an'izany dia mila manitsy ny fanitsiana ianao amin'ny alΓ lan'ny fisorohana ny Spark tsy hampiditra rakitra serivisy amin'ny lahatahiry:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Toa izao isan'andro dia misy partition parquet vaovao ampidirina ao amin'ny lahatahiry ety ivelany, izay misy ny angon-drakitra momba ny andro. Nitandrina mialoha izahay mba hahazoana antoka fa tsy misy fizarazarana misy fifanoherana karazana data.

Saingy miatrika olana fahatelo isika. Ankehitriny dia tsy fantatra ny schema ankapobeny, ankoatra izany, ao amin'ny Hive dia manana schema diso ny latabatra, satria ny fizarana vaovao tsirairay dia mety hampiditra fanodinkodinana ao amin'ny schema.

Mila soratana indray ny latabatra. Azo atao tsotra izao: vakio indray ny parquet an'ny trano fivarotana, alaivo ny skema ary mamorona DDL mifototra amin'izany, izay ahafahanao manoratra indray ny lahatahiry ao amin'ny Hive ho latabatra ivelany, manavao ny schema amin'ny trano fivarotana kendrena.

Miatrika olana fahefatra isika. Rehefa nisoratra anarana voalohany ny latabatra izahay dia niantehitra tamin'ny Spark. Ankehitriny isika dia manao izany, ary mila mitadidy fa ny saha parquet dia mety manomboka amin'ny endri-tsoratra izay tsy avelan'i Hive. Ohatra, ny Spark dia mamoaka andalana izay tsy azony nozaraina ao amin'ny saha "corrupt_record". Ny saha toy izany dia tsy azo soratana ao amin'ny Hive raha tsy miala.

Rehefa mahafantatra izany isika dia mahazo ny kisary:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

fehezan-dalΓ na ("_record_corrupt", "`_corrupt_record`") + " " + f[1]. replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") dia miaro ny DDL, izany hoe:

create table tname (_field1 string, 1field string)

Miaraka amin'ny anaran'ny saha toy ny "_field1, 1field", dia misy DDL azo antoka izay tsy ahafahan'ny anaran'ny saha: mamorona latabatra `tname` (`_field1` string, `1field` string).

Mipoitra ny fanontaniana: ahoana ny fomba hahazoana tsara ny dataframe miaraka amin'ny schema feno (amin'ny code pf)? Ahoana no ahazoana an'io pf io? Io no olana fahadimy. Avereno vakiana ny kisary amin'ny fizarazarana rehetra avy amin'ny lahatahiry misy rakitra parquet ao amin'ny trano fivarotana kendrena? Ity fomba ity no azo antoka indrindra, saingy sarotra.

Efa ao amin'ny Hive ny tetika. Afaka mahazo schema vaovao ianao amin'ny fampifangaroana ny schema amin'ny latabatra manontolo sy ny fizarazarana vaovao. Midika izany fa mila maka ny schema table avy amin'ny Hive ianao ary manambatra azy amin'ny schema amin'ny fizarazarana vaovao. Izany dia azo atao amin'ny famakiana metadata fitsapana avy amin'ny Hive, mitahiry izany ao amin'ny lahatahiry vonjimaika, ary mamaky ny fizarana roa miaraka amin'ny Spark.

Amin'ny ankapobeny dia misy ny zavatra rehetra ilainao: ny schema latabatra tany am-boalohany ao amin'ny Hive ary fisarahana vaovao. Manana data koa izahay. Ny hany sisa tavela dia ny fahazoana schema vaovao izay manambatra ny schema storefront sy ny saha vaovao avy amin'ny fizarana noforonina:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Manaraka, mamorona DDL fisoratana anarana latabatra, toy ny tamin'ny ampahany teo aloha.
Raha miasa tsara ny rojo manontolo, izany hoe, nisy entana voalohany, ary noforonina araka ny tokony ho izy ny latabatra tao amin'ny Hive, dia mahazo schema latabatra nohavaozina.

Ny olana farany dia tsy azonao atao ny manampy fisarahana mora amin'ny latabatra Hive, satria ho tapaka. Mila manery an'i Hive hanitsy ny firafitry ny fisarahana ianao:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Ny asa tsotra amin'ny famakiana JSON sy ny famoronana entam-barotra mifototra amin'izany dia miteraka fahasahiranana an-tsokosoko maromaro, ny vahaolana izay tsy maintsy hita misaraka. Ary na dia tsotra aza ireo vahaolana ireo, dia mila fotoana be ny fitadiavana azy ireo.

Mba hampiharana ny fananganana fampisehoana dia tsy maintsy:

  • Manampia fizarazarana amin'ny fivarotana, manala ny rakitra serivisy
  • Miaraha amin'ny saha tsy misy na inona na inona ao amin'ny angon-drakitra nosoratan'i Spark
  • Alefaso karazana tsotsotra amin'ny tady
  • Avadika ho litera kely ny anaran'ny saha
  • Ampisaraka ny fampiakarana angona sy ny fisoratana anarana latabatra ao amin'ny Hive (famoronana DDL)
  • Aza adino ny mandositra anarana saha mety tsy mifanaraka amin'ny Hive
  • Mianara manavao ny fisoratana anarana latabatra ao amin'ny Hive

Raha fintinina dia marihina fa feno fandrika maro ny fanapahan-kevitra hanorina trano fivarotana. Noho izany, raha misy fahasarotana amin'ny fampiharana, dia tsara kokoa ny mitodika any amin'ny mpiara-miasa efa za-draharaha manana fahaiza-manao mahomby.

Misaotra anao namaky ity lahatsoratra ity, manantena izahay fa mahasoa anao ny fampahalalana.

Source: www.habr.com

Add a comment