Spark schemaEvolution in de praktijk

Beste lezers, goede dag!

In dit artikel beschrijft de toonaangevende consultant van de business area Big Data Solutions van Neoflex in detail de opties voor het bouwen van showcases met variabele structuren met behulp van Apache Spark.

Als onderdeel van een data-analyseproject ontstaat vaak de taak om storefronts te bouwen op basis van losjes gestructureerde data.

Meestal zijn dit logs, of reacties van verschillende systemen, opgeslagen als JSON of XML. De gegevens worden geüpload naar Hadoop, waarna u er een etalage van moet bouwen. De toegang tot de gecreëerde showcase kunnen we bijvoorbeeld via Impala organiseren.

In dit geval is het schema van de doeletalage niet op voorhand bekend. Bovendien kan het schema ook niet van tevoren worden opgesteld, omdat het afhankelijk is van de gegevens, en we hebben te maken met deze zeer los gestructureerde gegevens.

Vandaag wordt bijvoorbeeld het volgende antwoord gelogd:

{source: "app1", error_code: ""}

en morgen komt uit hetzelfde systeem het volgende antwoord:

{source: "app1", error_code: "error", description: "Network error"}

Als gevolg hiervan moet er nog een veld aan de showcase worden toegevoegd - beschrijving, en niemand weet of het zal komen of niet.

De taak om een ​​etalage op dergelijke gegevens te maken is vrij standaard, en Spark heeft hiervoor een aantal tools. Voor het ontleden van de brongegevens is er ondersteuning voor zowel JSON als XML, en voor een voorheen onbekend schema is er ondersteuning voor schemaEvolution.

Op het eerste gezicht lijkt de oplossing simpel. U moet een map met JSON nemen en deze in een dataframe lezen. Spark maakt een schema en zet geneste gegevens om in structuren. Verder moet alles opgeslagen worden in parket, wat ook ondersteund wordt in Impala, door de storefront te registreren in de Hive metastore.

Alles lijkt eenvoudig te zijn.

Uit de korte voorbeelden in de documentatie blijkt echter niet duidelijk wat te doen met een aantal problemen in de praktijk.

De documentatie beschrijft een aanpak om geen storefront te maken, maar om JSON of XML in een dataframe in te lezen.

Het laat namelijk eenvoudig zien hoe JSON moet worden gelezen en geparseerd:

df = spark.read.json(path...)

Dit is voldoende om de gegevens beschikbaar te maken voor Spark.

In de praktijk is het script veel ingewikkelder dan alleen JSON-bestanden uit een map lezen en een dataframe maken. De situatie ziet er als volgt uit: er is al een bepaalde etalage, er komen elke dag nieuwe gegevens binnen, deze moeten aan de etalage worden toegevoegd, en niet te vergeten dat het schema kan verschillen.

Het gebruikelijke schema voor het bouwen van een vitrine is als volgt:

Stap 1. De gegevens worden in Hadoop geladen, dagelijks opnieuw geladen en toegevoegd aan een nieuwe partitie. Het blijkt een map te zijn met initiële gegevens die per dag zijn gepartitioneerd.

Stap 2. Tijdens de eerste keer laden wordt deze map gelezen en geparseerd door Spark. Het resulterende dataframe wordt opgeslagen in een parseerbaar formaat, bijvoorbeeld in parket, dat vervolgens kan worden geïmporteerd in Impala. Dit creëert een target-showcase met alle gegevens die tot nu toe zijn verzameld.

Stap 3. Er wordt een download gemaakt die de etalage elke dag zal updaten.
Er is een kwestie van incrementele belasting, de noodzaak om de vitrine op te delen en de kwestie van het behoud van het algemene schema van de vitrine.

Laten we een voorbeeld nemen. Laten we zeggen dat de eerste stap van het bouwen van een repository is geïmplementeerd en dat JSON-bestanden naar een map worden geüpload.

Daarvan een dataframe maken en vervolgens opslaan als showcase is geen probleem. Dit is de allereerste stap die gemakkelijk te vinden is in de Spark-documentatie:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Alles lijkt in orde te zijn.

We hebben JSON gelezen en geparseerd, vervolgens slaan we het dataframe op als een parket en registreren we het op een handige manier in Hive:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

We krijgen een raam.

Maar de volgende dag werden er nieuwe gegevens uit de bron toegevoegd. We hebben een map met JSON en een showcase gemaakt op basis van deze map. Na het laden van de volgende batch gegevens van de bron, mist de datamart een dag aan gegevens.

De logische oplossing zou zijn om de etalage per dag in te delen, waardoor elke volgende dag een nieuwe partitie kan worden toegevoegd. Het mechanisme hiervoor is ook bekend, met Spark kun je partities apart schrijven.

Eerst doen we een initiële belasting, waarbij we de gegevens opslaan zoals hierboven beschreven, en alleen partitionering toevoegen. Deze actie wordt storefront-initialisatie genoemd en wordt slechts één keer uitgevoerd:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

De volgende dag laden we alleen een nieuwe partitie:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Het enige dat overblijft is om opnieuw te registreren in Hive om het schema bij te werken.
Hier ontstaan ​​echter problemen.

Eerste probleem. Vroeg of laat zal het resulterende parket onleesbaar zijn. Dit komt doordat parket en JSON lege velden anders behandelen.

Laten we een typische situatie bekijken. Gisteren arriveert bijvoorbeeld JSON:

День 1: {"a": {"b": 1}},

en vandaag ziet dezelfde JSON er zo uit:

День 2: {"a": null}

Laten we zeggen dat we twee verschillende partities hebben, elk met één regel.
Wanneer we de volledige brongegevens lezen, kan Spark het type bepalen en begrijpen dat "a" een veld is van het type "structuur", met een genest veld "b" van het type INT. Maar als elke partitie afzonderlijk is opgeslagen, krijgen we een parket met incompatibele partitieschema's:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Deze situatie is bekend, dus er is speciaal een optie toegevoegd - bij het ontleden van de brongegevens, verwijder lege velden:

df = spark.read.json("...", dropFieldIfAllNull=True)

In dit geval zal het parket bestaan ​​uit scheidingswanden die samen gelezen kunnen worden.
Al zullen degenen die dit in de praktijk hebben gedaan hier bitter glimlachen. Waarom? Ja, want er zijn waarschijnlijk nog twee situaties. Of drie. Of vier. De eerste, die vrijwel zeker zal voorkomen, is dat numerieke typen er anders uitzien in verschillende JSON-bestanden. Bijvoorbeeld {intField: 1} en {intField: 1.1}. Als dergelijke velden in één partitie worden gevonden, zal de schemasamenvoeging alles correct lezen, wat leidt tot het meest nauwkeurige type. Maar als er verschillende zijn, dan heeft de ene intField: int, en de andere heeft intField: double.

Er is de volgende vlag om deze situatie aan te pakken:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Nu hebben we een map met partities die kunnen worden gelezen in een enkel dataframe en een geldig parket van de hele showcase. Ja? Nee.

We moeten niet vergeten dat we de tabel in Hive hebben geregistreerd. Hive is niet hoofdlettergevoelig in veldnamen, terwijl parket wel hoofdlettergevoelig is. Daarom zijn partities met schema's: field1: int en Field1: int hetzelfde voor Hive, maar niet voor Spark. Vergeet niet de veldnamen om te zetten in kleine letters.

Daarna lijkt alles in orde te zijn.

Niet allemaal zo eenvoudig. Er is nog een tweede, eveneens bekend probleem. Aangezien elke nieuwe partitie afzonderlijk wordt opgeslagen, bevat de partitiemap Spark-servicebestanden, bijvoorbeeld de succesvlag _SUCCESS. Dit resulteert in een fout bij het parket. Om dit te voorkomen, moet u de configuratie configureren om te voorkomen dat Spark servicebestanden aan de map toevoegt:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Het lijkt erop dat er nu elke dag een nieuwe parketpartitie wordt toegevoegd aan de doel-showcasemap, waar de geparseerde gegevens voor de dag zich bevinden. We hebben er vooraf voor gezorgd dat er geen partities waren met een datatypeconflict.

Maar we hebben nog een derde probleem. Nu is het algemene schema niet bekend, bovendien heeft de tabel in Hive een onjuist schema, aangezien elke nieuwe partitie hoogstwaarschijnlijk een vervorming in het schema heeft geïntroduceerd.

U moet de tabel opnieuw registreren. Dit kan eenvoudig worden gedaan: lees het parket van de storefront opnieuw, neem het schema en maak op basis daarvan een DDL, waarmee de map in Hive opnieuw wordt geregistreerd als een externe tabel, waarbij het schema van de doel-storefront wordt bijgewerkt.

We hebben nog een vierde probleem. Toen we de tafel voor het eerst registreerden, vertrouwden we op Spark. Nu doen we het zelf, en we moeten niet vergeten dat parketvelden kunnen beginnen met tekens die niet zijn toegestaan ​​voor Hive. Spark gooit bijvoorbeeld regels weg die niet konden worden geparseerd in het veld "corrupt_record". Zo'n veld kan niet in Hive worden geregistreerd zonder te worden geëscaped.

Als we dit weten, krijgen we het schema:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

code ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") maakt veilige DDL, d.w.z. in plaats van:

create table tname (_field1 string, 1field string)

Met veldnamen zoals "_field1, 1field", wordt veilige DDL gemaakt waar de veldnamen worden ontsnapt: maak tabel `tname` (`_field1` string, `1field` string).

De vraag rijst: hoe krijg je op de juiste manier een dataframe met een compleet schema (in pf-code)? Hoe krijg je deze pf? Dit is het vijfde probleem. Herlees het schema van alle partities uit de map met parketbestanden van de doelshowcase? Deze methode is de veiligste, maar moeilijk.

Het schema bevindt zich al in Hive. U kunt een nieuw schema krijgen door het schema van de hele tabel en de nieuwe partitie te combineren. U moet dus het tabelschema van Hive nemen en dit combineren met het schema van de nieuwe partitie. Dit kan worden gedaan door de testmetadata van Hive te lezen, deze op te slaan in een tijdelijke map en Spark te gebruiken om beide partities tegelijk te lezen.

In feite is er alles wat je nodig hebt: het originele tabelschema in Hive en de nieuwe partitie. We hebben ook gegevens. Het blijft alleen om een ​​​​nieuw schema te krijgen dat het storefront-schema combineert met nieuwe velden van de gemaakte partitie:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Vervolgens maken we de DDL voor tabelregistratie, zoals in het vorige fragment.
Als de hele keten correct werkt, er was namelijk een initialisatiebelasting en de tabel is correct gemaakt in Hive, dan krijgen we een bijgewerkt tabelschema.

En het laatste probleem is dat je niet zomaar een partitie aan een Hive-tabel kunt toevoegen, omdat deze kapot gaat. U moet Hive dwingen om de partitiestructuur te repareren:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

De simpele taak om JSON te lezen en op basis daarvan een storefront te creëren, resulteert in het overwinnen van een aantal impliciete moeilijkheden, oplossingen waar je apart naar moet zoeken. En hoewel deze oplossingen eenvoudig zijn, kost het veel tijd om ze te vinden.

Om de constructie van de vitrine te realiseren, moest ik:

  • Voeg partities toe aan de showcase en verwijder servicebestanden
  • Omgaan met lege velden in brongegevens die Spark heeft getypt
  • Cast eenvoudige typen naar een string
  • Zet veldnamen om in kleine letters
  • Aparte data upload en tabelregistratie in Hive (DDL generatie)
  • Vergeet niet veldnamen te escapen die mogelijk niet compatibel zijn met Hive
  • Meer informatie over het bijwerken van tabelregistratie in Hive

Samenvattend stellen we vast dat de beslissing om etalages te bouwen met veel valkuilen gepaard gaat. Daarom is het in geval van implementatieproblemen beter om contact op te nemen met een ervaren partner met succesvolle expertise.

Bedankt voor het lezen van dit artikel, we hopen dat u de informatie nuttig vindt.

Bron: www.habr.com

Voeg een reactie