Mergulho em Delta Lake: Aplicação e Evolução do Esquema

Olá, Habr! Apresento a sua atenção uma tradução do artigo "Mergulhando no Lago Delta: Aplicação e Evolução do Esquema" autores Burak Yavuz, Brenner Heintz e Denny Lee, que foi preparado antes do início do curso Engenheiro de dados de OTUS.

Mergulho em Delta Lake: Aplicação e Evolução do Esquema

Os dados, assim como a nossa experiência, estão em constante acumulação e evolução. Para acompanhar, os nossos modelos mentais do mundo devem adaptar-se a novos dados, alguns dos quais contêm novas dimensões – novas formas de observar coisas das quais não tínhamos ideia antes. Esses modelos mentais não são muito diferentes dos esquemas de tabela que determinam como categorizamos e processamos novas informações.

Isso nos leva à questão do gerenciamento de esquema. À medida que os desafios e requisitos de negócios mudam com o tempo, a estrutura dos seus dados também muda. Delta Lake facilita a introdução de novas medições à medida que os dados mudam. Os usuários têm acesso a uma semântica simples para gerenciar seus esquemas de tabela. Essas ferramentas incluem Schema Enforcement, que protege os usuários de poluir involuntariamente suas tabelas com erros ou dados desnecessários, e Schema Evolution, que permite que novas colunas de dados valiosos sejam adicionadas automaticamente aos locais apropriados. Neste artigo, nos aprofundaremos no uso dessas ferramentas.

Noções básicas sobre esquemas de tabela

Cada DataFrame no Apache Spark contém um esquema que define a forma dos dados, como tipos de dados, colunas e metadados. Com Delta Lake, o esquema da tabela é armazenado no formato JSON dentro do log de transações.

O que é aplicação do esquema?

Schema Enforcement, também conhecido como Schema Validation, é um mecanismo de segurança no Delta Lake que garante a qualidade dos dados rejeitando registros que não correspondem ao esquema da tabela. Assim como a recepcionista da recepção de um restaurante popular somente com reserva, ela verifica se cada coluna de dados inseridos na tabela está na lista correspondente de colunas esperadas (em outras palavras, se existe uma “reserva” para cada uma delas ) e rejeita quaisquer registros com colunas que não estejam na lista.

Como funciona a aplicação do esquema?

Delta Lake usa verificação de esquema na gravação, o que significa que todas as novas gravações na tabela são verificadas quanto à compatibilidade com o esquema da tabela de destino no momento da gravação. Se o esquema for inconsistente, o Delta Lake anula totalmente a transação (nenhum dado é gravado) e gera uma exceção para notificar o usuário sobre a inconsistência.
Delta Lake usa as regras a seguir para determinar se um registro é compatível com uma tabela. DataFrame a ser escrito:

  • não pode conter colunas adicionais que não estejam no esquema da tabela de destino. Por outro lado, está tudo bem se os dados recebidos não contiverem absolutamente todas as colunas da tabela - essas colunas serão simplesmente atribuídas a valores nulos.
  • não pode ter tipos de dados de coluna diferentes dos tipos de dados das colunas na tabela de destino. Se a coluna da tabela de destino contiver dados StringType, mas a coluna correspondente no DataFrame contiver dados IntegerType, a imposição do esquema lançará uma exceção e impedirá que a operação de gravação ocorra.
  • não pode conter nomes de colunas que diferem apenas em maiúsculas e minúsculas. Isso significa que você não pode ter colunas denominadas 'Foo' e 'foo' definidas na mesma tabela. Embora o Spark possa ser usado no modo que diferencia maiúsculas de minúsculas ou que não diferencia maiúsculas de minúsculas (padrão), o Delta Lake preserva maiúsculas de minúsculas, mas não faz distinção entre maiúsculas e minúsculas no armazenamento do esquema. Parquet diferencia maiúsculas de minúsculas ao armazenar e retornar informações de coluna. Para evitar possíveis erros, corrupção de dados ou perda de dados (que experimentamos pessoalmente no Databricks), decidimos adicionar esta limitação.

Para ilustrar isso, vamos dar uma olhada no que acontece no código abaixo quando tentamos adicionar algumas colunas recém-geradas a uma tabela Delta Lake que ainda não está configurada para aceitá-las.

# Сгенерируем DataFrame ссуд, который мы добавим в нашу таблицу Delta Lake
loans = sql("""
            SELECT addr_state, CAST(rand(10)*count as bigint) AS count,
            CAST(rand(10) * 10000 * count AS double) AS amount
            FROM loan_by_state_delta
            """)

# Вывести исходную схему DataFrame
original_loans.printSchema()

root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
 
# Вывести новую схему DataFrame
loans.printSchema()
 
root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
  |-- amount: double (nullable = true) # new column
 
# Попытка добавить новый DataFrame (с новым столбцом) в существующую таблицу
loans.write.format("delta") 
           .mode("append") 
           .save(DELTALAKE_PATH)

Returns:

A schema mismatch detected when writing to the Delta table.
 
To enable schema migration, please set:
'.option("mergeSchema", "true")'
 
Table schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
 
Data schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
-- amount: double (nullable = true)
 
If Table ACLs are enabled, these options will be ignored. Please use the ALTER TABLE command for changing the schema.

Em vez de adicionar novas colunas automaticamente, Delta Lake impõe um esquema e interrompe a gravação. Para ajudar a determinar qual coluna (ou conjunto de colunas) está causando a discrepância, o Spark gera ambos os esquemas do rastreamento de pilha para comparação.

Qual é a vantagem de impor um esquema?

Como a aplicação do esquema é uma verificação bastante rigorosa, é uma excelente ferramenta para usar como guardião de um conjunto de dados limpo e totalmente transformado, pronto para produção ou consumo. Normalmente aplicado a tabelas que alimentam dados diretamente:

  • Algoritmos de aprendizado de máquina
  • Painéis de BI
  • Ferramentas de análise e visualização de dados
  • Qualquer sistema de produção que exija esquemas semânticos altamente estruturados e fortemente tipados.

Para preparar seus dados para esse obstáculo final, muitos usuários usam uma arquitetura simples de “multi-hop” que gradualmente introduz estrutura em suas tabelas. Para saber mais sobre isso, você pode conferir o artigo Aprendizado de máquina de nível de produção com Delta Lake.

É claro que a imposição de esquema pode ser usada em qualquer lugar do pipeline, mas lembre-se de que o streaming para uma tabela nesse caso pode ser frustrante porque, por exemplo, você esqueceu que adicionou outra coluna aos dados recebidos.

Evitando a diluição de dados

Agora você deve estar se perguntando: por que tanto alarido? Afinal, às vezes um erro inesperado de “incompatibilidade de esquema” pode atrapalhar seu fluxo de trabalho, especialmente se você for novo no Delta Lake. Por que não deixar o esquema mudar conforme necessário para que eu possa escrever meu DataFrame de qualquer maneira?

Como diz o velho ditado, “um grama de prevenção vale um quilo de cura”. Em algum momento, se você não tomar cuidado para aplicar seu esquema, problemas de compatibilidade de tipo de dados aparecerão - fontes de dados brutos aparentemente homogêneas podem conter casos extremos, colunas corrompidas, mapeamentos malformados ou outras coisas assustadoras com as quais sonhar. pesadelos. A melhor abordagem é parar esses inimigos no portão - com aplicação de esquema - e lidar com eles à luz, em vez de mais tarde, quando eles começarem a se esconder nas profundezas escuras do seu código de produção.

A aplicação de um esquema dá a você a garantia de que o esquema da sua tabela não será alterado, a menos que você aprove a alteração. Isso evita a diluição de dados, que pode ocorrer quando novas colunas são adicionadas com tanta frequência que tabelas compactadas anteriormente valiosas perdem seu significado e utilidade devido à inundação de dados. Ao incentivá-lo a ser intencional, estabelecer padrões elevados e esperar alta qualidade, a aplicação de esquemas faz exatamente o que foi projetada para fazer: ajudá-lo a permanecer consciente e a manter suas planilhas limpas.

Se, após uma análise mais aprofundada, você decidir que realmente necessário adicione uma nova coluna - sem problemas, abaixo está uma correção de uma linha. A solução é a evolução do circuito!

O que é evolução do esquema?

A evolução do esquema é um recurso que permite aos usuários alterar facilmente o esquema da tabela atual de acordo com os dados que mudam ao longo do tempo. É mais frequentemente usado ao executar uma operação de acréscimo ou reescrita para adaptar automaticamente o esquema para incluir uma ou mais novas colunas.

Como funciona a evolução do esquema?

Seguindo o exemplo da seção anterior, os desenvolvedores podem facilmente usar a evolução do esquema para adicionar novas colunas que foram rejeitadas anteriormente devido à inconsistência do esquema. A evolução do circuito é ativada adicionando .option('mergeSchema', 'true') para sua equipe Spark .write или .writeStream.

# Добавьте параметр mergeSchema
loans.write.format("delta") 
           .option("mergeSchema", "true") 
           .mode("append") 
           .save(DELTALAKE_SILVER_PATH)

Para visualizar o gráfico, execute a seguinte consulta Spark SQL

# Создайте график с новым столбцом, чтобы подтвердить, что запись прошла успешно
%sql
SELECT addr_state, sum(`amount`) AS amount
FROM loan_by_state_delta
GROUP BY addr_state
ORDER BY sum(`amount`)
DESC LIMIT 10

Mergulho em Delta Lake: Aplicação e Evolução do Esquema
Alternativamente, você pode definir esta opção para toda a sessão do Spark adicionando spark.databricks.delta.schema.autoMerge = True para a configuração do Spark. Mas use isso com cautela, pois a imposição de esquema não irá mais alertá-lo sobre inconsistências de esquema não intencionais.

Incluindo o parâmetro na solicitação mergeSchema, todas as colunas presentes no DataFrame, mas não na tabela de destino, são automaticamente adicionadas ao final do esquema como parte de uma transação de gravação. Campos aninhados também podem ser adicionados e também serão adicionados ao final das colunas da estrutura correspondente.

Os engenheiros de data e os cientistas de dados podem usar esta opção para adicionar novas colunas (talvez uma métrica monitorada recentemente ou a coluna de desempenho de vendas deste mês) às suas tabelas de produção de machine learning existentes sem quebrar os modelos existentes com base em colunas antigas.

Os seguintes tipos de alterações de esquema são permitidos como parte da evolução do esquema durante uma adição ou reescrita de tabela:

  • Adicionando novas colunas (este é o cenário mais comum)
  • Alterando os tipos de dados de NullType -> qualquer outro tipo ou promovendo de ByteType -> ShortType -> IntegerType

Outras alterações não permitidas na evolução do esquema exigem que o esquema e os dados sejam reescritos adicionando .option("overwriteSchema", "true"). Por exemplo, no caso em que a coluna "Foo" era originalmente um número inteiro e o novo esquema era um tipo de dados string, todos os arquivos Parquet(data) precisariam ser reescritos. Essas mudanças incluem:

  • deletando uma coluna
  • alterando o tipo de dados de uma coluna existente (no local)
  • renomear colunas que diferem apenas em maiúsculas e minúsculas (por exemplo, "Foo" e "foo")

Finalmente, com a próxima versão do Spark 3.0, o DDL explícito será totalmente suportado (usando ALTER TABLE), permitindo que os usuários executem as seguintes ações em esquemas de tabela:

  • adicionando colunas
  • alterando comentários da coluna
  • definir propriedades da tabela que controlam o comportamento da tabela, como definir o período de tempo em que um log de transações é armazenado.

Qual é o benefício da evolução do circuito?

A evolução do esquema pode ser usada sempre que você pretende altere o esquema da sua tabela (ao contrário de quando você adicionou acidentalmente colunas ao seu DataFrame que não deveriam estar lá). Esta é a maneira mais fácil de migrar seu esquema porque adiciona automaticamente os nomes de colunas e tipos de dados corretos sem precisar declará-los explicitamente.

Conclusão

A imposição de esquema rejeita quaisquer novas colunas ou outras alterações de esquema que não sejam compatíveis com sua tabela. Ao estabelecer e manter estes padrões elevados, os analistas e engenheiros podem confiar que os seus dados têm o mais alto nível de integridade, comunicando-os de forma clara e clara, permitindo-lhes tomar melhores decisões de negócios.

Por outro lado, a evolução do esquema complementa a aplicação, simplificando supostamente alterações automáticas de esquema. Afinal, não deveria ser difícil adicionar uma coluna.

A aplicação forçada do esquema é yang, onde a evolução do esquema é yin. Quando usados ​​em conjunto, esses recursos tornam a supressão de ruído e o ajuste de sinal mais fáceis do que nunca.

Gostaríamos também de agradecer a Mukul Murthy e Pranav Anand por suas contribuições para este artigo.

Outros artigos desta série:

Mergulhe no Delta Lake: descompactando o log de transações

Artigos relacionados

Aprendizado de máquina de nível de produção com Delta Lake

O que é um lago de dados?

Saiba mais sobre o curso

Fonte: habr.com

Adicionar um comentário