Plonje nan Delta Lake: Ranfòsman Schema ak Evolisyon

Hey Habr! Mwen prezante nan atansyon ou tradiksyon an nan atik la "Plonje nan Delta Lake: Ranfòsman Schema ak Evolisyon" otè Burak Yavuz, Brenner Heintz ak Denny Lee, ki te prepare nan patisipe nan kòmansman kou a. Enjenyè Done soti nan OTUS.

Plonje nan Delta Lake: Ranfòsman Schema ak Evolisyon

Done, tankou eksperyans nou an, toujou ap akimile ak evolye. Pou kenbe, modèl mantal nou yo nan mond lan dwe adapte yo ak nouvo done, kèk nan yo ki gen nouvo dimansyon-nouvo fason yo obsève bagay nou pa te gen okenn lide sou anvan. Modèl mantal sa yo pa anpil diferan de schémas tablo ki defini fason nou kategorize ak trete nouvo enfòmasyon yo.

Sa a mennen nou nan pwoblèm nan nan jesyon chema. Kòm objektif biznis ak kondisyon yo chanje sou tan, se konsa estrikti nan done ou yo. Delta Lake rann li fasil pou aplike nouvo mezi kòm done yo chanje. Itilizatè yo gen aksè a senp semantik pou jere chema tab yo. Zouti sa yo gen ladan Schema Enforcement, ki pwoteje itilizatè yo kont envolontè polisyon tab yo ak erè oswa done ki pa nesesè, ak Schema Evolution, ki pèmèt nouvo kolòn nan done ki gen anpil valè yo dwe otomatikman ajoute nan kote ki apwopriye yo. Nan atik sa a, nou pral fouye nan itilizasyon zouti sa yo.

Konprann chema tab yo

Chak DataFrame nan Apache Spark gen yon chema ki defini fòm done yo tankou kalite done, kolòn, ak metadata. Avèk Delta Lake, chema tab la estoke nan fòma JSON andedan jounal tranzaksyon an.

Ki sa ki se Schema Enforcement?

Ranfòsman Schema, ke yo rele tou Validasyon Schema, se yon mekanis pwoteksyon nan Delta Lake ki garanti bon jan kalite done lè li rejte dosye ki pa matche ak chema tab la. Tankou yon otès nan biwo devan yon restoran popilè ki aksepte rezèvasyon sèlman, li tcheke pou wè si chak kolòn done ki antre nan tablo a nan lis korespondan kolòn espere (sa vle di, si gen yon "rezèvasyon" pou chak nan yo), epi li rejte nenpòt antre ki gen kolòn ki pa nan lis la.

Ki jan ranfòsman chema travay?

Delta Lake itilize validation chema sou ekriti, ki vle di ke tout nouvo ekriti sou tab la tcheke pou konpatibilite ak chema tab sib la nan moman ekriti. Si chema a pa konsistan, Delta Lake konplètman ranvèse tranzaksyon an (pa gen okenn done ekri) epi li voye yon eksepsyon pou enfòme itilizatè a sou enkonsistans la.
Delta Lake itilize règ sa yo pou detèmine si yon dosye konpatib ak yon tab. Ekri DataFrame:

  • pa kapab genyen kolòn adisyonèl ki pa nan chema tab sib la. Kontrèman, tout bagay anfòm si done yo fèk ap rantre pa genyen absoliman tout kolòn ki soti nan tab la - kolòn sa yo pral tou senpleman asiyen zewo valè.
  • pa ka gen kalite done kolòn ki diferan de kalite done kolòn nan tablo sib la. Si yon kolòn nan tablo sib la gen done StringType, men kolòn ki koresponn lan nan DataFrame a gen done IntegerType, ranfòsman chema pral voye yon eksepsyon epi anpeche operasyon ekriti a fèt.
  • pa ka genyen non kolòn ki diferan sèlman nan ka. Sa vle di ke ou pa ka gen kolòn yo te rele 'Foo' ak 'foo' defini nan menm tablo a. Pandan ke Spark ka itilize nan ka-sansib oswa ka-sansib (defo a), Delta Lake se ka-konsève men ensansib nan depo chema. Partez yo sansib lè li estoke epi retounen enfòmasyon kolòn yo. Pou evite erè posib, koripsyon done oswa pèt done (ki nou pèsonèlman eksperyans nan Databricks), nou deside ajoute limit sa a.

Pou ilistre sa a, ann gade sa k ap pase nan kòd ki anba a lè w ap eseye ajoute kèk kolòn ki fèk pwodwi nan yon tablo Delta Lake ki poko konfigirasyon pou aksepte yo.

# Сгенерируем DataFrame ссуд, который мы добавим в нашу таблицу Delta Lake
loans = sql("""
            SELECT addr_state, CAST(rand(10)*count as bigint) AS count,
            CAST(rand(10) * 10000 * count AS double) AS amount
            FROM loan_by_state_delta
            """)

# Вывести исходную схему DataFrame
original_loans.printSchema()

root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
 
# Вывести новую схему DataFrame
loans.printSchema()
 
root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
  |-- amount: double (nullable = true) # new column
 
# Попытка добавить новый DataFrame (с новым столбцом) в существующую таблицу
loans.write.format("delta") 
           .mode("append") 
           .save(DELTALAKE_PATH)

Returns:

A schema mismatch detected when writing to the Delta table.
 
To enable schema migration, please set:
'.option("mergeSchema", "true")'
 
Table schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
 
Data schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
-- amount: double (nullable = true)
 
If Table ACLs are enabled, these options will be ignored. Please use the ALTER TABLE command for changing the schema.

Olye otomatikman ajoute nouvo kolòn, Delta Lake ranfòse yon chema epi sispann anrejistreman. Pou ede detèmine ki kolòn (oswa seri yo) ki lakòz dezakò a, Spark parèt tou de chema nan pil tras la pou konparezon.

Ki benefis ki genyen nan ranfòsman chema?

Piske ranfòsman chema se yon chèk jistis solid, li se yon bon zouti pou itilize kòm yon gadyen nan yon seri done pwòp, konplètman transfòme ki pare pou pwodui oswa konsome. Tipikman aplike nan tab ki bay done dirèkteman:

  • Algoritm aprantisaj machin
  • BI tablodbò
  • Zouti analiz done ak vizyalizasyon
  • Nenpòt sistèm pwodiksyon ki mande trè estriktire, fòtman tape chema semantik.

Pou prepare done yo pou dènye obstak sa a, anpil itilizatè yo itilize yon senp achitekti "milti-hop" ki piti piti entwodui estrikti nan tab yo. Pou aprann plis sou sa, ou ka li atik la Aprantisaj machin klas pwodiksyon ak Delta Lake.

Natirèlman, ranfòsman chema ka itilize nenpòt kote nan tiyo ou a, men kenbe nan tèt ou ke difizyon sou tab la ka fwistre nan ka sa a, paske, pou egzanp, ou bliye ke ou ajoute yon lòt kolòn nan done yo antre.

Done eklèsi prevansyon

Nan pwen sa a, ou ta ka mande poukisa battage a? Apre yo tout, pafwa yon erè inatandi "dezakò nan chema" ka twonpe ou nan workflow ou a, espesyalman si w se nouvo nan Delta Lake. Poukisa nou pa jis kite chema a chanje jan sa nesesè pou mwen ka ekri DataFrame mwen an kèlkeswa sa?

Kòm ansyen pwovèb la di, "Yon ons prevansyon vo yon liv gerizon." Nan kèk pwen, si ou pa pran swen pou ranfòse chema ou a, pwoblèm konpatibilite kalite done yo pral dèyè tèt lèd yo - sous done anvan tout koreksyon w sanble omojèn ka gen ladan ka kwen, kolòn kase, kat malfòme, oswa lòt bagay redoutable ke ou rèv. nan move rèv. Pi bon apwòch la se sispann lènmi sa yo nan pòtay la - ak ranfòsman chema - ak fè fas ak yo nan limyè a, pa pita lè yo kòmanse pwowling fon lanmè yo fè nwa nan kòd pwodiksyon ou a.

Ranfòsman chema ba ou konfyans ke chema tab ou a pa pral chanje sof si ou konfime chanjman an tèt ou. Sa a anpeche dilution done ki ka rive lè yo ajoute nouvo kolòn souvan ke tab konprese ki te gen anpil valè yo pèdi valè ak itilite yo akòz inondasyon done yo. Lè w ankouraje w pou w fè entansyonèl, fikse estanda ki wo, epi atann bon jan kalite, ranfòsman chema fè egzakteman sa li te fèt pou fè—ede w rete konsyan epi kenbe fèy calcul ou pwòp.

Si, sou plis konsiderasyon, ou deside ke ou reyèlman bezwen ajoute yon nouvo kolòn - pa gen pwoblèm, anba a se yon ranje yon sèl liy. Solisyon an se evolisyon sikwi!

Ki sa ki evolisyon chema?

Evolisyon chema se yon karakteristik ki pèmèt itilizatè yo chanje fasilman chema aktyèl la nan yon tab pou matche ak done ki chanje sou tan. Li pi souvan itilize lè w ap fè yon operasyon ajoute oswa ranplase pou otomatikman adapte chema a pou enkli youn oswa plis kolòn nouvo.

Ki jan evolisyon chema travay?

Apre egzanp nan seksyon anvan an, devlopè yo ka fasilman itilize evolisyon chema pou ajoute nouvo kolòn ki te deja rejte akòz enkonsistans chema. Evolisyon sikwi aktive pa ajoute .option('mergeSchema', 'true') nan ekip Spark ou a .write или .writeStream.

# Добавьте параметр mergeSchema
loans.write.format("delta") 
           .option("mergeSchema", "true") 
           .mode("append") 
           .save(DELTALAKE_SILVER_PATH)

Pou wè graf la, kouri rechèch Spark SQL sa a

# Создайте график с новым столбцом, чтобы подтвердить, что запись прошла успешно
%sql
SELECT addr_state, sum(`amount`) AS amount
FROM loan_by_state_delta
GROUP BY addr_state
ORDER BY sum(`amount`)
DESC LIMIT 10

Plonje nan Delta Lake: Ranfòsman Schema ak Evolisyon
Altènativman, ou ka mete opsyon sa a pou tout sesyon Spark la lè w ajoute spark.databricks.delta.schema.autoMerge = True nan konfigirasyon Spark la. Men, sèvi ak sa a ak prekosyon, kòm ranfòsman chema p ap avèti ou ankò sou enkonsistans chema envolontè.

Lè w enkli yon paramèt nan demann lan mergeSchema, tout kolòn ki prezan nan DataFrame a men ki pa prezan nan tab la sib yo otomatikman ajoute nan fen chema a kòm yon pati nan tranzaksyon an ekri. Jaden anbrike yo ka ajoute tou, epi sa yo pral ajoute tou nan fen kolòn estrikti ki koresponn yo.

Enjenyè dat yo ak syantis done yo ka itilize opsyon sa a pou ajoute nouvo kolòn (petèt yon metrik ki fèk swiv oswa kolòn chif lavant mwa sa a) nan tab pwodiksyon aprantisaj machin ki egziste deja yo san yo pa kraze modèl ki deja egziste ki baze sou ansyen kolòn yo.

Kalite chanjman sa yo nan chema yo gen dwa kòm yon pati nan yon evolisyon chema pandan y ap ajoute oswa ranplase yon tablo:

  • Ajoute nouvo kolòn (sa a se senaryo ki pi komen)
  • Chanje kalite done ki soti nan NullType -> nenpòt lòt kalite oswa pwomosyon soti nan ByteType -> ShortType -> IntegerType

Lòt chanjman ki pa pèmèt kòm yon pati nan evolisyon chema mande pou chema a ak done dwe ranplase pa ajoute .option("overwriteSchema", "true"). Pou egzanp, nan ka kote kolòn "Foo" la te orijinèlman yon nonb antye relatif ak nouvo chema a ta dwe yon kalite done fisèl, Lè sa a, tout fichye Parquet (done) ta bezwen ranplase. Chanjman sa yo enkli:

  • efase yon kolòn
  • chanje kalite done yon kolòn ki deja egziste (an plas)
  • chanje non kolòn ki diferan sèlman nan ka (pa egzanp, "Foo" ak "foo")

Finalman, ak pwochen lage Spark 3.0, DDL eksplisit (lè l sèvi avèk ALTER TABLE) pral konplètman sipòte, ki pèmèt itilizatè yo fè aksyon sa yo sou chema tab:

  • ajoute kolòn
  • chanje kòmantè kolòn yo
  • tabli pwopriyete tab la ki detèmine kijan tab la konpòte, tankou fikse konbyen tan yo kenbe tranzaksyon an.

Ki benefis evolisyon chema?

Ka evolisyon chema dwe itilize chak fwa ou gen entansyon chanje chema tab ou a (kontrèman ak lè ou te ajoute aksidantèlman kolòn nan DataFrame ou a ki pa ta dwe la). Sa a se fason ki pi fasil pou imigre chema ou a paske li otomatikman ajoute non kolòn kòrèk yo ak kalite done san yo pa oblije deklare yo klèman.

Konklizyon

Ranfòsman Schema rejte nenpòt nouvo kolòn oswa lòt chanjman chema ki pa konpatib ak tab ou a. Lè yo tabli ak kenbe estanda wo sa yo, analis ak enjenyè ka konte sou done yo pou yo gen pi wo nivo entegrite, rezònman sou li klè ak kout, sa ki pèmèt yo pran pi bon desizyon biznis.

Nan lòt men an, evolisyon chema konplete ranfòsman pa senplifye sipoze chanjman otomatik nan chema. Apre yo tout, li pa ta dwe difisil pou ajoute yon kolòn.

Ranfòsman chema se yang, kote evolisyon chema yo yin. Lè yo itilize ansanm, karakteristik sa yo fè rediksyon bri ak akor siyal pi fasil pase tout tan.

Nou ta renmen remèsye tou Mukul Murthy ak Pranav Anand pou kontribisyon yo nan atik sa a.

Lòt atik nan seri sa a:

Plonje nan Delta Lake: debalaj jounal tranzaksyon an

Atik ki gen rapò

Aprantisaj machin klas pwodiksyon ak Delta Lake

Ki sa ki se yon lak done?

Aprann plis sou kou a

Sous: www.habr.com

Add nouvo kòmantè