Delta Lake Dive: applicazione ed evoluzione del regime

Ehi Habr! Presento alla vostra attenzione la traduzione dell'articolo "Immergersi nel Delta Lake: applicazione ed evoluzione dello schema" autori Burak Yavuz, Brenner Heintz e Denny Lee, preparato in previsione dell'inizio del corso Ingegnere dei dati da OTUS.

Delta Lake Dive: applicazione ed evoluzione del regime

I dati, come la nostra esperienza, si accumulano e si evolvono costantemente. Per stare al passo, i nostri modelli mentali del mondo devono adattarsi a nuovi dati, alcuni dei quali contengono nuove dimensioni, nuovi modi di osservare cose di cui prima non avevamo idea. Questi modelli mentali non sono molto diversi dagli schemi tabellari che determinano il modo in cui categorizziamo ed elaboriamo le nuove informazioni.

Questo ci porta al problema della gestione dello schema. Man mano che le sfide e i requisiti aziendali cambiano nel tempo, cambia anche la struttura dei dati. Delta Lake semplifica l'introduzione di nuove misurazioni man mano che i dati cambiano. Gli utenti hanno accesso a una semantica semplice per gestire i propri schemi di tabella. Questi strumenti includono Schema Enforcement, che protegge gli utenti dall'inquinamento involontario delle proprie tabelle con errori o dati non necessari, e Schema Evolution, che consente di aggiungere automaticamente nuove colonne di dati preziosi nelle posizioni appropriate. In questo articolo approfondiremo l'utilizzo di questi strumenti.

Comprensione degli schemi di tabella

Ogni DataFrame in Apache Spark contiene uno schema che definisce la forma dei dati, come tipi di dati, colonne e metadati. Con Delta Lake, lo schema della tabella viene archiviato in formato JSON all'interno del log delle transazioni.

Che cos'è l'applicazione del regime?

Schema Enforcement, noto anche come Schema Validation, è un meccanismo di sicurezza in Delta Lake che garantisce la qualità dei dati rifiutando i record che non corrispondono allo schema della tabella. Come la padrona di casa al front desk di un noto ristorante su prenotazione, controlla se ciascuna colonna di dati inserita nella tabella si trova nella corrispondente lista di colonne previste (in altre parole, se esiste una "prenotazione" per ciascuna di esse ) e rifiuta tutti i record con colonne non presenti nell'elenco.

Come funziona l'applicazione dello schema?

Delta Lake usa il controllo dello schema in scrittura, il che significa che tutte le nuove scritture sulla tabella vengono controllate per verificarne la compatibilità con lo schema della tabella di destinazione in fase di scrittura. Se lo schema è incoerente, Delta Lake interrompe completamente la transazione (non vengono scritti dati) e genera un'eccezione per avvisare l'utente dell'incoerenza.
Delta Lake utilizza le regole seguenti per determinare se un record è compatibile con una tabella. DataFrame scrivibile:

  • non può contenere colonne aggiuntive che non si trovano nello schema della tabella di destinazione. Al contrario, tutto va bene se i dati in arrivo non contengono assolutamente tutte le colonne della tabella: a queste colonne verranno semplicemente assegnati valori nulli.
  • non può avere tipi di dati di colonna diversi dai tipi di dati delle colonne nella tabella di destinazione. Se la colonna della tabella di destinazione contiene dati StringType, ma la colonna corrispondente in DataFrame contiene dati IntegerType, l'imposizione dello schema genererà un'eccezione e impedirà l'esecuzione dell'operazione di scrittura.
  • non può contenere nomi di colonna che differiscono solo nel caso. Ciò significa che non è possibile definire colonne denominate "Foo" e "foo" nella stessa tabella. Mentre Spark può essere utilizzato in modalità con distinzione tra maiuscole e minuscole o senza distinzione tra maiuscole e minuscole (impostazione predefinita), Delta Lake preserva le maiuscole e minuscole ma non fa distinzione tra maiuscole e minuscole all'interno dell'archiviazione dello schema. Parquet fa distinzione tra maiuscole e minuscole durante l'archiviazione e la restituzione delle informazioni sulla colonna. Per evitare possibili errori, danneggiamento o perdita di dati (qualcosa che abbiamo riscontrato personalmente in Databricks), abbiamo deciso di aggiungere questa limitazione.

Per illustrare ciò, diamo un'occhiata a cosa succede nel codice seguente quando proviamo ad aggiungere alcune colonne appena generate a una tabella Delta Lake che non è ancora configurata per accettarle.

# Сгенерируем DataFrame ссуд, который мы добавим в нашу таблицу Delta Lake
loans = sql("""
            SELECT addr_state, CAST(rand(10)*count as bigint) AS count,
            CAST(rand(10) * 10000 * count AS double) AS amount
            FROM loan_by_state_delta
            """)

# Вывести исходную схему DataFrame
original_loans.printSchema()

root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
 
# Вывести новую схему DataFrame
loans.printSchema()
 
root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
  |-- amount: double (nullable = true) # new column
 
# Попытка добавить новый DataFrame (с новым столбцом) в существующую таблицу
loans.write.format("delta") 
           .mode("append") 
           .save(DELTALAKE_PATH)

Returns:

A schema mismatch detected when writing to the Delta table.
 
To enable schema migration, please set:
'.option("mergeSchema", "true")'
 
Table schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
 
Data schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
-- amount: double (nullable = true)
 
If Table ACLs are enabled, these options will be ignored. Please use the ALTER TABLE command for changing the schema.

Invece di aggiungere automaticamente nuove colonne, Delta Lake impone uno schema e interrompe la scrittura. Per aiutare a determinare quale colonna (o insieme di colonne) causa la discrepanza, Spark restituisce entrambi gli schemi dall'analisi dello stack per il confronto.

Qual è il vantaggio di applicare uno schema?

Poiché l'applicazione dello schema è un controllo abbastanza rigoroso, è uno strumento eccellente da utilizzare come custode di un set di dati pulito e completamente trasformato, pronto per la produzione o il consumo. Tipicamente applicato alle tabelle che alimentano direttamente i dati:

  • Algoritmi di apprendimento automatico
  • Cruscotti BI
  • Strumenti di analisi e visualizzazione dei dati
  • Qualsiasi sistema di produzione che richiede schemi semantici altamente strutturati e fortemente tipizzati.

Per preparare i propri dati a questo ostacolo finale, molti utenti utilizzano una semplice architettura “multi-hop” che introduce gradualmente la struttura nelle tabelle. Per saperne di più su questo, puoi consultare l'articolo Machine learning di livello produttivo con Delta Lake.

Naturalmente, l'applicazione dello schema può essere utilizzata ovunque nella pipeline, ma ricorda che lo streaming su una tabella in questo caso può essere frustrante perché, ad esempio, hai dimenticato di aver aggiunto un'altra colonna ai dati in entrata.

Prevenire la diluizione dei dati

A questo punto ti starai chiedendo: di cosa si tratta? Dopotutto, a volte un errore imprevisto di "mancata corrispondenza dello schema" può farti inciampare nel tuo flusso di lavoro, soprattutto se sei nuovo a Delta Lake. Perché non lasciare semplicemente che lo schema cambi secondo necessità in modo da poter scrivere il mio DataFrame, qualunque cosa accada?

Come dice il vecchio proverbio, “un grammo di prevenzione vale un chilo di cura”. Ad un certo punto, se non ti prendi cura di applicare il tuo schema, i problemi di compatibilità del tipo di dati faranno capolino: origini di dati grezzi apparentemente omogenee possono contenere casi limite, colonne danneggiate, mappature non corrette o altre cose spaventose da sognare in incubi. L'approccio migliore è fermare questi nemici alle porte, con l'applicazione dello schema, e affrontarli alla luce, piuttosto che in un secondo momento, quando iniziano a nascondersi nelle oscure profondità del codice di produzione.

L'applicazione di uno schema ti dà la garanzia che lo schema della tua tabella non cambierà a meno che tu non approvi la modifica. Ciò impedisce la diluizione dei dati, che può verificarsi quando nuove colonne vengono aggiunte così frequentemente che le tabelle compresse precedentemente preziose perdono il loro significato e la loro utilità a causa dell'inondazione di dati. Incoraggiandoti a essere intenzionale, a stabilire standard elevati e ad aspettarti un'elevata qualità, l'applicazione dello schema fa esattamente ciò per cui è stata progettata: aiutarti a rimanere coscienzioso e a mantenere i tuoi fogli di calcolo puliti.

Se dopo un'ulteriore considerazione decidi che lo sei davvero necessario aggiungi una nuova colonna: nessun problema, di seguito è riportata una correzione su una riga. La soluzione è l'evoluzione del circuito!

Cos'è l'evoluzione dello schema?

L'evoluzione dello schema è una funzionalità che consente agli utenti di modificare facilmente lo schema della tabella corrente in base ai dati che cambiano nel tempo. Viene spesso utilizzato quando si esegue un'operazione di aggiunta o riscrittura per adattare automaticamente lo schema in modo da includere una o più nuove colonne.

Come funziona l'evoluzione dello schema?

Seguendo l'esempio della sezione precedente, gli sviluppatori possono utilizzare facilmente l'evoluzione dello schema per aggiungere nuove colonne precedentemente rifiutate a causa dell'incoerenza dello schema. L'evoluzione del circuito si attiva aggiungendo .option('mergeSchema', 'true') al tuo team Spark .write или .writeStream.

# Добавьте параметр mergeSchema
loans.write.format("delta") 
           .option("mergeSchema", "true") 
           .mode("append") 
           .save(DELTALAKE_SILVER_PATH)

Per visualizzare il grafico, esegui la seguente query Spark SQL

# Создайте график с новым столбцом, чтобы подтвердить, что запись прошла успешно
%sql
SELECT addr_state, sum(`amount`) AS amount
FROM loan_by_state_delta
GROUP BY addr_state
ORDER BY sum(`amount`)
DESC LIMIT 10

Delta Lake Dive: applicazione ed evoluzione del regime
In alternativa, puoi impostare questa opzione per l'intera sessione Spark aggiungendo spark.databricks.delta.schema.autoMerge = True alla configurazione Spark. Ma usalo con cautela, poiché l'applicazione dello schema non ti avviserà più di incoerenze involontarie dello schema.

Includendo il parametro nella richiesta mergeSchema, tutte le colonne presenti nel DataFrame ma non nella tabella di destinazione vengono aggiunte automaticamente alla fine dello schema come parte di una transazione di scrittura. È inoltre possibile aggiungere campi nidificati e questi verranno aggiunti anche alla fine delle colonne della struttura corrispondente.

Gli ingegneri dei dati e i data scientist possono utilizzare questa opzione per aggiungere nuove colonne (forse una metrica monitorata di recente o la colonna delle prestazioni di vendita di questo mese) alle tabelle di produzione di machine learning esistenti senza interrompere i modelli esistenti basati su vecchie colonne.

I seguenti tipi di modifiche allo schema sono consentiti come parte dell'evoluzione dello schema durante l'aggiunta o la riscrittura di una tabella:

  • Aggiunta di nuove colonne (questo è lo scenario più comune)
  • Modifica dei tipi di dati da NullType -> qualsiasi altro tipo o promozione da ByteType -> ShortType -> IntegerType

Altre modifiche non consentite nell'ambito dell'evoluzione dello schema richiedono che lo schema e i dati vengano riscritti mediante aggiunta .option("overwriteSchema", "true"). Ad esempio, nel caso in cui la colonna "Foo" fosse originariamente un numero intero e il nuovo schema fosse un tipo di dati stringa, tutti i file Parquet(dati) dovrebbero essere riscritti. Tali modifiche includono:

  • eliminando una colonna
  • modifica del tipo di dati di una colonna esistente (sul posto)
  • rinominare le colonne che differiscono solo per le maiuscole e minuscole (ad esempio "Foo" e "foo")

Infine, con la prossima versione di Spark 3.0, il DDL esplicito sarà completamente supportato (utilizzando ALTER TABLE), consentendo agli utenti di eseguire le seguenti azioni sugli schemi di tabella:

  • aggiunta di colonne
  • modifica dei commenti delle colonne
  • impostazione delle proprietà della tabella che controllano il comportamento della tabella, ad esempio l'impostazione del periodo di tempo in cui viene archiviato un registro delle transazioni.

Qual è il vantaggio dell'evoluzione del circuito?

L'evoluzione dello schema può essere utilizzata ogni volta che lo desideri avere intenzione cambia lo schema della tua tabella (al contrario di quando hai aggiunto accidentalmente colonne al tuo DataFrame che non dovrebbero essere lì). Questo è il modo più semplice per eseguire la migrazione dello schema perché aggiunge automaticamente i nomi delle colonne e i tipi di dati corretti senza doverli dichiarare esplicitamente.

conclusione

L'applicazione dello schema rifiuta qualsiasi nuova colonna o altra modifica dello schema che non sia compatibile con la tabella. Impostando e mantenendo questi standard elevati, analisti e ingegneri possono avere la certezza che i loro dati abbiano il massimo livello di integrità, comunicandoli in modo chiaro e chiaro, consentendo loro di prendere decisioni aziendali migliori.

D’altro canto, l’evoluzione dello schema integra l’applicazione semplificando potenziale modifiche automatiche dello schema. Dopotutto, non dovrebbe essere difficile aggiungere una colonna.

L'applicazione forzata dello schema è yang, dove l'evoluzione dello schema è yin. Se utilizzate insieme, queste funzionalità rendono la soppressione del rumore e la sintonizzazione del segnale più facili che mai.

Vorremmo anche ringraziare Mukul Murthy e Pranav Anand per il loro contributo a questo articolo.

Altri articoli in questa serie:

Immergiti in Delta Lake: decompressione del registro delle transazioni

Articoli correlati

Machine learning di livello produttivo con Delta Lake

Cos'è un data Lake?

Scopri di più sul corso

Fonte: habr.com

Aggiungi un commento