Spark SchemaEvolution an der Praxis

Léif Lieser, Gudde Mëtteg!

An dësem Artikel beschreift de féierende Beroder fir de Big Data Solutions Geschäftsberäich vun Neoflex am Detail Optiounen fir Variabel Strukturen Storefronts mat Apache Spark ze bauen.

Als Deel vun engem Dateanalyseprojet entsteet dacks d'Aufgab fir Vitrinen ze bauen op Basis vu locker strukturéierten Donnéeën.

Typesch sinn dës Logbicher, oder Äntwerte vu verschiddene Systemer, gespäichert a Form vu JSON oder XML. D'Daten ginn op Hadoop eropgelueden, da muss e Storefront dovunner gebaut ginn. Mir kënnen Zougang zu der geschaf storefront organiséieren, zum Beispill, duerch Impala.

An dësem Fall ass de Layout vun der Zil- Storefront onbekannt am Viraus. Desweideren kann de Schema net am Viraus opgestallt ginn, well et hänkt vun den Donnéeën of, a mir hunn mat dëse ganz schwaach strukturéierten Donnéeën ze dinn.

Zum Beispill, haut ass déi folgend Äntwert protokolléiert:

{source: "app1", error_code: ""}

a muer kënnt déi folgend Äntwert vum selwechte System:

{source: "app1", error_code: "error", description: "Network error"}

Als Resultat, soll en anert Feld op de storefront dobäi ginn - Beschreiwung, a kee weess ob et wäert kommen oder net.

D'Aufgab fir e Mart op esou Donnéeën ze kreéieren ass zimlech Standard, an Spark huet eng Rei Tools fir dëst. Fir Quelldaten ze analyséieren gëtt et Ënnerstëtzung fir béid JSON an XML, a fir e virdru onbekannt Schema gëtt schemaEvolution Ënnerstëtzung zur Verfügung gestallt.

Op den éischte Bléck gesäit d'Léisung einfach aus. Dir musst den Dossier mat JSON huelen an en an den Dataframe liesen. Spark wäert e Schema erstellen an déi nestéiert Donnéeën a Strukturen ëmsetzen. Als nächst muss alles am Parquet gespäichert ginn, deen och an Impala ënnerstëtzt gëtt, andeems Dir de Storefront am Hive Metastore registréiert.

Alles schéngt einfach ze sinn.

Wéi och ëmmer, aus de kuerze Beispiller an der Dokumentatioun ass et net kloer wat mat enger Rei vu Probleemer an der Praxis ze maachen ass.

D'Dokumentatioun beschreift eng Approche net fir e Storefront ze kreéieren, mee fir JSON oder XML an en Dataframe ze liesen.

Et weist nämlech einfach wéi een JSON liest a parséiert:

df = spark.read.json(path...)

Dëst ass genuch fir d'Donnéeën fir Spark verfügbar ze maachen.

An der Praxis ass de Szenario vill méi komplex wéi just JSON Dateien aus engem Dossier ze liesen an en Dataframe ze kreéieren. D'Situatioun gesäit esou aus: et gëtt schonn eng gewësse Vitrine, all Dag kommen nei Donnéeën, se mussen an d'Vitrine bäigefüügt ginn, net ze vergiessen datt de Schema ënnerscheeden kann.

Déi üblech Schema fir e Storefront ze bauen ass wéi follegt:

Schrëtt 1. D'Daten ginn an Hadoop gelueden mat spéider alldeegleche Reloading an op eng nei Partition bäigefüügt. D'Resultat ass en Dossier mat Quelldaten, opgedeelt vum Dag.

Schrëtt 2. Wärend der initialer Luede gëtt dësen Dossier gelies a parséiert mat Spark. Déi resultéierend Dateframe gëtt an engem Format gespäichert dat analyséiert ka ginn, zum Beispill am Parkett, deen dann an Impala importéiert ka ginn. Dëst erstellt en Zil Storefront mat all den Donnéeën déi bis zu dësem Zäitpunkt gesammelt hunn.

Schrëtt 3. En Download gëtt erstallt deen de Storefront all Dag aktualiséiert.
D'Fro vun der inkrementeller Luede stellt sech op, de Besoin fir de Storefront ze partitionéieren, an d'Fro fir den allgemenge Layout vum Storefront z'ënnerstëtzen.

Loosst eis e Beispill ginn. Loosst eis soen datt den éischte Schrëtt fir e Repository ze bauen ass implementéiert, an d'Eroplueden vun JSON Dateien an en Dossier ass konfiguréiert.

En Dateframe vun hinnen ze kreéieren an se dann als Vitrine ze späicheren ass kee Problem. Dëst ass deen éischte Schrëtt deen einfach an der Spark Dokumentatioun fonnt ka ginn:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Alles schéngt gutt ze sinn.

Mir hunn den JSON gelies a parséiert, da späichere mir den Dataframe als Parkett, registréiere se an Hive op all praktesch Manéier:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Mir kréien eng Vitrine.

Awer den nächsten Dag goufen nei Daten aus der Quell bäigefüügt. Mir hunn en Dossier mat JSON, an e Storefront erstallt baséiert op dësem Dossier. Nodeems den nächsten Deel vun Daten aus der Quell gelueden ass, huet de Storefront net genuch Daten fir een Dag.

Eng logesch Léisung wier et de Storefront vum Dag ze partitionéieren, wat Iech erlaabt all nächsten Dag eng nei Partition derbäi ze ginn. De Mechanismus fir dëst ass och bekannt; Spark erlaabt Iech Partitionen separat opzehuelen.

Als éischt maache mir déi initial Luede, späicheren d'Donnéeën wéi uewen beschriwwen, addéiere just Partitionéierung. Dës Aktioun gëtt Storefront Initialiséierung genannt a gëtt nëmmen eemol gemaach:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Den nächsten Dag download mir nëmmen déi nei Partition:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Alles wat bleift ass sech an Hive nei ze registréieren fir de Schema ze aktualiséieren.
Allerdéngs sinn hei Problemer.

Éischte Problem. Fréier oder spéider wäert de resultéierende Parkett net méi liesbar sinn. Dëst ass wéinst wéi Parkett an JSON eidel Felder anescht behandelen.

Loosst eis eng typesch Situatioun betruechten. Zum Beispill, gëschter kënnt JSON:

День 1: {"a": {"b": 1}},

an haut gesäit dee selwechte JSON esou aus:

День 2: {"a": null}

Loosst eis soen datt mir zwou verschidde Partitionen hunn, all mat enger Zeil.
Wa mir déi ganz Quelldaten liesen, Spark wäert kënnen den Typ ze bestëmmen, a wäert verstoen, datt "a" e Feld vun Typ "Struktur" ass, mat engem nest Feld "b" Typ INT. Awer wann all Partition getrennt gespäichert gouf, dann ass d'Resultat e Parkett mat inkompatibelen Partitionschemaen:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Dës Situatioun ass bekannt, sou datt eng Optioun speziell bäigefüügt gouf fir eidel Felder ze läschen wann Dir Quelldaten parséiert:

df = spark.read.json("...", dropFieldIfAllNull=True)

An dësem Fall besteet de Parkett aus Partitionen, déi zesumme gelies kënne ginn.
Obwuel déi, déi dat an der Praxis gemaach hunn, batter laachen. Firwat? Jo, well wahrscheinlech nach zwou Situatiounen entstoen. Oder dräi. Oder véier. Déi éischt, déi bal sécher ass, ass datt numeresch Aarte a verschiddene JSON Dateien anescht ausgesinn. Zum Beispill {intField: 1} an {intField: 1.1}. Wann esou Felder an engem Batch erschéngen, da liest de Schema Fusioun alles richteg, wat zu de genausten Typ féiert. Awer wann a verschiddene, da wäert een intField: int hunn, an deen aneren intField: duebel.

Fir dës Situatioun ze handhaben ass de folgende Fändel:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Elo hu mir en Dossier wou d'Partitionen sinn, deen an engem eenzegen Dataframe an e gültege Parkett vum ganze Storefront gelies ka ginn. Jo? Nee.

Mir mussen drun erënneren datt mir den Dësch an Hive registréiert hunn. Hive ass net Fall sensibel am Feld Nimm, mee Parkett ass. Dofir sinn Partitionen mat Schemaen: field1: int, a Field1: int d'selwecht fir Hive, awer net fir Spark. Vergiesst net d'Feldnimm op kleng Buschtawen z'änneren.

Duerno schéngt alles gutt ze sinn.

Wéi och ëmmer, net all sou einfach. En zweeten, och bekannte Problem entsteet. Zënter all nei Partition getrennt gespäichert ass, enthält de Partition Dossier Spark Service Dateien, zum Beispill den _SUCCESS Operatioun Erfolleg Fändel. Dëst wäert zu engem Feeler Resultat wann versicht Parkett. Fir dëst ze vermeiden, musst Dir d'Konfiguratioun konfiguréieren andeems Dir verhënnert datt Spark Servicedateien an den Dossier bäidréit:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Et schéngt, datt elo all Dag eng nei Parkettpartition an den Zil-Storefront-Dossier bäigefüügt gëtt, wou d'parséiert Donnéeën fir den Dag sinn. Mir hunn am Viraus opgepasst fir sécherzestellen datt et keng Partitionen mat Datentypkonflikter waren.

Mä mir sti mat engem drëtte Problem. Elo ass den allgemenge Schema net bekannt, ausserdeem, am Hive huet den Dësch de falsche Schema, well all nei Partition héchstwahrscheinlech eng Verzerrung an de Schema agefouert huet.

Den Dësch muss nei ugemellt ginn. Dëst kann einfach gemaach ginn: liest nach eng Kéier de Parkett vum Storefront, huelt de Schema an erstellt en DDL baséiert op deem, mat deem Dir den Dossier an Hive als externen Dësch registréiere kënnt, d'Schema vum Zil Storefront aktualiséieren.

Mir stellen e véierte Problem. Wéi mir den Dësch fir d'éischte Kéier ugemellt hunn, hu mir op Spark vertraut. Elo maache mir et selwer, a mir mussen drun erënneren datt Parkettfelder mat Charaktere fänken, déi net vun Hive erlaabt sinn. Zum Beispill, Spark werft Linnen eraus déi et net am Feld "corrupt_record" parse konnt. Esou e Feld kann net an Hive registréiert ginn ouni ze flüchten.

Wann Dir dëst wësst, kréien mir den Diagramm:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Code ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") mécht sécher DDL, dat ass, amplaz:

create table tname (_field1 string, 1field string)

Mat Feldnimm wéi "_field1, 1field", gëtt eng sécher DDL gemaach, wou d'Feldnimm entkomm sinn: Table 'tname' erstellen ('_field1' String, '1field' String).

D'Fro stellt sech: wéi korrekt en Dataframe mat engem komplette Schema ze kréien (am pf Code)? Wéi kritt een dës Pf? Dëst ass de fënnefte Problem. Re-liest d'Diagramm vun all Partitionen aus dem Dossier mat Parkettdateien vum Zilgeschäft? Dës Method ass déi sécherst, awer schwéier.

De Schema ass schonn am Hive. Dir kënnt en neie Schema kréien andeems Dir de Schema vun der ganzer Tabell an der neier Partition kombinéiert. Dëst bedeit datt Dir den Dëschschema vun Hive musst huelen an et mat dem Schema vun der neier Partition kombinéieren. Dëst kann gemaach ginn andeems Dir Testmetadaten aus Hive liest, se an en temporäre Dossier späichert, a béid Partitionen gläichzäiteg mat Spark liesen.

Wesentlech gëtt et alles wat Dir braucht: den ursprénglechen Dëschschema an Hive an eng nei Partition. Mir hunn och Daten. Alles wat bleift ass en neit Schema ze kréien deen de Storefront Schema an nei Felder vun der erstallter Partition kombinéiert:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Als nächst erstallt mir den Dëschregistrierung DDL, wéi am fréiere Fragment.
Wann déi ganz Kette richteg funktionnéiert, nämlech et war eng initial Belaaschtung, an den Dësch gouf korrekt am Hive erstallt, da kréie mir en aktualiséiert Dëschschema.

De leschte Problem ass datt Dir net einfach eng Partition zu engem Hive Dësch kënnt addéieren, well se brécht. Dir musst Hive forcéieren fir seng Partitionsstruktur ze fixéieren:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Déi einfach Aufgab fir JSON ze liesen an e Storefront op der Basis ze kreéieren resultéiert eng Rei implizit Schwieregkeeten ze iwwerwannen, d'Léisungen fir déi separat musse fonnt ginn. An obwuel dës Léisunge einfach sinn, brauch et vill Zäit fir se ze fannen.

Fir de Bau vun enger Vitrine ëmzesetzen, hu mir missen:

  • Füügt Partitionen un de Storefront, lass vu Servicedateien
  • Deal mat eidel Felder a Quelldaten déi Spark getippt huet
  • Gitt einfach Typen op String
  • Konvertéiert Feldnimm op kleng Buschtawen
  • Separat Donnéeën Eroplueden an Dëschregistrierung am Hive (DDL Kreatioun)
  • Denkt drun Feldnimm ze flüchten déi vläicht net mat Hive kompatibel sinn
  • Léiert den Dësch Aschreiwung an Hive ze aktualiséieren

Fir ze resuméieren, bemierken mir datt d'Entscheedung fir Storefronts ze bauen ass mat villen Fallen gefeelt. Dofir, wann et Schwieregkeeten an der Ëmsetzung entstinn, ass et besser fir en erfuerene Partner mat erfollegräicher Expertise ze wenden.

Merci fir d'Liesen vun dësem Artikel, mir hoffen Dir fannt d'Informatioun nëtzlech.

Source: will.com

Setzt e Commentaire