ãããããã«ïŒ èšäºã®ç¿»èš³ãã玹ä»ããŸã
ããŒã¿ã¯ç§ãã¡ã®çµéšãšåæ§ãåžžã«èç©ãããé²åããŠããŸãã ããã«è¿œãã€ãããã«ã¯ãç§ãã¡ã®äžçã®ã¡ã³ã¿ã« ã¢ãã«ãæ°ããããŒã¿ã«é©å¿ããå¿ èŠãããããã®äžã«ã¯æ°ãã次å ãã€ãŸãç§ãã¡ããããŸã§ç¥ããªãã£ããã®ã芳å¯ããæ°ããæ¹æ³ãå«ãŸããŠãããã®ããããŸãã ãããã®ã¡ã³ã¿ã« ã¢ãã«ã¯ãæ°ããæ å ±ãã©ã®ããã«åé¡ããŠåŠçãããã決å®ããããŒãã« ã¹ããŒããšããŸãå€ãããŸããã
ããã§ãã¹ããŒã管çã®åé¡ãçããŸãã ããžãã¹ã®èª²é¡ãèŠä»¶ãæéã®çµéãšãšãã«å€åããã«ã€ããŠãããŒã¿ã®æ§é ãå€åããŸãã Delta Lake ã䜿çšãããšãããŒã¿ã®å€åã«å¿ããŠæ°ãã枬å®å€ãç°¡åã«å°å ¥ã§ããŸãã ãŠãŒã¶ãŒã¯ãããŒãã« ã¹ããŒãã管çããããã®åçŽãªã»ãã³ãã£ã¯ã¹ã«ã¢ã¯ã»ã¹ã§ããŸãã ãããã®ããŒã«ã«ã¯ããŠãŒã¶ãŒããšã©ãŒãäžèŠãªããŒã¿ã§ããŒãã«ãæå³ããæ±æããªãããã«ä¿è·ãã Schema Enforcement ãšã貎éãªããŒã¿ã®æ°ããåãé©åãªå Žæã«èªåçã«è¿œå ã§ããããã«ãã Schema Evolution ãå«ãŸããŸãã ãã®èšäºã§ã¯ããããã®ããŒã«ã®äœ¿çšæ³ã«ã€ããŠè©³ãã説æããŸãã
ããŒãã«ã¹ããŒããç解ãã
Apache Spark ã®å DataFrame ã«ã¯ãããŒã¿åãåãã¡ã¿ããŒã¿ãªã©ã®ããŒã¿ã®åœ¢åŒãå®çŸ©ããã¹ããŒããå«ãŸããŠããŸãã Delta Lake ã§ã¯ãããŒãã« ã¹ããŒãã¯ãã©ã³ã¶ã¯ã·ã§ã³ ãã°å
ã« JSON 圢åŒã§ä¿åãããŸãã
ã¹ããŒã ã®æœè¡ãšã¯äœã§ãã?
ã¹ããŒãåŒ·å¶ (ã¹ããŒãæ€èšŒãšãåŒã°ããŸã) ã¯ãããŒãã«ã®ã¹ããŒãã«äžèŽããªãã¬ã³ãŒããæåŠããããšã§ããŒã¿å質ãä¿èšŒãã Delta Lake ã®ã»ãã¥ãªã㣠ã¡ã«ããºã ã§ãã äºçŽå¶ã®äººæ°ã¬ã¹ãã©ã³ã®ããã³ããã¹ã¯ã®ãã¹ãã¹ã®ããã«ãããŒãã«ã«å ¥åãããããŒã¿ã®ååãã察å¿ããäºæ³ãããåã®ãªã¹ãã«å«ãŸããŠãããã©ãã (ã€ãŸããããããã®åã«ãäºçŽãããããã©ãã) ããã§ãã¯ããŸãã )ããªã¹ãã«ãªãåãå«ãã¬ã³ãŒãã¯æåŠãããŸãã
ã¹ããŒãã®åŒ·å¶ã¯ã©ã®ããã«æ©èœããŸãã?
Delta Lake ã¯æžã蟌ã¿æã®ã¹ããŒã ãã§ãã¯ã䜿çšããŸããããã¯ãããŒãã«ãžã®ãã¹ãŠã®æ°ããæžã蟌ã¿ããæžã蟌ã¿æã«ã¿ãŒã²ãã ããŒãã«ã®ã¹ããŒããšã®äºææ§ã«ã€ããŠãã§ãã¯ãããããšãæå³ããŸãã ã¹ããŒãã«ççŸãããå ŽåãDelta Lake ã¯ãã©ã³ã¶ã¯ã·ã§ã³ãå®å
šã«äžæ¢ã (ããŒã¿ã¯æžã蟌ãŸããŸãã)ãäŸå€ãçºçãããŠãŠãŒã¶ãŒã«ççŸãéç¥ããŸãã
Delta Lake ã¯ã次ã®ã«ãŒã«ã䜿çšããŠãã¬ã³ãŒããããŒãã«ãšäºææ§ããããã©ãããå€æããŸãã æžã蟌ã¿å¯èœãªããŒã¿ãã¬ãŒã :
- ã¿ãŒã²ããããŒãã«ã®ã¹ããŒãã«ãªãè¿œå ã®åãå«ããããšã¯ã§ããŸããã éã«ãåä¿¡ããŒã¿ã«ããŒãã«ã®ãã¹ãŠã®åãå®å šã«å«ãŸããŠããªãå Žåã¯ããã¹ãŠåé¡ãããŸããããããã®åã«ã¯ãåçŽã« null å€ãå²ãåœãŠãããŸãã
- ã¿ãŒã²ããããŒãã«ã®åã®ããŒã¿åãšç°ãªãåããŒã¿åãæã€ããšã¯ã§ããŸããã ã¿ãŒã²ãã ããŒãã«ã®åã« StringType ããŒã¿ãå«ãŸããŠããããDataFrame å ã®å¯Ÿå¿ããåã« IntegerType ããŒã¿ãå«ãŸããŠããå Žåãã¹ããŒãã®åŒ·å¶ã«ãã£ãŠäŸå€ãã¹ããŒãããæžã蟌ã¿æäœãå®è¡ãããªããªããŸãã
- 倧æåãšå°æåã®ã¿ãç°ãªãååãå«ããããšã¯ã§ããŸããã ããã¯ãåãããŒãã«å ã«ãFooããšãfooããšããååã®åãå®çŸ©ããããšã¯ã§ããªãããšãæå³ããŸãã Spark ã¯å€§æåãšå°æåãåºå¥ããã¢ãŒããŸãã¯å€§æåãšå°æåãåºå¥ããªã (ããã©ã«ã) ã¢ãŒãã§äœ¿çšã§ããŸãããDelta Lake ã¯å€§æåãšå°æåãä¿æããŸãããã¹ããŒã ã¹ãã¬ãŒãžå ã§ã¯åºå¥ãããŸããã Parquet ã¯ãåæ å ±ãæ ŒçŽããã³è¿ããšãã«å€§æåãšå°æåãåºå¥ããŸãã èµ·ããåŸããšã©ãŒãããŒã¿ç ŽæããŸãã¯ããŒã¿æ倱 (Databricks ã§å人çã«çµéšããããš) ãåé¿ããããã«ããã®å¶éãè¿œå ããããšã«ããŸããã
ããã説æããããã«ãæ°ããçæãããåããŸã åãå ¥ããããã«æ§æãããŠããªã Delta Lake ããŒãã«ã«è¿œå ããããšãããšãã«ã以äžã®ã³ãŒãã§äœãèµ·ããããèŠãŠã¿ãŸãããã
# СгеМеÑОÑÑеЌ DataFrame ÑÑÑÐŽ, кПÑПÑÑй ÐŒÑ ÐŽÐŸÐ±Ð°Ð²ÐžÐŒ в МаÑÑ ÑаблОÑÑ Delta Lake
loans = sql("""
SELECT addr_state, CAST(rand(10)*count as bigint) AS count,
CAST(rand(10) * 10000 * count AS double) AS amount
FROM loan_by_state_delta
""")
# ÐÑвеÑÑО ОÑÑ
ПЎМÑÑ ÑÑ
ÐµÐŒÑ DataFrame
original_loans.printSchema()
root
|-- addr_state: string (nullable = true)
|-- count: integer (nullable = true)
# ÐÑвеÑÑО МПвÑÑ ÑÑ
ÐµÐŒÑ DataFrame
loans.printSchema()
root
|-- addr_state: string (nullable = true)
|-- count: integer (nullable = true)
|-- amount: double (nullable = true) # new column
# ÐПпÑÑка ЎПбавОÑÑ ÐœÐŸÐ²Ñй DataFrame (Ñ ÐœÐŸÐ²ÑÐŒ ÑÑПлбÑПЌ) в ÑÑÑеÑÑвÑÑÑÑÑ ÑаблОÑÑ
loans.write.format("delta")
.mode("append")
.save(DELTALAKE_PATH)
Returns:
A schema mismatch detected when writing to the Delta table.
To enable schema migration, please set:
'.option("mergeSchema", "true")'
Table schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
Data schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
-- amount: double (nullable = true)
If Table ACLs are enabled, these options will be ignored. Please use the ALTER TABLE command for changing the schema.
æ°ããåãèªåçã«è¿œå ãã代ããã«ãDelta Lake ã¯ã¹ããŒãã匷å¶ããæžã蟌ã¿ãåæ¢ããŸãã ã©ã®å (ãŸãã¯åã®ã»ãã) ãäžäžèŽã®åå ãšãªã£ãŠããããå€æããããã«ãSpark ã¯æ¯èŒã®ããã«ã¹ã¿ã㯠ãã¬ãŒã¹ããäž¡æ¹ã®ã¹ããŒããåºåããŸãã
ã¹ããŒãã匷å¶ããå©ç¹ã¯äœã§ãã?
ã¹ããŒãã®é©çšã¯ããªãå³æ Œãªãã§ãã¯ã§ãããããéçšãŸãã¯äœ¿çšã®æºåãã§ããŠããã¯ãªãŒã³ã§å®å šã«å€æãããããŒã¿ ã»ãããžã®ã²ãŒãããŒããŒãšããŠäœ¿çšããã®ã«æé©ãªããŒã«ã§ãã éåžžã¯ãããŒã¿ãçŽæ¥ãã£ãŒãããããŒãã«ã«é©çšãããŸãã
- æ©æ¢°åŠç¿ã¢ã«ãŽãªãºã
- BI ããã·ã¥ããŒã
- ããŒã¿åæããã³èŠèŠåããŒã«
- é«åºŠã«æ§é åãããå³å¯ã«åæå®ãããã»ãã³ãã£ã㯠ã¹ããŒããå¿ èŠãšããå®çšŒåã·ã¹ãã ã
ãã®æåŸã®ããŒãã«ã«åããŠããŒã¿ãæºåããããã«ãå€ãã®ãŠãŒã¶ãŒã¯ãããŒãã«ã«åŸã
ã«æ§é ãå°å
¥ããåçŽãªããã«ãããããã¢ãŒããã¯ãã£ã䜿çšããŸãã ããã«ã€ããŠè©³ããã¯ãèšäºãã芧ãã ããã
ãã¡ãããã¹ããŒãã®é©çšã¯ãã€ãã©ã€ã³ã®ã©ãã§ã䜿çšã§ããŸããããã®å ŽåãããŒãã«ãžã®ã¹ããªãŒãã³ã°ã¯ã€ã©ã€ã©ããå¯èœæ§ãããããšã«æ³šæããŠãã ãããããšãã°ãåä¿¡ããŒã¿ã«å¥ã®åãè¿œå ããããšãå¿ããŠããããã§ãã
ããŒã¿ã®åžèåãé²ã
ãããŸã§ã§ãäžäœäœã倧éšãããŠããã®ããšçåã«æã£ãŠãããããããŸããã çµå±ã®ãšãããç¹ã« Delta Lake ãåããŠäœ¿çšããå Žåã¯ãäºæããªããã¹ããŒãã®äžäžèŽããšã©ãŒã«ãã£ãŠã¯ãŒã¯ãããŒãã€ãŸã¥ããŠããŸãããšããããŸãã å¿ èŠã«å¿ããŠã¹ããŒããå€æŽããŠãäœããã£ãŠã DataFrame ãèšè¿°ã§ããããã«ããŠã¯ã©ãã§ãããã?
å€ãããšããã«ããããã«ããXNUMX ãªã³ã¹ã®äºé²ã¯ XNUMX ãã³ãã®æ²»çã«å¹æµããŸããã ã¹ããŒãã®é©çšã«æ³šæããªããšãããæç¹ã§ãããŒã¿åã®äºææ§ã®åé¡ãåä»ãªåé¡ãšããŠæµ®äžããŸããäžèŠå質ãªçããŒã¿ ãœãŒã¹ã«ã¯ããšããž ã±ãŒã¹ãç Žæããåãäžæ£ãªãããã³ã°ããŸãã¯ãã®ä»ã®æãããäºæ ãå«ãŸããŠããå¯èœæ§ããããŸããæªå€¢ã æåã®ã¢ãããŒãã¯ãã¹ããŒãã®åŒ·å¶ã䜿çšããŠããããã®æµãéåã§é»æ¢ããåŸã§æ¬çªã³ãŒãã®æãæ·±ãã«æœã¿å§ãããšãã«å¯ŸåŠããã®ã§ã¯ãªããæãããã¡ã«å¯ŸåŠããããšã§ãã
ã¹ããŒãã匷å¶ãããšãå€æŽãæ¿èªããªãéãããŒãã«ã®ã¹ããŒããå€æŽãããªãããšãä¿èšŒãããŸãã ããã«ãããæ°ããåãé »ç¹ã«è¿œå ãããããŒã¿ã®æ°Ÿæ¿«ã«ãã£ãŠä»¥åã¯è²Žéã§å§çž®ãããããŒãã«ã®æå³ãæçšæ§ã倱ãããå Žåã«çºçããå¯èœæ§ã®ããããŒã¿ã®åžèåãé²æ¢ãããŸãã æå³çã«è¡åããé«ãåºæºãèšå®ããé«å質ãæåŸ ããããšã奚å±ããããšã§ãã¹ããŒãã®é©çšã¯èšèšã©ããã«æ©èœããèª å®ããä¿ã¡ãã¹ãã¬ããã·ãŒããã¯ãªãŒã³ã«ä¿ã€ã®ã«åœ¹ç«ã¡ãŸãã
ããã«æ€èšããçµæãæ¬åœã«ããå€æããå Žåã¯ã å¿ èŠ æ°ããåãè¿œå ããŸããåé¡ãããŸããã以äžã¯ XNUMX è¡ã®ä¿®æ£ã§ãã 解決çã¯åè·¯ã®é²åïŒ
ã¹ããŒãé²åãšã¯äœã§ãã?
ã¹ããŒãé²åã¯ãæéã®çµéãšãšãã«å€åããããŒã¿ã«å¿ããŠçŸåšã®ããŒãã« ã¹ããŒããç°¡åã«å€æŽã§ããæ©èœã§ãã ããã¯ãè¿œå ãŸãã¯æžãæãæäœãå®è¡ã㊠XNUMX ã€ä»¥äžã®æ°ããåãå«ãããã«ã¹ããŒããèªåçã«èª¿æŽãããšãã«æããã䜿çšãããŸãã
ã¹ããŒãé²åã¯ã©ã®ããã«æ©èœããã®ã§ãããã?
åã®ã»ã¯ã·ã§ã³ã®äŸã«åŸããšãéçºè
ã¯ã¹ããŒãã®é²åã䜿çšããŠãã¹ããŒãã®äžæŽåã®ããã«ä»¥åã«æåŠãããæ°ããåãç°¡åã«è¿œå ã§ããŸãã è¿œå ããããšã§åè·¯é²åãçºåããŸã .option('mergeSchema', 'true')
Spark ããŒã ãž .write ОлО .writeStream.
# ÐПбавÑÑе паÑаЌеÑÑ mergeSchema
loans.write.format("delta")
.option("mergeSchema", "true")
.mode("append")
.save(DELTALAKE_SILVER_PATH)
ã°ã©ãã衚瀺ããã«ã¯ã次㮠Spark SQL ã¯ãšãªãå®è¡ããŸãã
# СПзЎайÑе гÑаÑОк Ñ ÐœÐŸÐ²ÑÐŒ ÑÑПлбÑПЌ, ÑÑÐŸÐ±Ñ Ð¿ÐŸÐŽÑвеÑЎОÑÑ, ÑÑП запОÑÑ Ð¿ÑПÑла ÑÑпеÑМП
%sql
SELECT addr_state, sum(`amount`) AS amount
FROM loan_by_state_delta
GROUP BY addr_state
ORDER BY sum(`amount`)
DESC LIMIT 10
ãããã¯ãè¿œå ããããšã§ãSpark ã»ãã·ã§ã³å
šäœã«å¯ŸããŠãã®ãªãã·ã§ã³ãèšå®ã§ããŸãã spark.databricks.delta.schema.autoMerge = True
Spark æ§æã«è¿œå ããŸãã ãã ããã¹ããŒãã®åŒ·å¶ã«ãããæå³ããªãã¹ããŒãã®äžäžèŽãèŠåãããªããªããããããã¯æ³šæããŠäœ¿çšããŠãã ããã
ãªã¯ãšã¹ãã«ãã©ã¡ãŒã¿ãå«ããããšã«ããã mergeSchema
ãããŒã¿ãã¬ãŒã ã«ã¯ååšãããã¿ãŒã²ããããŒãã«ã«ã¯ååšããªããã¹ãŠã®åã¯ãæžã蟌ã¿ãã©ã³ã¶ã¯ã·ã§ã³ã®äžéšãšããŠã¹ããŒãã®æåŸã«èªåçã«è¿œå ãããŸãã ãã¹ãããããã£ãŒã«ããè¿œå ããããšãã§ãããããã察å¿ããæ§é åã®æ«å°Ÿã«è¿œå ãããŸãã
ããŒã¿ ãšã³ãžãã¢ãšããŒã¿ ãµã€ãšã³ãã£ã¹ãã¯ããã®ãªãã·ã§ã³ã䜿çšããŠãå€ãåã«åºã¥ãæ¢åã®ã¢ãã«ãå£ãããšãªããæ¢åã®æ©æ¢°åŠç¿å®çšŒåããŒãã«ã«æ°ããå (ãããããæè¿è¿œè·¡ãããææšãŸãã¯ä»æã®è²©å£²å®çžŸå) ãè¿œå ã§ããŸãã
次ã®ã¿ã€ãã®ã¹ããŒãå€æŽã¯ãããŒãã«ã®è¿œå ãŸãã¯æžãæãäžã®ã¹ããŒãé²åã®äžç°ãšããŠèš±å¯ãããŸãã
- æ°ããåã®è¿œå (ãããæãäžè¬çãªã·ããªãª)
- ããŒã¿åã NullType -> ä»ã®åã«å€æŽããããByteType -> ShortType -> IntegerType ã«ææ ŒããŸãã
ã¹ããŒãã®é²åå
ã§èš±å¯ãããŠããªããã®ä»ã®å€æŽã§ã¯ãã¹ããŒããšããŒã¿ãè¿œå ããŠæžãçŽãå¿
èŠããããŸãã .option("overwriteSchema", "true")
ã ããšãã°ãåãFooããå
ã
æŽæ°ã§ãæ°ããã¹ããŒããæååããŒã¿åã ã£ãå Žåããã¹ãŠã® Parquet(data) ãã¡ã€ã«ãæžãçŽãå¿
èŠããããŸãã ãã®ãããªå€æŽã«ã¯æ¬¡ã®ãã®ãå«ãŸããŸãã
- åã®åé€
- æ¢åã®åã®ããŒã¿åãå€æŽãã (ã€ã³ãã¬ãŒã¹)
- 倧æåãšå°æåã®ã¿ãç°ãªãåã®ååãå€æŽãã (ããšãã°ããFooããšãfooã)
æåŸã«ãSpark 3.0 ã®æ¬¡ã®ãªãªãŒã¹ã§ã¯ãæ瀺ç㪠DDL ã (ALTER TABLE ã䜿çšããŠ) å®å šã«ãµããŒãããããŠãŒã¶ãŒãããŒãã« ã¹ããŒãã«å¯ŸããŠæ¬¡ã®ã¢ã¯ã·ã§ã³ãå®è¡ã§ããããã«ãªããŸãã
- åã®è¿œå
- åã®ã³ã¡ã³ããå€æŽãã
- ãã©ã³ã¶ã¯ã·ã§ã³ ãã°ã®ä¿åæéã®èšå®ãªã©ãããŒãã«ã®åäœãå¶åŸ¡ããããŒãã« ããããã£ãèšå®ããŸãã
åè·¯ã®é²åã®å©ç¹ã¯äœã§ãã?
ã¹ããŒãã®é²åã¯ãã€ã§ã䜿çšã§ããŸãã æå³ãã ããŒãã«ã®ã¹ããŒããå€æŽããŸã (ååšãã¹ãã§ã¯ãªãåã誀ã£ãŠ DataFrame ã«è¿œå ããå Žåãšã¯ç°ãªããŸã)ã ããã¯ãæ瀺çã«å®£èšããªããŠãæ£ããååãšããŒã¿åãèªåçã«è¿œå ããããããã¹ããŒãã移è¡ããæãç°¡åãªæ¹æ³ã§ãã
ãŸãšã
ã¹ããŒãã®åŒ·å¶ã¯ãããŒãã«ãšäºææ§ã®ãªãæ°ããåããã®ä»ã®ã¹ããŒãã®å€æŽãæåŠããŸãã ãããã®é«ãåºæºãèšå®ããŠç¶æããããšã§ãã¢ããªã¹ãããšã³ãžãã¢ã¯ããŒã¿ãæé«ã¬ãã«ã®æŽåæ§ãæã£ãŠããããšãä¿¡é ŒããããŒã¿ãæçãã€æ確ã«äŒããããé©åãªããžãã¹äžã®ææ決å®ãè¡ãããšãã§ããŸãã
äžæ¹ãã¹ããŒãã®é²åã¯ãã¹ããŒããç°¡çŽ åããããšã§æœè¡ãè£å®ããŸãã æ³å®ããã èªåã¹ããŒãå€æŽã çµå±ã®ãšãããåãè¿œå ããã®ã¯é£ãããªãã¯ãã§ãã
ã¹ããŒã ã®åŒ·å¶é©çšã¯éœã§ãããã¹ããŒã ã®é²åã¯é°ã§ãã ãããã®æ©èœã䜵çšãããšããã€ãºæå¶ãšä¿¡å·èª¿æŽããããŸã§ããç°¡åã«ãªããŸãã
ãã®èšäºãžã®è²¢ç®ã«ã€ããŠã¯ãMukul Murthy æ°ãš Pranav Anand æ°ã«ãæè¬ããããŸãã
ãã®ã·ãªãŒãºã®ä»ã®èšäº:
é¢é£èšäº
åºæïŒ habr.com