Dive into Delta Lake: Schema Enforcement and Evolution

Hey Habr! I present to your attention the translation of the article "Diving Into Delta Lake: Schema Enforcement & Evolution" authors Burak Yavuz, Brenner Heintz and Denny Lee, which was prepared in anticipation of the start of the course Data Engineer from OTUS.

Dive into Delta Lake: Schema Enforcement and Evolution

Data, like our experience, is constantly accumulating and evolving. To keep up, our mental models of the world must adapt to new data, some of which contains new dimensions—new ways to observe things we had no idea about before. These mental models are not much different from the table schemas that define how we classify and process new information.

This brings us to the issue of schema management. As business objectives and requirements change over time, so does the structure of your data. Delta Lake makes it easy to implement new measurements as data changes. Users have access to simple semantics to manage their table schemas. These tools include Schema Enforcement, which protects users from inadvertently polluting their tables with errors or unnecessary data, and Schema Evolution, which allows new columns of valuable data to be automatically added in the appropriate places. In this article, we will delve into the use of these tools.

Understanding table schemas

Each DataFrame in Apache Spark contains a schema that defines the shape of the data such as data types, columns, and metadata. With Delta Lake, the table schema is stored in JSON format inside the transaction log.

What is Schema Enforcement?

Schema Enforcement, also known as Schema Validation, is a protection mechanism in Delta Lake that guarantees data quality by rejecting records that do not match the table schema. Like a hostess at the front desk of a popular restaurant that accepts reservations only, he checks to see if each column of data entered into the table is in the corresponding list of expected columns (in other words, if there is a “booking” for each of them), and rejects any entries with columns not in the list.

How does schema enforcement work?

Delta Lake uses schema validation on write, which means that all new writes to the table are checked for compatibility with the target table's schema at write time. If the schema is inconsistent, Delta Lake completely reverses the transaction (no data is written) and throws an exception to inform the user of the inconsistency.
Delta Lake uses the following rules to determine if a record is compatible with a table. Written DataFrame:

  • cannot contain additional columns that are not in the schema of the target table. Conversely, everything is fine if the incoming data does not contain absolutely all the columns from the table - these columns will simply be assigned zero values.
  • cannot have column data types that are different from the column data types in the target table. If a column in the target table contains StringType data, but the corresponding column in the DataFrame contains IntegerType data, schema enforcement will throw an exception and prevent the write operation from taking place.
  • cannot contain column names that differ only in case. This means that you cannot have columns named 'Foo' and 'foo' defined in the same table. While Spark can be used in case-sensitive or case-insensitive (the default) mode, Delta Lake is case-preserving but insensitive within schema storage. Parquet is case sensitive when storing and returning column information. To avoid possible errors, data corruption or data loss (which we personally experienced in Databricks), we decided to add this limitation.

To illustrate this, let's take a look at what happens in the code below when trying to add some newly generated columns to a Delta Lake table that is not yet configured to accept them.

# Сгенерируем DataFrame ссуд, который мы добавим в нашу таблицу Delta Lake
loans = sql("""
            SELECT addr_state, CAST(rand(10)*count as bigint) AS count,
            CAST(rand(10) * 10000 * count AS double) AS amount
            FROM loan_by_state_delta
            """)

# Вывести исходную схему DataFrame
original_loans.printSchema()

root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
 
# Вывести новую схему DataFrame
loans.printSchema()
 
root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
  |-- amount: double (nullable = true) # new column
 
# Попытка добавить новый DataFrame (с новым столбцом) в существующую таблицу
loans.write.format("delta") 
           .mode("append") 
           .save(DELTALAKE_PATH)

Returns:

A schema mismatch detected when writing to the Delta table.
 
To enable schema migration, please set:
'.option("mergeSchema", "true")'
 
Table schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
 
Data schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
-- amount: double (nullable = true)
 
If Table ACLs are enabled, these options will be ignored. Please use the ALTER TABLE command for changing the schema.

Instead of automatically adding new columns, Delta Lake enforces a schema and stops recording. To help determine which column (or set of them) is causing the mismatch, Spark pops both schemas from the trace stack for comparison.

What is the benefit of schema enforcement?

Because schema enforcement is a fairly rigorous check, it is a great tool to use as a gatekeeper to a clean, fully transformed dataset that is ready to be produced or consumed. Typically applied to tables that feed data directly:

  • Machine learning algorithms
  • BI dashboards
  • Data analytics and visualization tools
  • Any production system that requires highly structured, strongly typed semantic schemas.

To prepare their data for this final hurdle, many users use a simple "multi-hop" architecture that gradually introduces structure into their tables. To learn more about this, you can read the article Production grade machine learning with Delta Lake.

Of course, schema enforcement can be used anywhere in your pipeline, but keep in mind that streaming writing to a table can be frustrating in this case, because, for example, you forgot that you added another column to the incoming data.

Data thinning prevention

By this point, you might be wondering why the hype? After all, sometimes an unexpected "schema mismatch" error can trip you up in your workflow, especially if you're new to Delta Lake. Why not just let the schema change as needed so that I can write my DataFrame no matter what?

As the old saying goes, "An ounce of prevention is worth a pound of cure." At some point, if you don't take care to enforce your schema, data type compatibility issues will rear their ugly heads - seemingly homogeneous raw data sources can contain edge cases, broken columns, malformed mappings, or other dreaded things that you dream about. in nightmares. The best approach is to stop these enemies at the gate - with schema enforcement - and deal with them in the light, not later when they start prowling the dark depths of your production code.

Schema enforcement gives you the confidence that your table's schema won't change unless you confirm the change yourself. This prevents data dilution that can occur when new columns are added so frequently that previously valuable, compressed tables lose their value and usefulness due to data flooding. By encouraging you to be intentional, set high standards, and expect high quality, schema enforcement does exactly what it was designed to do—help you stay conscientious and keep your spreadsheets clean.

If, upon further consideration, you decide that you really necessary add a new column - no problem, below is a one-line fix. The solution is circuit evolution!

What is schema evolution?

Schema evolution is a feature that allows users to easily change the current schema of a table to match data that changes over time. It is most commonly used when performing an add or overwrite operation to automatically adapt the schema to include one or more new columns.

How does schema evolution work?

Following the example in the previous section, developers can easily use schema evolution to add new columns that were previously rejected due to schema inconsistency. Circuit evolution is activated by adding .option('mergeSchema', 'true') to your Spark team .write или .writeStream.

# Добавьте параметр mergeSchema
loans.write.format("delta") 
           .option("mergeSchema", "true") 
           .mode("append") 
           .save(DELTALAKE_SILVER_PATH)

To view the graph, run the following Spark SQL query

# Создайте график с новым столбцом, чтобы подтвердить, что запись прошла успешно
%sql
SELECT addr_state, sum(`amount`) AS amount
FROM loan_by_state_delta
GROUP BY addr_state
ORDER BY sum(`amount`)
DESC LIMIT 10

Dive into Delta Lake: Schema Enforcement and Evolution
Alternatively, you can set this option for the entire Spark session by adding spark.databricks.delta.schema.autoMerge = True to the Spark configuration. Use this with caution though, as schema enforcement will no longer warn you about unintentional schema inconsistencies.

By including a parameter in the request mergeSchema, all columns that are present in the DataFrame but not present in the target table are automatically added to the end of the schema as part of the write transaction. Nested fields can also be added, and these will also be added to the end of the corresponding structure columns.

Date engineers and data scientists can use this option to add new columns (perhaps a recently tracked metric or this month's sales figures column) to their existing machine learning production tables without breaking existing models based on the old columns.

The following types of schema changes are allowed as part of a schema evolution while adding or overwriting a table:

  • Adding new columns (this is the most common scenario)
  • Changing data types from NullType -> any other type or promotion from ByteType -> ShortType -> IntegerType

Other changes that are not allowed as part of schema evolution require that the schema and data be overwritten by adding .option("overwriteSchema", "true"). For example, in the case where the "Foo" column was originally an integer and the new schema would be a string data type, then all Parquet(data) files would need to be overwritten. These changes include:

  • deleting a column
  • changing the data type of an existing column (in place)
  • renaming columns that differ only in case (for example, "Foo" and "foo")

Finally, with the next release of Spark 3.0, explicit DDL (using ALTER TABLE) will be fully supported, allowing users to perform the following actions on table schemas:

  • adding columns
  • changing column comments
  • setting table properties that determine how the table behaves, such as setting how long the transaction log is kept.

What is the benefit of schema evolution?

Schematic evolution can be used whenever you intend change the schema of your table (as opposed to when you accidentally added columns to your DataFrame that shouldn't be there). This is the easiest way to migrate your schema because it automatically adds the correct column names and data types without having to explicitly declare them.

Conclusion

Schema enforcement rejects any new columns or other schema changes that are not compatible with your table. By setting and maintaining these high standards, analysts and engineers can rely on their data to have the highest level of integrity, reasoning about it clearly and concisely, allowing them to make better business decisions.

On the other hand, schema evolution complements enforcement by simplifying supposed automatic schema changes. After all, it shouldn't be hard to add a column.

Schema enforcement is yang, where schema evolutions are yin. When used together, these features make noise reduction and signal tuning easier than ever.

We would also like to thank Mukul Murthy and Pranav Anand for their contributions to this article.

Other articles in this series:

Dive into Delta Lake: unpacking the transaction log

Related Articles

Production grade machine learning with Delta Lake

What is a data lake?

Learn more about the course

Source: habr.com

Add a comment