צלול לתוך אגם Delta: Schema Enforcement and Evolution

היי הבר! אני מציג לתשומת לבך את התרגום של המאמר "צלילה לתוך אגם דלתא: אכיפת סכימה ואבולוציה" המחברים בורק יבוז, ברנר היינץ ודני לי, שהוכן לקראת תחילת הקורס מהנדס נתונים מ-OTUS.

צלול לתוך אגם Delta: Schema Enforcement and Evolution

נתונים, כמו הניסיון שלנו, מצטברים ומתפתחים כל הזמן. כדי לעמוד בקצב, המודלים המנטליים שלנו של העולם חייבים להסתגל לנתונים חדשים, שחלקם מכילים מימדים חדשים - דרכים חדשות להתבונן בדברים שלא היה לנו מושג לגביהם קודם לכן. המודלים המנטליים הללו אינם שונים בהרבה מסכימות הטבלה שקובעות כיצד אנו מקטלגים ומעבדים מידע חדש.

זה מביא אותנו לסוגיית ניהול הסכימה. ככל שהאתגרים והדרישות העסקיות משתנות עם הזמן, כך גם מבנה הנתונים שלך משתנים. Delta Lake מקל על הצגת מדידות חדשות כאשר הנתונים משתנים. למשתמשים יש גישה לסמנטיקה פשוטה כדי לנהל את סכימות הטבלה שלהם. הכלים הללו כוללים Schema Enforcement, שמגנה על המשתמשים מלזהם את הטבלאות שלהם בשוגג או בנתונים מיותרים, ו- Schema Evolution, המאפשרת להוסיף עמודות חדשות של נתונים יקרי ערך באופן אוטומטי למיקומים המתאימים. במאמר זה, נצלול עמוק יותר לשימוש בכלים אלה.

הבנת סכימות טבלה

כל DataFrame ב- Apache Spark מכיל סכימה המגדירה את צורת הנתונים, כגון סוגי נתונים, עמודות ומטא נתונים. עם Delta Lake, סכימת הטבלה מאוחסנת בפורמט JSON בתוך יומן העסקאות.

מהי אכיפת תכנית?

Schema Enforcement, הידוע גם בשם Schema Validation, הוא מנגנון אבטחה ב-Delta Lake המבטיח איכות נתונים על ידי דחיית רשומות שאינן תואמות לסכימת הטבלה. כמו המארחת בדלפק הקבלה של מסעדה פופולרית להזמנות בלבד, היא בודקת האם כל עמודת נתונים המוזנת לטבלה נמצאת ברשימת העמודות הצפויות (במילים אחרות, האם יש "הזמנה" לכל אחת מהן ). ודוחה כל רשומה עם עמודות שאינן ברשימה.

כיצד פועלת אכיפת הסכימה?

Delta Lake משתמש בבדיקת schema-on-write, כלומר כל הכתיבה החדשה לטבלה נבדקת לגבי תאימות עם סכימת טבלת היעד בזמן הכתיבה. אם הסכימה אינה עקבית, Delta Lake מבטל את העסקה לחלוטין (לא נכתבים נתונים) ומעלה חריג כדי להודיע ​​למשתמש על אי העקביות.
Delta Lake משתמש בכללים הבאים כדי לקבוע אם רשומה תואמת לטבלה. DataFrame לכתיבה:

  • לא יכול להכיל עמודות נוספות שאינן בסכימה של טבלת היעד. לעומת זאת, הכל בסדר אם הנתונים הנכנסים אינם מכילים לחלוטין את כל העמודות מהטבלה - לעמודות אלו פשוט יוקצו ערכי null.
  • לא יכול לכלול סוגי נתוני עמודות שונים מסוגי הנתונים של העמודות בטבלת היעד. אם עמודת טבלת היעד מכילה נתוני StringType, אך העמודה המתאימה ב-DataFrame מכילה נתוני IntegerType, אכיפת הסכימה תזרוק חריגה ותמנע את פעולת הכתיבה.
  • לא יכול להכיל שמות עמודות שונים רק במקרה. משמעות הדבר היא שלא ניתן להגדיר עמודות בשם 'Fo' ו-'foo' באותה טבלה. בעוד שניתן להשתמש ב-Spark במצב תלוי רישיות או לא תלוי רישיות (ברירת מחדל), Delta Lake שומר על רישיות אך אינו רגיש בתוך אחסון הסכימה. פרקט הוא רגיש לאותיות גדולות בעת אחסון והחזרת מידע עמודה. כדי למנוע שגיאות אפשריות, פגיעה בנתונים או אובדן נתונים (משהו שחווינו באופן אישי ב-Databricks), החלטנו להוסיף מגבלה זו.

כדי להמחיש זאת, בואו נסתכל על מה שקורה בקוד למטה כשאנחנו מנסים להוסיף כמה עמודות שנוצרו לאחרונה לטבלה של Delta Lake שעדיין לא מוגדרת לקבל אותן.

# Сгенерируем DataFrame ссуд, который мы добавим в нашу таблицу Delta Lake
loans = sql("""
            SELECT addr_state, CAST(rand(10)*count as bigint) AS count,
            CAST(rand(10) * 10000 * count AS double) AS amount
            FROM loan_by_state_delta
            """)

# Вывести исходную схему DataFrame
original_loans.printSchema()

root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
 
# Вывести новую схему DataFrame
loans.printSchema()
 
root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
  |-- amount: double (nullable = true) # new column
 
# Попытка добавить новый DataFrame (с новым столбцом) в существующую таблицу
loans.write.format("delta") 
           .mode("append") 
           .save(DELTALAKE_PATH)

Returns:

A schema mismatch detected when writing to the Delta table.
 
To enable schema migration, please set:
'.option("mergeSchema", "true")'
 
Table schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
 
Data schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
-- amount: double (nullable = true)
 
If Table ACLs are enabled, these options will be ignored. Please use the ALTER TABLE command for changing the schema.

במקום להוסיף עמודות חדשות באופן אוטומטי, Delta Lake כופה סכימה ומפסיק לכתוב. כדי לעזור לקבוע איזו עמודה (או קבוצת עמודות) גורמת לאי ההתאמה, Spark מוציא את שתי הסכמות מ-Stack Trace לצורך השוואה.

מה היתרון באכיפת סכימה?

מכיוון שאכיפת סכימה היא בדיקה קפדנית למדי, היא כלי מצוין לשימוש כשומר סף למערך נתונים נקי, שעבר טרנספורמציה מלאה, מוכן לייצור או לצריכה. מיושם בדרך כלל על טבלאות שמזינות נתונים ישירות:

  • אלגוריתמים של למידת מכונה
  • לוחות מחוונים של BI
  • כלי ניתוח נתונים והדמיה
  • כל מערכת ייצור הדורשת סכמות סמנטיות מובנות מאוד, מוקלדות חזקות.

כדי להכין את הנתונים שלהם למכשול האחרון הזה, משתמשים רבים משתמשים בארכיטקטורת "רב-הופ" פשוטה שמכניסה בהדרגה מבנה לטבלאות שלהם. כדי ללמוד עוד על זה, אתה יכול לעיין במאמר למידת מכונה בדרגת ייצור עם Delta Lake.

כמובן שניתן להשתמש באכיפת סכימה בכל מקום בצנרת שלך, אך זכרו שזרימה לטבלה במקרה זה יכולה להיות מתסכלת, כי למשל שכחת שהוספת עמודה נוספת לנתונים הנכנסים.

מניעת דילול נתונים

עד עכשיו אתם אולי תוהים, על מה כל המהומה? אחרי הכל, לפעמים שגיאת "אי התאמה של סכימה" בלתי צפויה יכולה להכשיל אותך בזרימת העבודה שלך, במיוחד אם אתה חדש ב-Delta Lake. למה לא פשוט לתת לסכימה להשתנות לפי הצורך כדי שאוכל לכתוב את ה-DataFrame שלי לא משנה מה?

כפי שאומר הפתגם הישן, "גרם של מניעה שווה קילו של תרופה." בשלב מסוים, אם לא תדאג לאכוף את הסכימה שלך, בעיות תאימות מסוג נתונים יעלו את ראשן המכוער - מקורות נתונים גולמיים הומוגניים לכאורה עשויים להכיל מקרי קצה, עמודות פגומות, מיפויים שגויים או דברים מפחידים אחרים לחלום עליהם. סיוטים. הגישה הטובה ביותר היא לעצור את האויבים האלה בשער - עם אכיפת סכמה - ולהתמודד איתם באור, ולא מאוחר יותר כשהם מתחילים לארוב במעמקים האפלים של קוד הייצור שלך.

אכיפת סכימה מעניקה לך את הביטחון שהסכימה של הטבלה שלך לא תשתנה אלא אם תאשר את השינוי. זה מונע דילול נתונים, שעלול להתרחש כאשר עמודות חדשות מתווספות בתדירות כה גבוהה עד שטבלאות דחוסות בעלות ערך בעבר מאבדות את המשמעות והשימושיות שלהן עקב הצפה של נתונים. על ידי עידודך להיות מכוון, להציב סטנדרטים גבוהים ולצפות לאיכות גבוהה, אכיפת הסכימה עושה בדיוק את מה שהיא תוכננה לעשות - עוזרת לך להישאר מצפונית והגליונות האלקטרוניים שלך נקיים.

אם בשיקול נוסף תחליט שאתה באמת צורך הוסף עמודה חדשה - אין בעיה, להלן תיקון בשורה אחת. הפתרון הוא התפתחות המעגל!

מהי אבולוציה של סכימה?

התפתחות סכימה היא תכונה המאפשרת למשתמשים לשנות בקלות את סכימת הטבלה הנוכחית בהתאם לנתונים המשתנים עם הזמן. הוא משמש לרוב בעת ביצוע פעולת הוספה או כתיבה מחדש כדי להתאים אוטומטית את הסכימה כך שתכלול עמודה חדשה אחת או יותר.

איך עובדת התפתחות הסכימה?

בעקבות הדוגמה מהסעיף הקודם, מפתחים יכולים להשתמש בקלות בפיתוח סכימה כדי להוסיף עמודות חדשות שנדחו בעבר עקב חוסר עקביות בסכימה. התפתחות המעגל מופעלת על ידי הוספה .option('mergeSchema', 'true') לצוות הספארק שלך .write или .writeStream.

# Добавьте параметр mergeSchema
loans.write.format("delta") 
           .option("mergeSchema", "true") 
           .mode("append") 
           .save(DELTALAKE_SILVER_PATH)

כדי להציג את הגרף, הפעל את שאילתת Spark SQL הבאה

# Создайте график с новым столбцом, чтобы подтвердить, что запись прошла успешно
%sql
SELECT addr_state, sum(`amount`) AS amount
FROM loan_by_state_delta
GROUP BY addr_state
ORDER BY sum(`amount`)
DESC LIMIT 10

צלול לתוך אגם Delta: Schema Enforcement and Evolution
לחלופין, אתה יכול להגדיר אפשרות זו עבור כל הפעלת Spark על ידי הוספה spark.databricks.delta.schema.autoMerge = True לתצורת Spark. אבל השתמש בזה בזהירות, שכן אכיפת סכימה לא תתריע עוד על חוסר עקביות בסכימה לא מכוונת.

על ידי הכללת הפרמטר בבקשה mergeSchema, כל העמודות שנמצאות ב-DataFrame אך לא בטבלת היעד מתווספות אוטומטית לסוף הסכימה כחלק מעסקת כתיבה. ניתן להוסיף גם שדות מקוננים ואלה יתווספו גם לסוף עמודות המבנה המתאימות.

מהנדסי תאריכים ומדעני נתונים יכולים להשתמש באפשרות זו כדי להוסיף עמודות חדשות (אולי מדד שנבדק לאחרונה או עמודת ביצועי המכירות של החודש) לטבלאות הייצור הקיימות של למידת מכונה מבלי לשבור מודלים קיימים המבוססים על עמודות ישנות.

הסוגים הבאים של שינויים בסכימה מותרים כחלק מהתפתחות הסכימה במהלך הוספה או שכתוב של טבלה:

  • הוספת עמודות חדשות (זהו התרחיש הנפוץ ביותר)
  • שינוי סוגי נתונים מ- NullType -> כל סוג אחר או קידום מ- ByteType -> ShortType -> IntegerType

שינויים אחרים שאינם מותרים בהתפתחות הסכימה דורשים כי הסכימה והנתונים ייכתבו מחדש על ידי הוספה .option("overwriteSchema", "true"). לדוגמה, במקרה שבו העמודה "Foo" הייתה במקור מספר שלם והסכימה החדשה הייתה מסוג נתוני מחרוזת, אז כל קבצי Parquet(data) יצטרך להיכתב מחדש. שינויים כאלה כוללים:

  • מחיקת עמודה
  • שינוי סוג הנתונים של עמודה קיימת (במקום)
  • שינוי שמות של עמודות שנבדלות רק במקרה (לדוגמה, "Foo" ו-"foo")

לבסוף, עם המהדורה הבאה של Spark 3.0, DDL מפורש יקבל תמיכה מלאה (באמצעות ALTER TABLE), המאפשר למשתמשים לבצע את הפעולות הבאות בסכימות טבלה:

  • הוספת עמודות
  • שינוי הערות עמודות
  • הגדרת מאפייני טבלה השולטים בהתנהגות הטבלה, כגון הגדרת משך הזמן שבו יומן טרנזקציות מאוחסן.

מה היתרון של אבולוציה במעגל?

ניתן להשתמש בהתפתחות סכימה בכל פעם שאתה מתכוון שנה את הסכימה של הטבלה שלך (בניגוד למקרה שהוספת בטעות עמודות ל-DataFrame שלך ​​שלא אמורות להיות שם). זוהי הדרך הקלה ביותר להעביר את הסכימה שלך מכיוון שהיא מוסיפה אוטומטית את שמות העמודות וסוגי הנתונים הנכונים מבלי להכריז עליהם במפורש.

מסקנה

אכיפת סכימה דוחה כל עמודה חדשה או שינויי סכימה אחרים שאינם תואמים לטבלה שלך. על ידי קביעת ותחזוקה של סטנדרטים גבוהים אלה, אנליסטים ומהנדסים יכולים לסמוך על כך שלנתונים שלהם יש את רמת האינטגריטי הגבוהה ביותר, מתקשרים אותם בצורה ברורה וברורה, ומאפשרים להם לקבל החלטות עסקיות טובות יותר.

מצד שני, התפתחות הסכימה משלימה את האכיפה על ידי פישוט אמור שינויים אוטומטיים בסכימה. אחרי הכל, זה לא אמור להיות קשה להוסיף עמודה.

היישום הכפוי של הסכימה הוא יאנג, כאשר האבולוציה של הסכימה היא יין. בשימוש יחד, תכונות אלה הופכות את דיכוי הרעשים וכוונון האותות לקלים מאי פעם.

ברצוננו גם להודות למוקול מרתי ופראנב אנאנד על תרומתם למאמר זה.

מאמרים נוספים בסדרה זו:

צלול לתוך אגם דלתא: פריקת יומן העסקאות

הפעל וידאו

מאמרים קשורים

למידת מכונה בדרגת ייצור עם Delta Lake

מהו אגם נתונים?

למידע נוסף על הקורס

מקור: www.habr.com

הוספת תגובה