Delta Lake Dive: Pagpapatupad at Ebolusyon ng Scheme

Hello, Habr! Ipinakita ko sa iyong pansin ang isang pagsasalin ng artikulo "Dive Into Delta Lake: Schema Enforcement & Evolution" ni Burak Yavuz, Brenner Heintz at Denny Lee, na inihanda bilang pag-asam ng paglulunsad ng kurso Data Engineer mula sa OTUS.

Delta Lake Dive: Pagpapatupad at Ebolusyon ng Scheme

Ang data, tulad ng aming karanasan, ay patuloy na nag-iipon at nagbabago. Upang makasabay, ang ating mga mental na modelo ng mundo ay dapat umangkop sa bagong data, ang ilan sa mga ito ay naglalaman ng mga bagong dimensyon—mga bagong paraan ng pagmamasid sa mga bagay na hindi natin alam noon. Ang mga mental na modelong ito ay hindi katulad ng mga schema sa mga spreadsheet na tumutukoy kung paano namin inuuri at pinoproseso ang bagong impormasyon.

Dinadala tayo nito sa isyu ng pamamahala ng schema. Habang nagbabago ang mga layunin at kinakailangan sa negosyo sa paglipas ng panahon, gayundin ang istraktura ng iyong data. Pinapadali ng Delta Lake ang pagpasok ng mga bagong dimensyon habang nagbabago ang data. May access ang mga user sa mga simpleng semantika para sa pamamahala ng kanilang mga schema ng talahanayan. Kasama sa mga tool na ito ang Schema Enforcement, na nagpoprotekta sa mga user mula sa hindi sinasadyang pagkalat ng kanilang mga talahanayan ng mga error o hindi kinakailangang data, at Schema Evolution, na awtomatikong nagdaragdag ng mga bagong column na naglalaman ng mahalagang data sa mga naaangkop na lokasyon. Sa artikulong ito, sumisid tayo nang mas malalim sa paggamit ng mga tool na ito.

Pag-unawa sa mga schema ng talahanayan

Ang bawat DataFrame sa Apache Spark ay naglalaman ng schema na tumutukoy sa form ng data, gaya ng mga uri ng data, column, at metadata. Sa Delta Lake, iniimbak ang schema ng talahanayan sa JSON na format sa loob ng log ng transaksyon.

Ano ang pagpapatupad ng scheme?

Ang Schema Enforcement, na kilala rin bilang Schema Validation, ay isang mekanismo ng seguridad sa Delta Lake na nagsisiguro sa kalidad ng data sa pamamagitan ng pagtanggi sa mga talaan na hindi sumusunod sa schema ng talahanayan. Tulad ng isang hostess sa isang sikat na restaurant na tumatanggap lang ng mga reservation, sinusuri ng Schema Enforcement kung ang bawat column ng data na ipinasok sa talahanayan ay nasa isang katumbas na listahan ng mga inaasahang column (sa madaling salita, kung mayroong "reservation" para sa bawat isa) at tinatanggihan ang anumang mga record na may mga column na wala sa listahan.

Paano gumagana ang pagpapatupad ng schema?

Gumagamit ang Delta Lake ng schema-on-write checking, ibig sabihin, lahat ng bagong pagsusulat sa isang talahanayan ay sinusuri para sa pagiging tugma sa schema ng target na talahanayan sa oras ng pagsulat. Kung hindi tugma ang schema, ganap na i-abort ng Delta Lake ang transaksyon (walang nakasulat na data) at magtataas ng exception para ipaalam sa user ang hindi pagkakapare-pareho.
Ginagamit ng Delta Lake ang mga sumusunod na panuntunan upang matukoy kung ang isang tala ay tugma sa isang talahanayan. Ang DataFrame na isinusulat:

  • Hindi ito maaaring maglaman ng mga karagdagang column na wala sa schema ng target na talahanayan. Sa kabaligtaran, mainam kung ang papasok na data ay hindi naglalaman ng bawat column mula sa talahanayan—ang mga column na iyon ay itatalaga lamang ng mga null value.
  • hindi maaaring magkaroon ng mga uri ng data ng column na naiiba sa mga uri ng data ng column sa target na talahanayan. Kung ang isang column sa target na table ay naglalaman ng StringType data, ngunit ang kaukulang column sa DataFrame ay naglalaman ng IntegerType data, ang pagpapatupad ng schema ay maglalagay ng exception at pipigilan ang write operation na mangyari.
  • hindi maaaring maglaman ng mga pangalan ng column na naiiba lamang kung sakali. Nangangahulugan ito na hindi ka maaaring magkaroon ng mga column na pinangalanang 'Foo' at 'foo' na tinukoy sa parehong talahanayan. Habang maaaring gamitin ang Spark sa case-sensitive o case-insensitive (bilang default), pinapanatili ng Delta Lake ang case ngunit case-insensitive kapag nag-iimbak ng schema. Ang parquet ay case-sensitive kapag nag-iimbak at kumukuha ng impormasyon ng column. Upang maiwasan ang mga potensyal na error, katiwalian ng data, o pagkawala ng data (na personal naming naranasan sa Databricks), nagpasya kaming idagdag ang paghihigpit na ito.

Upang ilarawan ito, tingnan natin kung ano ang mangyayari sa code sa ibaba kapag sinubukan nating magdagdag ng ilang bagong nabuong column sa isang talahanayan ng Delta Lake na hindi pa naka-configure upang tanggapin ang mga ito.

# Сгенерируем DataFrame ссуд, который мы добавим в нашу таблицу Delta Lake
loans = sql("""
            SELECT addr_state, CAST(rand(10)*count as bigint) AS count,
            CAST(rand(10) * 10000 * count AS double) AS amount
            FROM loan_by_state_delta
            """)

# Вывести исходную схему DataFrame
original_loans.printSchema()

root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
 
# Вывести новую схему DataFrame
loans.printSchema()
 
root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
  |-- amount: double (nullable = true) # new column
 
# Попытка добавить новый DataFrame (с новым столбцом) в существующую таблицу
loans.write.format("delta") 
           .mode("append") 
           .save(DELTALAKE_PATH)

Returns:

A schema mismatch detected when writing to the Delta table.
 
To enable schema migration, please set:
'.option("mergeSchema", "true")'
 
Table schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
 
Data schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
-- amount: double (nullable = true)
 
If Table ACLs are enabled, these options will be ignored. Please use the ALTER TABLE command for changing the schema.

Sa halip na awtomatikong magdagdag ng mga bagong column, nagpapatupad ang Delta Lake ng schema at huminto sa pagsusulat. Upang makatulong na matukoy kung aling column (o mga hanay ng mga column) ang nagdudulot ng pagkakaiba, ipinapakita ng Spark ang parehong mga schema mula sa stack trace para sa paghahambing.

Ano ang pakinabang ng pagpapatupad ng iskema?

Dahil ang pagpapatupad ng schema ay isang medyo mahigpit na pagsusuri, ito ay isang mahusay na tool para sa paggamit bilang isang gatekeeper sa isang malinis, ganap na binagong dataset na handa para sa produksyon o pagkonsumo. Karaniwan itong inilalapat sa mga talahanayan na direktang nagpapakain ng data:

  • Mga algorithm ng machine learning
  • Mga dashboard ng BI
  • Mga tool sa analytics at visualization ng data
  • Anumang production system na nangangailangan ng mahigpit na structured, mahigpit na na-type na semantic schemas.

Upang ihanda ang kanilang data para sa huling hadlang na ito, maraming user ang gumagamit ng simpleng "multi-hop" na arkitektura na unti-unting nagpapakilala ng istraktura sa kanilang mga talahanayan. Para sa karagdagang impormasyon, maaari mong basahin ang artikulo Production-grade machine learning sa Delta Lake.

Siyempre, maaari mong gamitin ang pagpapatupad ng schema saanman sa iyong pipeline, ngunit tandaan na ang pagsulat sa isang talahanayan sa kasong ito ay maaaring nakakadismaya, halimbawa, dahil nakalimutan mong nagdagdag ka ng isa pang column sa papasok na data.

Pag-iwas sa liquefaction ng data

Sa puntong ito, maaaring nagtataka ka kung tungkol saan ang lahat ng kaguluhan? Pagkatapos ng lahat, kung minsan ang isang hindi inaasahang "schema mismatch" na error ay maaaring masira sa iyong daloy ng trabaho, lalo na kung bago ka sa Delta Lake. Bakit hindi na lang hayaang magbago ang schema kung kinakailangan para maisulat ko ang aking DataFrame kahit na ano?

Tulad ng sinasabi ng matandang kasabihan, "isang onsa ng pag-iwas ay nagkakahalaga ng kalahating kilong lunas." Sa isang punto, kung hindi ka mag-iingat na ipatupad ang iyong schema, ang mga isyu sa compatibility sa uri ng data ay mauuwi sa kanilang mga pangit na ulo—na tila homogenous na raw data source ay maaaring maglaman ng mga edge case, mga corrupt na column, mga malformed mapping, o iba pang bangungot. Ang pinakamahusay na diskarte ay upang pigilan ang mga kaaway na ito sa gate-na may pagpapatupad ng schema-at harapin ang mga ito sa bukas, sa halip na mamaya, kapag nagsimula silang magtago sa madilim na kailaliman ng iyong production code.

Tinitiyak ng pagpapatupad ng schema na hindi magbabago ang schema ng iyong talahanayan maliban kung tahasan mong aprubahan ang pagbabago. Pinipigilan nito ang pagbabanto ng data, na maaaring mangyari kapag ang mga bagong column ay idinagdag nang napakadalas na ang dating mahalaga, compact na mga talahanayan ay nawawalan ng kahulugan at pagiging kapaki-pakinabang dahil sa baha ng data. Sa pamamagitan ng paghikayat sa iyong maging sinasadya, magtakda ng matataas na pamantayan, at umasa sa mataas na kalidad, ginagawa ng pagpapatupad ng schema kung ano mismo ang nilalayon nitong gawin—tumutulong sa iyong mapanatili ang integridad at panatilihing malinis ang iyong mga talahanayan.

Kung sa karagdagang pagsasaalang-alang ay nagpasya ka na talagang gagawin mo kailangan Ang pagdaragdag ng bagong column ay walang problema; isang linyang pag-aayos ay ibinigay sa ibaba. Ang solusyon ay schema evolution!

Ano ang schema evolution?

Ang ebolusyon ng schema ay isang tampok na nagbibigay-daan sa mga user na madaling baguhin ang kasalukuyang schema ng talahanayan upang ma-accommodate ang pagbabago ng data sa paglipas ng panahon. Ito ay pinakakaraniwang ginagamit sa panahon ng pagpasok o pagsulat muli upang awtomatikong iakma ang schema upang magsama ng isa o higit pang mga bagong column.

Paano gumagana ang schema evolution?

Kasunod ng halimbawa mula sa nakaraang seksyon, madaling magamit ng mga developer ang schema evolution upang magdagdag ng mga bagong column na dati nang tinanggihan dahil sa hindi pagsunod sa schema. Ang ebolusyon ng schema ay isinaaktibo sa pamamagitan ng pagdaragdag .option('mergeSchema', 'true') sa iyong Spark team .write или .writeStream.

# Добавьте параметр mergeSchema
loans.write.format("delta") 
           .option("mergeSchema", "true") 
           .mode("append") 
           .save(DELTALAKE_SILVER_PATH)

Upang tingnan ang graph, patakbuhin ang sumusunod na query sa Spark SQL

# Создайте график с новым столбцом, чтобы подтвердить, что запись прошла успешно
%sql
SELECT addr_state, sum(`amount`) AS amount
FROM loan_by_state_delta
GROUP BY addr_state
ORDER BY sum(`amount`)
DESC LIMIT 10

Delta Lake Dive: Pagpapatupad at Ebolusyon ng Scheme
Bilang kahalili, maaari mong itakda ang opsyong ito para sa buong session ng Spark sa pamamagitan ng pagdaragdag spark.databricks.delta.schema.autoMerge = True sa pagsasaayos ng Spark. Gayunpaman, gamitin ito nang may pag-iingat, dahil hindi ka na babalaan ng pagpapatupad ng schema tungkol sa hindi sinasadyang mga hindi pagkakapare-pareho ng schema.

Sa pamamagitan ng pagsasama ng isang parameter sa kahilingan mergeSchemaAng lahat ng column na nasa DataFrame ngunit nawawala sa target na talahanayan ay awtomatikong idinagdag sa schema sa panahon ng write transaction. Maaari ding magdagdag ng mga nested field, at idaragdag din ang mga ito sa mga kaukulang column sa structure.

Maaaring gamitin ng mga data engineer at scientist ang opsyong ito para magdagdag ng mga bagong column (marahil isang kamakailang sinusubaybayang sukatan o column ng mga numero ng benta para sa buwang ito) sa kanilang mga umiiral nang machine learning production table nang hindi sinisira ang mga kasalukuyang modelo batay sa mga lumang column.

Ang mga sumusunod na uri ng mga pagbabago sa schema ay pinapayagan bilang bahagi ng ebolusyon ng schema sa panahon ng pagpapasok o muling pagsusulat ng talahanayan:

  • Pagdaragdag ng mga bagong column (ito ang pinakakaraniwang senaryo)
  • Pagbabago ng mga uri ng data mula sa NullType -> anumang iba pang uri o pag-promote mula sa ByteType -> ShortType -> IntegerType

Ang iba pang mga pagbabago na hindi pinapayagan sa loob ng ebolusyon ng schema ay nangangailangan ng schema at data na ma-overwrite sa pamamagitan ng pagdaragdag .option("overwriteSchema", "true")Halimbawa, kung ang column na "Foo" ay orihinal na isang integer, at ang bagong schema ay isang uri ng data ng string, ang lahat ng Parquet (data) na file ay kailangang muling isulat. Kabilang sa mga naturang pagbabago ang:

  • pagtanggal ng column
  • pagbabago ng uri ng data ng isang umiiral na column (nasa lugar)
  • pagpapalit ng pangalan sa mga column na naiiba lamang sa kaso (hal. "Foo" at "foo")

Sa wakas, sa susunod na release, ang Spark 3.0, ang tahasang DDL (gamit ang ALTER TABLE) ay ganap na susuportahan, na nagpapahintulot sa mga user na gawin ang mga sumusunod na aksyon sa mga schema ng talahanayan:

  • pagdaragdag ng mga column
  • pagbabago ng mga komento sa column
  • Pagtatakda ng mga katangian ng talahanayan na tumutukoy kung paano kumikilos ang talahanayan, gaya ng pagtatakda ng panahon ng pagpapanatili ng log ng transaksyon.

Ano ang pakinabang ng ebolusyon ng schema?

Maaaring gamitin ang ebolusyon ng schema sa tuwing ikaw balak Baguhin ang schema ng iyong talahanayan (kumpara sa aksidenteng pagdaragdag ng mga column sa iyong DataFrame na hindi dapat naroroon). Ito ang pinakamadaling paraan upang i-migrate ang iyong schema dahil awtomatiko nitong idinaragdag ang mga tamang pangalan ng column at mga uri ng data nang hindi kinakailangang ideklara ang mga ito nang tahasan.

Konklusyon

Tinatanggihan ng pagpapatupad ng schema ang anumang mga bagong column o iba pang pagbabago sa schema na hindi tugma sa iyong talahanayan. Sa pamamagitan ng pagtatakda at pagpapanatili ng matataas na pamantayang ito, ang mga analyst at inhinyero ay maaaring magtiwala na ang kanilang data ay may pinakamataas na antas ng integridad, na nangangatuwiran tungkol dito nang malinaw at maigsi, na nagbibigay-daan sa kanila na gumawa ng mas epektibong mga desisyon sa negosyo.

Sa kabilang banda, ang ebolusyon ng scheme ay umaakma sa pagpapatupad sa pamamagitan ng pagpapasimple diumano Mga awtomatikong pagbabago sa schema. Pagkatapos ng lahat, ang pagdaragdag ng isang column ay hindi dapat ganoon kahirap.

Ang circuit enforcement ay ang yang sa circuit ng yin ng ebolusyon. Kapag ginamit nang magkasama, ginagawang mas madali ng mga feature na ito ang pagsugpo ng ingay at pag-tune ng signal kaysa dati.

Nais din naming pasalamatan sina Mukul Murthy at Pranav Anand para sa kanilang mga kontribusyon sa artikulong ito.

Iba pang mga artikulo sa seryeng ito:

Paglilibot sa Delta Lake: Pag-unpack ng Log ng Transaksyon

Play na video

Mga kaugnay na artikulo

Production-grade machine learning sa Delta Lake

Ano ang isang data lake?

Alamin ang higit pa tungkol sa kurso

Pinagmulan: www.habr.com

Magdagdag ng komento