Tauchen Sie ein in Delta Lake: Schemadurchsetzung und -entwicklung

Hey Habr! Ich präsentiere Ihnen die Übersetzung des Artikels „Eintauchen in den Delta Lake: Schemadurchsetzung und -entwicklung“ Autoren Burak Yavuz, Brenner Heintz und Denny Lee, das im Vorfeld des Kursbeginns erstellt wurde Dateningenieur von OTUS.

Tauchen Sie ein in Delta Lake: Schemadurchsetzung und -entwicklung

Daten, wie auch unsere Erfahrungen, sammeln sich ständig an und entwickeln sich weiter. Um Schritt zu halten, müssen sich unsere mentalen Modelle der Welt an neue Daten anpassen, von denen einige neue Dimensionen enthalten – neue Möglichkeiten, Dinge zu beobachten, von denen wir vorher keine Ahnung hatten. Diese mentalen Modelle unterscheiden sich nicht wesentlich von den Tabellenschemata, die definieren, wie wir neue Informationen klassifizieren und verarbeiten.

Dies bringt uns zum Thema Schema-Management. Da sich Geschäftsziele und -anforderungen im Laufe der Zeit ändern, ändert sich auch die Struktur Ihrer Daten. Delta Lake erleichtert die Implementierung neuer Messungen bei Datenänderungen. Benutzer haben Zugriff auf einfache Semantik, um ihre Tabellenschemata zu verwalten. Zu diesen Tools gehören Schema Enforcement, das Benutzer davor schützt, ihre Tabellen versehentlich mit Fehlern oder unnötigen Daten zu verunreinigen, und Schema Evolution, das das automatische Hinzufügen neuer Spalten mit wertvollen Daten an den entsprechenden Stellen ermöglicht. In diesem Artikel werden wir uns mit der Verwendung dieser Tools befassen.

Tabellenschemata verstehen

Jeder DataFrame in Apache Spark enthält ein Schema, das die Form der Daten wie Datentypen, Spalten und Metadaten definiert. Bei Delta Lake wird das Tabellenschema im JSON-Format im Transaktionsprotokoll gespeichert.

Was ist Schema-Durchsetzung?

Schema Enforcement, auch bekannt als Schema Validation, ist ein Schutzmechanismus in Delta Lake, der die Datenqualität garantiert, indem Datensätze abgelehnt werden, die nicht mit dem Tabellenschema übereinstimmen. Wie eine Hostess an der Rezeption eines beliebten Restaurants, das nur Reservierungen entgegennimmt, prüft er, ob jede in die Tabelle eingegebene Datenspalte in der entsprechenden Liste der erwarteten Spalten enthalten ist (mit anderen Worten, ob eine „Buchung“ für vorliegt). (jeder von ihnen) und lehnt alle Einträge mit Spalten ab, die nicht in der Liste enthalten sind.

Wie funktioniert die Schema-Durchsetzung?

Delta Lake verwendet beim Schreiben eine Schemavalidierung, was bedeutet, dass alle neuen Schreibvorgänge in die Tabelle zum Zeitpunkt des Schreibvorgangs auf Kompatibilität mit dem Schema der Zieltabelle überprüft werden. Wenn das Schema inkonsistent ist, macht Delta Lake die Transaktion vollständig rückgängig (es werden keine Daten geschrieben) und löst eine Ausnahme aus, um den Benutzer über die Inkonsistenz zu informieren.
Delta Lake verwendet die folgenden Regeln, um zu bestimmen, ob ein Datensatz mit einer Tabelle kompatibel ist. Geschriebener DataFrame:

  • darf keine zusätzlichen Spalten enthalten, die nicht im Schema der Zieltabelle enthalten sind. Umgekehrt ist alles in Ordnung, wenn die eingehenden Daten nicht absolut alle Spalten der Tabelle enthalten – diesen Spalten werden einfach Nullwerte zugewiesen.
  • darf keine Spaltendatentypen haben, die sich von den Spaltendatentypen in der Zieltabelle unterscheiden. Wenn eine Spalte in der Zieltabelle StringType-Daten enthält, die entsprechende Spalte im DataFrame jedoch IntegerType-Daten, löst die Schemadurchsetzung eine Ausnahme aus und verhindert, dass der Schreibvorgang stattfindet.
  • darf keine Spaltennamen enthalten, die sich nur in der Groß-/Kleinschreibung unterscheiden. Das bedeutet, dass in derselben Tabelle keine Spalten mit den Namen „Foo“ und „foo“ definiert sein können. Während Spark im Modus mit Groß-/Kleinschreibung oder ohne Berücksichtigung der Groß-/Kleinschreibung (Standard) verwendet werden kann, behält Delta Lake die Groß-/Kleinschreibung bei, berücksichtigt aber im Schemaspeicher nicht. Beim Speichern und Zurückgeben von Spalteninformationen berücksichtigt Parquet die Groß-/Kleinschreibung. Um mögliche Fehler, Datenbeschädigungen oder Datenverluste (die wir persönlich in Databricks erlebt haben) zu vermeiden, haben wir uns entschieden, diese Einschränkung hinzuzufügen.

Um dies zu veranschaulichen, schauen wir uns an, was im folgenden Code passiert, wenn versucht wird, einige neu generierte Spalten zu einer Delta-Lake-Tabelle hinzuzufügen, die noch nicht für die Annahme dieser Spalten konfiguriert ist.

# Сгенерируем DataFrame ссуд, который мы добавим в нашу таблицу Delta Lake
loans = sql("""
            SELECT addr_state, CAST(rand(10)*count as bigint) AS count,
            CAST(rand(10) * 10000 * count AS double) AS amount
            FROM loan_by_state_delta
            """)

# Вывести исходную схему DataFrame
original_loans.printSchema()

root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
 
# Вывести новую схему DataFrame
loans.printSchema()
 
root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
  |-- amount: double (nullable = true) # new column
 
# Попытка добавить новый DataFrame (с новым столбцом) в существующую таблицу
loans.write.format("delta") 
           .mode("append") 
           .save(DELTALAKE_PATH)

Returns:

A schema mismatch detected when writing to the Delta table.
 
To enable schema migration, please set:
'.option("mergeSchema", "true")'
 
Table schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
 
Data schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
-- amount: double (nullable = true)
 
If Table ACLs are enabled, these options will be ignored. Please use the ALTER TABLE command for changing the schema.

Anstatt automatisch neue Spalten hinzuzufügen, erzwingt Delta Lake ein Schema und stoppt die Aufzeichnung. Um festzustellen, welche Spalte (oder welcher Satz davon) die Nichtübereinstimmung verursacht, entfernt Spark beide Schemas zum Vergleich aus dem Trace-Stack.

Was ist der Vorteil der Schema-Durchsetzung?

Da es sich bei der Schemadurchsetzung um eine ziemlich strenge Prüfung handelt, eignet sie sich hervorragend als Gatekeeper eines sauberen, vollständig transformierten Datensatzes, der zur Produktion oder Nutzung bereit ist. Wird normalerweise auf Tabellen angewendet, die Daten direkt einspeisen:

  • Algorithmen für maschinelles Lernen
  • BI-Dashboards
  • Datenanalyse- und Visualisierungstools
  • Jedes Produktionssystem, das hochstrukturierte, stark typisierte semantische Schemata erfordert.

Um ihre Daten auf diese letzte Hürde vorzubereiten, verwenden viele Benutzer eine einfache „Multi-Hop“-Architektur, die schrittweise Struktur in ihre Tabellen einführt. Um mehr darüber zu erfahren, können Sie den Artikel lesen Maschinelles Lernen in Produktionsqualität mit Delta Lake.

Natürlich kann die Schema-Erzwingung überall in Ihrer Pipeline eingesetzt werden, aber bedenken Sie, dass das Streamen von Schreibvorgängen in eine Tabelle in diesem Fall frustrierend sein kann, weil Sie beispielsweise vergessen haben, dass Sie den eingehenden Daten eine weitere Spalte hinzugefügt haben.

Verhinderung der Datenausdünnung

An diesem Punkt fragen Sie sich vielleicht, warum dieser Hype? Schließlich kann ein unerwarteter „Schema-Nichtübereinstimmungs“-Fehler manchmal Ihren Arbeitsablauf stören, insbesondere wenn Sie neu bei Delta Lake sind. Warum lässt man das Schema nicht einfach nach Bedarf ändern, damit ich meinen DataFrame schreiben kann, egal was passiert?

Wie das alte Sprichwort sagt: „Eine Unze Vorbeugung ist ein Pfund Heilung wert.“ Irgendwann, wenn Sie nicht darauf achten, Ihr Schema durchzusetzen, werden Datentypkompatibilitätsprobleme ihre hässlichen Köpfe zum Vorschein bringen – scheinbar homogene Rohdatenquellen können Randfälle, fehlerhafte Spalten, fehlerhafte Zuordnungen oder andere gefürchtete Dinge enthalten, von denen Sie träumen .in Albträumen. Der beste Ansatz besteht darin, diese Feinde an der Pforte zu stoppen – mit der Durchsetzung des Schemas – und sie im Licht zu bekämpfen, und nicht erst später, wenn sie anfangen, in den dunklen Tiefen Ihres Produktionscodes herumzustreifen.

Die Durchsetzung des Schemas gibt Ihnen die Gewissheit, dass sich das Schema Ihrer Tabelle nicht ändert, es sei denn, Sie bestätigen die Änderung selbst. Dies verhindert eine Datenverwässerung, die auftreten kann, wenn neue Spalten so häufig hinzugefügt werden, dass zuvor wertvolle, komprimierte Tabellen durch Datenflut ihren Wert und ihre Nützlichkeit verlieren. Indem die Schema-Durchsetzung Sie dazu ermutigt, gezielt vorzugehen, hohe Standards zu setzen und hohe Qualität zu erwarten, tut sie genau das, wofür sie entwickelt wurde: Sie hilft Ihnen, gewissenhaft zu bleiben und Ihre Tabellenkalkulationen sauber zu halten.

Wenn Sie nach weiteren Überlegungen zu dem Schluss kommen, dass Sie wirklich notwendig Fügen Sie eine neue Spalte hinzu – kein Problem, unten finden Sie eine einzeilige Lösung. Die Lösung heißt Schaltungsentwicklung!

Was ist Schemaentwicklung?

Die Schemaentwicklung ist eine Funktion, die es Benutzern ermöglicht, das aktuelle Schema einer Tabelle einfach zu ändern, um sie an Daten anzupassen, die sich im Laufe der Zeit ändern. Es wird am häufigsten verwendet, wenn ein Vorgang zum Hinzufügen oder Überschreiben durchgeführt wird, um das Schema automatisch so anzupassen, dass es eine oder mehrere neue Spalten enthält.

Wie funktioniert die Schemaentwicklung?

Dem Beispiel im vorherigen Abschnitt folgend, können Entwickler mithilfe der Schemaentwicklung problemlos neue Spalten hinzufügen, die zuvor aufgrund von Schemainkonsistenzen abgelehnt wurden. Die Schaltungsentwicklung wird durch Hinzufügen aktiviert .option('mergeSchema', 'true') an Ihr Spark-Team .write или .writeStream.

# Добавьте параметр mergeSchema
loans.write.format("delta") 
           .option("mergeSchema", "true") 
           .mode("append") 
           .save(DELTALAKE_SILVER_PATH)

Um das Diagramm anzuzeigen, führen Sie die folgende Spark SQL-Abfrage aus

# Создайте график с новым столбцом, чтобы подтвердить, что запись прошла успешно
%sql
SELECT addr_state, sum(`amount`) AS amount
FROM loan_by_state_delta
GROUP BY addr_state
ORDER BY sum(`amount`)
DESC LIMIT 10

Tauchen Sie ein in Delta Lake: Schemadurchsetzung und -entwicklung
Alternativ können Sie diese Option durch Hinzufügen für die gesamte Spark-Sitzung festlegen spark.databricks.delta.schema.autoMerge = True zur Spark-Konfiguration. Gehen Sie dabei jedoch mit Vorsicht vor, da Sie durch die Schema-Durchsetzung nicht mehr vor unbeabsichtigten Schema-Inkonsistenzen gewarnt werden.

Durch Einschließen eines Parameters in die Anfrage mergeSchemaAlle Spalten, die im DataFrame, aber nicht in der Zieltabelle vorhanden sind, werden im Rahmen der Schreibtransaktion automatisch am Ende des Schemas hinzugefügt. Es können auch verschachtelte Felder hinzugefügt werden, die ebenfalls am Ende der entsprechenden Strukturspalten eingefügt werden.

Datumsingenieure und Datenwissenschaftler können diese Option nutzen, um neue Spalten (z. B. eine kürzlich verfolgte Metrik oder die Spalte mit den Verkaufszahlen dieses Monats) zu ihren vorhandenen Produktionstabellen für maschinelles Lernen hinzuzufügen, ohne bestehende Modelle, die auf den alten Spalten basieren, zu beschädigen.

Die folgenden Arten von Schemaänderungen sind im Rahmen einer Schemaentwicklung beim Hinzufügen oder Überschreiben einer Tabelle zulässig:

  • Hinzufügen neuer Spalten (dies ist das häufigste Szenario)
  • Ändern von Datentypen von NullType -> irgendein anderer Typ oder Heraufstufung von ByteType -> ShortType -> IntegerType

Andere Änderungen, die im Rahmen der Schemaentwicklung nicht zulässig sind, erfordern, dass das Schema und die Daten durch Hinzufügen überschrieben werden .option("overwriteSchema", "true"). Wenn beispielsweise die Spalte „Foo“ ursprünglich eine Ganzzahl war und das neue Schema ein String-Datentyp wäre, müssten alle Parquet(data)-Dateien überschrieben werden. Zu diesen Änderungen gehören:

  • Löschen einer Spalte
  • Ändern des Datentyps einer vorhandenen Spalte (an Ort und Stelle)
  • Umbenennen von Spalten, die sich nur in der Groß-/Kleinschreibung unterscheiden (zum Beispiel „Foo“ und „foo“)

Schließlich wird mit der nächsten Version von Spark 3.0 explizites DDL (mit ALTER TABLE) vollständig unterstützt, sodass Benutzer die folgenden Aktionen für Tabellenschemata ausführen können:

  • Spalten hinzufügen
  • Spaltenkommentare ändern
  • Festlegen von Tabelleneigenschaften, die bestimmen, wie sich die Tabelle verhält, z. B. Festlegen, wie lange das Transaktionsprotokoll aufbewahrt wird.

Was ist der Vorteil der Schema-Evolution?

Die schematische Evolution kann jederzeit verwendet werden beabsichtigen Ändern Sie das Schema Ihrer Tabelle (im Gegensatz dazu, wenn Sie versehentlich Spalten zu Ihrem DataFrame hinzugefügt haben, die nicht vorhanden sein sollten). Dies ist die einfachste Möglichkeit, Ihr Schema zu migrieren, da automatisch die richtigen Spaltennamen und Datentypen hinzugefügt werden, ohne dass diese explizit deklariert werden müssen.

Abschluss

Die Schemadurchsetzung lehnt alle neuen Spalten oder andere Schemaänderungen ab, die nicht mit Ihrer Tabelle kompatibel sind. Durch die Festlegung und Aufrechterhaltung dieser hohen Standards können sich Analysten und Ingenieure darauf verlassen, dass ihre Daten ein Höchstmaß an Integrität aufweisen, klar und prägnant darüber nachdenken und so bessere Geschäftsentscheidungen treffen können.

Andererseits ergänzt die Schemaentwicklung die Durchsetzung durch Vereinfachung vermutet automatische Schemaänderungen. Schließlich sollte es nicht schwer sein, eine Spalte hinzuzufügen.

Schemadurchsetzung ist Yang, Schemaentwicklungen sind Yin. Zusammengenommen machen diese Funktionen die Rauschunterdrückung und Signalabstimmung einfacher als je zuvor.

Wir möchten auch Mukul Murthy und Pranav Anand für ihre Beiträge zu diesem Artikel danken.

Weitere Artikel dieser Reihe:

Tauchen Sie ein in Delta Lake: Auspacken des Transaktionsprotokolls

Verwandte Artikel

Maschinelles Lernen in Produktionsqualität mit Delta Lake

Was ist ein Datensee?

Erfahren Sie mehr über den Kurs

Source: habr.com

Kommentar hinzufügen