Scufundați-vă în Delta Lake: implementarea și evoluția schemelor

Bună, Habr! Vă prezint atenției o traducere a articolului „Sumfărirea în lacul Delta: aplicarea și evoluția schemelor” autorii Burak Yavuz, Brenner Heintz și Denny Lee, care a fost pregătit în așteptarea începerii cursului Inginer de date de la OTUS.

Scufundați-vă în Delta Lake: implementarea și evoluția schemelor

Datele, ca și experiența noastră, se acumulează și evoluează în mod constant. Pentru a ține pasul, modelele noastre mentale ale lumii trebuie să se adapteze la noi date, unele dintre ele conținând noi dimensiuni – noi moduri de a observa lucruri despre care habar n-aveam până acum. Aceste modele mentale nu sunt foarte diferite de schemele de tabel care determină modul în care catalogăm și procesăm noi informații.

Acest lucru ne aduce la problema managementului schemei. Pe măsură ce provocările și cerințele de afaceri se modifică în timp, la fel se modifică și structura datelor dvs. Delta Lake facilitează introducerea de noi măsurători pe măsură ce datele se modifică. Utilizatorii au acces la semantică simplă pentru a-și gestiona schemele de tabel. Aceste instrumente includ Schema Enforcement, care protejează utilizatorii de poluarea neintenționată a tabelelor cu erori sau date inutile, și Schema Evolution, care permite adăugarea automată de noi coloane de date valoroase în locațiile corespunzătoare. În acest articol, ne vom aprofunda în utilizarea acestor instrumente.

Înțelegerea schemelor de tabel

Fiecare DataFrame din Apache Spark conține o schemă care definește forma datelor, cum ar fi tipurile de date, coloanele și metadatele. Cu Delta Lake, schema tabelului este stocată în format JSON în jurnalul de tranzacții.

Ce este aplicarea schemei?

Schema Enforcement, cunoscută și sub numele de Schema Validation, este un mecanism de securitate din Delta Lake care asigură calitatea datelor prin respingerea înregistrărilor care nu se potrivesc cu schema tabelului. La fel ca gazda de la recepția unui restaurant popular cu rezervare, ea verifică dacă fiecare coloană de date introdusă în tabel se află în lista corespunzătoare de coloane așteptate (cu alte cuvinte, dacă există o „rezervare” pentru fiecare dintre ele ), și respinge orice înregistrări cu coloane care nu sunt în listă.

Cum funcționează aplicarea schemei?

Delta Lake folosește verificarea schema-on-write, ceea ce înseamnă că toate scrierile noi în tabel sunt verificate pentru compatibilitate cu schema tabelului țintă în momentul scrierii. Dacă schema este inconsecventă, Delta Lake anulează tranzacția în întregime (nu sunt scrise date) și ridică o excepție pentru a notifica utilizatorul despre inconsecvență.
Delta Lake folosește următoarele reguli pentru a determina dacă o înregistrare este compatibilă cu un tabel. Cadru de date care poate fi scris:

  • nu poate conține coloane suplimentare care nu sunt în schema tabelului țintă. În schimb, totul este în regulă dacă datele primite nu conțin absolut toate coloanele din tabel - acestor coloane li se vor atribui pur și simplu valori nule.
  • nu poate avea tipuri de date coloane diferite de tipurile de date ale coloanelor din tabelul țintă. Dacă coloana tabelului țintă conține date StringType, dar coloana corespunzătoare din DataFrame conține date IntegerType, aplicarea schemei va arunca o excepție și va împiedica operația de scriere.
  • nu poate conține nume de coloane care diferă doar în caz de litere. Aceasta înseamnă că nu puteți avea coloane numite „Foo” și „foo” definite în același tabel. În timp ce Spark poate fi utilizat în modul care distinge majuscule sau minuscule (implicit), Delta Lake păstrează majuscule, dar este insensibil în stocarea schemei. Parchetul ține seama de majuscule și minuscule atunci când stochează și returnează informații despre coloană. Pentru a evita posibile erori, coruperea datelor sau pierderea datelor (ceva ce am experimentat personal la Databricks), am decis să adăugăm această limitare.

Pentru a ilustra acest lucru, să aruncăm o privire la ceea ce se întâmplă în codul de mai jos când încercăm să adăugăm câteva coloane nou generate la un tabel Delta Lake care nu este încă configurat să le accepte.

# Сгенерируем DataFrame ссуд, который мы добавим в нашу таблицу Delta Lake
loans = sql("""
            SELECT addr_state, CAST(rand(10)*count as bigint) AS count,
            CAST(rand(10) * 10000 * count AS double) AS amount
            FROM loan_by_state_delta
            """)

# Вывести исходную схему DataFrame
original_loans.printSchema()

root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
 
# Вывести новую схему DataFrame
loans.printSchema()
 
root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
  |-- amount: double (nullable = true) # new column
 
# Попытка добавить новый DataFrame (с новым столбцом) в существующую таблицу
loans.write.format("delta") 
           .mode("append") 
           .save(DELTALAKE_PATH)

Returns:

A schema mismatch detected when writing to the Delta table.
 
To enable schema migration, please set:
'.option("mergeSchema", "true")'
 
Table schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
 
Data schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
-- amount: double (nullable = true)
 
If Table ACLs are enabled, these options will be ignored. Please use the ALTER TABLE command for changing the schema.

În loc să adauge automat coloane noi, Delta Lake impune o schemă și nu mai scrie. Pentru a determina care coloană (sau set de coloane) cauzează discrepanța, Spark scoate ambele scheme din urmărirea stivei pentru comparație.

Care este beneficiul aplicării unei scheme?

Deoarece aplicarea schemei este o verificare destul de strictă, este un instrument excelent pentru a fi folosit ca un gardian al unui set de date curat, complet transformat, care este gata pentru producție sau consum. Se aplică de obicei tabelelor care alimentează direct date:

  • Algoritmi de învățare automată
  • Tablouri de bord BI
  • Instrumente de analiză și vizualizare a datelor
  • Orice sistem de producție care necesită scheme semantice puternic structurate, puternic tipizate.

Pentru a-și pregăti datele pentru acest ultim obstacol, mulți utilizatori folosesc o arhitectură simplă „multi-hop” care introduce treptat structura în tabelele lor. Pentru a afla mai multe despre acest lucru, puteți consulta articolul Învățare automată la nivel de producție cu Delta Lake.

Desigur, aplicarea schemei poate fi folosită oriunde în conducta dvs., dar amintiți-vă că fluxul către un tabel în acest caz poate fi frustrant, deoarece, de exemplu, ați uitat că ați adăugat o altă coloană la datele primite.

Prevenirea diluării datelor

Până acum, s-ar putea să vă întrebați, despre ce este toată agitația? La urma urmei, uneori, o eroare neașteptată de „nepotrivire a schemei” vă poate împiedica în fluxul de lucru, mai ales dacă sunteți nou în Delta Lake. De ce nu lăsați schema să se schimbe după cum este necesar, astfel încât să îmi pot scrie DataFrame indiferent de ce?

După cum spune vechea zicală, „un gram de prevenire merită o jumătate de kilogram de vindecare”. La un moment dat, dacă nu aveți grijă să vă impuneți schema, problemele de compatibilitate cu tipurile de date le vor ridica capul urât - sursele de date brute aparent omogene pot conține cazuri de margine, coloane corupte, mapări incorecte sau alte lucruri înfricoșătoare la care să visați în coşmaruri. Cea mai bună abordare este să oprești acești inamici la poartă - cu aplicarea schemei - și să-i faci față în lumină, mai degrabă decât mai târziu, când încep să pândească în adâncurile întunecate ale codului tău de producție.

Aplicarea unei scheme vă oferă asigurarea că schema tabelului dvs. nu se va modifica decât dacă aprobați modificarea. Acest lucru previne diluarea datelor, care poate apărea atunci când sunt adăugate coloane noi atât de frecvent încât tabelele comprimate valoroase anterior își pierd sensul și utilitatea din cauza inundației de date. Încurajându-vă să fiți intenționați, să stabiliți standarde înalte și să vă așteptați la o calitate înaltă, aplicarea schemei face exact ceea ce a fost concepută pentru a face: vă ajută să rămâneți conștiincioși și foile de calcul curate.

Dacă după o analiză ulterioară decideți că într-adevăr nevoie adăugați o nouă coloană - nicio problemă, mai jos este o remediere pe o singură linie. Soluția este evoluția circuitului!

Ce este evoluția schemei?

Evoluția schemei este o caracteristică care permite utilizatorilor să schimbe cu ușurință schema tabelului curent în funcție de datele care se modifică în timp. Este folosit cel mai adesea atunci când se efectuează o operație de adăugare sau de rescriere pentru a adapta automat schema pentru a include una sau mai multe coloane noi.

Cum funcționează evoluția schemei?

Urmând exemplul din secțiunea anterioară, dezvoltatorii pot folosi cu ușurință evoluția schemei pentru a adăuga coloane noi care au fost respinse anterior din cauza inconsecvenței schemei. Evoluția circuitului este activată prin adăugare .option('mergeSchema', 'true') către echipa ta Spark .write или .writeStream.

# Добавьте параметр mergeSchema
loans.write.format("delta") 
           .option("mergeSchema", "true") 
           .mode("append") 
           .save(DELTALAKE_SILVER_PATH)

Pentru a vizualiza graficul, rulați următoarea interogare Spark SQL

# Создайте график с новым столбцом, чтобы подтвердить, что запись прошла успешно
%sql
SELECT addr_state, sum(`amount`) AS amount
FROM loan_by_state_delta
GROUP BY addr_state
ORDER BY sum(`amount`)
DESC LIMIT 10

Scufundați-vă în Delta Lake: implementarea și evoluția schemelor
Alternativ, puteți seta această opțiune pentru întreaga sesiune Spark prin adăugare spark.databricks.delta.schema.autoMerge = True la configurația Spark. Dar folosiți acest lucru cu prudență, deoarece aplicarea schemei nu vă va mai alerta asupra inconsecvențelor neintenționate ale schemei.

Prin includerea parametrului în cerere mergeSchema, toate coloanele care sunt prezente în DataFrame, dar nu în tabelul țintă, sunt adăugate automat la sfârșitul schemei ca parte a unei tranzacții de scriere. Câmpurile imbricate pot fi, de asemenea, adăugate și acestea vor fi, de asemenea, adăugate la sfârșitul coloanelor structurii corespunzătoare.

Inginerii de date și oamenii de știință de date pot folosi această opțiune pentru a adăuga coloane noi (poate o valoare urmărită recent sau coloana de performanță a vânzărilor din această lună) la tabelele de producție de învățare automată existente, fără a sparge modelele existente bazate pe coloanele vechi.

Următoarele tipuri de modificări ale schemei sunt permise ca parte a evoluției schemei în timpul adăugării sau rescrierii unui tabel:

  • Adăugarea de noi coloane (acesta este cel mai frecvent scenariu)
  • Schimbarea tipurilor de date din NullType -> orice alt tip sau promovarea din ByteType -> ShortType -> IntegerType

Alte modificări nepermise în cadrul evoluției schemei necesită ca schema și datele să fie rescrise prin adăugare .option("overwriteSchema", "true"). De exemplu, în cazul în care coloana „Foo” a fost inițial un număr întreg și noua schemă a fost un tip de date șir, atunci toate fișierele Parquet(date) ar trebui rescrise. Astfel de modificări includ:

  • ștergerea unei coloane
  • modificarea tipului de date al unei coloane existente (in loc)
  • redenumirea coloanelor care diferă numai după caz ​​(de exemplu, „Foo” și „foo”)

În cele din urmă, cu următoarea ediție a Spark 3.0, DDL explicit va fi pe deplin acceptat (folosind ALTER TABLE), permițând utilizatorilor să efectueze următoarele acțiuni pe schemele de tabele:

  • adaugand coloane
  • modificarea comentariilor coloanei
  • stabilirea proprietăților tabelului care controlează comportamentul tabelului, cum ar fi setarea duratei de timp în care este stocat un jurnal de tranzacții.

Care este beneficiul evoluției circuitului?

Evoluția schemei poate fi folosită oricând intenționează modificați schema tabelului dvs. (spre deosebire de când ați adăugat accidental coloane la DataFrame care nu ar trebui să fie acolo). Acesta este cel mai simplu mod de a migra schema dvs., deoarece adaugă automat numele de coloane și tipurile de date corecte, fără a fi nevoie să le declarați în mod explicit.

Concluzie

Aplicarea schemei respinge orice coloane noi sau alte modificări ale schemei care nu sunt compatibile cu tabelul dvs. Prin stabilirea și menținerea acestor standarde înalte, analiștii și inginerii pot avea încredere că datele lor au cel mai înalt nivel de integritate, comunicându-le clar și clar, permițându-le să ia decizii de afaceri mai bune.

Pe de altă parte, evoluția schemei completează aplicarea prin simplificare pretins modificări automate ale schemei. La urma urmei, nu ar trebui să fie dificil să adăugați o coloană.

Aplicarea forțată a schemei este yang, unde evoluția schemei este yin. Când sunt utilizate împreună, aceste caracteristici fac suprimarea zgomotului și reglarea semnalului mai ușor ca niciodată.

De asemenea, am dori să mulțumim lui Mukul Murthy și Pranav Anand pentru contribuțiile lor la acest articol.

Alte articole din această serie:

Scufundați-vă în Delta Lake: despachetarea jurnalului de tranzacții

Articole similare

Învățare automată la nivel de producție cu Delta Lake

Ce este un lac de date?

Aflați mai multe despre curs

Sursa: www.habr.com

Adauga un comentariu