Spark schemaEvolution hauv kev xyaum

Nyob zoo cov nyeem, hnub zoo!

Hauv tsab xov xwm no, tus kws tshaj lij ntawm Neoflex's Big Data Solutions thaj chaw ua lag luam piav qhia meej txog cov kev xaiv rau kev tsim cov qauv sib txawv ntawm kev siv Apache Spark.

Raws li ib feem ntawm qhov project tsom xam cov ntaub ntawv, txoj haujlwm ntawm kev tsim cov khw muag khoom raws li cov ntaub ntawv uas tsis muaj teeb meem feem ntau tshwm sim.

Feem ntau cov no yog cov cav, lossis cov lus teb los ntawm ntau lub tshuab, khaws cia ua JSON lossis XML. Cov ntaub ntawv yog uploaded rau Hadoop, ces koj yuav tsum tsim ib tug storefront ntawm lawv. Peb tuaj yeem teeb tsa kev nkag mus rau qhov tsim yeeb yam, piv txwv li, los ntawm Impala.

Nyob rau hauv cov ntaub ntawv no, lub schema ntawm lub hom phiaj storefront tsis paub ua ntej. Ntxiv mus, lub tswv yim kuj tsis tuaj yeem raug kos ua ntej, vim nws nyob ntawm cov ntaub ntawv, thiab peb tab tom cuam tshuam nrog cov ntaub ntawv xoob heev.

Piv txwv li, hnub no cov lus teb hauv qab no tau teev tseg:

{source: "app1", error_code: ""}

thiab tag kis los ntawm tib lub cev los cov lus teb hauv qab no:

{source: "app1", error_code: "error", description: "Network error"}

Raws li qhov tshwm sim, ib qho chaw ntxiv yuav tsum tau ntxiv rau qhov kev nthuav qhia - piav qhia, thiab tsis muaj leej twg paub tias nws yuav tuaj los yog tsis.

Lub luag haujlwm ntawm kev tsim lub khw muag khoom ntawm cov ntaub ntawv no yog tus qauv zoo nkauj, thiab Spark muaj ntau yam cuab yeej rau qhov no. Rau kev txheeb xyuas cov ntaub ntawv hauv qab no, muaj kev txhawb nqa rau JSON thiab XML, thiab rau ib qho kev tsis paub yav dhau los, kev txhawb nqa rau schemaEvolution yog muab.

Thaum xub thawj siab ib muag, cov tshuaj zoo li yooj yim. Koj yuav tsum nqa ib daim nplaub tshev nrog JSON thiab nyeem nws rau hauv dataframe. Spark yuav tsim cov schema, tig cov ntaub ntawv nested rau hauv cov qauv. Tsis tas li ntawd, txhua yam yuav tsum tau txais kev cawmdim hauv parquet, uas tseem muaj kev txhawb nqa hauv Impala, los ntawm kev tso npe rau lub khw hauv Hive metastore.

Txhua yam zoo li yooj yim.

Txawm li cas los xij, nws tsis paub meej los ntawm cov piv txwv luv luv hauv cov ntaub ntawv yuav ua li cas nrog ntau yam teeb meem hauv kev xyaum.

Cov ntaub ntawv piav qhia txog txoj hauv kev tsis yog tsim lub khw muag khoom, tab sis nyeem JSON lossis XML rau hauv cov ntaub ntawv.

Namely, nws tsuas qhia yuav ua li cas nyeem thiab parse JSON:

df = spark.read.json(path...)

Qhov no txaus los ua kom cov ntaub ntawv muaj rau Spark.

Hauv kev xyaum, tsab ntawv yog qhov nyuaj ntau dua li tsuas yog nyeem JSON cov ntaub ntawv los ntawm cov ntawv tais ceev tseg thiab tsim cov ntaub ntawv. Qhov xwm txheej zoo li no: twb muaj qee qhov chaw muag khoom, cov ntaub ntawv tshiab tuaj hauv txhua hnub, lawv yuav tsum tau ntxiv rau hauv khw muag khoom, tsis txhob hnov ​​​​qab tias cov tswv yim yuav txawv.

Cov txheej txheem ib txwm ua rau kev tsim lub showcase yog raws li hauv qab no:

Kauj ruam 1. Cov ntaub ntawv yog loaded rau hauv Hadoop nrog rau tom ntej txhua hnub reloading thiab ntxiv rau ib tug tshiab muab faib. Nws hloov tawm ib daim nplaub tshev nrog cov ntaub ntawv thawj zaug muab faib los ntawm hnub.

Kauj ruam 2. Thaum lub sijhawm pib thauj khoom, daim nplaub tshev no tau nyeem thiab txheeb xyuas los ntawm Spark. Cov txiaj ntsig dataframe tau txais kev cawmdim nyob rau hauv ib hom ntawv parsable, piv txwv li, hauv parquet, uas tuaj yeem raug xa mus rau hauv Impala. Qhov no tsim ib lub hom phiaj showcase nrog tag nrho cov ntaub ntawv uas tau sau mus txog rau qhov no.

Kauj ruam 3. Ib qho download tau tsim uas yuav hloov kho lub khw muag khoom txhua hnub.
Muaj ib lo lus nug ntawm incremental loading, qhov yuav tsum tau muab faib lub showcase, thiab cov lus nug ntawm kev tswj cov txheej txheem ntawm lub showcase.

Cia peb ua piv txwv. Cia peb hais tias thawj kauj ruam ntawm kev tsim lub chaw khaws ntaub ntawv tau ua tiav, thiab JSON cov ntaub ntawv raug xa mus rau ib daim nplaub tshev.

Tsim ib dataframe los ntawm lawv, ces txuag nws raws li ib tug showcase, tsis yog ib qho teeb meem. Nov yog thawj kauj ruam uas tuaj yeem pom tau yooj yim hauv Spark cov ntaub ntawv:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Txhua yam zoo li zoo.

Peb nyeem thiab txheeb xyuas JSON, tom qab ntawd peb khaws cov ntaub ntawv raws li parquet, sau npe rau hauv Hive hauv txhua txoj kev yooj yim:

df.write.format(β€œparquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Peb tau txais lub qhov rais.

Tab sis, hnub tom qab, cov ntaub ntawv tshiab los ntawm qhov chaw tau ntxiv. Peb muaj ib daim nplaub tshev nrog JSON, thiab ib qho kev nthuav qhia tsim los ntawm cov ntawv tais ceev tseg no. Tom qab thauj cov ntaub ntawv tom ntej los ntawm qhov chaw, cov ntaub ntawv mart yog ploj lawm ib hnub cov ntaub ntawv tsim nyog.

Txoj kev daws teeb meem yuav yog muab faib lub khw muag khoom los ntawm hnub, uas yuav tso cai ntxiv kev faib tshiab txhua hnub tom ntej. Lub tshuab rau qhov no kuj paub zoo, Spark tso cai rau koj los sau cov partitions cais.

Ua ntej, peb ua qhov pib thauj khoom, txuag cov ntaub ntawv raws li tau piav qhia saum toj no, ntxiv tsuas yog muab faib. Qhov kev txiav txim no yog hu ua storefront initialization thiab tsuas yog ua ib zaug xwb:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Hnub tom qab, peb thauj tsuas yog muab faib tshiab:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Txhua yam uas tseem tshuav yog rov sau npe rau hauv Hive kom hloov kho cov schema.
Txawm li cas los xij, qhov no yog qhov teeb meem tshwm sim.

Thawj qhov teeb meem. Tsis ntev los sis tom qab, qhov tshwm sim parquet yuav tsis nyeem tau. Qhov no yog vim li cas parquet thiab JSON kho thaj chaw khoob sib txawv.

Cia peb xav txog qhov xwm txheej. Piv txwv li, nag hmo JSON tuaj txog:

Π”Π΅Π½ΡŒ 1: {"a": {"b": 1}},

thiab niaj hnub no tib JSON zoo li no:

Π”Π΅Π½ΡŒ 2: {"a": null}

Cia peb hais tias peb muaj ob qhov sib txawv, txhua tus muaj ib kab.
Thaum peb nyeem tag nrho cov ntaub ntawv qhov chaw, Spark yuav tuaj yeem txiav txim siab hom, thiab yuav nkag siab tias "a" yog qhov chaw ntawm hom "qauv", nrog rau thaj chaw ze "b" ntawm hom INT. Tab sis, yog tias txhua qhov kev faib tawm tau txais kev cawmdim, ces peb tau txais ib lub parquet nrog cov kev faib tsis sib xws:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Qhov xwm txheej no paub zoo, yog li ib qho kev xaiv tau tshwj xeeb ntxiv - thaum txheeb xyuas cov ntaub ntawv, tshem tawm qhov khoob:

df = spark.read.json("...", dropFieldIfAllNull=True)

Hauv qhov no, lub parquet yuav muaj cov partitions uas tuaj yeem nyeem ua ke.
Txawm hais tias cov neeg uas tau ua qhov no hauv kev xyaum yuav luag nyav iab ntawm no. Vim li cas? Yog lawm, vim tias yuav muaj ob qhov xwm txheej ntxiv. Los yog peb. Los yog plaub. Thawj, uas yuav luag tshwm sim, yog tias cov lej yuav zoo li txawv hauv cov ntaub ntawv JSON sib txawv. Piv txwv li, {intField: 1} thiab {intField: 1.1}. Yog tias cov teb no pom nyob rau hauv ib qho kev faib, ces cov schema sib koom ua ke yuav nyeem txhua yam kom raug, ua rau muaj qhov tseeb tshaj plaws. Tab sis yog hais tias nyob rau hauv sib txawv, ces ib tug yuav muaj intField: int, thiab lwm yam yuav muaj intField: ob.

Muaj tus chij hauv qab no los daws qhov xwm txheej no:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Tam sim no peb muaj ib daim nplaub tshev uas muaj cov partitions uas tuaj yeem nyeem rau hauv ib qho dataframe thiab siv tau parquet ntawm tag nrho cov showcase. Yog? Tsis muaj.

Peb yuav tsum nco ntsoov tias peb sau lub rooj hauv Hive. Hive tsis yog rooj plaub hauv cov npe npe, thaum parquet yog cov ntaub ntawv nkag siab. Yog li, partitions nrog schemas: field1: int, thiab Field1: int yog tib yam rau Hive, tab sis tsis yog rau Spark. Tsis txhob hnov ​​​​qab hloov cov npe teb rau cov ntawv qis.

Tom qab ntawd, txhua yam zoo li zoo.

Txawm li cas los xij, tsis yog txhua yam yooj yim. Muaj qhov thib ob, kuj paub qhov teeb meem. Txij li txhua qhov kev faib tawm tshiab tau txais kev cawmdim nyias, cov ntawv muab faib yuav muaj cov ntaub ntawv kev pabcuam Spark, piv txwv li, _SUCCESS tus chij ua haujlwm tiav. Qhov no yuav ua rau muaj kev ua yuam kev thaum sim parquet. Txhawm rau zam qhov no, koj yuav tsum tau teeb tsa lub teeb tsa los tiv thaiv Spark los ntawm kev ntxiv cov ntaub ntawv pabcuam rau hauv daim nplaub tshev:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Nws zoo nkaus li tias tam sim no txhua txhua hnub ib qho tshiab parquet muab faib ntxiv rau lub hom phiaj showcase folder, qhov twg cov ntaub ntawv parsed rau hnub nyob. Peb tau saib xyuas ua ntej tias tsis muaj partitions nrog cov ntaub ntawv tsis sib haum xeeb.

Tab sis, peb muaj qhov teeb meem thib peb. Tam sim no cov schema dav tsis paub, tsis tas li ntawd, lub rooj hauv Hive muaj qhov tsis raug schema, txij li txhua qhov kev faib tshiab feem ntau yuav qhia txog kev cuam tshuam rau hauv schema.

Koj yuav tsum rov sau npe lub rooj. Qhov no tuaj yeem ua tau yooj yim: nyeem cov parquet ntawm lub khw muag khoom dua, coj cov schema thiab tsim DDL raws li nws, uas yuav rov sau npe cov ntawv tais ceev tseg hauv Hive ua ib lub rooj sab nraud, hloov kho cov schema ntawm lub hom phiaj storefront.

Peb muaj plaub qhov teeb meem. Thaum peb sau npe lub rooj thawj zaug, peb tso siab rau Spark. Tam sim no peb ua nws tus kheej, thiab peb yuav tsum nco ntsoov tias parquet teb tuaj yeem pib nrog cov cim uas tsis tso cai rau Hive. Piv txwv li, Spark cuam tshuam cov kab uas nws tsis tuaj yeem txheeb xyuas hauv "corrupt_record" teb. Xws li daim teb tsis tuaj yeem sau npe hauv Hive yam tsis tau khiav tawm.

Paub qhov no, peb tau txais cov txheej txheem:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

code ("_corrupt_record", "`_corrupt_record`") + "" + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") ua kom muaj kev nyab xeeb DDL, piv txwv li tsis yog:

create table tname (_field1 string, 1field string)

Nrog cov npe teb xws li "_field1, 1field", DDL muaj kev nyab xeeb yog ua qhov twg cov npe teb tau dim: tsim cov lus `tname` (`_field1` hlua, `1field` hlua).

Cov lus nug tshwm sim: yuav ua li cas thiaj li tau txais cov ntaub ntawv zoo nrog cov txheej txheem tiav (hauv pf code)? Yuav ua li cas kom tau qhov no pf? Qhov no yog qhov teeb meem thib tsib. Rov nyeem cov tswv yim ntawm tag nrho cov partitions los ntawm cov ntawv tais ceev tseg nrog cov ntaub ntawv parquet ntawm lub hom phiaj showcase? Txoj kev no yog qhov kev nyab xeeb tshaj plaws, tab sis nyuaj.

Lub schema twb nyob hauv Hive. Koj tuaj yeem tau txais schema tshiab los ntawm kev sib txuas cov schema ntawm tag nrho cov lus thiab cov kev faib tshiab. Yog li koj yuav tsum coj lub rooj schema los ntawm Hive thiab muab nws nrog cov schema ntawm qhov kev faib tshiab. Qhov no tuaj yeem ua tiav los ntawm kev nyeem cov ntawv xeem metadata los ntawm Hive, txuag nws mus rau ib ntus nplaub tshev, thiab siv Spark nyeem ob qho tib si ib zaug.

Qhov tseeb, muaj txhua yam koj xav tau: thawj lub rooj schema hauv Hive thiab qhov kev faib tshiab. Peb kuj muaj cov ntaub ntawv. Nws tseem tsuas yog kom tau txais ib lub tswv yim tshiab uas ua ke nrog cov phiaj xwm khw muag khoom thiab cov teb tshiab los ntawm qhov tsim muab faib:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Tom ntej no, peb tsim lub rooj sau npe DDL, zoo li hauv cov kab lus dhau los.
Yog tias tag nrho cov saw hlau ua haujlwm raug, uas yog, muaj kev pib ua haujlwm, thiab lub rooj tau tsim kom raug hauv Hive, ces peb tau txais cov lus hloov tshiab.

Thiab qhov teeb meem kawg yog tias koj tsis tuaj yeem ntxiv ib qho kev faib rau Hive rooj, vim nws yuav tawg. Koj yuav tsum yuam Hive txhawm rau txhim kho nws cov qauv muab faib:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Txoj haujlwm yooj yim ntawm kev nyeem JSON thiab tsim lub khw muag khoom raws li nws ua rau muaj kev kov yeej ntau yam teeb meem cuam tshuam, cov kev daws teeb meem uas koj yuav tsum nrhiav nyias. Thiab txawm hais tias cov kev daws teeb meem no yooj yim, nws yuav siv sij hawm ntau los nrhiav lawv.

Txhawm rau siv qhov kev tsim kho ntawm lub showcase, kuv yuav tsum:

  • Ntxiv partitions rau lub showcase, tshem tawm cov kev pab cuam ntaub ntawv
  • Deal nrog cov teb khoob hauv cov ntaub ntawv uas Spark tau ntaus
  • Cam khwb cia hom yooj yim rau ib txoj hlua
  • Hloov cov npe ntawm cov npe mus rau tus lej qis
  • Cais cov ntaub ntawv upload thiab sau npe rooj hauv Hive (DDL tiam)
  • Tsis txhob hnov ​​​​qab khiav cov npe teb uas yuav tsis sib haum nrog Hive
  • Kawm paub yuav ua li cas hloov lub rooj sau npe hauv Hive

Summing li, peb nco ntsoov tias qhov kev txiav txim siab los tsim lub qhov rais khw yog fraught nrog ntau pitfalls. Yog li ntawd, nyob rau hauv cov ntaub ntawv ntawm kev nyuaj siab nyob rau hauv kev siv, nws yog zoo dua mus cuag ib tug muaj kev paub nrog cov kws txawj ua tau zoo.

Ua tsaug rau koj nyeem tsab xov xwm no, peb vam tias koj yuav pom cov ntaub ntawv muaj txiaj ntsig.

Tau qhov twg los: www.hab.com

Ntxiv ib saib