Delta Lake Dive: handhaving en evolutie van het schema

Hallo, Habr! Ik presenteer onder uw aandacht een vertaling van het artikel "Duiken in Delta Lake: schemahandhaving en evolutie" auteurs Burak Yavuz, Brenner Heintz en Denny Lee, die werd voorbereid vooruitlopend op de start van de cursus Gegevens ingenieur van OTUS.

Delta Lake Dive: handhaving en evolutie van het schema

Gegevens verzamelen zich, net als onze ervaring, voortdurend en evolueren voortdurend. Om bij te blijven moeten onze mentale modellen van de wereld zich aanpassen aan nieuwe gegevens, waarvan sommige nieuwe dimensies bevatten – nieuwe manieren om dingen waar te nemen waar we voorheen geen idee van hadden. Deze mentale modellen verschillen niet veel van de tabelschema's die bepalen hoe we nieuwe informatie categoriseren en verwerken.

Dit brengt ons bij de kwestie van schemabeheer. Naarmate zakelijke uitdagingen en vereisten in de loop van de tijd veranderen, verandert ook de structuur van uw gegevens. Delta Lake maakt het gemakkelijk om nieuwe metingen te introduceren als gegevens veranderen. Gebruikers hebben toegang tot eenvoudige semantiek om hun tabelschema's te beheren. Deze tools omvatten Schema Enforcement, dat gebruikers beschermt tegen het onbedoeld vervuilen van hun tabellen met fouten of onnodige gegevens, en Schema Evolution, waarmee nieuwe kolommen met waardevolle gegevens automatisch aan de juiste locaties kunnen worden toegevoegd. In dit artikel gaan we dieper in op het gebruik van deze tools.

Tabelschema's begrijpen

Elk DataFrame in Apache Spark bevat een schema dat de vorm van de gegevens definieert, zoals gegevenstypen, kolommen en metagegevens. Met Delta Lake wordt het tabelschema opgeslagen in JSON-indeling in het transactielogboek.

Wat is regelingshandhaving?

Schema Enforcement, ook wel bekend als Schema Validation, is een beveiligingsmechanisme in Delta Lake dat de gegevenskwaliteit garandeert door records te weigeren die niet overeenkomen met het schema van de tabel. Net als de gastvrouw bij de receptie van een populair restaurant waar alleen gereserveerd kan worden, controleert ze of elke kolom met gegevens die in de tabel wordt ingevoerd, in de overeenkomstige lijst met verwachte kolommen staat (met andere woorden: of er voor elk ervan een "reservering" bestaat). ), en wijst alle records af met kolommen die niet in de lijst voorkomen.

Hoe werkt schemahandhaving?

Delta Lake maakt gebruik van schema-on-write-controle, wat betekent dat alle nieuwe schrijfbewerkingen naar de tabel tijdens het schrijven worden gecontroleerd op compatibiliteit met het schema van de doeltabel. Als het schema inconsistent is, breekt Delta Lake de transactie volledig af (er worden geen gegevens geschreven) en genereert een uitzondering om de gebruiker op de hoogte te stellen van de inconsistentie.
Delta Lake gebruikt de volgende regels om te bepalen of een record compatibel is met een tabel. Beschrijfbaar dataframe:

  • mag geen extra kolommen bevatten die niet in het schema van de doeltabel voorkomen. Omgekeerd is alles in orde als de binnenkomende gegevens niet absoluut alle kolommen uit de tabel bevatten - aan deze kolommen worden eenvoudigweg null-waarden toegewezen.
  • mogen geen kolomgegevenstypen hebben die verschillen van de gegevenstypen van de kolommen in de doeltabel. Als de doeltabelkolom StringType-gegevens bevat, maar de corresponderende kolom in het DataFrame IntegerType-gegevens bevat, genereert schemahandhaving een uitzondering en voorkomt dat de schrijfbewerking plaatsvindt.
  • mogen geen kolomnamen bevatten die alleen verschillen in hoofd- en kleine letters. Dit betekent dat de kolommen 'Foo' en 'foo' niet in dezelfde tabel kunnen worden gedefinieerd. Hoewel Spark kan worden gebruikt in de hoofdlettergevoelige of hoofdletterongevoelige (standaard) modus, behoudt Delta Lake hoofdletters, maar is het ongevoelig binnen de schemaopslag. Parquet is hoofdlettergevoelig bij het opslaan en retourneren van kolomgegevens. Om mogelijke fouten, gegevensbeschadiging of gegevensverlies te voorkomen (iets wat we persoonlijk hebben ervaren bij Databricks), hebben we besloten deze beperking toe te voegen.

Laten we, om dit te illustreren, eens kijken naar wat er in de onderstaande code gebeurt als we proberen een aantal nieuw gegenereerde kolommen toe te voegen aan een Delta Lake-tabel die nog niet is geconfigureerd om ze te accepteren.

# Сгенерируем DataFrame ссуд, который мы добавим в нашу таблицу Delta Lake
loans = sql("""
            SELECT addr_state, CAST(rand(10)*count as bigint) AS count,
            CAST(rand(10) * 10000 * count AS double) AS amount
            FROM loan_by_state_delta
            """)

# Вывести исходную схему DataFrame
original_loans.printSchema()

root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
 
# Вывести новую схему DataFrame
loans.printSchema()
 
root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
  |-- amount: double (nullable = true) # new column
 
# Попытка добавить новый DataFrame (с новым столбцом) в существующую таблицу
loans.write.format("delta") 
           .mode("append") 
           .save(DELTALAKE_PATH)

Returns:

A schema mismatch detected when writing to the Delta table.
 
To enable schema migration, please set:
'.option("mergeSchema", "true")'
 
Table schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
 
Data schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
-- amount: double (nullable = true)
 
If Table ACLs are enabled, these options will be ignored. Please use the ALTER TABLE command for changing the schema.

In plaats van automatisch nieuwe kolommen toe te voegen, legt Delta Lake een schema op en stopt met schrijven. Om te helpen bepalen welke kolom (of reeks kolommen) het verschil veroorzaakt, voert Spark beide schema's uit de stacktracering uit ter vergelijking.

Wat is het voordeel van het afdwingen van een schema?

Omdat schemahandhaving een vrij strenge controle is, is het een uitstekend hulpmiddel om te gebruiken als poortwachter naar een schone, volledig getransformeerde dataset die klaar is voor productie of consumptie. Meestal toegepast op tabellen die rechtstreeks gegevens invoeren:

  • Machine learning-algoritmen
  • BI-dashboards
  • Data-analyse en visualisatietools
  • Elk productiesysteem dat zeer gestructureerde, sterk getypeerde semantische schema's vereist.

Om hun gegevens voor te bereiden op deze laatste hindernis, gebruiken veel gebruikers een eenvoudige ‘multi-hop’-architectuur die geleidelijk structuur in hun tabellen introduceert. Als u hier meer over wilt weten, kunt u het artikel lezen Machine learning op productieniveau met Delta Lake.

Natuurlijk kan schemahandhaving overal in uw pijplijn worden gebruikt, maar onthoud dat het streamen naar een tabel in dit geval frustrerend kan zijn omdat u bijvoorbeeld bent vergeten dat u een andere kolom aan de binnenkomende gegevens hebt toegevoegd.

Voorkomen van dataverwatering

Nu vraag je je misschien af: waar gaat het allemaal over? Soms kan een onverwachte 'schema-mismatch'-fout u in uw workflow laten struikelen, vooral als u nieuw bent bij Delta Lake. Waarom laat ik het schema niet gewoon veranderen als dat nodig is, zodat ik mijn DataFrame hoe dan ook kan schrijven?

Zoals het oude gezegde luidt: “Een ons voorkomen is een pond genezen waard.” Als u er niet voor zorgt dat u uw schema afdwingt, zullen compatibiliteitsproblemen met gegevenstypes op een gegeven moment de kop opsteken - ogenschijnlijk homogene ruwe gegevensbronnen kunnen randgevallen, beschadigde kolommen, verkeerd opgemaakte toewijzingen of andere enge dingen bevatten om over te dromen. nachtmerries. De beste aanpak is om deze vijanden bij de poort tegen te houden (met schema-afdwinging) en ze in het licht aan te pakken, in plaats van later wanneer ze op de loer liggen in de donkere diepten van je productiecode.

Het afdwingen van een schema geeft u de zekerheid dat het schema van uw tabel niet zal veranderen, tenzij u de wijziging goedkeurt. Dit voorkomt gegevensverdunning, die kan optreden wanneer er zo vaak nieuwe kolommen worden toegevoegd dat voorheen waardevolle, gecomprimeerde tabellen hun betekenis en bruikbaarheid verliezen als gevolg van de overstroming van gegevens. Door u aan te moedigen doelbewust te zijn, hoge normen te stellen en hoge kwaliteit te verwachten, doet schemahandhaving precies waarvoor het is ontworpen: u helpen gewetensvol te blijven en uw spreadsheets schoon te houden.

Als je bij nader inzien besluit dat je echt noodzakelijk voeg een nieuwe kolom toe - geen probleem, hieronder vindt u een oplossing van één regel. De oplossing is de evolutie van het circuit!

Wat is schema-evolutie?

Schema-evolutie is een functie waarmee gebruikers het huidige tabelschema eenvoudig kunnen wijzigen op basis van gegevens die in de loop van de tijd veranderen. Het wordt meestal gebruikt bij het uitvoeren van een toevoeg- of herschrijfbewerking om het schema automatisch aan te passen zodat het een of meer nieuwe kolommen bevat.

Hoe werkt schema-evolutie?

In navolging van het voorbeeld uit de vorige sectie kunnen ontwikkelaars eenvoudig schema-evolutie gebruiken om nieuwe kolommen toe te voegen die eerder zijn afgewezen vanwege inconsistentie in het schema. Circuitevolutie wordt geactiveerd door toevoeging .option('mergeSchema', 'true') aan uw Spark-team .write или .writeStream.

# Добавьте параметр mergeSchema
loans.write.format("delta") 
           .option("mergeSchema", "true") 
           .mode("append") 
           .save(DELTALAKE_SILVER_PATH)

Voer de volgende Spark SQL-query uit om de grafiek weer te geven

# Создайте график с новым столбцом, чтобы подтвердить, что запись прошла успешно
%sql
SELECT addr_state, sum(`amount`) AS amount
FROM loan_by_state_delta
GROUP BY addr_state
ORDER BY sum(`amount`)
DESC LIMIT 10

Delta Lake Dive: handhaving en evolutie van het schema
Als alternatief kunt u deze optie voor de gehele Spark-sessie instellen door toe te voegen spark.databricks.delta.schema.autoMerge = True naar de Spark-configuratie. Maar wees hier voorzichtig mee, aangezien het afdwingen van schema's u niet langer waarschuwt voor onbedoelde inconsistenties in het schema.

Door de parameter in de aanvraag op te nemen mergeSchema, worden alle kolommen die aanwezig zijn in het DataFrame maar niet in de doeltabel automatisch toegevoegd aan het einde van het schema als onderdeel van een schrijftransactie. Geneste velden kunnen ook worden toegevoegd en deze worden ook toegevoegd aan het einde van de overeenkomstige structuurkolommen.

Datumingenieurs en datawetenschappers kunnen deze optie gebruiken om nieuwe kolommen (misschien een onlangs bijgehouden statistiek of de kolom met verkoopprestaties van deze maand) toe te voegen aan hun bestaande machine learning-productietabellen zonder bestaande modellen op basis van oude kolommen te verbreken.

De volgende typen schemawijzigingen zijn toegestaan ​​als onderdeel van de schema-evolutie tijdens het toevoegen of herschrijven van een tabel:

  • Nieuwe kolommen toevoegen (dit is het meest voorkomende scenario)
  • Gegevenstypen wijzigen van NullType -> elk ander type of promoten van ByteType -> ShortType -> IntegerType

Andere wijzigingen die binnen de schema-evolutie niet zijn toegestaan, vereisen dat het schema en de gegevens worden herschreven door toevoegingen .option("overwriteSchema", "true"). Als bijvoorbeeld de kolom 'Foo' oorspronkelijk een geheel getal was en het nieuwe schema een tekenreeksgegevenstype was, zouden alle Parquet(data)-bestanden herschreven moeten worden. Dergelijke veranderingen omvatten:

  • het verwijderen van een kolom
  • het gegevenstype van een bestaande kolom wijzigen (in-place)
  • het hernoemen van kolommen die alleen verschillen in hoofdlettergebruik (bijvoorbeeld 'Foo' en 'foo')

Ten slotte wordt expliciete DDL met de volgende release van Spark 3.0 volledig ondersteund (met behulp van ALTER TABLE), waardoor gebruikers de volgende acties op tabelschema's kunnen uitvoeren:

  • kolommen toevoegen
  • kolomopmerkingen wijzigen
  • tabeleigenschappen instellen die het gedrag van de tabel bepalen, zoals het instellen van de tijdsduur dat een transactielogboek wordt opgeslagen.

Wat is het voordeel van circuitevolutie?

Schema-evolutie kan worden gebruikt wanneer u maar wilt van plan verander het schema van uw tabel (in tegenstelling tot wanneer u per ongeluk kolommen aan uw DataFrame hebt toegevoegd die er niet zouden moeten zijn). Dit is de gemakkelijkste manier om uw schema te migreren, omdat hiermee automatisch de juiste kolomnamen en gegevenstypen worden toegevoegd zonder dat u deze expliciet hoeft te declareren.

Conclusie

Schemahandhaving wijst alle nieuwe kolommen of andere schemawijzigingen af ​​die niet compatibel zijn met uw tabel. Door deze hoge normen vast te stellen en te handhaven, kunnen analisten en ingenieurs erop vertrouwen dat hun gegevens het hoogste niveau van integriteit hebben en deze duidelijk en helder communiceren, waardoor ze betere zakelijke beslissingen kunnen nemen.

Aan de andere kant vult schema-evolutie de handhaving aan door te vereenvoudigen zogenaamd automatische schemawijzigingen. Het hoeft immers niet moeilijk te zijn om een ​​kolom toe te voegen.

De gedwongen toepassing van het schema is yang, terwijl de evolutie van het schema yin is. Wanneer ze samen worden gebruikt, maken deze functies ruisonderdrukking en signaalafstemming eenvoudiger dan ooit.

We willen ook Mukul Murthy en Pranav Anand bedanken voor hun bijdragen aan dit artikel.

Andere artikelen in deze serie:

Duik in Delta Lake: het transactielogboek uitpakken

gerelateerde artikelen

Machine learning op productieniveau met Delta Lake

Wat is een datameer?

Lees meer over de cursus

Bron: www.habr.com

Voeg een reactie