Delta Lake Dive: Pagpapatupad at Ebolusyon ng Scheme

Hello, Habr! Ipinakita ko sa iyong pansin ang isang pagsasalin ng artikulo "Dive Into Delta Lake: Schema Enforcement & Evolution" mga may-akda na sina Burak Yavuz, Brenner Heintz at Denny Lee, na inihanda sa pag-asam ng pagsisimula ng kurso Data Engineer mula sa OTUS.

Delta Lake Dive: Pagpapatupad at Ebolusyon ng Scheme

Ang data, tulad ng aming karanasan, ay patuloy na nag-iipon at umuunlad. Upang makasabay, ang ating mga mental na modelo ng mundo ay dapat umangkop sa bagong data, ang ilan sa mga ito ay naglalaman ng mga bagong dimensyonβ€”mga bagong paraan ng pagmamasid sa mga bagay na hindi natin alam noon. Ang mga mental model na ito ay hindi gaanong naiiba sa mga schema ng talahanayan na tumutukoy kung paano namin ikinategorya at pinoproseso ang bagong impormasyon.

Dinadala tayo nito sa isyu ng pamamahala ng schema. Habang nagbabago ang mga hamon at kinakailangan sa negosyo sa paglipas ng panahon, nagbabago rin ang istruktura ng iyong data. Pinapadali ng Delta Lake ang pagpapakilala ng mga bagong sukat habang nagbabago ang data. May access ang mga user sa mga simpleng semantika para pamahalaan ang kanilang mga schema ng talahanayan. Kasama sa mga tool na ito ang Schema Enforcement, na nagpoprotekta sa mga user mula sa hindi sinasadyang pagdumi sa kanilang mga talahanayan ng mga error o hindi kinakailangang data, at Schema Evolution, na nagpapahintulot sa mga bagong column ng mahalagang data na awtomatikong maidagdag sa mga naaangkop na lokasyon. Sa artikulong ito, sumisid tayo nang mas malalim sa paggamit ng mga tool na ito.

Pag-unawa sa mga Schema ng Talahanayan

Ang bawat DataFrame sa Apache Spark ay naglalaman ng schema na tumutukoy sa anyo ng data, gaya ng mga uri ng data, column, at metadata. Sa Delta Lake, iniimbak ang schema ng talahanayan sa JSON na format sa loob ng log ng transaksyon.

Ano ang pagpapatupad ng scheme?

Ang Schema Enforcement, na kilala rin bilang Schema Validation, ay isang mekanismo ng seguridad sa Delta Lake na nagsisiguro sa kalidad ng data sa pamamagitan ng pagtanggi sa mga talaan na hindi tumutugma sa schema ng talahanayan. Tulad ng babaing punong-abala sa front desk ng isang sikat na reservation-only na restaurant, tinitingnan niya kung ang bawat column ng data na inilagay sa talahanayan ay nasa kaukulang listahan ng mga inaasahang column (sa madaling salita, kung mayroong "reservation" para sa bawat isa sa kanila. ). at tinatanggihan ang anumang mga talaan na may mga hanay na wala sa listahan.

Paano gumagana ang pagpapatupad ng schema?

Gumagamit ang Delta Lake ng schema-on-write checking, na nangangahulugan na ang lahat ng mga bagong pagsusulat sa talahanayan ay sinusuri para sa pagiging tugma sa schema ng target na talahanayan sa oras ng pagsulat. Kung hindi pare-pareho ang schema, ganap na i-abort ng Delta Lake ang transaksyon (walang nakasulat na data) at magtataas ng exception para ipaalam sa user ang hindi pagkakapare-pareho.
Ginagamit ng Delta Lake ang mga sumusunod na panuntunan upang matukoy kung ang isang tala ay tugma sa isang talahanayan. Naisusulat na DataFrame:

  • hindi maaaring maglaman ng mga karagdagang column na wala sa schema ng target na talahanayan. Sa kabaligtaran, maayos ang lahat kung ang papasok na data ay hindi naglalaman ng ganap na lahat ng mga column mula sa talahanayan - ang mga column na ito ay itatalaga lamang ng mga null value.
  • hindi maaaring magkaroon ng mga uri ng data ng column na iba sa mga uri ng data ng mga column sa target na talahanayan. Kung ang column ng target na table ay naglalaman ng StringType data, ngunit ang kaukulang column sa DataFrame ay naglalaman ng IntegerType data, ang pagpapatupad ng schema ay maglalagay ng exception at pipigilan ang write operation na maganap.
  • hindi maaaring maglaman ng mga pangalan ng column na naiiba lamang kung sakali. Nangangahulugan ito na hindi ka maaaring magkaroon ng mga column na pinangalanang 'Foo' at 'foo' na tinukoy sa parehong talahanayan. Habang ang Spark ay maaaring gamitin sa case-sensitive o case-insensitive (default) na mode, ang Delta Lake ay case-preserve ngunit hindi sensitibo sa loob ng schema storage. Ang parquet ay case sensitive kapag nag-iimbak at nagbabalik ng impormasyon ng column. Upang maiwasan ang mga posibleng error, katiwalian ng data, o pagkawala ng data (na personal naming naranasan sa Databricks), nagpasya kaming idagdag ang limitasyong ito.

Upang ilarawan ito, tingnan natin kung ano ang nangyayari sa code sa ibaba kapag sinubukan nating magdagdag ng ilang bagong nabuong column sa isang talahanayan ng Delta Lake na hindi pa naka-configure upang tanggapin ang mga ito.

# Π‘Π³Π΅Π½Π΅Ρ€ΠΈΡ€ΡƒΠ΅ΠΌ DataFrame ссуд, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ ΠΌΡ‹ Π΄ΠΎΠ±Π°Π²ΠΈΠΌ Π² Π½Π°ΡˆΡƒ Ρ‚Π°Π±Π»ΠΈΡ†Ρƒ Delta Lake
loans = sql("""
            SELECT addr_state, CAST(rand(10)*count as bigint) AS count,
            CAST(rand(10) * 10000 * count AS double) AS amount
            FROM loan_by_state_delta
            """)

# ВывСсти ΠΈΡΡ…ΠΎΠ΄Π½ΡƒΡŽ схСму DataFrame
original_loans.printSchema()

root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
 
# ВывСсти Π½ΠΎΠ²ΡƒΡŽ схСму DataFrame
loans.printSchema()
 
root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
  |-- amount: double (nullable = true) # new column
 
# ΠŸΠΎΠΏΡ‹Ρ‚ΠΊΠ° Π΄ΠΎΠ±Π°Π²ΠΈΡ‚ΡŒ Π½ΠΎΠ²Ρ‹ΠΉ DataFrame (с Π½ΠΎΠ²Ρ‹ΠΌ столбцом) Π² ΡΡƒΡ‰Π΅ΡΡ‚Π²ΡƒΡŽΡ‰ΡƒΡŽ Ρ‚Π°Π±Π»ΠΈΡ†Ρƒ
loans.write.format("delta") 
           .mode("append") 
           .save(DELTALAKE_PATH)

Returns:

A schema mismatch detected when writing to the Delta table.
 
To enable schema migration, please set:
'.option("mergeSchema", "true")'
 
Table schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
 
Data schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
-- amount: double (nullable = true)
 
If Table ACLs are enabled, these options will be ignored. Please use the ALTER TABLE command for changing the schema.

Sa halip na awtomatikong magdagdag ng mga bagong column, ang Delta Lake ay nagpapataw ng isang schema at huminto sa pagsusulat. Upang makatulong na matukoy kung aling column (o hanay ng mga column) ang nagdudulot ng pagkakaiba, inilalabas ng Spark ang parehong mga schema mula sa stack trace para sa paghahambing.

Ano ang pakinabang ng pagpapatupad ng schema?

Dahil ang pagpapatupad ng schema ay isang medyo mahigpit na pagsusuri, ito ay isang mahusay na tool upang magamit bilang isang gatekeeper sa isang malinis, ganap na nabagong set ng data na handa na para sa produksyon o pagkonsumo. Karaniwang inilalapat sa mga talahanayan na direktang nagpapakain ng data:

  • Mga algorithm ng machine learning
  • Mga dashboard ng BI
  • Mga tool sa analytics at visualization ng data
  • Anumang sistema ng produksyon na nangangailangan ng mataas na istruktura, malakas na na-type na semantic schemas.

Upang ihanda ang kanilang data para sa huling hadlang na ito, maraming user ang gumagamit ng simpleng "multi-hop" na arkitektura na unti-unting nagpapakilala ng istraktura sa kanilang mga talahanayan. Upang matuto nang higit pa tungkol dito, maaari mong tingnan ang artikulo Production-grade machine learning sa Delta Lake.

Siyempre, maaaring gamitin ang pagpapatupad ng schema saanman sa iyong pipeline, ngunit tandaan na ang streaming sa isang talahanayan sa kasong ito ay maaaring nakakadismaya dahil, halimbawa, nakalimutan mong nagdagdag ka ng isa pang column sa papasok na data.

Pag-iwas sa pagbabanto ng data

Sa ngayon ay maaaring nagtataka ka, ano ang lahat ng kaguluhan? Pagkatapos ng lahat, kung minsan ang isang hindi inaasahang "schema mismatch" na error ay maaaring masira sa iyong daloy ng trabaho, lalo na kung bago ka sa Delta Lake. Bakit hindi na lang hayaang magbago ang schema kung kinakailangan upang maisulat ko ang aking DataFrame kahit na ano?

Tulad ng sinasabi ng matandang kasabihan, "isang onsa ng pag-iwas ay nagkakahalaga ng isang kalahating kilong lunas." Sa isang punto, kung hindi ka mag-iingat na ipatupad ang iyong schema, ang mga isyu sa compatibility sa uri ng data ay mauuwi sa kanilang mga pangit na ulo - ang tila homogenous na raw na pinagmumulan ng data ay maaaring maglaman ng mga gilid na kaso, mga sira na column, maling pagkakabuo ng mga pagmamapa, o iba pang nakakatakot na bagay na pangarapin mga bangungot. Ang pinakamahusay na diskarte ay upang ihinto ang mga kaaway na ito sa gate - na may pagpapatupad ng schema - at harapin ang mga ito sa liwanag, sa halip na sa ibang pagkakataon kapag nagsimula silang magtago sa madilim na kailaliman ng iyong production code.

Ang pagpapatupad ng schema ay nagbibigay sa iyo ng katiyakan na ang schema ng iyong talahanayan ay hindi magbabago maliban kung aprubahan mo ang pagbabago. Pinipigilan nito ang pagbabanto ng data, na maaaring mangyari kapag ang mga bagong column ay idinagdag nang napakadalas na ang dating mahalaga, naka-compress na mga talahanayan ay nawawala ang kanilang kahulugan at pagiging kapaki-pakinabang dahil sa pagbaha ng data. Sa pamamagitan ng paghikayat sa iyong maging sinasadya, magtakda ng matataas na pamantayan, at umasa sa mataas na kalidad, ginagawa ng pagpapatupad ng schema kung ano mismo ang idinisenyo nitong gawinβ€”tumutulong sa iyong manatiling tapat at malinis ang iyong mga spreadsheet.

Kung sa karagdagang pagsasaalang-alang nagpasya kang ikaw talaga kailangan magdagdag ng bagong column - walang problema, sa ibaba ay isang one-line na pag-aayos. Ang solusyon ay ang ebolusyon ng circuit!

Ano ang schema evolution?

Ang Schema evolution ay isang feature na nagbibigay-daan sa mga user na madaling baguhin ang kasalukuyang table schema ayon sa data na nagbabago sa paglipas ng panahon. Ito ay kadalasang ginagamit kapag nagsasagawa ng append o rewrite operation upang awtomatikong iakma ang schema para magsama ng isa o higit pang mga bagong column.

Paano gumagana ang schema evolution?

Kasunod ng halimbawa mula sa nakaraang seksyon, madaling magamit ng mga developer ang schema evolution upang magdagdag ng mga bagong column na dati nang tinanggihan dahil sa hindi pagkakapare-pareho ng schema. Ang ebolusyon ng circuit ay isinaaktibo sa pamamagitan ng pagdaragdag .option('mergeSchema', 'true') sa iyong Spark team .write ΠΈΠ»ΠΈ .writeStream.

# Π”ΠΎΠ±Π°Π²ΡŒΡ‚Π΅ ΠΏΠ°Ρ€Π°ΠΌΠ΅Ρ‚Ρ€ mergeSchema
loans.write.format("delta") 
           .option("mergeSchema", "true") 
           .mode("append") 
           .save(DELTALAKE_SILVER_PATH)

Upang tingnan ang graph, patakbuhin ang sumusunod na query sa Spark SQL

# Π‘ΠΎΠ·Π΄Π°ΠΉΡ‚Π΅ Π³Ρ€Π°Ρ„ΠΈΠΊ с Π½ΠΎΠ²Ρ‹ΠΌ столбцом, Ρ‡Ρ‚ΠΎΠ±Ρ‹ ΠΏΠΎΠ΄Ρ‚Π²Π΅Ρ€Π΄ΠΈΡ‚ΡŒ, Ρ‡Ρ‚ΠΎ запись ΠΏΡ€ΠΎΡˆΠ»Π° ΡƒΡΠΏΠ΅ΡˆΠ½ΠΎ
%sql
SELECT addr_state, sum(`amount`) AS amount
FROM loan_by_state_delta
GROUP BY addr_state
ORDER BY sum(`amount`)
DESC LIMIT 10

Delta Lake Dive: Pagpapatupad at Ebolusyon ng Scheme
Bilang kahalili, maaari mong itakda ang opsyong ito para sa buong session ng Spark sa pamamagitan ng pagdaragdag spark.databricks.delta.schema.autoMerge = True sa pagsasaayos ng Spark. Ngunit gamitin ito nang may pag-iingat, dahil hindi ka na aalertuhan ng pagpapatupad ng schema sa hindi sinasadyang mga hindi pagkakatugma ng schema.

Sa pamamagitan ng pagsasama ng parameter sa kahilingan mergeSchema, lahat ng column na nasa DataFrame ngunit wala sa target na talahanayan ay awtomatikong idinaragdag sa dulo ng schema bilang bahagi ng isang write transaction. Maaari ding magdagdag ng mga nested field at idaragdag din ang mga ito sa dulo ng mga kaukulang column ng structure.

Maaaring gamitin ng mga inhinyero ng petsa at data scientist ang opsyong ito para magdagdag ng mga bagong column (marahil isang kamakailang sinusubaybayang sukatan o column ng performance ng benta ngayong buwan) sa kanilang mga umiiral nang machine learning production table nang hindi sinisira ang mga kasalukuyang modelo batay sa mga lumang column.

Ang mga sumusunod na uri ng mga pagbabago sa schema ay pinapayagan bilang bahagi ng ebolusyon ng schema sa panahon ng pagdaragdag o muling pagsulat ng talahanayan:

  • Pagdaragdag ng mga bagong column (ito ang pinakakaraniwang senaryo)
  • Pagbabago ng mga uri ng data mula sa NullType -> anumang iba pang uri o pag-promote mula sa ByteType -> ShortType -> IntegerType

Ang iba pang mga pagbabagong hindi pinapayagan sa loob ng ebolusyon ng schema ay nangangailangan na ang schema at data ay muling isulat sa pamamagitan ng pagdaragdag .option("overwriteSchema", "true"). Halimbawa, sa kaso kung saan ang column na "Foo" ay orihinal na isang integer at ang bagong schema ay isang string data type, ang lahat ng Parquet(data) file ay kailangang muling isulat. Kabilang sa mga naturang pagbabago ang:

  • pagtanggal ng column
  • pagbabago ng uri ng data ng isang umiiral na column (in-place)
  • pagpapalit ng pangalan sa mga column na naiiba lamang sa kaso (halimbawa, "Foo" at "foo")

Sa wakas, sa susunod na paglabas ng Spark 3.0, ganap na susuportahan ang tahasang DDL (gamit ang ALTER TABLE), na nagpapahintulot sa mga user na gawin ang mga sumusunod na pagkilos sa mga schema ng talahanayan:

  • pagdaragdag ng mga column
  • pagbabago ng mga komento sa column
  • pagtatakda ng mga katangian ng talahanayan na kumokontrol sa gawi ng talahanayan, tulad ng pagtatakda ng tagal ng oras na nakaimbak ang isang log ng transaksyon.

Ano ang pakinabang ng circuit evolution?

Maaaring gamitin ang ebolusyon ng schema sa tuwing ikaw balak baguhin ang schema ng iyong talahanayan (kumpara sa kapag hindi mo sinasadyang nagdagdag ng mga column sa iyong DataFrame na hindi dapat naroroon). Ito ang pinakamadaling paraan upang i-migrate ang iyong schema dahil awtomatiko nitong idinaragdag ang mga tamang pangalan ng column at mga uri ng data nang hindi kinakailangang tahasang ideklara ang mga ito.

Konklusyon

Tinatanggihan ng pagpapatupad ng schema ang anumang bagong column o iba pang pagbabago sa schema na hindi tugma sa iyong talahanayan. Sa pamamagitan ng pagtatakda at pagpapanatili ng mga matataas na pamantayang ito, mapagkakatiwalaan ng mga analyst at engineer na ang kanilang data ay may pinakamataas na antas ng integridad, malinaw at malinaw na ipinapahayag ito, na nagpapahintulot sa kanila na gumawa ng mas mahusay na mga desisyon sa negosyo.

Sa kabilang banda, ang ebolusyon ng schema ay umaakma sa pagpapatupad sa pamamagitan ng pagpapasimple diumano awtomatikong pagbabago ng schema. Pagkatapos ng lahat, hindi dapat mahirap magdagdag ng column.

Ang sapilitang aplikasyon ng scheme ay yang, kung saan ang ebolusyon ng scheme ay yin. Kapag ginamit nang magkasama, ginagawang mas madali ng mga feature na ito ang pagsugpo ng ingay at pag-tune ng signal kaysa dati.

Nais din naming pasalamatan sina Mukul Murthy at Pranav Anand para sa kanilang mga kontribusyon sa artikulong ito.

Iba pang mga artikulo sa seryeng ito:

Sumisid sa Delta Lake: Pag-unpack ng Log ng Transaksyon

Mga Kaugnay na Artikulo

Production-grade machine learning sa Delta Lake

Ano ang isang data lake?

Alamin ang higit pa tungkol sa kurso

Pinagmulan: www.habr.com

Magdagdag ng komento