Spark-skemoEvoluo en praktiko

Karaj legantoj, bonan posttagmezon!

En ĉi tiu artikolo, la ĉefa konsultisto por la komerca areo de Big Data Solutions de Neoflex detale priskribas eblojn por konstrui variajn strukturajn butikfasadojn per Apache Spark.

Kiel parto de datuma analizprojekto, ofte aperas la tasko konstrui montrofenestrojn bazitajn sur loze strukturitaj datumoj.

Tipe ĉi tiuj estas protokoloj, aŭ respondoj de diversaj sistemoj, konservitaj en la formo de JSON aŭ XML. La datumoj estas alŝutitaj al Hadoop, tiam oni devas konstrui vendejon el ĝi. Ni povas organizi aliron al la kreita vendejo, ekzemple, per Impala.

En ĉi tiu kazo, la aranĝo de la cela vendejo estas nekonata anticipe. Krome, la skemo ne povas esti ellaborita anticipe, ĉar ĝi dependas de la datumoj, kaj ni traktas ĉi tiujn tre malforte strukturitajn datumojn.

Ekzemple, hodiaŭ la sekva respondo estas registrita:

{source: "app1", error_code: ""}

kaj morgaŭ la sekva respondo venas de la sama sistemo:

{source: "app1", error_code: "error", description: "Network error"}

Kiel rezulto, alia kampo devus esti aldonita al la vendejo - priskribo, kaj neniu scias ĉu ĝi venos aŭ ne.

La tasko krei vendejon pri tiaj datumoj estas sufiĉe norma, kaj Spark havas kelkajn ilojn por tio. Por analizado de fontaj datumoj, ekzistas subteno por kaj JSON kaj XML, kaj por antaŭe nekonata skemo, schemaEvolution-subteno estas disponigita.

Unuavide, la solvo aspektas simpla. Vi devas preni la dosierujon kun JSON kaj legi ĝin en la datumkadron. Spark kreos skemon kaj transformos la nestitajn datumojn en strukturojn. Poste, ĉio devas esti konservita en pargeto, kiu ankaŭ estas subtenata en Impala, registrante la vendejon en la metavendejo Hive.

Ĉio ŝajnas esti simpla.

Tamen el la mallongaj ekzemploj en la dokumentado ne estas klare, kion fari kun kelkaj problemoj en la praktiko.

La dokumentaro priskribas aliron ne por krei butikfasadon, sed por legi JSON aŭ XML en datumkadron.

Nome, ĝi simple montras kiel legi kaj analizi JSON:

df = spark.read.json(path...)

Ĉi tio sufiĉas por disponigi la datumojn al Spark.

En praktiko, la scenaro estas multe pli kompleksa ol nur legi JSON-dosierojn el dosierujo kaj krei datuman kadron. La situacio aspektas tiel: ekzistas jam certa montrofenestro, novaj datumoj alvenas ĉiutage, ili devas esti aldonitaj al la montrofenestro, ne forgesante, ke la skemo povas malsami.

La kutima skemo por konstrui vendejon estas kiel sekvas:

Ŝtupo 1. La datumoj estas ŝarĝitaj en Hadoop, sekvataj de ĉiutaga plia ŝarĝo kaj aldonitaj al nova sekcio. La rezulto estas dosierujo kun fontaj datumoj, dividitaj laŭtage.

Ŝtupo 2. Dum komenca ŝarĝo, ĉi tiu dosierujo estas legita kaj analizita per Spark. La rezulta datumkadro estas konservita en formato analizebla, ekzemple, en pargeto, kiu tiam povas esti importita en Impala. Ĉi tio kreas celan vendejon kun ĉiuj datumoj, kiuj amasiĝis ĝis ĉi tiu punkto.

Ŝtupo 3. Elŝuto estas kreita, kiu ĝisdatigos la butikfasadon ĉiutage.
La demando pri pliiga ŝarĝo ŝprucas, la bezono dividi la butikfasadon, kaj la demando pri subteno de la ĝenerala aranĝo de la butikfasado.

Ni donu ekzemplon. Ni diru, ke la unua paŝo konstrui deponejon estas efektivigita, kaj alŝuti JSON-dosierojn al dosierujo estas agordita.

Ne estas problemo krei datumkadron de ili kaj poste konservi ilin kiel montrofenestro. Ĉi tiu estas la unua paŝo, kiu troveblas facile en la dokumentado de Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Ĉio ŝajnas esti bona.

Ni legis kaj analizis la JSON, tiam ni konservas la datumkadron kiel pargeton, registrante ĝin en Hive laŭ iu ajn oportuna maniero:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Ni ricevas montrofenestron.

Sed, la sekvan tagon novaj datumoj estis aldonitaj de la fonto. Ni havas dosierujon kun JSON, kaj butikfasado kreita surbaze de ĉi tiu dosierujo. Post ŝarĝo de la sekva parto de datumoj de la fonto, la vendejo ne havas sufiĉajn datumojn por unu tago.

Logika solvo estus dividi la vendejon tage, kio permesos al vi aldoni novan sekcion ĉiun sekvan tagon. La mekanismo por tio ankaŭ estas konata; Spark permesas vin registri subdiskojn aparte.

Unue, ni faras la komencan ŝarĝon, konservante la datumojn kiel priskribite supre, aldonante nur dispartigo. Ĉi tiu ago nomiĝas butikfasado inicialigo kaj estas farita nur unufoje:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

La sekvan tagon ni elŝutas nur la novan sekcion:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Restas nur reregistriĝi en Hive por ĝisdatigi la skemon.
Tamen ĉi tie aperas problemoj.

Unua problemo. Pli aŭ malpli frue, la rezulta pargeto ne plu estos legebla. Ĉi tio estas pro kiel pargeto kaj JSON traktas malplenajn kampojn malsame.

Ni konsideru tipan situacion. Ekzemple, hieraŭ JSON alvenas:

День 1: {"a": {"b": 1}},

kaj hodiaŭ la sama JSON aspektas jene:

День 2: {"a": null}

Ni diru, ke ni havas du malsamajn sekciojn, ĉiu kun unu linio.
Kiam ni legos la tutajn fontajn datumojn, Spark povos determini la tipon, kaj komprenos, ke "a" estas kampo de tipo "strukturo", kun nestita kampo "b" de tipo INT. Sed, se ĉiu sekcio estis konservita aparte, tiam la rezulto estas pargeto kun nekongruaj vandoskemoj:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Ĉi tiu situacio estas konata, do opcio estis speciale aldonita por forigi malplenajn kampojn dum analizado de fontaj datumoj:

df = spark.read.json("...", dropFieldIfAllNull=True)

En ĉi tiu kazo, pargeto konsistos el vandoj, kiuj legeblas kune.
Kvankam tiuj, kiuj faris tion praktike, ridetos amare ĉi tie. Kial? Jes, ĉar verŝajne estos du pliaj situacioj. Aŭ tri. Aŭ kvar. La unua, kiu estas preskaŭ certa, estas, ke nombraj tipoj aspektos malsame en malsamaj JSON-dosieroj. Ekzemple, {intField: 1} kaj {intField: 1.1}. Se tiaj kampoj aperas en unu aro, tiam la skemo kunfandita legos ĉion ĝuste, kondukante al la plej preciza tipo. Sed se en malsamaj, tiam unu havos intField: int, kaj la alia havos intField: double.

Por trakti ĉi tiun situacion estas la sekva flago:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Nun ni havas dosierujon, kie troviĝas la sekcioj, kiu legeblas en ununuran datumkadron kaj validan pargeton de la tuta butikfasado. Jes? Ne.

Ni devas memori, ke ni registris la tablon en Hive. Hive ne distingas minusklecon en kamponomoj, sed pargeto estas. Tial, sekcioj kun skemoj: field1: int, kaj Field1: int estas la samaj por Hive, sed ne por Spark. Ne forgesu ŝanĝi kamponomojn al minusklo.

Post ĉi tio, ĉio ŝajnas esti bona.

Tamen, ne ĉio estas tiel simpla. Estiĝas dua, ankaŭ konata problemo. Ĉar ĉiu nova sekcio estas konservita aparte, la diskdosierujo enhavos Spark-servdosierojn, ekzemple, la _SUCCESS-operacia sukcesmarko. Ĉi tio rezultigos eraron kiam vi provos pargeti. Por eviti tion, vi devas agordi la agordon malebligante al Spark aldoni servodosierojn al la dosierujo:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Ŝajnas, ke nun ĉiutage nova pargeta vando estas aldonita al la cela butikfasado dosierujo, kie troviĝas la analizitaj datumoj por la tago. Ni antaŭzorgis certigi, ke ne ekzistas sekcioj kun datumtipaj konfliktoj.

Sed ni estas antaŭ tria problemo. Nun la ĝenerala skemo ne estas konata, krome, en Hive la tabelo havas la malĝustan skemon, ĉar ĉiu nova dispartigo plej verŝajne enkondukis distordon en la skemon.

La tabelo devas esti reregistrita. Ĉi tio povas esti farita simple: legu la pargeton de la vendejo denove, prenu la skemon kaj kreu DDL bazitan sur ĝi, per kiu vi povas reregistri la dosierujon en Hive kiel eksteran tabelon, ĝisdatigante la skemon de la cela vendejo.

Ni alfrontas kvaran problemon. Kiam ni registris la tablon por la unua fojo, ni fidis al Spark. Nun ni mem faras ĝin, kaj ni devas memori, ke pargetaj kampoj povas komenciĝi per signoj, kiuj ne estas permesitaj de Hive. Ekzemple, Spark elĵetas liniojn, kiujn ĝi ne povis analizi en la kampo "corrupt_record". Tia kampo ne povas esti registrita en Hive sen eskapi.

Sciante tion, ni ricevas la diagramon:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Kodo ("_koruptita_registro", "`_koruptita_registro`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace ("tabelo<`", "tabelo<") faras sekuran DDL, tio estas, anstataŭ:

create table tname (_field1 string, 1field string)

Kun kamponomoj kiel "_field1, 1field", sekura DDL estas farita kie la kamponomoj estas eskapataj: kreu tabelon `tname` (`_field1` ĉeno, `1field` ĉeno).

La demando ŝprucas: kiel ĝuste akiri datumkadron kun kompleta skemo (en pf-kodo)? Kiel akiri ĉi tiun pf? Ĉi tiu estas la kvina problemo. Relegi la diagramon de ĉiuj vandoj el la dosierujo kun pargetaj dosieroj de la cela vendejo? Ĉi tiu metodo estas la plej sekura, sed malfacila.

La skemo jam estas en Hive. Vi povas akiri novan skemon kombinante la skemon de la tuta tabelo kaj la nova sekcio. Ĉi tio signifas, ke vi devas preni la tabelskemon de Hive kaj kombini ĝin kun la skemo de la nova sekcio. Ĉi tio povas esti farita legante testajn metadatumojn de Hive, konservante ĝin en provizora dosierujo, kaj legante ambaŭ sekciojn samtempe uzante Spark.

Esence, estas ĉio, kion vi bezonas: la origina tabelskemo en Hive kaj nova sekcio. Ni ankaŭ havas datumojn. Restas nur akiri novan skemon, kiu kombinas la butikfasadon kaj novajn kampojn de la kreita sekcio:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Poste, ni kreas la tabelregistradon DDL, kiel en la antaŭa fragmento.
Se la tuta ĉeno funkcias ĝuste, nome, estis komenca ŝarĝo, kaj la tabelo estis ĝuste kreita en Hive, tiam ni ricevas ĝisdatigitan tabelskemon.

Kaj la lasta problemo estas, ke vi ne povas facile aldoni sekcion al la Hive-tabelo, ĉar ĝi rompiĝos. Vi devas devigi Hive ripari ĝian sekciostrukturon:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

La simpla tasko legi JSON kaj krei vendejon bazitan sur ĝi rezultas venki kelkajn implicajn malfacilaĵojn, kies solvoj devas esti trovitaj aparte. Kaj kvankam ĉi tiuj solvoj estas simplaj, necesas multe da tempo por trovi ilin.

Por efektivigi la konstruadon de montrofenestro, ni devis:

  • Aldonu sekciojn al la vendejo, forigante servajn dosierojn
  • Traktu malplenajn kampojn en fontaj datumoj, kiujn Spark tajpis
  • Ĵetu simplajn tipojn al ŝnuro
  • Konverti kamponomojn al minusklo
  • Aparta datuma alŝuto kaj tabelregistrado en Hive (DDL-generacio)
  • Ne forgesu eviti kamponomojn, kiuj eble ne kongruas kun Hive
  • Lernu ĝisdatigi tabelregistradon en Hive

Por resumi, ni rimarkas, ke la decido konstrui butikfasadojn estas plena de multaj malfacilaĵoj. Tial, se okazas malfacilaĵoj en efektivigo, estas pli bone turni sin al sperta partnero kun sukcesa kompetenteco.

Dankon pro legi ĉi tiun artikolon, ni esperas ke vi trovos la informon utila.

fonto: www.habr.com

Aldoni komenton