Spark SchemaEvolution in der Praxis

Liebe Leserinnen und Leser, guten Tag!

In diesem Artikel beschreibt der führende Berater für den Geschäftsbereich Big Data Solutions von Neoflex ausführlich Möglichkeiten zum Aufbau von Storefronts mit variabler Struktur mithilfe von Apache Spark.

Im Rahmen eines Datenanalyseprojekts stellt sich häufig die Aufgabe, Schaufenster auf Basis lose strukturierter Daten zu erstellen.

In der Regel handelt es sich dabei um Protokolle oder Antworten verschiedener Systeme, die in Form von JSON oder XML gespeichert werden. Die Daten werden auf Hadoop hochgeladen, dann muss daraus eine Storefront erstellt werden. Den Zugriff auf die erstellte Storefront können wir beispielsweise über Impala organisieren.

In diesem Fall ist das Layout der Zielfassade im Voraus unbekannt. Darüber hinaus kann das Schema nicht im Voraus erstellt werden, da es von den Daten abhängt und wir es mit diesen sehr schwach strukturierten Daten zu tun haben.

Heute wird beispielsweise die folgende Antwort protokolliert:

{source: "app1", error_code: ""}

und morgen kommt die folgende Antwort aus demselben System:

{source: "app1", error_code: "error", description: "Network error"}

Infolgedessen sollte ein weiteres Feld zur Storefront hinzugefügt werden – Beschreibung, und niemand weiß, ob es kommen wird oder nicht.

Die Aufgabe, einen Mart auf der Grundlage solcher Daten zu erstellen, ist ziemlich normal, und Spark verfügt hierfür über eine Reihe von Tools. Für das Parsen von Quelldaten gibt es Unterstützung sowohl für JSON als auch für XML, und für ein bisher unbekanntes Schema wird schemaEvolution-Unterstützung bereitgestellt.

Auf den ersten Blick sieht die Lösung einfach aus. Sie müssen den Ordner mit JSON nehmen und ihn in den Datenrahmen einlesen. Spark erstellt ein Schema und wandelt die verschachtelten Daten in Strukturen um. Als nächstes muss alles im Parkett gespeichert werden, was auch in Impala unterstützt wird, indem die Storefront im Hive-Metastore registriert wird.

Alles scheint einfach zu sein.

Aus den kurzen Beispielen in der Dokumentation geht jedoch nicht klar hervor, wie mit einer Reihe von Problemen in der Praxis umgegangen werden soll.

Die Dokumentation beschreibt einen Ansatz nicht zum Erstellen einer Storefront, sondern zum Einlesen von JSON oder XML in einen Datenrahmen.

Es zeigt nämlich einfach, wie man JSON liest und analysiert:

df = spark.read.json(path...)

Dies reicht aus, um Spark die Daten zur Verfügung zu stellen.

In der Praxis ist das Szenario viel komplexer, als nur JSON-Dateien aus einem Ordner zu lesen und einen Datenrahmen zu erstellen. Die Situation sieht so aus: Es gibt bereits ein bestimmtes Schaufenster, jeden Tag kommen neue Daten hinzu, sie müssen zum Schaufenster hinzugefügt werden, nicht zu vergessen, dass das Schema abweichen kann.

Das übliche Schema für den Bau einer Ladenfront ist wie folgt:

Schritt 1. Die Daten werden in Hadoop geladen, anschließend täglich zusätzlich geladen und einer neuen Partition hinzugefügt. Das Ergebnis ist ein nach Tagen partitionierter Ordner mit Quelldaten.

Schritt 2. Beim ersten Laden wird dieser Ordner mit Spark gelesen und analysiert. Der resultierende Datenrahmen wird in einem Format gespeichert, das analysiert werden kann, beispielsweise in Parkett, das dann in Impala importiert werden kann. Dadurch entsteht eine Ziel-Storefront mit allen bisher angefallenen Daten.

Schritt 3. Es wird ein Download erstellt, der die Storefront täglich aktualisiert.
Es stellt sich die Frage der inkrementellen Belastung, der Notwendigkeit, die Ladenfront zu unterteilen und die Frage der Unterstützung des allgemeinen Layouts der Ladenfront.

Geben wir ein Beispiel. Nehmen wir an, der erste Schritt zum Aufbau eines Repositorys wurde implementiert und das Hochladen von JSON-Dateien in einen Ordner ist konfiguriert.

Daraus einen Datenrahmen zu erstellen und ihn dann als Showcase zu speichern, ist kein Problem. Dies ist der allererste Schritt, der leicht in der Spark-Dokumentation zu finden ist:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Alles scheint in Ordnung zu sein.

Wir haben den JSON gelesen und analysiert, dann speichern wir den Datenrahmen als Parquet und registrieren ihn auf beliebige bequeme Weise in Hive:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Wir bekommen eine Vitrine.

Aber am nächsten Tag wurden neue Daten von der Quelle hinzugefügt. Wir haben einen Ordner mit JSON und eine Storefront, die auf diesem Ordner basiert. Nach dem Laden der nächsten Datenmenge aus der Quelle verfügt die Storefront nicht über genügend Daten für einen Tag.

Eine logische Lösung wäre, die Storefront nach Tagen zu unterteilen, sodass Sie jeden nächsten Tag eine neue Partition hinzufügen können. Der Mechanismus dafür ist ebenfalls bekannt; Spark ermöglicht es, Partitionen separat aufzuzeichnen.

Zuerst führen wir das anfängliche Laden durch, speichern die Daten wie oben beschrieben und fügen nur die Partitionierung hinzu. Diese Aktion wird Storefront-Initialisierung genannt und wird nur einmal durchgeführt:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Am nächsten Tag laden wir nur die neue Partition herunter:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Es bleibt nur noch eine Neuregistrierung in Hive, um das Schema zu aktualisieren.
Hier treten jedoch Probleme auf.

Erstes Problem. Früher oder später wird das entstandene Parkett nicht mehr lesbar sein. Dies liegt daran, dass Parkett und JSON leere Felder unterschiedlich behandeln.

Betrachten wir eine typische Situation. Gestern kam beispielsweise JSON an:

День 1: {"a": {"b": 1}},

und heute sieht derselbe JSON so aus:

День 2: {"a": null}

Nehmen wir an, wir haben zwei verschiedene Partitionen mit jeweils einer Zeile.
Wenn wir die gesamten Quelldaten lesen, kann Spark den Typ bestimmen und verstehen, dass „a“ ein Feld vom Typ „Struktur“ mit einem verschachtelten Feld „b“ vom Typ INT ist. Wenn jedoch jede Partition separat gespeichert wurde, ist das Ergebnis ein Parkett mit inkompatiblen Partitionsschemata:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Diese Situation ist bekannt, daher wurde speziell eine Option hinzugefügt, um leere Felder beim Parsen von Quelldaten zu entfernen:

df = spark.read.json("...", dropFieldIfAllNull=True)

In diesem Fall besteht das Parkett aus zusammenlegbaren Trennwänden.
Obwohl diejenigen, die dies in der Praxis getan haben, bitter lächeln werden. Warum? Ja, denn höchstwahrscheinlich werden noch zwei weitere Situationen auftreten. Oder drei. Oder vier. Der erste, der fast sicher ist, ist, dass numerische Typen in verschiedenen JSON-Dateien unterschiedlich aussehen. Beispiel: {intField: 1} und {intField: 1.1}. Wenn solche Felder in einem Stapel erscheinen, liest die Schemazusammenführung alles korrekt, was zum genauesten Typ führt. Wenn es sich jedoch um verschiedene handelt, hat das eine intField: int und das andere intField: double.

Um mit dieser Situation umzugehen, gibt es das folgende Flag:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Jetzt haben wir einen Ordner, in dem sich die Partitionen befinden, der in einen einzelnen Datenrahmen und einen gültigen Parkett der gesamten Storefront eingelesen werden kann. Ja? Nein.

Wir müssen bedenken, dass wir die Tabelle in Hive registriert haben. Hive unterscheidet bei Feldnamen nicht zwischen Groß- und Kleinschreibung, Parkett hingegen schon. Daher sind Partitionen mit den Schemata „field1: int“ und „Field1: int“ für Hive identisch, nicht jedoch für Spark. Vergessen Sie nicht, Feldnamen in Kleinbuchstaben umzuwandeln.

Danach scheint alles in Ordnung zu sein.

Allerdings ist nicht alles so einfach. Es entsteht ein zweites, ebenfalls bekanntes Problem. Da jede neue Partition separat gespeichert wird, enthält der Partitionsordner Spark-Dienstdateien, beispielsweise das _SUCCESS-Vorgangserfolgsflag. Dies führt zu einem Fehler beim Versuch, Parkett zu verlegen. Um dies zu vermeiden, müssen Sie die Konfiguration konfigurieren, indem Sie verhindern, dass Spark Dienstdateien zum Ordner hinzufügt:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Es scheint, dass jetzt jeden Tag eine neue Parquet-Partition zum Ziel-Storefront-Ordner hinzugefügt wird, in dem sich die analysierten Daten für den Tag befinden. Wir haben im Vorfeld darauf geachtet, dass es keine Partitionen mit Datentypkonflikten gibt.

Aber wir stehen vor einem dritten Problem. Nun ist das allgemeine Schema nicht bekannt, außerdem hat die Tabelle in Hive das falsche Schema, da jede neue Partition höchstwahrscheinlich eine Verzerrung des Schemas mit sich brachte.

Die Tabelle muss neu registriert werden. Das geht ganz einfach: Lesen Sie das Parkett der Storefront erneut, nehmen Sie das Schema und erstellen Sie darauf basierend eine DDL, mit der Sie den Ordner in Hive erneut als externe Tabelle registrieren und so das Schema der Ziel-Storefront aktualisieren können.

Wir stehen vor einem vierten Problem. Als wir die Tabelle zum ersten Mal registrierten, setzten wir auf Spark. Jetzt machen wir es selbst und müssen bedenken, dass Parkettfelder möglicherweise mit Zeichen beginnen, die von Hive nicht zugelassen sind. Spark wirft beispielsweise Zeilen aus, die es nicht im Feld „corrupt_record“ analysieren konnte. Ein solches Feld kann nicht ohne Escapezeichen in Hive registriert werden.

Wenn wir das wissen, erhalten wir das Diagramm:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Code ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(::", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") führt sicheres DDL aus, das heißt, statt:

create table tname (_field1 string, 1field string)

Mit Feldnamen wie „_field1, 1field“ wird eine sichere DDL erstellt, in der die Feldnamen maskiert werden: Tabelle „tname“ erstellen (Zeichenfolge „_field1“, Zeichenfolge „1field“).

Es stellt sich die Frage: Wie erhält man korrekt einen Datenrahmen mit einem vollständigen Schema (im PF-Code)? Wie bekomme ich diesen Pf? Dies ist das fünfte Problem. Das Diagramm aller Partitionen aus dem Ordner mit Parkettdateien der Ziel-Storefront erneut lesen? Diese Methode ist die sicherste, aber schwierigste.

Das Schema ist bereits in Hive. Sie können ein neues Schema erhalten, indem Sie das Schema der gesamten Tabelle und der neuen Partition kombinieren. Das bedeutet, dass Sie das Tabellenschema von Hive übernehmen und es mit dem Schema der neuen Partition kombinieren müssen. Dies kann erreicht werden, indem Testmetadaten aus Hive gelesen, in einem temporären Ordner gespeichert und beide Partitionen gleichzeitig mit Spark gelesen werden.

Im Wesentlichen ist alles vorhanden, was Sie brauchen: das ursprüngliche Tabellenschema in Hive und eine neue Partition. Wir haben auch Daten. Es bleibt nur noch, ein neues Schema zu erhalten, das das Storefront-Schema und neue Felder aus der erstellten Partition kombiniert:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Als nächstes erstellen wir die Tabellenregistrierungs-DDL, wie im vorherigen Fragment.
Wenn die gesamte Kette korrekt funktioniert, also ein Erstladevorgang stattgefunden hat und die Tabelle korrekt in Hive erstellt wurde, erhalten wir ein aktualisiertes Tabellenschema.

Das letzte Problem besteht darin, dass Sie einer Hive-Tabelle nicht einfach eine Partition hinzufügen können, da diese sonst kaputt geht. Sie müssen Hive zwingen, seine Partitionsstruktur zu korrigieren:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Die einfache Aufgabe, JSON zu lesen und darauf basierend eine Storefront zu erstellen, führt zur Überwindung einer Reihe impliziter Schwierigkeiten, deren Lösungen separat gefunden werden müssen. Und obwohl diese Lösungen einfach sind, braucht es viel Zeit, sie zu finden.

Um den Bau einer Vitrine umzusetzen, mussten wir:

  • Fügen Sie der Storefront Partitionen hinzu und entfernen Sie so Servicedateien
  • Umgang mit leeren Feldern in Quelldaten, die Spark eingegeben hat
  • Wandeln Sie einfache Typen in einen String um
  • Feldnamen in Kleinbuchstaben umwandeln
  • Separater Daten-Upload und Tabellenregistrierung in Hive (DDL-Erstellung)
  • Vergessen Sie nicht, Feldnamen zu maskieren, die möglicherweise nicht mit Hive kompatibel sind
  • Erfahren Sie, wie Sie die Tischregistrierung in Hive aktualisieren

Zusammenfassend stellen wir fest, dass die Entscheidung, Ladenfronten zu bauen, mit vielen Fallstricken verbunden ist. Treten daher Schwierigkeiten bei der Umsetzung auf, ist es besser, sich an einen erfahrenen Partner mit erfolgreicher Expertise zu wenden.

Vielen Dank, dass Sie diesen Artikel gelesen haben. Wir hoffen, dass Sie die Informationen nützlich finden.

Source: habr.com

Kommentar hinzufügen