Spark schemaEvolution i praktiken

Kära läsare, god dag!

I den här artikeln beskriver den ledande konsulten för Neoflex affärsområde Big Data Solutions i detalj alternativen för att bygga skyltfönster med variabel struktur med Apache Spark.

Som en del av ett dataanalysprojekt uppstår ofta uppgiften att bygga skyltfönster baserat på löst strukturerad data.

Vanligtvis är dessa loggar, eller svar från olika system, sparade som JSON eller XML. Data laddas upp till Hadoop, sedan måste du bygga ett skyltfönster från dem. Vi kan organisera tillgången till det skapade skyltfönstret, till exempel genom Impala.

I det här fallet är schemat för målskyltfönstret inte känt i förväg. Dessutom kan schemat inte heller utarbetas i förväg, eftersom det beror på uppgifterna, och vi har att göra med dessa mycket löst strukturerade uppgifter.

Till exempel, idag loggas följande svar:

{source: "app1", error_code: ""}

och imorgon från samma system kommer följande svar:

{source: "app1", error_code: "error", description: "Network error"}

Som ett resultat bör ytterligare ett fält läggas till i skyltfönstret - beskrivning, och ingen vet om det kommer eller inte.

Uppgiften att skapa ett skyltfönster på sådan data är ganska standard, och Spark har ett antal verktyg för detta. För att analysera källdata finns det stöd för både JSON och XML, och för ett tidigare okänt schema tillhandahålls stöd för schemaEvolution.

Vid första anblicken ser lösningen enkel ut. Du måste ta en mapp med JSON och läsa in den i en dataram. Spark kommer att skapa ett schema, förvandla kapslade data till strukturer. Vidare måste allt sparas i parkett, vilket även stöds i Impala, genom att registrera skyltfönstret i Hive-metastore.

Allt verkar vara enkelt.

Det framgår dock inte av de korta exemplen i dokumentationen vad man ska göra med ett antal problem i praktiken.

Dokumentationen beskriver ett tillvägagångssätt att inte skapa ett skyltfönster, utan att läsa in JSON eller XML i en dataram.

Det visar nämligen helt enkelt hur man läser och tolkar JSON:

df = spark.read.json(path...)

Detta är tillräckligt för att göra data tillgänglig för Spark.

I praktiken är skriptet mycket mer komplicerat än att bara läsa JSON-filer från en mapp och skapa en dataram. Situationen ser ut så här: det finns redan ett visst skyltfönster, nya data kommer in varje dag, de måste läggas till i skyltfönstret, inte att glömma att schemat kan skilja sig åt.

Det vanliga schemat för att bygga ett skyltfönster är som följer:

Steg 1. Data laddas in i Hadoop med efterföljande daglig omladdning och läggs till en ny partition. Det visar sig en mapp med initiala data partitionerade per dag.

Steg 2. Under den första laddningen läses och analyseras denna mapp av Spark. Den resulterande dataramen sparas i ett parserbart format, till exempel i parkett, som sedan kan importeras till Impala. Detta skapar en målbild med all data som har samlats fram till denna punkt.

Steg 3. En nedladdning skapas som uppdaterar skyltfönstret varje dag.
Det är en fråga om inkrementell belastning, behovet av att dela upp montern och frågan om att bibehålla det allmänna schemat för montern.

Låt oss ta ett exempel. Låt oss säga att det första steget för att bygga ett arkiv har implementerats och JSON-filer laddas upp till en mapp.

Att skapa en dataram från dem och sedan spara den som ett skyltfönster är inget problem. Detta är det allra första steget som lätt kan hittas i Spark-dokumentationen:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Allt verkar vara bra.

Vi läste och analyserade JSON, sedan sparar vi dataramen som en parkett och registrerar den i Hive på något bekvämt sätt:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Vi får ett fönster.

Men nästa dag lades nya data från källan till. Vi har en mapp med JSON och ett utställningsfönster skapat från den här mappen. Efter att ha laddat nästa batch med data från källan, saknar datamarknaden en dags data.

Den logiska lösningen skulle vara att partitionera skyltfönstret efter dag, vilket gör det möjligt att lägga till en ny partition varje nästa dag. Mekanismen för detta är också välkänd, Spark låter dig skriva partitioner separat.

Först gör vi en initial laddning, sparar data enligt beskrivningen ovan och lägger endast till partitionering. Denna åtgärd kallas skyltfönsterinitiering och görs endast en gång:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Nästa dag laddar vi bara en ny partition:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Allt som återstår är att registrera om i Hive för att uppdatera schemat.
Det är dock här som problem uppstår.

Första problemet. Förr eller senare kommer den resulterande parketten att vara oläslig. Detta beror på hur parkett och JSON behandlar tomma fält olika.

Låt oss överväga en typisk situation. Till exempel, igår kom JSON:

День 1: {"a": {"b": 1}},

och idag ser samma JSON ut så här:

День 2: {"a": null}

Låt oss säga att vi har två olika partitioner, var och en med en rad.
När vi läser hela källdata kommer Spark att kunna bestämma typen och kommer att förstå att "a" är ett fält av typen "struktur", med ett kapslat fält "b" av typen INT. Men om varje partition sparades separat, får vi en parkett med inkompatibla partitionssystem:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Denna situation är välkänd, så ett alternativ har lagts till speciellt - när du analyserar källdata, ta bort tomma fält:

df = spark.read.json("...", dropFieldIfAllNull=True)

I detta fall kommer parketten att bestå av skiljeväggar som kan läsas ihop.
Även om de som gjort detta i praktiken kommer att le bittert här. Varför? Ja, för det kommer sannolikt att bli två situationer till. Eller tre. Eller fyra. Den första, som nästan säkert kommer att inträffa, är att numeriska typer kommer att se olika ut i olika JSON-filer. Till exempel {intField: 1} och {intField: 1.1}. Om sådana fält finns i en partition kommer schemasammanslagningen att läsa allt korrekt, vilket leder till den mest exakta typen. Men om i olika, kommer den ena att ha intField: int, och den andra kommer att ha intField: double.

Det finns följande flagga för att hantera denna situation:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Nu har vi en mapp där det finns partitioner som kan läsas in i en enda dataram och en giltig parkett av hela montern. Ja? Nej.

Vi måste komma ihåg att vi registrerade tabellen i Hive. Hive är inte skiftlägeskänsligt i fältnamn, medan parkett är skiftlägeskänsligt. Därför är partitioner med scheman: field1: int och Field1: int samma för Hive, men inte för Spark. Glöm inte att konvertera fältnamnen till gemener.

Efter det verkar allt vara bra.

Dock inte allt så enkelt. Det finns ett andra, också välkänt problem. Eftersom varje ny partition sparas separat, kommer partitionsmappen att innehålla Spark-tjänstfiler, till exempel flaggan _SUCCESS operationen lyckad. Detta kommer att resultera i ett fel när du försöker parkett. För att undvika detta måste du konfigurera konfigurationen för att förhindra att Spark lägger till servicefiler i mappen:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Det verkar som att nu varje dag en ny parkettpartition läggs till i målmappen, där den analyserade datan för dagen finns. Vi såg i förväg till att det inte fanns några partitioner med en datatypkonflikt.

Men vi har ett tredje problem. Nu är det allmänna schemat inte känt, dessutom har tabellen i Hive ett felaktigt schema, eftersom varje ny partition med största sannolikhet införde en förvrängning i schemat.

Du måste registrera om bordet. Detta kan göras enkelt: läs parketten på skyltfönstret igen, ta schemat och skapa en DDL baserat på det, med vilken du kan registrera mappen i Hive som en extern tabell, uppdatera schemat för målskyltfönstret.

Vi har ett fjärde problem. När vi registrerade bordet för första gången förlitade vi oss på Spark. Nu gör vi det själva, och vi måste komma ihåg att parkettfält kan börja med tecken som inte är tillåtna för Hive. Till exempel kastar Spark ut rader som den inte kunde analysera i fältet "corrupt_record". Ett sådant fält kan inte registreras i Hive utan att vara escaped.

När vi vet detta får vi schemat:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Kod ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") gör säker DDL, dvs istället för:

create table tname (_field1 string, 1field string)

Med fältnamn som "_field1, 1field", skapas säker DDL där fältnamnen escapes: skapa tabell `tname` (`_field1` sträng, `1field` sträng).

Frågan uppstår: hur man korrekt får en dataram med ett komplett schema (i pf-kod)? Hur får man tag i denna pf? Detta är det femte problemet. Läsa om schemat för alla partitioner från mappen med parkettfiler för målutställningen? Denna metod är den säkraste, men svår.

Schemat finns redan i Hive. Du kan få ett nytt schema genom att kombinera schemat för hela tabellen och den nya partitionen. Så du måste ta tabellschemat från Hive och kombinera det med schemat för den nya partitionen. Detta kan göras genom att läsa testmetadata från Hive, spara den i en tillfällig mapp och använda Spark för att läsa båda partitionerna samtidigt.

Faktum är att det finns allt du behöver: det ursprungliga tabellschemat i Hive och den nya partitionen. Vi har också data. Det återstår bara att få ett nytt schema som kombinerar skyltfönsterschemat och nya fält från den skapade partitionen:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Därefter skapar vi tabellregistreringen DDL, som i föregående utdrag.
Om hela kedjan fungerar korrekt, nämligen det fanns en initialiseringsbelastning, och tabellen skapades korrekt i Hive, så får vi ett uppdaterat tabellschema.

Och det sista problemet är att du inte bara kan lägga till en partition till en Hive-tabell, eftersom den kommer att gå sönder. Du måste tvinga Hive att fixa dess partitionsstruktur:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Den enkla uppgiften att läsa JSON och skapa ett skyltfönster baserat på det resulterar i att övervinna ett antal implicita svårigheter, lösningar som du måste leta efter separat. Och även om dessa lösningar är enkla, tar det mycket tid att hitta dem.

För att genomföra konstruktionen av skyltfönstret var jag tvungen att:

  • Lägg till partitioner i skyltfönstret, bli av med servicefiler
  • Hantera tomma fält i källdata som Spark har skrivit
  • Kasta enkla typer till ett snöre
  • Konvertera fältnamn till gemener
  • Separat datauppladdning och tabellregistrering i Hive (DDL-generation)
  • Glöm inte att undvika fältnamn som kan vara inkompatibla med Hive
  • Lär dig hur du uppdaterar bordsregistrering i Hive

Sammanfattningsvis konstaterar vi att beslutet att bygga skyltfönster är kantat av många fallgropar. Därför, vid svårigheter med genomförandet, är det bättre att kontakta en erfaren partner med framgångsrik expertis.

Tack för att du läser den här artikeln, vi hoppas att du hittar informationen användbar.

Källa: will.com

Lägg en kommentar