Esblygiad sgema gwreichionen yn ymarferol

Annwyl ddarllenwyr, prynhawn da!

Yn yr erthygl hon, mae'r ymgynghorydd blaenllaw ar gyfer maes busnes Big Data Solutions Neoflex yn disgrifio'n fanwl yr opsiynau ar gyfer adeiladu blaenau siopau strwythur amrywiol gan ddefnyddio Apache Spark.

Fel rhan o brosiect dadansoddi data, mae'r dasg o adeiladu arddangosfeydd yn seiliedig ar ddata â strwythur llac yn codi'n aml.

Yn nodweddiadol mae'r rhain yn logiau, neu ymatebion o systemau amrywiol, wedi'u cadw ar ffurf JSON neu XML. Mae'r data'n cael ei lanlwytho i Hadoop, yna mae angen adeiladu blaen siop ohono. Gallwn drefnu mynediad i'r blaen siop a grëwyd, er enghraifft, trwy Impala.

Yn yr achos hwn, nid yw cynllun blaen y siop darged yn hysbys ymlaen llaw. At hynny, ni ellir llunio'r cynllun ymlaen llaw, gan ei fod yn dibynnu ar y data, ac rydym yn ymdrin â'r data hwn sydd â strwythur gwan iawn.

Er enghraifft, heddiw mae'r ymateb canlynol wedi'i gofnodi:

{source: "app1", error_code: ""}

ac yfory daw'r ymateb canlynol o'r un system:

{source: "app1", error_code: "error", description: "Network error"}

O ganlyniad, dylid ychwanegu maes arall at flaen y siop - disgrifiad, a does neb yn gwybod a ddaw ai peidio.

Mae'r dasg o greu mart ar ddata o'r fath yn weddol safonol, ac mae gan Spark nifer o offer ar gyfer hyn. Ar gyfer dosrannu data ffynhonnell, mae cefnogaeth ar gyfer JSON ac XML, ac ar gyfer sgema anhysbys o'r blaen, darperir cefnogaeth schemaEvolution.

Ar yr olwg gyntaf, mae'r ateb yn edrych yn syml. Mae angen i chi gymryd y ffolder gyda JSON a'i ddarllen yn y ffrâm ddata. Bydd Spark yn creu sgema ac yn troi'r data nythu yn strwythurau. Nesaf, mae angen arbed popeth mewn parquet, sydd hefyd yn cael ei gefnogi yn Impala, trwy gofrestru blaen y siop yn metastore Hive.

Mae'n ymddangos bod popeth yn syml.

Fodd bynnag, o'r enghreifftiau byr yn y ddogfennaeth nid yw'n glir beth i'w wneud â nifer o broblemau yn ymarferol.

Mae'r ddogfennaeth yn disgrifio dull nid ar gyfer creu blaen siop, ond ar gyfer darllen JSON neu XML i ffrâm ddata.

Sef, mae'n dangos yn syml sut i ddarllen a dosrannu JSON:

df = spark.read.json(path...)

Mae hyn yn ddigon i sicrhau bod y data ar gael i Spark.

Yn ymarferol, mae'r senario yn llawer mwy cymhleth na dim ond darllen ffeiliau JSON o ffolder a chreu ffrâm ddata. Mae'r sefyllfa'n edrych fel hyn: mae yna arddangosfa benodol eisoes, mae data newydd yn cyrraedd bob dydd, mae angen eu hychwanegu at yr arddangosfa, heb anghofio y gallai'r cynllun fod yn wahanol.

Mae'r cynllun arferol ar gyfer adeiladu blaen siop fel a ganlyn:

Cam 1. Mae'r data'n cael ei lwytho i Hadoop, ac yna llwytho ychwanegol dyddiol a'i ychwanegu at raniad newydd. Y canlyniad yw ffolder gyda data ffynhonnell, wedi'i rannu fesul dydd.

Cam 2. Yn ystod y llwytho cychwynnol, mae'r ffolder hon yn cael ei darllen a'i dosrannu gan ddefnyddio Spark. Mae'r ffrâm data canlyniadol yn cael ei gadw mewn fformat y gellir ei ddadansoddi, er enghraifft, mewn parquet, y gellir ei fewnforio wedyn i Impala. Mae hyn yn creu blaen siop darged gyda'r holl ddata sydd wedi cronni hyd at y pwynt hwn.

Cam 3. Mae lawrlwythiad yn cael ei greu a fydd yn diweddaru blaen y siop bob dydd.
Mae'r cwestiwn o lwytho cynyddrannol yn codi, yr angen i rannu blaen y siop, a'r cwestiwn o gefnogi gosodiad cyffredinol blaen y siop.

Gadewch i ni roi enghraifft. Gadewch i ni ddweud bod y cam cyntaf o adeiladu ystorfa wedi'i weithredu, ac mae uwchlwytho ffeiliau JSON i ffolder wedi'i ffurfweddu.

Nid yw'n broblem creu ffrâm ddata oddi wrthynt ac yna eu cadw fel arddangosfa. Dyma'r cam cyntaf un y gellir ei ddarganfod yn hawdd yn nogfennaeth Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Mae popeth yn ymddangos yn iawn.

Rydym wedi darllen a dosrannu'r JSON, yna rydym yn cadw'r ffrâm ddata fel parquet, gan ei gofrestru yn Hive mewn unrhyw ffordd gyfleus:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Rydyn ni'n cael arddangosfa.

Ond, y diwrnod wedyn ychwanegwyd data newydd o'r ffynhonnell. Mae gennym ni ffolder gyda JSON, a blaen siop wedi'i chreu yn seiliedig ar y ffolder hwn. Ar ôl llwytho'r rhan nesaf o ddata o'r ffynhonnell, nid oes gan flaen y siop ddigon o ddata am un diwrnod.

Ateb rhesymegol fyddai rhannu blaen y siop yn ystod y dydd, a fydd yn caniatáu ichi ychwanegu rhaniad newydd bob diwrnod nesaf. Mae'r mecanwaith ar gyfer hyn hefyd yn adnabyddus; mae Spark yn caniatáu ichi gofnodi rhaniadau ar wahân.

Yn gyntaf, rydym yn gwneud y llwytho cychwynnol, gan arbed y data fel y disgrifir uchod, gan ychwanegu rhaniad yn unig. Gelwir y weithred hon yn gychwyniad blaen siop ac fe'i gwneir unwaith yn unig:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Y diwrnod wedyn rydyn ni'n lawrlwytho'r rhaniad newydd yn unig:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Y cyfan sydd ar ôl yw ailgofrestru yn Hive i ddiweddaru'r sgema.
Fodd bynnag, dyma lle mae problemau'n codi.

Problem gyntaf. Yn hwyr neu'n hwyrach, ni fydd y parquet canlyniadol yn ddarllenadwy mwyach. Mae hyn oherwydd sut mae parquet a JSON yn trin caeau gwag yn wahanol.

Gadewch i ni ystyried sefyllfa nodweddiadol. Er enghraifft, ddoe mae JSON yn cyrraedd:

День 1: {"a": {"b": 1}},

a heddiw mae'r un JSON yn edrych fel hyn:

День 2: {"a": null}

Gadewch i ni ddweud bod gennym ddau raniad gwahanol, pob un ag un llinell.
Pan fyddwn yn darllen y data ffynhonnell gyfan, bydd Spark yn gallu pennu'r math, a bydd yn deall bod "a" yn faes o fath "strwythur", gyda maes nythu "b" o fath INT. Ond, pe bai pob rhaniad yn cael ei arbed ar wahân, yna'r canlyniad yw parquet gyda chynlluniau rhaniad anghydnaws:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Mae'r sefyllfa hon yn hysbys iawn, felly mae opsiwn wedi'i ychwanegu'n arbennig i ddileu meysydd gwag wrth ddosrannu data ffynhonnell:

df = spark.read.json("...", dropFieldIfAllNull=True)

Yn yr achos hwn, bydd parquet yn cynnwys rhaniadau y gellir eu darllen gyda'i gilydd.
Er y bydd y rhai sydd wedi gwneud hyn yn ymarferol yn gwenu'n chwerw. Pam? Bydd, oherwydd mae'n debyg y bydd dwy sefyllfa arall yn codi. Neu dri. Neu bedwar. Y cyntaf, sydd bron yn sicr, yw y bydd mathau rhifol yn edrych yn wahanol mewn gwahanol ffeiliau JSON. Er enghraifft, {intField: 1} a {intField: 1.1}. Os bydd meysydd o'r fath yn ymddangos mewn un swp, yna bydd yr uno sgema yn darllen popeth yn gywir, gan arwain at y math mwyaf cywir. Ond os mewn rhai gwahanol, yna bydd gan un inField: int, a bydd gan y llall inField: dwbl.

Er mwyn delio â'r sefyllfa hon mae'r faner ganlynol:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Nawr mae gennym ni ffolder lle mae'r rhaniadau wedi'u lleoli, y gellir eu darllen yn un ffrâm ddata a pharquet dilys o flaen y siop gyfan. Oes? Nac ydw.

Rhaid cofio inni gofrestru'r bwrdd yn Hive. Nid yw cwch gwenyn yn hynod sensitif mewn enwau caeau, ond mae parquet yn sensitif. Felly, mae rhaniadau gyda sgemâu: field1: int, a Field1:int yr un peth ar gyfer Hive, ond nid ar gyfer Spark. Peidiwch ag anghofio newid enwau caeau i lythrennau bach.

Ar ôl hyn, mae'n ymddangos bod popeth yn iawn.

Fodd bynnag, nid yw pob un mor syml. Mae ail broblem, sydd hefyd yn adnabyddus, yn codi. Gan fod pob rhaniad newydd yn cael ei gadw ar wahân, bydd y ffolder rhaniad yn cynnwys ffeiliau gwasanaeth Spark, er enghraifft, baner llwyddiant gweithrediad _SUCCESS. Bydd hyn yn arwain at gamgymeriad wrth geisio parquet. Er mwyn osgoi hyn, mae angen i chi ffurfweddu'r ffurfweddiad trwy atal Spark rhag ychwanegu ffeiliau gwasanaeth i'r ffolder:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Mae'n ymddangos bod rhaniad parquet newydd yn cael ei ychwanegu bob dydd at y ffolder blaen siop targed, lle mae'r data wedi'i ddosrannu ar gyfer y diwrnod wedi'i leoli. Cymerasom ofal ymlaen llaw i sicrhau nad oedd unrhyw raniadau gyda gwrthdaro math o ddata.

Ond rydym yn wynebu trydydd problem. Nawr nid yw'r sgema cyffredinol yn hysbys, ar ben hynny, yn Hive mae gan y bwrdd y sgema anghywir, gan fod pob rhaniad newydd yn fwyaf tebygol o gyflwyno ystumiad i'r sgema.

Mae angen ailgofrestru'r tabl. Gellir gwneud hyn yn syml: darllenwch barquet blaen y siop eto, cymerwch y sgema a chreu DDL yn seiliedig arno, y gallwch ei ddefnyddio i ailgofrestru'r ffolder yn Hive fel bwrdd allanol, gan ddiweddaru sgema blaen y siop darged.

Rydym yn wynebu pedwerydd problem. Pan wnaethom gofrestru'r bwrdd am y tro cyntaf, roeddem yn dibynnu ar Spark. Nawr rydyn ni'n ei wneud ein hunain, ac mae angen i ni gofio y gall caeau parquet ddechrau gyda chymeriadau na chaniateir gan Hive. Er enghraifft, mae Spark yn taflu llinellau na allai eu dosrannu yn y maes “corrupt_record”. Ni ellir cofrestru cae o'r fath yn Hive heb ddianc.

O wybod hyn, rydym yn cael y diagram:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Cod ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`"). yn lle ("arae<`", "array<") yn gwneud DDL yn ddiogel, hynny yw, yn lle:

create table tname (_field1 string, 1field string)

Gydag enwau caeau fel “_field1, 1field”, mae DDL diogel yn cael ei wneud lle mae enwau caeau yn cael eu dianc: creu tabl `tname` (llinyn `_field1`, llinyn `1field`).

Mae'r cwestiwn yn codi: sut i gael ffrâm ddata yn gywir gyda sgema cyflawn (mewn cod PF)? Sut i gael y pf hwn? Dyma'r pumed broblem. Ailddarllen y diagram o'r holl raniad o'r ffolder gyda ffeiliau parquet o flaen y storfa darged? Y dull hwn yw'r mwyaf diogel, ond anodd.

Mae'r sgema eisoes yn Hive. Gallwch gael sgema newydd trwy gyfuno sgema'r bwrdd cyfan a'r rhaniad newydd. Mae hyn yn golygu bod angen i chi gymryd y sgema bwrdd o Hive a'i gyfuno â sgema'r rhaniad newydd. Gellir gwneud hyn trwy ddarllen metadata prawf o Hive, ei gadw i ffolder dros dro, a darllen y ddau raniad ar unwaith gan ddefnyddio Spark.

Yn y bôn, mae yna bopeth sydd ei angen arnoch chi: y sgema bwrdd gwreiddiol yn Hive a rhaniad newydd. Mae gennym ni ddata hefyd. Y cyfan sydd ar ôl yw cael sgema newydd sy'n cyfuno'r sgema blaen siop a meysydd newydd o'r rhaniad a grëwyd:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Nesaf, rydyn ni'n creu'r cofrestriad tabl DDL, fel yn y darn blaenorol.
Os yw'r gadwyn gyfan yn gweithio'n gywir, sef, roedd llwyth cychwynnol, a bod y bwrdd wedi'i greu'n gywir yn Hive, yna rydyn ni'n cael sgema tabl wedi'i ddiweddaru.

Y broblem olaf yw na allwch chi ychwanegu rhaniad at fwrdd Hive yn hawdd, gan y bydd yn torri. Mae angen i chi orfodi Hive i drwsio ei strwythur rhaniad:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Mae'r dasg syml o ddarllen JSON a chreu blaen siop yn seiliedig arno yn arwain at oresgyn nifer o anawsterau ymhlyg, y mae'n rhaid dod o hyd i'r atebion ar eu cyfer ar wahân. Ac er bod yr atebion hyn yn syml, mae'n cymryd llawer o amser i ddod o hyd iddynt.

Er mwyn gweithredu'r gwaith o adeiladu arddangosfa, roedd yn rhaid i ni:

  • Ychwanegu rhaniadau i flaen y siop, cael gwared ar ffeiliau gwasanaeth
  • Delio â meysydd gwag yn y data ffynhonnell y mae Spark wedi'i deipio
  • Castiwch fathau syml i linyn
  • Trosi enwau caeau i lythrennau bach
  • Llwytho data ar wahân a chofrestru tablau yn Hive (creu DDL)
  • Peidiwch ag anghofio dianc rhag enwau caeau nad ydynt efallai'n gydnaws â Hive
  • Dysgwch i ddiweddaru cofrestriad tabl yn Hive

I grynhoi, nodwn fod y penderfyniad i adeiladu blaenau siopau yn llawn llawer o beryglon. Felly, os cyfyd anawsterau wrth weithredu, mae'n well troi at bartner profiadol sydd ag arbenigedd llwyddiannus.

Diolch am ddarllen yr erthygl hon, gobeithiwn y bydd y wybodaeth yn ddefnyddiol i chi.

Ffynhonnell: hab.com

Ychwanegu sylw