Spark schemaEvolution i praksis

Kære læsere, god dag!

I denne artikel beskriver den førende konsulent for Neoflex' Big Data Solutions forretningsområde detaljeret mulighederne for at bygge udstillingsvinduer med variabel struktur ved hjælp af Apache Spark.

Som led i et dataanalyseprojekt opstår ofte opgaven med at bygge butiksfacader baseret på løst strukturerede data.

Normalt er disse logfiler, eller svar fra forskellige systemer, gemt som JSON eller XML. Dataene uploades til Hadoop, så skal du bygge en butiksfacade ud fra dem. Vi kan organisere adgang til det oprettede udstillingsvindue, for eksempel gennem Impala.

I dette tilfælde er skemaet for målbutiksfacade ikke kendt på forhånd. Desuden kan ordningen heller ikke udarbejdes på forhånd, da den afhænger af dataene, og vi har med disse meget løst strukturerede data at gøre.

For eksempel logges følgende svar i dag:

{source: "app1", error_code: ""}

og i morgen fra samme system kommer følgende svar:

{source: "app1", error_code: "error", description: "Network error"}

Som følge heraf skal der tilføjes et felt mere til udstillingsvinduet - beskrivelse, og ingen ved, om det kommer eller ej.

Opgaven med at skabe en butiksfacade på sådanne data er ret standard, og Spark har en række værktøjer til dette. Til parsing af kildedata er der understøttelse af både JSON og XML, og for et hidtil ukendt skema er understøttelse af schemaEvolution.

Ved første øjekast ser løsningen simpel ud. Du skal tage en mappe med JSON og læse den ind i en dataramme. Spark vil oprette et skema, omdanne indlejrede data til strukturer. Yderligere skal alt gemmes i parket, som også understøttes i Impala, ved at registrere butiksfacade i Hive metastore.

Alt ser ud til at være enkelt.

Det fremgår dog ikke af de korte eksempler i dokumentationen, hvad man skal gøre med en række problemer i praksis.

Dokumentationen beskriver en tilgang til ikke at oprette en butiksfacade, men at læse JSON eller XML ind i en dataramme.

Det viser nemlig ganske enkelt, hvordan man læser og analyserer JSON:

df = spark.read.json(path...)

Dette er nok til at gøre dataene tilgængelige for Spark.

I praksis er scriptet meget mere kompliceret end blot at læse JSON-filer fra en mappe og oprette en dataramme. Situationen ser sådan ud: der er allerede en vis butiksfacade, nye data kommer ind hver dag, de skal tilføjes til butiksfacadet, ikke at glemme, at ordningen kan variere.

Den sædvanlige ordning for at bygge et udstillingsvindue er som følger:

Trin 1. Dataene indlæses i Hadoop med efterfølgende daglig genindlæsning og tilføjes til en ny partition. Det viser sig en mappe med indledende data opdelt efter dag.

Trin 2. Under den første indlæsning læses og analyseres denne mappe af Spark. Den resulterende dataramme gemmes i et parserbart format, for eksempel i parket, som derefter kan importeres til Impala. Dette skaber et måludstillingsvindue med alle de data, der er akkumuleret indtil dette tidspunkt.

Trin 3. Der oprettes en download, der opdaterer butiksfacadet hver dag.
Der er et spørgsmål om trinvis belastning, behovet for at opdele udstillingsvinduet og spørgsmålet om at opretholde det generelle skema for udstillingsvinduet.

Lad os tage et eksempel. Lad os sige, at det første trin i at bygge et lager er blevet implementeret, og JSON-filer uploades til en mappe.

Det er ikke et problem at oprette en dataramme fra dem og derefter gemme den som et udstillingsvindue. Dette er det allerførste trin, der nemt kan findes i Spark-dokumentationen:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Alt ser ud til at være i orden.

Vi læste og parsede JSON, derefter gemmer vi datarammen som en parket og registrerer den i Hive på enhver bekvem måde:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Vi får et vindue.

Men dagen efter blev der tilføjet nye data fra kilden. Vi har en mappe med JSON og et udstillingsvindue oprettet fra denne mappe. Efter at have indlæst den næste batch af data fra kilden, mangler datamarkedet en dags data.

Den logiske løsning ville være at partitionere butiksfacadet om dagen, hvilket gør det muligt at tilføje en ny partition hver næste dag. Mekanismen for dette er også velkendt, Spark giver dig mulighed for at skrive partitioner separat.

Først udfører vi en indledende indlæsning, gemmer dataene som beskrevet ovenfor, og tilføjer kun partitionering. Denne handling kaldes initialisering af butiksfacade og udføres kun én gang:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Den næste dag indlæser vi kun en ny partition:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Det eneste, der er tilbage, er at omregistrere i Hive for at opdatere skemaet.
Det er dog her, der opstår problemer.

Første problem. Før eller siden vil den resulterende parket være ulæselig. Dette skyldes, hvordan parket og JSON behandler tomme felter forskelligt.

Lad os overveje en typisk situation. For eksempel ankommer JSON i går:

День 1: {"a": {"b": 1}},

og i dag ser den samme JSON sådan ud:

День 2: {"a": null}

Lad os sige, at vi har to forskellige partitioner, hver med en linje.
Når vi læser hele kildedataene, vil Spark være i stand til at bestemme typen, og vil forstå, at "a" er et felt af typen "struktur", med et indlejret felt "b" af typen INT. Men hvis hver partition blev gemt separat, får vi en parket med inkompatible partitionsskemaer:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Denne situation er velkendt, så en mulighed er specielt tilføjet - når du analyserer kildedata, skal du fjerne tomme felter:

df = spark.read.json("...", dropFieldIfAllNull=True)

I dette tilfælde vil parketten bestå af skillevægge, der kan læses sammen.
Selvom de, der har gjort det i praksis, vil smile bittert her. Hvorfor? Ja, for der er sandsynligvis to situationer mere. Eller tre. Eller fire. Den første, som næsten helt sikkert vil forekomme, er, at numeriske typer vil se anderledes ud i forskellige JSON-filer. For eksempel {intField: 1} og {intField: 1.1}. Hvis sådanne felter findes i en partition, vil skemafletningen læse alt korrekt, hvilket fører til den mest nøjagtige type. Men hvis i forskellige, så vil den ene have intField: int, og den anden vil have intField: double.

Der er følgende flag til at håndtere denne situation:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Nu har vi en mappe, hvor der er skillevægge, der kan læses ind i en enkelt dataramme og en gyldig parket af hele montren. Ja? Ingen.

Vi skal huske, at vi har registreret bordet i Hive. Hive skelner ikke mellem store og små bogstaver i feltnavne, mens parket skelner mellem store og små bogstaver. Derfor er partitioner med skemaer: field1: int og Field1: int de samme for Hive, men ikke for Spark. Glem ikke at konvertere feltnavnene til små bogstaver.

Herefter ser alt ud til at være i orden.

Dog ikke alt så enkelt. Der er et andet, også velkendt problem. Da hver ny partition gemmes separat, vil partitionsmappen indeholde Spark-tjenestefiler, for eksempel flaget _SUCCESS operation succes. Dette vil resultere i en fejl, når du forsøger at parketlægge. For at undgå dette skal du konfigurere konfigurationen for at forhindre Spark i at tilføje servicefiler til mappen:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Det ser ud til, at der nu hver dag bliver tilføjet en ny parketpartition til måludstillingsmappen, hvor de analyserede data for dagen er placeret. Vi sørgede på forhånd for, at der ikke var nogen partitioner med en datatypekonflikt.

Men vi har et tredje problem. Nu er det generelle skema ikke kendt, desuden har tabellen i Hive et forkert skema, da hver ny partition højst sandsynligt har indført en forvrængning i skemaet.

Du skal omregistrere bordet. Dette kan gøres enkelt: læs parketten på butiksfacadet igen, tag skemaet og opret en DDL baseret på det, som du kan genregistrere mappen i Hive med som en ekstern tabel, og opdatere skemaet for målbutiksfacadet.

Vi har et fjerde problem. Første gang vi registrerede et bord, stolede vi på Spark. Nu gør vi det selv, og vi skal huske, at parketfelter kan begynde med tegn, der ikke er tilladt for Hive. For eksempel smider Spark linjer ud, som den ikke kunne parse i feltet "corrupt_record". Et sådant felt kan ikke registreres i Hive uden at blive undslippet.

Ved at vide dette får vi ordningen:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Kode ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") gør sikker DDL, dvs. i stedet for:

create table tname (_field1 string, 1field string)

Med feltnavne som "_field1, 1field" laves sikker DDL hvor feltnavnene er escaped: opret tabel `tname` (`_field1` streng, `1field` streng).

Spørgsmålet opstår: hvordan man korrekt får en dataramme med et komplet skema (i pf-kode)? Hvordan får man denne pf? Dette er det femte problem. Genlæse skemaet for alle partitioner fra mappen med parketfiler af målet showcase? Denne metode er den sikreste, men svær.

Skemaet er allerede i Hive. Du kan få et nyt skema ved at kombinere hele tabellens skema og den nye partition. Så du skal tage tabelskemaet fra Hive og kombinere det med skemaet for den nye partition. Dette kan gøres ved at læse testmetadataene fra Hive, gemme dem i en midlertidig mappe og bruge Spark til at læse begge partitioner på én gang.

Faktisk er der alt hvad du behøver: det originale tabelskema i Hive og den nye partition. Vi har også data. Det er kun tilbage at få et nyt skema, der kombinerer butiksfrontskemaet og nye felter fra den oprettede partition:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Dernæst opretter vi tabelregistreringen DDL, som i det forrige uddrag.
Hvis hele kæden fungerer korrekt, nemlig der var en initialiseringsbelastning, og tabellen blev oprettet korrekt i Hive, så får vi et opdateret tabelskema.

Og det sidste problem er, at du ikke bare kan tilføje en partition til en Hive-tabel, fordi den bliver ødelagt. Du skal tvinge Hive til at rette dens partitionsstruktur:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Den enkle opgave at læse JSON og skabe en butiksfacade baseret på det resulterer i at overvinde en række implicitte vanskeligheder, løsninger som du skal lede efter separat. Og selvom disse løsninger er enkle, tager det meget tid at finde dem.

For at implementere konstruktionen af ​​udstillingsvinduet skulle jeg:

  • Tilføj partitioner til udstillingsvinduet, fjern servicefiler
  • Håndter tomme felter i kildedata, som Spark har indtastet
  • Støb simple typer til en snor
  • Konverter feltnavne til små bogstaver
  • Separat dataupload og tabelregistrering i Hive (DDL-generation)
  • Glem ikke at undslippe feltnavne, der kan være inkompatible med Hive
  • Lær, hvordan du opdaterer bordregistrering i Hive

Sammenfattende bemærker vi, at beslutningen om at bygge butiksvinduer er fyldt med mange faldgruber. Derfor, i tilfælde af vanskeligheder med implementeringen, er det bedre at kontakte en erfaren partner med succesfuld ekspertise.

Tak fordi du læste denne artikel, vi håber du finder oplysningerne nyttige.

Kilde: www.habr.com

Tilføj en kommentar