Spark skemaEvolúsje yn 'e praktyk

Beste lêzers, goeie dei!

Yn dit artikel beskriuwt de liedende adviseur fan it bedriuwsgebiet Big Data Solutions fan Neoflex yn detail de opsjes foar it bouwen fan showcases mei fariabele struktuer mei Apache Spark.

As ûnderdiel fan in data-analyzeprojekt ûntstiet faak de taak om winkelfronten te bouwen basearre op los strukturearre gegevens.

Meastal binne dit logs, as antwurden fan ferskate systemen, bewarre as JSON of XML. De gegevens wurde opladen nei Hadoop, dan moatte jo in winkelfront fan har bouwe. Wy kinne organisearje tagong ta de makke showcase, bygelyks, fia Impala.

Yn dit gefal is it skema fan 'e doelwinkelfront net foarôf bekend. Boppedat kin it skema ek net foarôf opsteld wurde, om't it ôfhinget fan de gegevens, en wy hawwe te krijen mei dizze tige los strukturearre gegevens.

Bygelyks, hjoed wurdt it folgjende antwurd oanmeld:

{source: "app1", error_code: ""}

en moarn út itselde systeem komt it folgjende antwurd:

{source: "app1", error_code: "error", description: "Network error"}

As gefolch, ien mear fjild moat wurde tafoege oan de showcase - beskriuwing, en gjinien wit oft it sil komme of net.

De taak fan it meitsjen fan in winkelfront op sokke gegevens is frij standert, en Spark hat in oantal ark foar dit. Foar it parsearjen fan de boarnegegevens is d'r stipe foar sawol JSON as XML, en foar in earder ûnbekend skema wurdt stipe foar schemaEvolution levere.

Op it earste each sjocht de oplossing ienfâldich. Jo moatte in map nimme mei JSON en it lêze yn in dataframe. Spark sil in skema oanmeitsje, geneste gegevens omsette yn struktueren. Fierder moat alles bewarre wurde yn parket, dat ek stipe wurdt yn Impala, troch it registrearjen fan de winkel yn de Hive-metastore.

Alles liket ienfâldich te wêzen.

Ut de koarte foarbylden yn de dokumintaasje is lykwols net dúdlik wat der yn de praktyk mei in tal problemen moat.

De dokumintaasje beskriuwt in oanpak net om in winkelfront te meitsjen, mar om JSON of XML yn in dataframe te lêzen.

It lit nammentlik gewoan sjen hoe't jo JSON lêze en parsearje:

df = spark.read.json(path...)

Dit is genôch om de gegevens beskikber te stellen foar Spark.

Yn 'e praktyk is it skript folle yngewikkelder dan allinich it lêzen fan JSON-bestannen út in map en it meitsjen fan in dataframe. De situaasje sjocht der sa út: d'r is al in bepaalde winkel, nije gegevens komme elke dei binnen, se moatte wurde tafoege oan 'e winkel, net te ferjitten dat it skema kin ferskille.

It gewoane skema foar it bouwen fan in showcase is as folget:

Step 1. De gegevens wurde laden yn Hadoop mei dêrop folgjende deistige opnij laden en tafoege oan in nije partysje. It docht bliken dat in map mei inisjele gegevens ferdield troch dei.

Step 2. Tidens de earste laden wurdt dizze map lêzen en parseard troch Spark. It resultearjende dataframe wurdt bewarre yn in parsearbere formaat, bygelyks yn parket, dat dan kin wurde ymportearre yn Impala. Dit soarget foar in doelshowcase mei alle gegevens dy't oant dit punt hawwe sammele.

Step 3. In download wurdt makke dy't elke dei de winkelfront sil bywurkje.
D'r is in fraach fan inkrementele laden, de needsaak om de showcase te dielen, en de fraach fan it behâld fan it algemiene skema fan 'e showcase.

Litte wy in foarbyld nimme. Litte wy sizze dat de earste stap fan it bouwen fan in repository is ymplementearre, en JSON-bestannen wurde opladen nei in map.

It meitsjen fan in dataframe fan har, dan bewarje it as in showcase, is gjin probleem. Dit is de earste stap dy't maklik te finen is yn 'e Spark-dokumintaasje:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Alles liket goed te wêzen.

Wy lêze en parsearden JSON, dan bewarje wy it dataframe as in parket, en registrearje it op elke handige manier yn Hive:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Wy krije in finster.

Mar de oare deis waarden nije gegevens fan 'e boarne tafoege. Wy hawwe in map mei JSON, en in showcase makke út dizze map. Nei it laden fan de folgjende batch fan gegevens fan 'e boarne, mist de gegevensmart ien dei wearde oan gegevens.

De logyske oplossing soe wêze om de winkelfront by dei te dielen, wêrtroch elke oare deis in nije partysje kin wurde tafoege. It meganisme foar dit is ek bekend, Spark kinne jo te skriuwen partysjes apart.

Earst dogge wy in earste lading, bewarje de gegevens lykas hjirboppe beskreaun, tafoegje allinich partitionearring. Dizze aksje wurdt inisjalisaasje fan winkelfront neamd en wurdt mar ien kear dien:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

De oare deis laden wy allinich in nije partysje:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Alles wat oerbliuwt is opnij registrearje yn Hive om it skema te aktualisearjen.
Dit is lykwols wêr't problemen ûntsteane.

Earste probleem. Ier of letter sil it resultearjende parket net lêsber wêze. Dit komt troch hoe't parket en JSON lege fjilden oars behannelje.

Litte wy in typyske situaasje beskôgje. Bygelyks, juster komt JSON:

День 1: {"a": {"b": 1}},

en hjoed sjocht deselde JSON der sa út:

День 2: {"a": null}

Litte wy sizze dat wy twa ferskillende partysjes hawwe, elk mei ien line.
As wy lêze de hiele boarne gegevens, sil Spark by steat wêze om te bepalen it type, en sil begripe dat "a" is in fjild fan type "struktuer", mei in neste fjild "b" fan type INT. Mar as elke ôfskieding apart waard bewarre, dan krije wy in parket mei ynkompatibele partysjeskema's:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Dizze situaasje is goed bekend, dus in opsje is spesjaal tafoege - by it parsearjen fan de boarnegegevens, fuortsmite lege fjilden:

df = spark.read.json("...", dropFieldIfAllNull=True)

Yn dit gefal sil it parket bestean út partysjes dy't tegearre kinne wurde lêzen.
Hoewol't dejingen dy't dit yn 'e praktyk dien hawwe, sille hjir bitter glimkje. Wêrom? Ja, want der binne nei alle gedachten noch twa situaasjes. Of trije. Of fjouwer. De earste, dy't hast wis sil foarkomme, is dat numerike typen oars sille útsjen yn ferskate JSON-bestannen. Bygelyks {intField: 1} en {intField: 1.1}. As sokke fjilden fûn wurde yn ien dieling, dan sil de gearfoeging fan it skema alles goed lêze, wat liedt ta it meast krekte type. Mar as yn ferskillende, dan sil ien hawwe intField: int, en de oare sil hawwe intField: dûbele.

D'r is de folgjende flagge om dizze situaasje te behanneljen:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

No hawwe wy in map wêr't partysjes binne dy't kinne wurde lêzen yn ien dataframe en in jildich parket fan 'e heule showcase. Ja? Nee.

Wy moatte betinke dat wy registrearre de tabel yn Hive. Hive is net hoofdlettergefoel yn fjildnammen, wylst parket is hoofdlettergefoel. Dêrom binne partysjes mei skema's: field1: int, en Field1: int itselde foar Hive, mar net foar Spark. Ferjit net de fjildnammen te konvertearjen nei lytse letters.

Dêrnei liket alles goed te wêzen.

Lykwols, net allegear sa ienfâldich. Der is in twadde, ek bekend probleem. Sûnt elke nije partysje wurdt opslein apart, sil de partition map befetsje Spark tsjinst triemmen, Bygelyks, de _SUCCESS operaasje súkses flagge. Dit sil resultearje yn in flater as jo besykje te parket. Om dit te foarkommen, moatte jo de konfiguraasje ynstelle om te foarkommen dat Spark tsjinstbestannen tafoegje oan de map:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

It liket derop dat no elke dei in nije parquetpartysje wurdt tafoege oan 'e doelwittenskipmap, wêr't de parsearde gegevens foar de dei sitte. Wy soargen foarôf dat d'r gjin partysjes wiene mei in datatypekonflikt.

Mar, wy hawwe in tredde probleem. No is it algemiene skema net bekend, boppedat hat de tabel yn Hive in ferkeard skema, om't elke nije partysje nei alle gedachten in ferfoarming yn it skema ynfierd hat.

Jo moatte opnij registrearje de tafel. Dit kin ienfâldich dien wurde: lês it parket fan 'e winkelfront nochris, nim it skema en meitsje derop in DDL oan, wêrmei't jo de map yn Hive opnij registrearje as in eksterne tabel, it bywurkjen fan it skema fan 'e doelwinkelfront.

Wy hawwe in fjirde probleem. Doe't wy registrearre de tafel foar de earste kear, wy fertroud op Spark. No wy dogge it sels, en wy moatte betinke dat parket fjilden kinne begjinne mei tekens dy't net tastien foar Hive. Bygelyks, Spark smyt rigels út dy't it net koe parse yn it fjild "corrupt_record". Sa'n fjild kin net registrearre wurde yn Hive sûnder te ûntkommen.

As wy dit witte, krije wy it skema:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

koade ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") makket feilige DDL, dus ynstee fan:

create table tname (_field1 string, 1field string)

Mei fjildnammen lykas "_field1, 1field", wurdt feilige DDL makke wêr't de fjildnammen ûntkommen binne: meitsje tabel `tname` (`_field1` string, `1field` string).

De fraach ûntstiet: hoe kinne jo in dataframe goed krije mei in folslein skema (yn pf-koade)? Hoe kinne jo dizze pf krije? Dit is it fyfde probleem. Lês it skema fan alle partysjes út 'e map opnij mei parketbestannen fan' e doelshowcase? Dizze metoade is de feilichste, mar dreech.

It skema is al yn Hive. Jo kinne in nij skema krije troch it kombinearjen fan it skema fan 'e hiele tabel en de nije partysje. Dat jo moatte it tabelskema fan Hive nimme en it kombinearje mei it skema fan 'e nije dieling. Dit kin dien wurde troch de testmetadata fan Hive te lêzen, it te bewarjen yn in tydlike map, en Spark te brûken om beide partysjes tagelyk te lêzen.

Yn feite is d'r alles wat jo nedich binne: it orizjinele tabelskema yn Hive en de nije partysje. Wy hawwe ek gegevens. It bliuwt allinich om in nij skema te krijen dat it winkelfrontskema kombineart en nije fjilden fan 'e oanmakke dieling:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Dêrnei meitsje wy de tabelregistraasje DDL, lykas yn it foarige snippet.
As de hiele ketting goed wurket, nammentlik d'r wie in initialisearjende lading, en de tabel is korrekt makke yn Hive, dan krije wy in aktualisearre tabelskema.

En it lêste probleem is dat jo net samar in ôfskieding taheakje kinne oan in Hive-tafel, om't it sil wurde brutsen. Jo moatte Hive twinge om syn partysjestruktuer te reparearjen:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

De ienfâldige taak om JSON te lêzen en dêrop in winkelfront te meitsjen resulteart yn it oerwinnen fan in oantal ymplisite swierrichheden, oplossings wêrfoar jo apart sykje moatte. En hoewol dizze oplossingen ienfâldich binne, nimt it in protte tiid om se te finen.

Om de bou fan 'e showcase út te fieren, moast ik:

  • Foegje partysjes ta oan 'e showcase, ferwiderje fan tsjinstbestannen
  • Omgean mei lege fjilden yn boarne gegevens dy't Spark hat typt
  • Giet ienfâldige typen oan in tekenrige
  • Konvertearje fjildnammen nei lytse letters
  • Separate upload fan gegevens en tabelregistraasje yn Hive (DDL-generaasje)
  • Ferjit net om fjildnammen te ûntkommen dy't miskien net kompatibel binne mei Hive
  • Learje hoe't jo tabelregistraasje yn Hive bywurkje

Gearfetsjend konstatearje wy dat it beslút om winkelfinsters te bouwen is fol mei in protte falkûlen. Dêrom, yn gefal fan swierrichheden yn útfiering, is it better om kontakt op mei in betûfte partner mei súksesfol saakkundigens.

Betanke foar it lêzen fan dit artikel, wy hoopje dat jo de ynformaasje nuttich fine.

Boarne: www.habr.com

Add a comment