Spark schemaEvolution nan pratik

Chè lektè, bon jounen!

Nan atik sa a, konsiltan dirijan nan zòn biznis Big Data Solutions Neoflex a dekri an detay opsyon pou bati estrikti varyab lè l sèvi avèk Apache Spark.

Kòm yon pati nan yon pwojè analiz done, travay la nan bati devan magazen ki baze sou done ki lach estriktire souvan rive.

Anjeneral sa yo se mòso bwa, oswa repons ki soti nan divès sistèm, sove kòm JSON oswa XML. Done yo telechaje nan Hadoop, Lè sa a, ou bezwen bati yon devan magazen nan men yo. Nou ka òganize aksè nan vitrin ki te kreye a, pou egzanp, atravè Impala.

Nan ka sa a, chema nan devan magazen an pa konnen davans. Anplis, konplo a tou pa ka trase moute davans, kòm li depann sou done yo, epi nou ap fè fas ak done sa yo trè lach estriktire.

Pou egzanp, jodi a repons sa a anrejistre:

{source: "app1", error_code: ""}

epi demen soti nan menm sistèm nan vini repons sa a:

{source: "app1", error_code: "error", description: "Network error"}

Kòm yon rezilta, yon lòt jaden yo ta dwe ajoute nan vitrin nan - deskripsyon, epi pèsonn pa konnen si li pral vini oswa ou pa.

Travay la nan kreye yon devan magazen sou done sa yo se trè estanda, ak Spark gen yon kantite zouti pou sa. Pou analize done sous yo, gen sipò pou tou de JSON ak XML, epi pou yon chema deja enkoni, yo bay sipò pou schemaEvolution.

Nan premye gade, solisyon an sanble senp. Ou bezwen pran yon katab ak JSON epi li li nan yon dataframe. Spark pral kreye yon chema, vire done enbrike nan estrikti. Pli lwen, tout bagay bezwen yo dwe sove nan partez, ki sipòte tou nan Impala, lè w anrejistre devan magazen an nan metastore Hive la.

Tout bagay sanble senp.

Sepandan, li pa klè nan egzanp kout yo nan dokiman an sa yo dwe fè ak yon kantite pwoblèm nan pratik.

Dokimantasyon an dekri yon apwòch pa kreye yon devan magazen, men li JSON oswa XML nan yon dataframe.

Savwa, li tou senpleman montre ki jan yo li ak analize JSON:

df = spark.read.json(path...)

Sa a se ase yo fè done yo disponib nan Spark.

Nan pratik, script la pi konplike pase jis li fichye JSON nan yon katab epi kreye yon dataframe. Sitiyasyon an sanble sa a: gen deja yon vitrina sèten, nouvo done vini nan chak jou, yo bezwen yo dwe ajoute nan devan magazen an, pa bliye ke konplo a ka diferan.

Konplo abityèl pou bati yon vitrin se jan sa a:

Etap 1. Done yo chaje nan Hadoop ak rechaje chak jou ki vin apre epi ajoute nan yon nouvo patisyon. Li sanble yon katab ak done inisyal divize pa jou.

Etap 2. Pandan chaj inisyal la, Spark li epi analize katab sa a. Dataframe ki kapab lakòz la sove nan yon fòma analizable, pou egzanp, nan partez, ki ka Lè sa a, dwe enpòte nan Impala. Sa a kreye yon vitrin sib ak tout done ki te akimile jiska pwen sa a.

Etap 3. Yo kreye yon download ki pral mete ajou devan magazen an chak jou.
Gen yon kesyon de loading incrémentielle, bezwen nan patisyon vitrin nan, ak kesyon an nan kenbe konplo a jeneral nan vitrin la.

Ann pran yon egzanp. Ann di ke premye etap la nan bati yon depo te aplike, ak dosye JSON yo Uploaded nan yon katab.

Kreye yon dataframe nan men yo, Lè sa a, sove li kòm yon vitrin, se pa yon pwoblèm. Sa a se premye etap la ki ka fasil jwenn nan dokiman an Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Tout bagay sanble ap byen.

Nou li ak analize JSON, Lè sa a, nou sove dataframe la kòm yon partez, enskri li nan Hive nan nenpòt fason pratik:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Nou jwenn yon fenèt.

Men, jou kap vini an, nouvo done ki soti nan sous la te ajoute. Nou gen yon katab ak JSON, ak yon vitrin ki te kreye nan katab sa a. Apre yo fin chaje pwochen pakèt done ki soti nan sous la, done mart la manke yon sèl jou a nan done.

Solisyon ki lojik ta dwe separe devan magazen an pa jou, sa ki pral pèmèt ajoute yon nouvo patisyon chak jou kap vini an. Mekanis pou sa a se byen li te ye tou, Spark pèmèt ou ekri patisyon separeman.

Premyèman, nou fè yon chaj inisyal, ekonomize done yo jan sa dekri pi wo a, ajoute sèlman patisyon. Yo rele aksyon sa a inisyalizasyon devan magazen epi li fèt yon sèl fwa:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Nan denmen, nou chaje sèlman yon nouvo patisyon:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Tout sa ki rete se re-enskri nan Hive pou mete ajou chema a.
Sepandan, sa a se kote pwoblèm rive.

Premye pwoblèm. Pi bonè oswa pita, partez la ki kapab lakòz yo pral lizib. Sa a se akòz ki jan partez ak JSON trete jaden vid yon fason diferan.

Ann konsidere yon sitiyasyon tipik. Pa egzanp, yè JSON rive:

День 1: {"a": {"b": 1}},

ak jodi a menm JSON a sanble tankou sa a:

День 2: {"a": null}

Ann di nou gen de patisyon diferan, chak ak yon liy.
Lè nou li done yo tout sous, Spark yo pral kapab detèmine kalite a, epi yo pral konprann ke "a" se yon jaden ki nan kalite "estrikti", ak yon jaden enbrike "b" nan kalite INT. Men, si chak patisyon te sove separeman, Lè sa a, nou jwenn yon partez ak rapid patisyon enkonpatib:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Sitiyasyon sa a byen li te ye, kidonk yo te ajoute yon opsyon espesyalman - lè w analize done sous yo, retire jaden vid yo:

df = spark.read.json("...", dropFieldIfAllNull=True)

Nan ka sa a, partez la pral konpoze de patisyon ki ka li ansanm.
Malgre ke moun ki te fè sa nan pratik pral souri anmè kou fièl isit la. Poukisa? Wi, paske gen chans pou gen de lòt sitiyasyon. Oswa twa. Oswa kat. Premye a, ki pral prèske sètènman rive, se ke kalite nimerik ap gade diferan nan diferan dosye JSON. Pa egzanp, {intField: 1} ak {intField: 1.1}. Si yo jwenn jaden sa yo nan yon sèl patisyon, Lè sa a, chema fizyon an pral li tout bagay kòrèkteman, ki mennen nan kalite ki pi egzak. Men, si nan diferan, Lè sa a, youn pral gen intField: int, ak lòt la pral gen intField: doub.

Gen drapo sa a pou jere sitiyasyon sa a:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Koulye a, nou gen yon katab kote gen patisyon ki ka li nan yon sèl dataframe ak yon partez valab nan vitrin nan tout antye. Wi? Non.

Nou dwe sonje ke nou te anrejistre tab la nan Hive. Hive se pa majiskil nan non jaden, pandan y ap partez se sansib. Se poutèt sa, patisyon ak chema: field1: int, ak Field1: int yo se menm bagay pou Hive, men se pa pou Spark. Pa bliye konvèti non jaden yo an miniskil.

Apre sa, tout bagay sanble ap anfòm.

Sepandan, se pa tout konsa senp. Gen yon dezyèm pwoblèm ki byen koni tou. Piske chak nouvo patisyon sove separeman, katab patisyon an ap genyen fichye sèvis Spark, pa egzanp, drapo siksè operasyon _SUCCESS la. Sa a pral lakòz yon erè lè w ap eseye partez. Pou evite sa a, ou bezwen konfigirasyon konfigirasyon an pou anpeche Spark ajoute dosye sèvis nan katab la:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Li sanble ke kounye a chak jou yon nouvo patisyon partez ajoute nan katab la vitrin sib, kote done yo analize pou jounen an sitiye. Nou te pran swen davans ke pa te gen okenn patisyon ak yon konfli kalite done.

Men, nou gen yon twazyèm pwoblèm. Koulye a, chema jeneral la pa konnen, Anplis, tab la nan Hive gen yon chema kòrèk, depi chak nouvo patisyon gen plis chans prezante yon distòsyon nan chema a.

Ou bezwen re-anrejistre tab la. Sa a ka fè tou senpleman: li partez la nan devan boutik la ankò, pran chema a epi kreye yon DDL ki baze sou li, ak ki re-enskri katab la nan Hive kòm yon tab ekstèn, mete ajou chema nan devan boutik la sib.

Nou gen yon katriyèm pwoblèm. Lè nou te anrejistre tab la pou premye fwa, nou te konte sou Spark. Koulye a, nou fè li tèt nou, epi nou bezwen sonje ke jaden partez ka kòmanse ak karaktè ki pa gen dwa pou Hive. Pou egzanp, Spark jete soti liy ke li pa t 'kapab analize nan jaden an "corrupt_record". Yon jaden konsa pa ka anrejistre nan Hive san yo pa chape.

Lè nou konnen sa a, nou jwenn konplo a:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Kòd ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace ("<", "<`").replace(",", ",`"). ranplase ("etalaj<`", "etalaj<") fè DDL an sekirite, sa vle di olye pou yo:

create table tname (_field1 string, 1field string)

Avèk non jaden tankou "_field1, 1field", DDL ki an sekirite fèt kote non jaden yo chape: kreye tab `tname` (`_field1` string, `1field` string).

Kesyon an rive: ki jan yo byen jwenn yon dataframe ak yon chema konplè (nan kòd pf)? Ki jan yo jwenn sa a pf? Sa a se senkyèm pwoblèm nan. Reli konplo a nan tout patisyon soti nan katab la ak dosye partez nan vitrin nan sib? Metòd sa a se pi an sekirite, men difisil.

Schema a deja nan Hive. Ou ka jwenn yon nouvo chema lè w konbine chema tout tab la ak nouvo patisyon an. Se konsa, ou bezwen pran chema tab la nan Hive epi konbine li ak chema nouvo patisyon an. Sa a ka fè lè w li metadata tès yo soti nan Hive, sove li nan yon katab tanporè, epi lè l sèvi avèk Spark pou li tou de patisyon yo alafwa.

An reyalite, gen tout bagay ou bezwen: chema tab orijinal la nan Hive ak nouvo patisyon an. Nou gen done tou. Li rete sèlman pou jwenn yon nouvo chema ki konbine chema devan magazen an ak nouvo jaden ki soti nan patisyon kreye a:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Apre sa, nou kreye DDL enskripsyon tab la, tankou nan snippet anvan an.
Si tout chèn lan ap travay kòrèkteman, sètadi, te gen yon chaj inisyalize, epi yo te kreye tab la kòrèkteman nan Hive, Lè sa a, nou jwenn yon chema tab mete ajou.

Ak dènye pwoblèm nan se ke ou pa ka jis ajoute yon patisyon nan yon tab Hive, paske li pral kase. Ou bezwen fòse Hive pou ranje estrikti patisyon li yo:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Travay la senp nan li JSON ak kreye yon devan magazen ki baze sou li rezilta nan simonte yon kantite difikilte implicite, solisyon pou ki ou dwe chèche separeman. E byenke solisyon sa yo senp, li pran anpil tan pou jwenn yo.

Pou aplike konstriksyon vitrin lan, mwen te oblije:

  • Ajoute patisyon nan vitrin la, debarase m de dosye sèvis yo
  • Fè fas ak jaden vid nan done sous ke Spark te tape
  • Mete kalite senp nan yon fisèl
  • Konvèti non jaden an miniskil
  • Téléchargement done separe ak enskripsyon tab nan Hive (jenerasyon DDL)
  • Pa bliye chape non jaden ki ta ka enkonpatib ak Hive
  • Aprann kijan pou mete ajou enskripsyon tab la nan Hive

Rezime, nou remake ke desizyon an pou konstwi fenèt boutik chaje ak anpil enkonvenyans. Se poutèt sa, nan ka ta gen difikilte nan aplikasyon, li pi bon kontakte yon patnè ki gen eksperyans ak ekspètiz siksè.

Mèsi paske w li atik sa a, nou espere ou jwenn enfòmasyon an itil.

Sous: www.habr.com

Add nouvo kòmantè