Spark sgeama Evolution ann an cleachdadh

A leughadairean gràdhach, feasgar math!

San artaigil seo, tha am prìomh chomhairliche airson raon gnìomhachais Big Data Solutions de Neoflex a ’toirt cunntas mionaideach air roghainnean airson a bhith a’ togail aghaidhean stòr structar caochlaideach a ’cleachdadh Apache Spark.

Mar phàirt de phròiseact mion-sgrùdadh dàta, bidh an obair gu tric a’ togail thaisbeanaidhean stèidhichte air dàta le structar sgaoilte.

Mar as trice is iad sin logaichean, no freagairtean bho dhiofar shiostaman, air an sàbhaladh ann an cruth JSON no XML. Tha an dàta air a luchdachadh suas gu Hadoop, agus an uairsin feumar aghaidh stòr a thogail bhuaithe. Is urrainn dhuinn ruigsinneachd a chuir air dòigh don aghaidh stòr a chaidh a chruthachadh, mar eisimpleir, tro Impala.

Anns a 'chùis seo, chan eil fios ro-làimh air cruth an targaid stòrais. A bharrachd air an sin, chan urrainnear an sgeama a dhealbhadh ro-làimh, leis gu bheil e an urra ris an dàta, agus tha sinn a’ dèiligeadh ris an dàta seo le structar lag.

Mar eisimpleir, an-diugh tha am freagairt a leanas air a chlàradh:

{source: "app1", error_code: ""}

agus a-màireach thig am freagairt a leanas bhon aon shiostam:

{source: "app1", error_code: "error", description: "Network error"}

Mar thoradh air an sin, bu chòir raon eile a chur ris an aghaidh stòr - tuairisgeul, agus chan eil fios aig duine an tig e no nach tig.

Tha an obair airson mart a chruthachadh air an dàta sin gu math àbhaisteach, agus tha grunn innealan aig Spark airson seo. Airson dàta stòr a pharsadh, tha taic ann airson an dà chuid JSON agus XML, agus airson sgeama neo-aithnichte roimhe, tha taic schemaEvolution air a thoirt seachad.

Aig a 'chiad sealladh, tha am fuasgladh a' coimhead sìmplidh. Feumaidh tu am pasgan a thoirt le JSON agus a leughadh a-steach don dataframe. Cruthaichidh Spark sgeama agus tionndaidhidh e an dàta neadachaidh gu structaran. An ath rud, feumar a h-uile càil a shàbhaladh ann am parquet, a tha cuideachd a ’faighinn taic ann an Impala, le bhith a’ clàradh aghaidh an stòrais ann am metastore Hive.

Tha e coltach gu bheil a h-uile dad sìmplidh.

Ach, bho na h-eisimpleirean goirid anns na sgrìobhainnean chan eil e soilleir dè a bu chòir a dhèanamh le grunn dhuilgheadasan ann an cleachdadh.

Tha na sgrìobhainnean a’ toirt cunntas air dòigh-obrach chan ann airson aghaidh stòr a chruthachadh, ach airson JSON no XML a leughadh a-steach do fhrèam dàta.

Is e sin, tha e dìreach a’ sealltainn mar a leughas tu agus a nì thu sgrùdadh air JSON:

df = spark.read.json(path...)

Tha seo gu leòr gus an dàta a thoirt do Spark.

Ann an cleachdadh, tha an suidheachadh tòrr nas iom-fhillte na dìreach a bhith a 'leughadh faidhlichean JSON bho phasgan agus a' cruthachadh frèam dàta. Tha an suidheachadh a ’coimhead mar seo: tha taisbeanadh sònraichte ann mu thràth, bidh dàta ùr a’ ruighinn a h-uile latha, feumar an cur ris an taisbeanadh, gun a bhith a ’dìochuimhneachadh gum faodadh an sgeama a bhith eadar-dhealaichte.

Tha an sgeama àbhaisteach airson aghaidh-stòr a thogail mar a leanas:

Step 1. Tha an dàta air a luchdachadh a-steach do Hadoop, air a leantainn le luchdachadh a bharrachd làitheil agus air a chur ri sgaradh ùr. Is e an toradh pasgan le dàta stòr, air a sgaradh tron ​​​​latha.

Step 2. Rè an luchdachadh tùsail, thèid am pasgan seo a leughadh agus a pharsadh le bhith a’ cleachdadh Spark. Tha an clàr-dàta a thig às air a shàbhaladh ann an cruth a ghabhas sgrùdadh, mar eisimpleir, ann am parquet, a dh’ fhaodar an uairsin a thoirt a-steach gu Impala. Bidh seo a’ cruthachadh aghaidh stòr targaid leis an dàta gu lèir a tha air cruinneachadh suas chun na h-ìre seo.

Step 3. Thèid luchdachadh sìos a chruthachadh a bheir ùrachadh air aghaidh an stòr gach latha.
Tha a’ cheist mu luchdachadh mean air mhean ag èirigh, an fheum air aghaidh a’ bhùth a sgaradh, agus a’ cheist mu bhith a’ toirt taic do chruth coitcheann aghaidh na bùtha.

Bheir sinn eisimpleir. Canaidh sinn gun deach a’ chiad cheum ann a bhith a’ togail stòr-tasgaidh a chur an gnìomh, agus tha luchdachadh suas faidhlichean JSON gu pasgan air a rèiteachadh.

Chan eil duilgheadas sam bith ann a bhith a’ cruthachadh frèam dàta bhuapa agus an uairsin gan sàbhaladh mar thaisbeanadh. Is e seo a’ chiad cheum a tha furasta a lorg ann an sgrìobhainnean Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Tha coltas gu bheil a h-uile dad gu math.

Tha sinn air an JSON a leughadh agus a pharsadh, agus an uairsin sàbhailidh sinn an dataframe mar parquet, ga chlàradh ann an Hive ann an dòigh iomchaidh sam bith:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Bidh sinn a’ faighinn taisbeanadh.

Ach, an ath latha chaidh dàta ùr a chuir ris bhon stòr. Tha pasgan againn le JSON, agus aghaidh stòr air a chruthachadh stèidhichte air a’ phasgan seo. Às deidh an ath chuibhreann de dhàta a luchdachadh bhon stòr, chan eil dàta gu leòr aig an stòr airson aon latha.

Is e fuasgladh loidsigeach a bhith a’ sgaradh an aghaidh stòr tron ​​​​latha, a leigeas leat sgaradh ùr a chuir ris a h-uile ath latha. Tha an uidheamachd airson seo ainmeil cuideachd; Leigidh Spark leat sgaraidhean a chlàradh air leth.

An toiseach, bidh sinn a 'dèanamh a' chiad luchdachadh, a 'sàbhaladh an dàta mar a chaidh a mhìneachadh gu h-àrd, a' cur a-steach sgaradh a-mhàin. Canar tùsachadh aghaidh stòr ris a’ ghnìomh seo agus cha tèid a dhèanamh ach aon turas:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

An ath latha bidh sinn a 'luchdachadh sìos ach an earrann ùr:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Chan eil air fhàgail ach ath-chlàradh ann an Hive gus an sgeama ùrachadh.
Ach, seo far a bheil duilgheadasan ag èirigh.

A 'chiad duilgheadas. Nas luaithe no nas fhaide air adhart, cha bhith am parquet mar thoradh air sin ri leughadh tuilleadh. Tha seo mar thoradh air mar a bhios parquet agus JSON a’ làimhseachadh raointean falamh ann an dòigh eadar-dhealaichte.

Beachdaichidh sinn air suidheachadh àbhaisteach. Mar eisimpleir, an-dè thàinig JSON:

День 1: {"a": {"b": 1}},

agus an-diugh tha an aon JSON a’ coimhead mar seo:

День 2: {"a": null}

Canaidh sinn gu bheil dà sgaradh eadar-dhealaichte againn, gach fear le aon loidhne.
Nuair a leughas sinn an dàta stòr gu lèir, bidh e comasach dha Spark an seòrsa a dhearbhadh, agus tuigidh sinn gur e raon de sheòrsa “structar” a th’ ann an “a”, le raon neadachaidh “b” de sheòrsa INT. Ach, ma chaidh gach sgaradh a shàbhaladh air leth, is e an toradh parquet le sgeamaichean sgaradh neo-fhreagarrach:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Tha an suidheachadh seo aithnichte, agus mar sin chaidh roghainn a chuir ris gu sònraichte gus raointean falamh a thoirt air falbh nuair a bhios tu a’ parsadh dàta stòr:

df = spark.read.json("...", dropFieldIfAllNull=True)

Anns a 'chùis seo, bidh parquet air a dhèanamh suas de phàirtean a ghabhas leughadh còmhla.
Ged nì iadsan a rinn seo ann an cleachdadh gàire goirt. Carson? Tha, oir is dòcha gun èirich dà shuidheachadh eile. No trì. No ceithir. Is e a’ chiad fhear, a tha cha mhòr cinnteach, gum bi seòrsaichean àireamhach a’ coimhead eadar-dhealaichte ann an diofar fhaidhlichean JSON. Mar eisimpleir, {intField: 1} agus {intField: 1.1}. Ma tha na raointean sin a 'nochdadh ann an aon bhaidse, an uairsin leughaidh an sgeama còmhla a h-uile dad gu ceart, a' leantainn gu an seòrsa as ceart. Ach ma bhios ann an fheadhainn eadar-dhealaichte, an uairsin bidh inField: int aig aon, agus inField aig an fhear eile: dùbailte.

Gus dèiligeadh ris an t-suidheachadh seo tha am bratach a leanas:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

A-nis tha pasgan againn far a bheil na sgaraidhean suidhichte, a ghabhas leughadh ann an aon fhrèam dàta agus parquet dligheach den aghaidh stòr gu lèir. Tha? Chan eil.

Feumaidh sinn cuimhneachadh gun do chlàraich sinn am bòrd ann an Hive. Chan eil Hive mothachail air cùisean ann an ainmean achaidhean, ach tha parquet. Mar sin, tha roinnean le sgeamaichean: field1: int, agus Field1: int co-ionann airson Hive, ach chan ann airson Spark. Na dì-chuimhnich ainmean achaidhean atharrachadh gu litrichean beaga.

Às deidh seo, tha coltas gu bheil a h-uile dad gu math.

Ach, chan eil a h-uile cho sìmplidh. Tha dàrna duilgheadas, a tha aithnichte cuideachd, ag èirigh. Leis gu bheil gach sgaradh ùr air a shàbhaladh air leth, bidh faidhlichean seirbheis Spark anns a’ phasgan sgaradh, mar eisimpleir, bratach soirbheachas gnìomhachd _SUCCESS. Bidh seo mar thoradh air mearachd nuair a thathar a’ feuchainn ri parquet. Gus seo a sheachnadh, feumaidh tu an rèiteachadh a rèiteachadh le bhith a 'cur casg air Spark bho bhith a' cur faidhlichean seirbheis ris a 'phasgan:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Tha e coltach gu bheil a-nis a h-uile latha de sgaradh parquet ùr air a chur ris a 'phasgan targaid stòrais, far a bheil an dàta parsaichte airson an latha suidhichte. Ghabh sinn cùram ro-làimh gus dèanamh cinnteach nach robh sgaradh ann le còmhstri seòrsa dàta.

Ach tha an treas trioblaid romhainn. A-nis chan eil fios air an sgeama coitcheann, a bharrachd air an sin, ann an Hive tha an sgeama ceàrr air a’ bhòrd, leis gu bheil gach sgaradh ùr dualtach saobhadh a thoirt a-steach don sgeama.

Feumaidh an clàr a bhith air ath-chlàradh. Faodar seo a dhèanamh gu sìmplidh: leugh parquet aghaidh an stòr a-rithist, gabh an sgeama agus cruthaich DDL stèidhichte air, leis an urrainn dhut am pasgan ath-chlàradh ann an Hive mar bhòrd a-muigh, ag ùrachadh sgeama an aghaidh stòr targaid.

Tha an ceathramh duilgheadas againn. Nuair a chlàraich sinn am bòrd airson a’ chiad uair, bha sinn an urra ri Spark. A-nis bidh sinn ga dhèanamh sinn fhìn, agus feumaidh sinn cuimhneachadh gum faodadh raointean parquet tòiseachadh le caractaran nach eil ceadaichte le Hive. Mar eisimpleir, bidh Spark a’ tilgeil a-mach loidhnichean nach b’ urrainn dha a pharsadh san raon “corrupt_record”. Chan urrainnear a leithid de raon a chlàradh ann an Hive gun teicheadh.

Le fios seo, gheibh sinn an diagram:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

còd a ' ("_corrupt_record", "`_corrupt_record`") + " " + f [1].replace(":", "`:").replace ("<", "<`").replace(",", ",`"). cuir an àite ("eagar <`", "array<") a bheil DDL sàbhailte, is e sin, an àite:

create table tname (_field1 string, 1field string)

Le ainmean achaidhean mar “_field1, 1field”, thèid DDL sàbhailte a dhèanamh far an tèid na h-ainmean achaidh a theicheadh: cruthaich clàr `tname` (sreang `_field1`, sreang `1field`).

Tha a 'cheist ag èirigh: ciamar a gheibh thu frèam dàta gu ceart le sgeama iomlan (ann an còd pf)? Ciamar a gheibh thu am pf seo? Is e seo an còigeamh duilgheadas. Ath-leughadh an diagram de gach sgaradh bhon phasgan le faidhlichean parquet den aghaidh stòr targaid? Is e an dòigh seo an dòigh as sàbhailte, ach as duilghe.

Tha an sgeama ann an Hive mu thràth. Gheibh thu sgeama ùr le bhith a’ cothlamadh sgeama a’ bhùird gu lèir agus an sgaradh ùr. Tha seo a 'ciallachadh gum feum thu an sgeama bùird a thoirt bho Hive agus a chur còmhla ri sgeama an sgaradh ùr. Faodar seo a dhèanamh le bhith a 'leughadh meata-dàta deuchainn bho Hive, ga shàbhaladh gu pasgan sealach, agus a' leughadh an dà phàirt aig an aon àm a 'cleachdadh Spark.

Gu bunaiteach, tha a h-uile dad a dh ’fheumas tu: an sgeama bùird tùsail ann an Hive agus sgaradh ùr. Tha dàta againn cuideachd. Chan eil air fhàgail ach sgeama ùr fhaighinn a bhios a’ cothlamadh sgeama an aghaidh stòr agus raointean ùra bhon sgaradh cruthaichte:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

An uairsin, bidh sinn a 'cruthachadh clàr clàradh DDL, mar a bha anns a' chriomag roimhe.
Ma tha an t-sreath gu lèir ag obair gu ceart, is e sin, bha luchd tùsail ann, agus chaidh am bòrd a chruthachadh gu ceart ann an Hive, gheibh sinn sgeama clàr ùraichte.

Is e an duilgheadas mu dheireadh nach urrainn dhut sgaradh a chuir ri bòrd Hive gu furasta, oir brisidh e. Feumaidh tu toirt air Hive an structar sgaradh aige a chàradh:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Tha an obair shìmplidh a bhith a 'leughadh JSON agus a' cruthachadh aghaidh stòr stèidhichte air a 'ciallachadh gu bheilear a' faighinn thairis air grunn dhuilgheadasan a tha follaiseach, agus feumar na fuasglaidhean airson a lorg air leth. Agus ged a tha na fuasglaidhean sin sìmplidh, bheir e tòrr ùine airson an lorg.

Gus togail taisbeanaidh a chuir an gnìomh, bha againn ri:

  • Cuir sgaraidhean ris an aghaidh stòr, a’ faighinn cuidhteas faidhlichean seirbheis
  • Dèilig ri raointean falamh ann an dàta stòr a tha Spark air a thaipeadh
  • Tilg seòrsaichean sìmplidh gu sreang
  • Tionndaidh ainmean achaidh gu litrichean beaga
  • Luchdaich suas dàta air leth agus clàradh bùird ann an Hive (cruthachadh DDL)
  • Cuimhnich gun teich ainmean achaidhean a dh’ fhaodadh nach eil co-chosmhail ri Hive
  • Ionnsaich mar a dh'ùraicheas tu clàradh clàr ann an Hive

Gus geàrr-chunntas a dhèanamh, tha sinn a 'toirt fa-near gu bheil an co-dhùnadh a bhith a' togail aghaidhean-stòrais làn de dhuilgheadasan. Mar sin, ma dh'èireas duilgheadasan ann an cur an gnìomh, tha e nas fheàrr tionndadh gu com-pàirtiche eòlach le eòlas soirbheachail.

Tapadh leibh airson an artaigil seo a leughadh, tha sinn an dòchas gum bi am fiosrachadh feumail dhut.

Source: www.habr.com

Cuir beachd ann