Spark scéimre Éabhlóid go praiticiúil

A léitheoirí, lá maith!

San Airteagal seo, déanann príomhchomhairleoir réimse gnó Neoflex's Big Data Solutions cur síos go mion ar na roghanna chun taispeántais struchtúr athraitheach a thógáil ag baint úsáide as Apache Spark.

Mar chuid de thionscadal anailíse sonraí, is minic a thagann an tasc chun aghaidheanna stórais a thógáil bunaithe ar shonraí struchtúrtha scaoilte.

Go hiondúil is logaí iad seo, nó freagraí ó chórais éagsúla, a shábháiltear mar JSON nó XML. Déantar na sonraí a uaslódáil chuig Hadoop, ansin caithfidh tú aghaidh stórais a thógáil uathu. Is féidir linn rochtain ar an taispeántas cruthaithe a eagrú, mar shampla, trí Impala.

Sa chás seo, ní fios roimh ré scéimre an sprioc-éadan siopa. Ina theannta sin, ní féidir an scéim a tharraingt suas roimh ré freisin, toisc go mbraitheann sé ar na sonraí, agus táimid ag déileáil leis na sonraí struchtúrtha an-scaoilte seo.

Mar shampla, tá an freagra seo a leanas logáilte inniu:

{source: "app1", error_code: ""}

agus amárach ón gcóras céanna tagann an freagra seo a leanas:

{source: "app1", error_code: "error", description: "Network error"}

Mar thoradh air sin, ba chóir réimse amháin níos mó a chur leis an taispeántas - cur síos, agus níl a fhios ag aon duine an dtiocfaidh sé nó nach dtiocfaidh.

Tá an tasc chun aghaidh stórais a chruthú ar shonraí den sórt sin sách caighdeánach, agus tá roinnt uirlisí ag Spark chuige seo. Chun na sonraí foinseacha a pharsáil, tá tacaíocht ar fáil do JSON agus XML, agus do scéimre nach raibh aithne uirthi roimhe seo, soláthraítear tacaíocht do schemaEvolution.

Ar an gcéad amharc, breathnaíonn an réiteach simplí. Ní mór duit fillteán a thógáil le JSON agus é a léamh i bhfráma sonraí. Cruthóidh Spark scéimre, casfaidh sonraí neadaithe ina struchtúir. Ina theannta sin, ní mór gach rud a shábháil i iontlaise, a fhaigheann tacaíocht freisin in Impala, trí aghaidh an tstórais a chlárú i meiteastóir Hive.

Is cosúil go bhfuil gach rud simplí.

Mar sin féin, ní léir ó na samplaí gearra sa doiciméadú cad ba cheart a dhéanamh le roinnt fadhbanna go praiticiúil.

Déanann an doiciméadú cur síos ar chur chuige nach gcruthaítear aghaidh stórais, ach chun JSON nó XML a léamh i bhfráma sonraí.

Eadhon, léiríonn sé go simplí conas JSON a léamh agus a pharsáil:

df = spark.read.json(path...)

Is leor é seo chun na sonraí a chur ar fáil do Spark.

Go praiticiúil, tá an script i bhfad níos casta ná comhaid JSON a léamh ó fhillteán agus fráma sonraí a chruthú. Breathnaíonn an cás mar seo: tá aghaidh áirithe stórais ann cheana féin, tagann sonraí nua isteach gach lá, ní mór iad a chur leis an siopa, gan dearmad a dhéanamh go bhféadfadh an scéim a bheith difriúil.

Seo a leanas an gnáthscéim chun taispeántas a thógáil:

Céim 1. Lódáiltear na sonraí isteach i Hadoop le hathlódáil laethúil ina dhiaidh sin agus cuirtear le críochdheighilt nua iad. Casadh sé amach fillteán le sonraí tosaigh a dheighilt de réir an lae.

Céim 2. Le linn an ualaigh tosaigh, léann Spark an fillteán seo agus é a pharsáil. Sábháltar an fráma sonraí mar thoradh air i bhformáid parsable, mar shampla, i iontlaise, ar féidir a allmhairiú ansin isteach i Impala. Cruthaíonn sé seo spriocthaispeántas leis na sonraí go léir atá carntha suas go dtí an pointe seo.

Céim 3. Cruthaítear íoslódáil a nuashonróidh aghaidh an tsiopa gach lá.
Tá ceist ann maidir le luchtú incriminteach, an gá atá leis an taispeántas a dheighilt, agus an cheist maidir le scéim ghinearálta an taispeántais a chothabháil.

Glacaimis sampla. Ligean le rá go bhfuil an chéad chéim de thógáil stór curtha i bhfeidhm, agus comhaid JSON uaslódáilte chuig fillteán.

Ní fadhb é fráma sonraí a chruthú uathu, ansin é a shábháil mar thaispeántas. Is é seo an chéad chéim is féidir a fháil go héasca i gcáipéisíocht Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Is cosúil go bhfuil gach rud go breá.

Léigheamar agus pharsáileamar JSON, ansin sábhálaimid an fráma sonraí mar iontlaise, agus é á chlárú in Hive ar aon bhealach áisiúil:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Faighimid fuinneog.

Ach, an lá dár gcionn, cuireadh sonraí nua ón bhfoinse leis. Tá fillteán againn le JSON, agus taispeántas cruthaithe ón bhfillteán seo. Tar éis an chéad bhaisc eile sonraí a luchtú ón bhfoinse, tá luach lae amháin sonraí in easnamh ar an margadh sonraí.

Is é an réiteach loighciúil a bheadh ​​ann ná aghaidh an stórais a dheighilt go lá, rud a cheadóidh críochdheighilt nua a chur leis gach lá eile. Tá an mheicníocht seo ar eolas go maith freisin, ligeann Spark duit landairí a scríobh ar leithligh.

Ar dtús, déanaimid ualach tosaigh, ag sábháil na sonraí mar a thuairiscítear thuas, ag cur deighilt amháin leis. Tugtar túsú aghaidh siopa ar an ngníomh seo agus ní dhéantar é ach uair amháin:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

An lá dár gcionn, ní lódálann muid ach críochdheighilt nua:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Níl fágtha ach athchlárú in Hive chun an scéimre a nuashonrú.
Mar sin féin, is anseo a thagann fadhbanna chun cinn.

An chéad fhadhb. Luath nó mall, beidh an iontlaise mar thoradh air a bheith neamh-inléite. Tá sé seo mar gheall ar an gcaoi a ndéileálann iontlaise agus JSON réimsí folmha ar bhealach difriúil.

Déanaimis machnamh ar chás tipiciúil. Mar shampla, tháinig JSON inné:

День 1: {"a": {"b": 1}},

agus inniu tá an chuma ar an JSON céanna mar seo:

День 2: {"a": null}

Ligean le rá go bhfuil dhá dheighiltí éagsúla againn, gach ceann acu le líne amháin.
Nuair a bheidh na sonraí foinse iomlána á léamh againn, beidh Spark in ann an cineál a chinneadh, agus tuigfidh sé gur réimse de chineál "struchtúr" é "a", le réimse neadaithe "b" de chineál INT. Ach, má shábháiltear gach críochdheighilt ar leithligh, ansin faigheann muid iontlaise le scéimeanna deighilte neamh-chomhoiriúnacha:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Tá an scéal seo ar eolas go maith, mar sin tá rogha curtha leis go speisialta - agus na sonraí foinseacha á pharsáil, bain na réimsí folamh:

df = spark.read.json("...", dropFieldIfAllNull=True)

Sa chás seo, beidh an iontlaise comhdhéanta de landairí is féidir a léamh le chéile.
Cé go mbeidh aoibh gháire searbh anseo iad siúd a rinne é seo go praiticiúil. Cén fáth? Sea, mar is dócha go mbeidh dhá chás eile ann. Nó trí. Nó ceithre. Is é an chéad cheann, a tharlóidh beagnach cinnte, ná go mbeidh cuma difriúil ar chineálacha uimhriúla i gcomhaid JSON éagsúla. Mar shampla, {intField: 1} agus {intField: 1.1}. Má aimsítear réimsí den sórt sin i ndeighilt amháin, ansin léifidh an cumasc scéimre gach rud i gceart, as a dtiocfaidh an cineál is cruinne. Ach más rud é i gcinn éagsúla, ansin beidh intField: int ag duine, agus beidh intField: dúbailte ag an gceann eile.

Tá an bhratach seo a leanas ann chun an cás seo a láimhseáil:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Anois tá fillteán againn ina bhfuil landairí ar féidir iad a léamh isteach i bhfráma sonraí amháin agus iontlaise bailí den taispeántas iomlán. Tá? Níl.

Ní mór dúinn cuimhneamh gur chláraigh muid an tábla i Hive. Níl an choirceog cásíogair in ainmneacha páirce, agus tá iontlaise cásíogair. Mar sin, tá landairí le scéimre: field1: int, agus Field1: int mar an gcéanna do Hive, ach ní do Spark. Ná déan dearmad na hainmneacha páirce a thiontú go cás íochtair.

Tar éis sin, is cosúil go bhfuil gach rud go breá.

Mar sin féin, ní léir chomh simplí. Tá an dara fadhb a bhfuil aithne mhaith uirthi freisin. Ós rud é go ndéantar gach deighilt nua a shábháil ar leithligh, beidh comhaid seirbhíse Spark san fhillteán deighilte, mar shampla, bratach rathúlachta na hoibríochta _SUCCESS. Beidh sé seo mar thoradh ar earráid nuair a iarraidh a iontlaise. Chun é seo a sheachaint, ní mór duit an chumraíocht a chumrú chun Spark a chosc ó chomhaid seirbhíse a chur leis an bhfillteán:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Dealraíonn sé go gcuirtear críochdheighilt iontlaise nua gach lá leis an bhfillteán taispeántais sprice, áit a bhfuil na sonraí parsáilte don lá suite. Ghlacamar cúram roimh ré nach raibh aon deighiltí ann le coinbhleacht cineál sonraí.

Ach, tá an tríú fadhb againn. Anois ní fios cad é an scéimre ghinearálta, ina theannta sin, tá scéimre mícheart ag an tábla i Hive, mar is dócha gur thug gach críochdheighilt nua saobhadh isteach sa scéimre.

Ní mór duit an tábla a athchlárú. Is féidir é seo a dhéanamh go simplí: léigh iontlaise aghaidh an tsiopa arís, tóg an scéimre agus cruthaigh DDL bunaithe air, chun an fillteán in Hive a athchlárú mar thábla seachtrach, ag nuashonrú scéimre an sprioc-éadanais stórais.

Tá ceathrú fadhb againn. Nuair a chláraigh muid an tábla don chéad uair, bhíomar ag brath ar Spark. Anois déanaimid é féin, agus ní mór dúinn cuimhneamh gur féidir tús a chur le réimsí iontlaise le carachtair nach bhfuil ceadaithe do Hive. Mar shampla, caitheann Spark línte amach nach bhféadfadh sé a pharsáil sa réimse "corrupt_record". Ní féidir páirc den sórt sin a chlárú in Hive gan éalú.

Agus é seo ar eolas againn, faigheann muid an scéim:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Cód ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`"). ionad ("eagar <`", "eagar<") a dhéanann DDL sábháilte, i.e. in ionad:

create table tname (_field1 string, 1field string)

Le hainmneacha páirce mar "_field1, 1field", déantar DDL sábháilte nuair a éalaítear na hainmneacha páirce: cruthaigh tábla `tname` (teaghrán `_field1`, teaghrán `1field`).

Éiríonn an cheist: conas fráma sonraí a fháil i gceart le scéimre iomlán (i gcód pf)? Conas an pf seo a fháil? Is é seo an cúigiú fadhb. Athléamh scéim na ndeighiltí go léir ón bhfillteán le comhaid iontlaise den spriocthaispeántas? Is é an modh seo an ceann is sábháilte, ach is deacra.

Tá an scéimre i Hive cheana féin. Is féidir leat scéimre nua a fháil trí scéimre an tábla ar fad agus an deighilt nua a chomhcheangal. Mar sin ní mór duit scéimre an bhoird a thógáil ó Hive agus é a chomhcheangal le scéimre na críochdheighilte nua. Is féidir é seo a dhéanamh trí na meiteashonraí tástála ó Hive a léamh, é a shábháil i bhfillteán sealadach, agus Spark a úsáid chun an dá dheighilt a léamh láithreach.

Go deimhin, tá gach rud atá uait: an scéimre tábla bunaidh i Hive agus an críochdheighilte nua. Tá sonraí againn freisin. Níl le déanamh ach scéimre nua a fháil a chomhcheanglaíonn scéimre aghaidh an tsiopa agus réimsí nua ón deighilt chruthaithe:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Ina dhiaidh sin, cruthaímid an clárú tábla DDL, mar a tharla sa bhlúire roimhe seo.
Má oibríonn an slabhra iomlán i gceart, eadhon, bhí ualach tosaigh ann, agus cruthaíodh an tábla i gceart i Hive, ansin faigheann muid scéimre tábla nuashonraithe.

Agus is é an fhadhb dheireanach nach féidir leat a chur díreach críochdheighilte le tábla Hive, mar go mbeidh sé a bheith briste. Ní mór duit iallach a chur ar Hive a struchtúr deighilte a shocrú:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Mar thoradh ar an tasc simplí JSON a léamh agus aghaidh siopa a chruthú bunaithe air sáraítear roinnt deacrachtaí intuigthe, réitigh a gcaithfidh tú a lorg ar leithligh. Agus cé go bhfuil na réitigh seo simplí, tógann sé go leor ama iad a aimsiú.

Chun tógáil an taispeántais a chur i bhfeidhm, bhí orm:

  • Cuir landairí leis an taispeántas, ag fáil réidh le comhaid seirbhíse
  • Déileáil le réimsí folmha sna sonraí foinse atá clóscríofa ag Spark
  • Cineálacha simplí a chaitheamh le sreang
  • Tiontaigh ainmneacha páirce go cás íochtair
  • Uaslódáil sonraí ar leith agus clárú táblaí in Hive (giniúint DDL)
  • Ná déan dearmad éalú ó ainmneacha páirce a d’fhéadfadh a bheith ar neamhréir le Hive
  • Foghlaim conas clárú táblaí in Hive a nuashonrú

Ag achoimre, tugaimid faoi deara go bhfuil an cinneadh chun fuinneoga siopa a thógáil lán le go leor gaistí. Dá bhrí sin, i gcás deacrachtaí maidir le cur i bhfeidhm, is fearr teagmháil a dhéanamh le comhpháirtí taithí a bhfuil saineolas rathúil aige.

Go raibh maith agat as an alt seo a léamh, tá súil againn go mbeidh an fhaisnéis úsáideach duit.

Foinse: will.com

Add a comment