Spark schemaEvolution ka ts'ebetso

Babali ba ratehang, letsatsi le letle!

Sehloohong sena, moeletsi ea ka sehloohong oa sebaka sa khoebo sa Neoflex's Big Data Solutions o hlalosa ka ho qaqileng mekhoa ea ho haha ​​​​lipontšo tsa mefuta-futa ea mekhoa e sebelisoang ka Apache Spark.

E le karolo ea morero oa ho hlahloba lintlha, mosebetsi oa ho haha ​​​​libaka tsa polokelo tse thehiloeng ho data e hlophisitsoeng ka mokhoa o hlephileng hangata oa hlaha.

Hangata tsena ke li-log, kapa likarabo tse tsoang ho litsamaiso tse fapaneng, tse bolokiloeng joalo ka JSON kapa XML. Lintlha li fetisetsoa ho Hadoop, joale o hloka ho haha ​​​​setoro ho tloha ho bona. Re ka hlophisa phihlello ea ponts'o e entsoeng, mohlala, ka Impala.

Tabeng ena, schema ea sebaka sa polokelo ea thepa ha e tsejoe esale pele. Ho feta moo, morero le oona o ke ke oa raloa esale pele, kaha o itšetlehile ka boitsebiso, 'me re sebetsana le lintlha tsena tse hlophisitsoeng ka mokhoa o hlephileng.

Ka mohlala, kajeno ho na le karabo e latelang:

{source: "app1", error_code: ""}

'me hosasane ho tsoa tsamaisong e tšoanang ho tla le karabo e latelang:

{source: "app1", error_code: "error", description: "Network error"}

Ka lebaka leo, tšimo e 'ngoe hape e lokela ho ekeletsoa pontšong - tlhaloso,' me ha ho motho ea tsebang hore na e tla tla kapa che.

Mosebetsi oa ho theha sebaka sa polokelo ho data e joalo o maemong a matle, 'me Spark e na le lisebelisoa tse' maloa bakeng sa sena. Bakeng sa ho arola lintlha tsa mohloli, ho na le tšehetso bakeng sa JSON le XML ka bobeli, 'me bakeng sa schema e neng e sa tsejoe pele, tšehetso ea schemaEvolution e fanoa.

Ha u sheba ka lekhetlo la pele, tharollo e shebahala e le bonolo. U hloka ho nka foldara e nang le JSON ebe u e bala ho dataframe. Spark e tla theha schema, e fetole data e behiloeng meahong. Ho feta moo, ntho e 'ngoe le e' ngoe e hloka ho bolokoa ka parquet, e ts'ehetsoeng hape Impala, ka ho ngolisa lebenkele le ka pele ho metastore ea Hive.

Ntho e 'ngoe le e 'ngoe e bonahala e le bonolo.

Leha ho le joalo, ha ho hlake ho tloha mehlala e khutšoanyane ea litokomane seo u lokelang ho se etsa ka mathata a mangata a ts'ebetsong.

Litokomane li hlalosa mokhoa oa ho se thehe sebaka sa polokelo, empa ho bala JSON kapa XML ho dataframe.

Ka mantsoe a mang, e bontša feela mokhoa oa ho bala le ho hlalosa JSON:

df = spark.read.json(path...)

Sena se lekane ho etsa hore data e fumanehe ho Spark.

Ha e le hantle, sengoloa se rarahane ho feta ho bala lifaele tsa JSON ho tsoa foldareng le ho theha dataframe. Boemo bo shebahala tjena: ho se ho ntse ho e-na le sebaka se itseng sa lebenkele, lintlha tse ncha li fihla letsatsi le leng le le leng, li hloka ho ekeletsoa lebenkeleng la mabenkele, li sa lebale hore schema e ka 'na ea fapana.

Morero o tloaelehileng oa ho etsa pontšo ke o latelang:

Mohato 1. Lintlha li kenngoa ka har'a Hadoop ka ho kenya hape letsatsi le leng le le leng ebe li eketsoa karolong e ncha. E hlahisa foldara e nang le data ea pele e arotsoeng ka letsatsi.

Mohato 2. Nakong ea mojaro oa pele, foldara ena e baloa ebe e aroloa ke Spark. Sephetho sa dataframe se bolokiloe ka mokhoa o ka hlalosoang, mohlala, ka parquet, e ka kenngoa ka har'a Impala. Sena se theha pontšo ea sepheo se nang le data eohle e bokelletsoeng ho fihlela ntlheng ena.

Mohato 3. Ho entsoe download e tla nchafatsa sebaka sa polokelo letsatsi le letsatsi.
Ho na le potso ea ho kenya letsoho ka ho eketseha, tlhokahalo ea ho arola pontšo, le potso ea ho boloka moralo o akaretsang oa pontšo.

Ha re nke mohlala. Ha re re mohato oa pele oa ho aha polokelo o se o kentsoe tšebetsong, 'me lifaele tsa JSON li kentsoe foldareng.

Ho theha dataframe ho tsoa ho bona, ebe ho e boloka e le pontšo, ha se bothata. Ona ke mohato oa pele o ka fumanoang habonolo litokomaneng tsa Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Tsohle di bonahala di lokile.

Re bala le ho fetisa JSON, ebe re boloka dataframe joalo ka parquet, re e ngolisa ho Hive ka tsela efe kapa efe e bonolo:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Re fumana fensetere.

Empa, letsatsing le hlahlamang, data e ncha e tsoang mohloling e ile ea eketsoa. Re na le sephutheli se nang le JSON, le pontšo e entsoeng ho tsoa ho sephutheli sena. Kamora ho kenya data e latelang ho tsoa mohloling, data mart ha e na data ea letsatsi le le leng.

Tharollo e utloahalang e tla ba ho arola lebenkele ka letsatsi, e leng se tla lumella ho eketsa karohano e ncha letsatsi le leng le le leng le latelang. Mochine oa sena o boetse o tsebahala, Spark e u lumella ho ngola likarolo ka thoko.

Taba ea pele, re etsa mojaro oa pele, re boloka data joalo ka ha ho hlalositsoe ka holimo, re eketsa karohano feela. Ketso ena e bitsoa ho qalisoa ha lebenkele 'me e etsoa hang feela:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Letsatsing le hlahlamang, re kenya karolo e ncha feela:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Ho setseng ke ho ingolisa hape ho Hive ho ntlafatsa schema.
Leha ho le joalo, ke hona moo mathata a hlahang.

Bothata ba pele. Haufinyane, parquet e hlahisoang e ke ke ea baloa. Sena se bakoa ke hore na parquet le JSON li tšoara masimo a se nang letho ka tsela e fapaneng.

A re hlahlobeng boemo bo tloaelehileng. Mohlala, maobane JSON e fihlile:

День 1: {"a": {"b": 1}},

'me kajeno JSON e tšoanang e shebahala tjena:

День 2: {"a": null}

Ha re re re na le likarolo tse peli tse fapaneng, e 'ngoe le e' ngoe e na le mola o le mong.
Ha re bala lintlha tsohle tsa mohloli, Spark o tla khona ho tseba hore na ke mofuta ofe, 'me o tla utloisisa hore "a" ke tšimo ea mofuta oa "sebopeho", se nang le tšimo e entsoeng "b" ea mofuta oa INT. Empa, haeba karohano e 'ngoe le e' ngoe e bolokiloe ka thoko, joale re fumana parquet e nang le merero e sa lumellaneng ea karohano:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Boemo bona bo tsejoa hantle, kahoo khetho e kenyellelitsoe ka ho khetheha - ha u hlophisa lintlha tsa mohloli, tlosa libaka tse se nang letho:

df = spark.read.json("...", dropFieldIfAllNull=True)

Tabeng ena, parquet e tla ba le likaroloana tse ka baloang hammoho.
Le hoja ba entseng sena ka liketso ba tla bososela habohloko mona. Hobaneng? E, hobane ho ka etsahala hore ho be le maemo a mang a mabeli. Kapa tse tharo. Kapa tse nne. Ea pele, e tla batla e etsahala, ke hore mefuta ea linomoro e tla shebahala e fapane lifaeleng tse fapaneng tsa JSON. Ka mohlala, {intField: 1} le {intField: 1.1}. Haeba masimo a joalo a fumanoa karolong e le 'ngoe, joale schema merge e tla bala ntho e' ngoe le e 'ngoe ka nepo, e lebisang ho mofuta o nepahetseng ka ho fetisisa. Empa haeba ho tse fapaneng, joale e mong o tla ba le intField: int, 'me e mong o tla ba le intField: habeli.

Ho na le folakha e latelang ho sebetsana le boemo bona:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Hona joale re na le fensetere moo ho nang le likaroloana tse ka baloang ho dataframe e le 'ngoe le parquet e nepahetseng ea pontšo eohle. Ee? Che.

Re lokela ho hopola hore re ngolisitse tafole ho Hive. Hive ha e utloe bohloko ka mabitso a masimo, ha parquet e na le kutloelo-bohloko. Ka hona, li-partitions tse nang le schemas: field1: int, le Field1: int li tšoana bakeng sa Hive, empa eseng bakeng sa Spark. U se ke ua lebala ho fetolela mabitso a sebaka ho ba litlhaku tse tlaase.

Ka mor'a moo, lintho tsohle li bonahala li lokile.

Leha ho le joalo, hase bohle ba bonolo hakaalo. Ho na le bothata ba bobeli, hape bo tsebahalang. Kaha karohano e 'ngoe le e' ngoe e ncha e bolokiloe ka thoko, foldara ea karohano e tla ba le lifaele tsa ts'ebeletso ea Spark, mohlala, folakha ea katleho ea ts'ebetso ea _SUCCESS. Sena se tla fella ka phoso ha u leka ho parquet. Ho qoba sena, o hloka ho hlophisa tlhophiso ho thibela Spark ho eketsa lifaele tsa lits'ebeletso ho foldareng:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Ho bonahala eka joale letsatsi le leng le le leng karohano e ncha ea parquet e eketsoa foldareng ea ponts'o ea sepheo, moo ho nang le data e hlophisitsoeng ea letsatsi. Re hlokometse esale pele hore ho ne ho se na likarolo tse nang le khohlano ea mofuta oa data.

Empa, re na le bothata ba boraro. Hona joale schema e akaretsang ha e tsejoe, ho feta moo, tafole ea Hive e na le schema e fosahetseng, kaha karohano e 'ngoe le e' ngoe e ncha e ka 'na ea hlahisa ho sotha ho schema.

U hloka ho ngolisa tafole hape. Sena se ka etsoa habonolo feela: bala parquet ea sebaka se ka pele sa lebenkele hape, nka schema 'me u thehe DDL e thehiloeng ho eona, eo ka eona u ka ngolisang foldara ho Hive e le tafole ea kantle, ho nchafatsa schema sa sebaka se ka pele sa lebenkele.

Re na le bothata ba bone. Ha re ngolisa tafole lekhetlo la pele, re ile ra itšetleha ka Spark. Hona joale re iketsetsa rona, 'me re lokela ho hopola hore masimo a parquet a ka qala ka litlhaku tse sa lumelloeng bakeng sa Hive. Mohlala, Spark e lahlela mela eo e neng e sa khone ho e hlalosa sebakeng sa "corrupt_record". Tšimo e joalo e ke ke ea ngolisoa ho Hive ntle le ho phonyoha.

Ho tseba sena, re fumana moralo:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

khoutu ("_corrupt_record", "`_corrupt_record`") + " " + f[1].fetola(":", "`:").fetola("<", "<`").fetola(",", ",`").fetola("lehlopha<`", "lehlopha<"). e etsa hore DDL e bolokehe, ke hore sebakeng sa:

create table tname (_field1 string, 1field string)

Ka mabitso a tšimo joalo ka "_field1, 1field", DDL e bolokehileng e etsoa moo mabitso a masimo a phonyohileng: theha tafole `tname` (`_field1` khoele, `1field` khoele).

Ho hlaha potso: mokhoa oa ho fumana dataframe hantle ka schema e felletseng (ka khoutu ea pf)? Joang ho fumana pf ee? Bona ke bothata ba bohlano. Bala hape leano la likarolo tsohle tse tsoang foldareng ka lifaele tsa parquet tsa showcase e shebiloeng? Mokhoa ona ke o sireletsehileng ka ho fetisisa, empa o thata.

Moralo o se o ntse o le Hive. U ka fumana schema e ncha ka ho kopanya schema ea tafole eohle le karohano e ncha. Kahoo o hloka ho nka schema ea tafole ho Hive ebe o e kopanya le schema ea karohano e ncha. Sena se ka etsoa ka ho bala metadata ea liteko ho tsoa ho Hive, ho e boloka foldareng ea nakoana, le ho sebelisa Spark ho bala likarolo tse peli hang.

Ebile, ho na le ntho e 'ngoe le e' ngoe eo u e hlokang: schema ea tafole ea mantlha ho Hive le karohano e ncha. Re boetse re na le data. E sala feela ho fumana schema e ncha e kopanyang schema ea lebenkele le masimo a macha ho tsoa karohanong e entsoeng:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Ka mor'a moo, re theha DDL ea ngoliso ea tafole, joalo ka snippet e fetileng.
Haeba ketane eohle e sebetsa ka nepo, e leng, ho ne ho e-na le mojaro o qalang, mme tafole e entsoe ka nepo ho Hive, joale re fumana schema e ntlafalitsoeng ea tafole.

Mme bothata ba ho qetela ke hore o ke ke oa eketsa karohano tafoleng ea Hive, hobane e tla robeha. U hloka ho qobella Hive ho lokisa sebopeho sa eona sa karohano:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Mosebetsi o bonolo oa ho bala JSON le ho theha sebaka sa polokelo ho ipapisitse le eona ho fella ka ho hlola mathata a mangata a hlakileng, tharollo eo u tlamehang ho e batla ka thoko. 'Me le hoja litharollo tsena li le bonolo, ho nka nako e ngata ho li fumana.

Ho kenya ts'ebetsong kaho ea showcase, ke ile ka tlameha ho:

  • Kenya li-partitions ho showcase, ho tlosa lifaele tsa lits'ebeletso
  • Sebetsana le libaka tse se nang letho ho data ea mohloli eo Spark a e ngotseng
  • Lahlela mefuta e bonolo ho khoele
  • Fetolela mabitso a libaka ho ba litlhaku tse nyane
  • Arola ho kenya data le ho ngolisoa ha tafole ho Hive (moloko oa DDL)
  • Se ke oa lebala ho baleha mabitso a masimong a kanna a hanana le Hive
  • Ithute ho ntlafatsa ngoliso ea tafole ho Hive

Ha re akaretsa, re hlokomela hore qeto ea ho haha ​​lifensetere tsa mabenkele e na le maraba a mangata. Ka hona, haeba u kopana le mathata ts'ebetsong, ho molemo ho ikopanya le molekane ea nang le phihlelo ea nang le tsebo e atlehileng.

Kea leboha ha u bala sehlooho sena, re tšepa hore u tla fumana boitsebiso bo le molemo.

Source: www.habr.com

Eketsa ka tlhaloso