GnistskjemaEvolusjon i praksis

Kjære lesere, god dag!

I denne artikkelen beskriver den ledende konsulenten for Neoflex sitt forretningsområde Big Data Solutions i detalj mulighetene for å bygge utstillingsvinduer med variabel struktur ved hjelp av Apache Spark.

Som en del av et dataanalyseprosjekt oppstår ofte oppgaven med å bygge utstillingsvinduer basert på løst strukturerte data.

Vanligvis er dette logger, eller svar fra ulike systemer, lagret som JSON eller XML. Dataene lastes opp til Hadoop, så må du bygge en butikk fra dem. Vi kan organisere tilgang til det opprettede utstillingsvinduet, for eksempel gjennom Impala.

I dette tilfellet er skjemaet til målbutikkfronten ikke kjent på forhånd. Dessuten kan ordningen heller ikke utarbeides på forhånd, da den avhenger av dataene, og vi har å gjøre med disse svært løst strukturerte dataene.

I dag logges for eksempel følgende svar:

{source: "app1", error_code: ""}

og i morgen fra samme system kommer følgende svar:

{source: "app1", error_code: "error", description: "Network error"}

Som et resultat bør ett felt til legges til utstillingsvinduet - beskrivelse, og ingen vet om det kommer eller ikke.

Oppgaven med å lage en butikkfront på slike data er ganske standard, og Spark har en rekke verktøy for dette. For å analysere kildedataene er det støtte for både JSON og XML, og for et tidligere ukjent skjema gis støtte for schemaEvolution.

Ved første øyekast ser løsningen enkel ut. Du må ta en mappe med JSON og lese den inn i en dataramme. Spark vil lage et skjema, gjøre nestede data om til strukturer. Videre må alt lagres i parkett, som også støttes i Impala, ved å registrere butikkfronten i Hive-metastore.

Alt ser ut til å være enkelt.

Det fremgår imidlertid ikke av de korte eksemplene i dokumentasjonen hva man skal gjøre med en rekke problemer i praksis.

Dokumentasjonen beskriver en tilnærming ikke for å lage en butikkfront, men for å lese JSON eller XML inn i en dataramme.

Det viser nemlig ganske enkelt hvordan du leser og analyserer JSON:

df = spark.read.json(path...)

Dette er nok til å gjøre dataene tilgjengelige for Spark.

I praksis er scenariet mye mer komplekst enn å bare lese JSON-filer fra en mappe og lage en dataramme. Situasjonen ser slik ut: det er allerede et visst utstillingsvindu, nye data kommer hver dag, de må legges til utstillingsvinduet, ikke glem at ordningen kan variere.

Det vanlige opplegget for å bygge et utstillingsvindu er som følger:

Trinn 1. Dataene lastes inn i Hadoop med påfølgende daglig omlasting og legges til en ny partisjon. Det viser seg en mappe med innledende data partisjonert etter dag.

Trinn 2. Under den første innlastingen blir denne mappen lest og analysert av Spark. Den resulterende datarammen lagres i et parserbart format, for eksempel i parkett, som deretter kan importeres til Impala. Dette skaper et målutstillingsvindu med alle dataene som har samlet seg frem til dette punktet.

Trinn 3. Det opprettes en nedlasting som vil oppdatere butikkfronten hver dag.
Spørsmålet om inkrementell lasting oppstår, behovet for å dele opp butikkfronten og spørsmålet om å støtte den generelle utformingen av butikkfronten.

La oss ta et eksempel. La oss si at det første trinnet med å bygge et depot er implementert, og JSON-filer lastes opp til en mappe.

Å lage en dataramme fra dem, og deretter lagre den som et utstillingsvindu, er ikke et problem. Dette er det aller første trinnet som enkelt kan finnes i Spark-dokumentasjonen:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Alt ser ut til å være bra.

Vi leste og analyserte JSON, så lagrer vi datarammen som en parkett, og registrerer den i Hive på en praktisk måte:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Vi får et vindu.

Men dagen etter ble nye data fra kilden lagt til. Vi har en mappe med JSON, og et utstillingsvindu opprettet fra denne mappen. Etter å ha lastet inn neste batch med data fra kilden, mangler datamarkedet én dags data.

Den logiske løsningen ville være å partisjonere butikkfronten etter dag, noe som vil tillate å legge til en ny partisjon hver neste dag. Mekanismen for dette er også velkjent, Spark lar deg skrive partisjoner separat.

Først gjør vi en innledende lasting, lagrer dataene som beskrevet ovenfor, og legger bare til partisjonering. Denne handlingen kalles initialisering av butikkfront og gjøres bare én gang:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Neste dag laster vi bare en ny partisjon:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Alt som gjenstår er å registrere deg på nytt i Hive for å oppdatere skjemaet.
Det er imidlertid her problemer oppstår.

Første problem. Før eller siden vil den resulterende parketten være uleselig. Dette skyldes hvordan parkett og JSON behandler tomme felt forskjellig.

La oss vurdere en typisk situasjon. For eksempel, i går kom JSON:

День 1: {"a": {"b": 1}},

og i dag ser den samme JSON slik ut:

День 2: {"a": null}

La oss si at vi har to forskjellige partisjoner, hver med en linje.
Når vi leser hele kildedataene, vil Spark kunne bestemme typen, og vil forstå at "a" er et felt av typen "struktur", med et nestet felt "b" av typen INT. Men hvis hver partisjon ble lagret separat, er resultatet en parkett med inkompatible partisjonsordninger:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Denne situasjonen er velkjent, så et alternativ er spesielt lagt til for å fjerne tomme felt når du analyserer kildedata:

df = spark.read.json("...", dropFieldIfAllNull=True)

I dette tilfellet vil parketten bestå av skillevegger som kan leses sammen.
Selv om de som har gjort dette i praksis vil smile bittert her. Hvorfor? Ja, for det er sannsynligvis to situasjoner til. Eller tre. Eller fire. Den første, som nesten helt sikkert vil forekomme, er at numeriske typer vil se annerledes ut i forskjellige JSON-filer. For eksempel {intField: 1} og {intField: 1.1}. Hvis slike felt finnes i en partisjon, vil skjemasammenslåingen lese alt riktig, noe som fører til den mest nøyaktige typen. Men hvis i forskjellige, vil den ene ha intField: int, og den andre vil ha intField: double.

Det er følgende flagg for å håndtere denne situasjonen:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Nå har vi en mappe hvor det er skillevegger som kan leses inn i en enkelt dataramme og en gyldig parkett av hele utstillingsvinduet. Ja? Nei.

Vi må huske at vi registrerte tabellen i Hive. Hive skiller ikke mellom store og små bokstaver i feltnavn, mens parkett skiller mellom store og små bokstaver. Derfor er partisjoner med skjemaer: field1: int og Field1: int de samme for Hive, men ikke for Spark. Ikke glem å konvertere feltnavnene til små bokstaver.

Etter det ser alt ut til å være bra.

Imidlertid er ikke alt så enkelt. Det er et annet, også velkjent problem. Siden hver nye partisjon lagres separat, vil partisjonsmappen inneholde Spark-tjenestefiler, for eksempel suksessflagget _SUCCESS. Dette vil resultere i en feil ved forsøk på parkett. For å unngå dette må du konfigurere konfigurasjonen for å forhindre at Spark legger til tjenestefiler i mappen:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Det ser ut til at nå hver dag legges en ny parkettpartisjon til målbutikkfrontmappen, der de analyserte dataene for dagen er plassert. Vi sørget på forhånd for at det ikke var partisjoner med en datatypekonflikt.

Men vi har et tredje problem. Nå er det generelle skjemaet ikke kjent, dessuten har tabellen i Hive et feil skjema, siden hver ny partisjon mest sannsynlig introduserte en forvrengning i skjemaet.

Du må registrere bordet på nytt. Dette kan gjøres enkelt: les parketten på butikkfronten igjen, ta skjemaet og lag en DDL basert på det, som du kan registrere mappen i Hive på nytt som en ekstern tabell, og oppdatere skjemaet til målbutikkfronten.

Vi står overfor et fjerde problem. Da vi registrerte bordet for første gang, stolte vi på Spark. Nå gjør vi det selv, og vi må huske at parkettfelt kan begynne med tegn som ikke er tillatt av Hive. For eksempel kaster Spark ut linjer som den ikke kunne analysere i «corrupt_record»-feltet. Et slikt felt kan ikke registreres i Hive uten å unnslippe.

Når vi vet dette, får vi ordningen:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Kode ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") gjør sikker DDL, dvs. i stedet for:

create table tname (_field1 string, 1field string)

Med feltnavn som "_field1, 1field", lages sikker DDL der feltnavnene er escaped: lag tabell `tname` (`_field1` streng, `1field` streng).

Spørsmålet oppstår: hvordan får man riktig en dataramme med et komplett skjema (i pf-kode)? Hvordan får man tak i denne pf? Dette er det femte problemet. Lese skjemaet for alle partisjoner fra mappen med parkettfiler til målutstillingsvinduet? Denne metoden er den sikreste, men vanskelig.

Skjemaet er allerede i Hive. Du kan få et nytt skjema ved å kombinere skjemaet for hele tabellen og den nye partisjonen. Så du må ta tabellskjemaet fra Hive og kombinere det med skjemaet til den nye partisjonen. Dette kan gjøres ved å lese testmetadataene fra Hive, lagre dem i en midlertidig mappe og bruke Spark til å lese begge partisjonene samtidig.

I hovedsak er det alt du trenger: det originale tabellskjemaet i Hive og en ny partisjon. Vi har også data. Alt som gjenstår er å få et nytt skjema som kombinerer butikkfrontskjemaet og nye felt fra den opprettede partisjonen:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Deretter oppretter vi tabellregistreringen DDL, som i forrige kodebit.
Hvis hele kjeden fungerer riktig, nemlig det var en initialiseringsbelastning, og tabellen ble opprettet riktig i Hive, får vi et oppdatert tabellskjema.

Det siste problemet er at du ikke enkelt kan legge til en partisjon til en Hive-tabell, da den vil gå i stykker. Du må tvinge Hive til å fikse partisjonsstrukturen:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Den enkle oppgaven med å lese JSON og lage en butikkfront basert på den resulterer i å overvinne en rekke implisitte vanskeligheter, løsningene for disse må finnes separat. Og selv om disse løsningene er enkle, tar det mye tid å finne dem.

For å implementere konstruksjonen av utstillingsvinduet, måtte jeg:

  • Legg til partisjoner i butikkfronten, bli kvitt tjenestefiler
  • Håndter tomme felt i kildedata som Spark har skrevet inn
  • Støp enkle typer til en streng
  • Konverter feltnavn til små bokstaver
  • Separat dataopplasting og tabellregistrering i Hive (DDL-generasjon)
  • Ikke glem å unnslippe feltnavn som kanskje ikke er kompatible med Hive
  • Lær hvordan du oppdaterer bordregistrering i Hive

Oppsummert kan vi konstatere at beslutningen om å bygge butikkvinduer er full av mange fallgruver. Derfor, i tilfelle problemer med implementeringen, er det bedre å kontakte en erfaren partner med vellykket ekspertise.

Takk for at du leste denne artikkelen, vi håper du finner informasjonen nyttig.

Kilde: www.habr.com

Legg til en kommentar