Zanurz się w Delta Lake: egzekwowanie schematów i ewolucja

Hej Habro! Zwracam uwagę na tłumaczenie artykułu „Nurkowanie w jeziorze Delta: egzekwowanie schematu i ewolucja” autorów Burak Yavuz, Brenner Heintz i Denny Lee, który został przygotowany w oczekiwaniu na rozpoczęcie kursu Inżynier danych z firmy OTUS.

Zanurz się w Delta Lake: egzekwowanie schematów i ewolucja

Dane, podobnie jak nasze doświadczenie, stale się gromadzą i ewoluują. Aby dotrzymać kroku, nasze mentalne modele świata muszą dostosować się do nowych danych, a niektóre z nich zawierają nowe wymiary – nowe sposoby obserwacji rzeczy, o których wcześniej nie mieliśmy pojęcia. Te modele mentalne nie różnią się zbytnio od schematów tabelarycznych, które określają, w jaki sposób kategoryzujemy i przetwarzamy nowe informacje.

To prowadzi nas do kwestii zarządzania schematami. W miarę jak wyzwania i wymagania biznesowe zmieniają się z biegiem czasu, zmienia się także struktura Twoich danych. Delta Lake ułatwia wprowadzanie nowych pomiarów w przypadku zmiany danych. Użytkownicy mają dostęp do prostej semantyki umożliwiającej zarządzanie schematami tabel. Narzędzia te obejmują Schema Enforcement, które chroni użytkowników przed niezamierzonym zanieczyszczaniem tabel błędami lub niepotrzebnymi danymi, oraz Schema Evolution, które pozwala na automatyczne dodawanie nowych kolumn cennych danych w odpowiednich lokalizacjach. W tym artykule przyjrzymy się bliżej korzystaniu z tych narzędzi.

Zrozumienie schematów tabel

Każda ramka DataFrame w Apache Spark zawiera schemat definiujący formę danych, na przykład typy danych, kolumny i metadane. W przypadku usługi Delta Lake schemat tabeli jest przechowywany w formacie JSON w dzienniku transakcji.

Co to jest egzekwowanie schematu?

Wymuszanie schematu, znane również jako sprawdzanie poprawności schematu, to mechanizm zabezpieczeń w usłudze Delta Lake, który zapewnia jakość danych poprzez odrzucanie rekordów, które nie pasują do schematu tabeli. Podobnie jak hostessa w recepcji popularnej restauracji z możliwością rezerwacji, sprawdza, czy każda kolumna danych wpisanych do tabeli znajduje się w odpowiedniej liście oczekiwanych kolumn (innymi słowy, czy dla każdej z nich istnieje „rezerwacja” ) i odrzuca wszelkie rekordy z kolumnami, których nie ma na liście.

Jak działa egzekwowanie schematu?

Delta Lake korzysta ze sprawdzania schematu podczas zapisu, co oznacza, że ​​wszystkie nowe zapisy do tabeli są sprawdzane pod kątem zgodności ze schematem tabeli docelowej w czasie zapisu. Jeśli schemat jest niespójny, Delta Lake całkowicie przerywa transakcję (żadne dane nie są zapisywane) i zgłasza wyjątek, aby powiadomić użytkownika o niespójności.
Delta Lake używa następujących reguł w celu ustalenia, czy rekord jest zgodny z tabelą. Zapisywalna ramka danych:

  • nie może zawierać dodatkowych kolumn, które nie znajdują się w schemacie tabeli docelowej. I odwrotnie, wszystko jest w porządku, jeśli przychodzące dane nie zawierają absolutnie wszystkich kolumn z tabeli - tym kolumnom zostanie po prostu przypisana wartość null.
  • nie mogą mieć typów danych kolumn innych niż typy danych kolumn w tabeli docelowej. Jeśli kolumna tabeli docelowej zawiera dane typu StringType, ale odpowiadająca jej kolumna w ramce DataFrame zawiera dane typu IntegerType, wymuszenie schematu zgłosi wyjątek i uniemożliwi wykonanie operacji zapisu.
  • nie może zawierać nazw kolumn różniących się tylko wielkością liter. Oznacza to, że w tej samej tabeli nie można zdefiniować kolumn o nazwach „Foo” i „foo”. Chociaż platformy Spark można używać w trybie uwzględniania wielkości liter lub bez uwzględniania wielkości liter (domyślnie), Delta Lake zachowuje wielkość liter, ale nie uwzględnia wielkości liter w magazynie schematu. Podczas przechowywania i zwracania informacji o kolumnach w Parquet rozróżniana jest wielkość liter. Aby uniknąć możliwych błędów, uszkodzenia lub utraty danych (czego osobiście doświadczyliśmy w Databricks), zdecydowaliśmy się dodać to ograniczenie.

Aby to zilustrować, przyjrzyjmy się, co dzieje się w poniższym kodzie, gdy próbujemy dodać nowo wygenerowane kolumny do tabeli Delta Lake, która nie jest jeszcze skonfigurowana do ich akceptowania.

# Сгенерируем DataFrame ссуд, который мы добавим в нашу таблицу Delta Lake
loans = sql("""
            SELECT addr_state, CAST(rand(10)*count as bigint) AS count,
            CAST(rand(10) * 10000 * count AS double) AS amount
            FROM loan_by_state_delta
            """)

# Вывести исходную схему DataFrame
original_loans.printSchema()

root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
 
# Вывести новую схему DataFrame
loans.printSchema()
 
root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
  |-- amount: double (nullable = true) # new column
 
# Попытка добавить новый DataFrame (с новым столбцом) в существующую таблицу
loans.write.format("delta") 
           .mode("append") 
           .save(DELTALAKE_PATH)

Returns:

A schema mismatch detected when writing to the Delta table.
 
To enable schema migration, please set:
'.option("mergeSchema", "true")'
 
Table schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
 
Data schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
-- amount: double (nullable = true)
 
If Table ACLs are enabled, these options will be ignored. Please use the ALTER TABLE command for changing the schema.

Zamiast automatycznie dodawać nowe kolumny, Delta Lake narzuca schemat i przestaje pisać. Aby pomóc określić, która kolumna (lub zestaw kolumn) powoduje rozbieżność, Spark wyprowadza oba schematy ze śladu stosu w celu porównania.

Jaka jest korzyść z egzekwowania schematu?

Ponieważ egzekwowanie schematu jest dość rygorystyczną kontrolą, jest to doskonałe narzędzie do stosowania jako strażnik czystego, w pełni przekształconego zestawu danych, który jest gotowy do produkcji lub konsumpcji. Zwykle stosowane do tabel bezpośrednio dostarczających dane:

  • Algorytmy uczenia maszynowego
  • Panele BI
  • Narzędzia do analizy i wizualizacji danych
  • Dowolny system produkcyjny, który wymaga wysoce ustrukturyzowanych, silnie typowanych schematów semantycznych.

Aby przygotować dane na tę ostatnią przeszkodę, wielu użytkowników korzysta z prostej architektury „multi-hop”, która stopniowo wprowadza strukturę do ich tabel. Aby dowiedzieć się więcej na ten temat, możesz zapoznać się z artykułem Uczenie maszynowe klasy produkcyjnej za pomocą Delta Lake.

Oczywiście egzekwowanie schematu można zastosować w dowolnym miejscu potoku, należy jednak pamiętać, że przesyłanie strumieniowe do tabeli w tym przypadku może być frustrujące, ponieważ na przykład zapomniałeś, że do przychodzących danych dodałeś kolejną kolumnę.

Zapobieganie rozwodnieniu danych

Być może zastanawiasz się teraz, o co to całe zamieszanie? W końcu czasami nieoczekiwany błąd „niedopasowania schematu” może przeszkodzić Ci w przepływie pracy, zwłaszcza jeśli dopiero zaczynasz korzystać z Delta Lake. Dlaczego po prostu nie pozwolić, aby schemat zmienił się w razie potrzeby, abym mógł napisać moją ramkę DataFrame bez względu na wszystko?

Jak mówi stare powiedzenie: „uncja zapobiegania jest warta funta leczenia”. W pewnym momencie, jeśli nie zadbasz o wyegzekwowanie swojego schematu, problemy ze zgodnością typów danych wyjdą na wierzch — pozornie jednorodne, surowe źródła danych mogą zawierać przypadki Edge, uszkodzone kolumny, zniekształcone mapowania lub inne przerażające rzeczy, o których możesz marzyć koszmary. Najlepszym podejściem jest zatrzymanie tych wrogów przy bramie – poprzez egzekwowanie schematu – i rozprawienie się z nimi na światło dzienne, a nie później, gdy zaczną czaić się w ciemnych głębinach kodu produkcyjnego.

Wymuszanie schematu daje pewność, że schemat tabeli nie ulegnie zmianie, chyba że zatwierdzisz zmianę. Zapobiega to rozwodnieniu danych, które może wystąpić, gdy nowe kolumny są dodawane tak często, że wcześniej cenne, skompresowane tabele tracą swoje znaczenie i użyteczność z powodu zalewu danych. Zachęcając Cię do działania celowego, wyznaczania wysokich standardów i oczekiwania wysokiej jakości, egzekwowanie schematów robi dokładnie to, do czego zostało zaprojektowane — pomaga zachować sumienność i czystość arkuszy kalkulacyjnych.

Jeśli po dalszym zastanowieniu zdecydujesz, że naprawdę potrzeba dodaj nową kolumnę - nie ma problemu, poniżej znajduje się jednoliniowa poprawka. Rozwiązaniem jest ewolucja obwodu!

Co to jest ewolucja schematu?

Ewolucja schematu to funkcja, która pozwala użytkownikom łatwo zmieniać bieżący schemat tabeli zgodnie z danymi zmieniającymi się w czasie. Jest najczęściej używany podczas wykonywania operacji dołączania lub przepisywania w celu automatycznego dostosowania schematu w celu uwzględnienia jednej lub większej liczby nowych kolumn.

Jak działa ewolucja schematu?

Idąc za przykładem z poprzedniej sekcji, programiści mogą łatwo wykorzystać ewolucję schematu, aby dodać nowe kolumny, które wcześniej zostały odrzucone z powodu niespójności schematu. Ewolucja obwodu jest aktywowana poprzez dodanie .option('mergeSchema', 'true') do swojego zespołu Spark .write или .writeStream.

# Добавьте параметр mergeSchema
loans.write.format("delta") 
           .option("mergeSchema", "true") 
           .mode("append") 
           .save(DELTALAKE_SILVER_PATH)

Aby wyświetlić wykres, uruchom następujące zapytanie Spark SQL

# Создайте график с новым столбцом, чтобы подтвердить, что запись прошла успешно
%sql
SELECT addr_state, sum(`amount`) AS amount
FROM loan_by_state_delta
GROUP BY addr_state
ORDER BY sum(`amount`)
DESC LIMIT 10

Zanurz się w Delta Lake: egzekwowanie schematów i ewolucja
Alternatywnie możesz ustawić tę opcję dla całej sesji Spark, dodając spark.databricks.delta.schema.autoMerge = True do konfiguracji Sparka. Należy jednak zachować ostrożność, ponieważ wymuszanie schematu nie będzie już ostrzegać o niezamierzonych niespójnościach schematu.

Dołączając parametr do żądania mergeSchema, wszystkie kolumny znajdujące się w ramce DataFrame, ale nie w tabeli docelowej, są automatycznie dodawane na końcu schematu w ramach transakcji zapisu. Można również dodać pola zagnieżdżone, które również zostaną dodane na końcu odpowiednich kolumn struktury.

Inżynierowie dat i analitycy danych mogą skorzystać z tej opcji, aby dodać nowe kolumny (na przykład ostatnio prześledzone dane lub kolumnę wyników sprzedaży w tym miesiącu) do istniejących tabel produkcyjnych opartych na uczeniu maszynowym bez przerywania istniejących modeli opartych na starych kolumnach.

W ramach ewolucji schematu podczas dodawania lub przepisywania tabeli dozwolone są następujące typy zmian schematu:

  • Dodawanie nowych kolumn (jest to najczęstszy scenariusz)
  • Zmiana typów danych z NullType -> dowolny inny typ lub promowanie z ByteType -> ShortType -> IntegerType

Inne zmiany niedozwolone w ramach ewolucji schematu wymagają przepisania schematu i danych poprzez dodanie .option("overwriteSchema", "true"). Na przykład w przypadku, gdy kolumna „Foo” była pierwotnie liczbą całkowitą, a nowy schemat byłby typem danych typu string, wówczas wszystkie pliki Parquet(data) musiałyby zostać przepisane. Zmiany takie obejmują:

  • usunięcie kolumny
  • zmiana typu danych istniejącej kolumny (in-place)
  • zmiana nazw kolumn różniących się tylko wielkością liter (na przykład „Foo” i „foo”)

Wreszcie, w następnej wersji Spark 3.0, jawny DDL będzie w pełni obsługiwany (przy użyciu ALTER TABLE), umożliwiając użytkownikom wykonywanie następujących działań na schematach tabel:

  • dodawanie kolumn
  • zmiana komentarzy w kolumnach
  • ustawianie właściwości tabeli kontrolujących zachowanie tabeli, na przykład ustawianie czasu przechowywania dziennika transakcji.

Jakie są korzyści z ewolucji obwodów?

Ewolucji schematu można używać w dowolnym momencie zamierzać zmień schemat swojej tabeli (w przeciwieństwie do przypadku, gdy przypadkowo dodałeś kolumny do ramki DataFrame, których nie powinno tam być). Jest to najłatwiejszy sposób migracji schematu, ponieważ automatycznie dodaje prawidłowe nazwy kolumn i typy danych bez konieczności ich jawnego deklarowania.

wniosek

Wymuszanie schematu odrzuca wszelkie nowe kolumny lub inne zmiany schematu, które nie są zgodne z Twoją tabelą. Ustalając i utrzymując te wysokie standardy, analitycy i inżynierowie mogą mieć pewność, że ich dane charakteryzują się najwyższym poziomem integralności, komunikując je w sposób jasny i jasny, co pozwala im podejmować lepsze decyzje biznesowe.

Z drugiej strony ewolucja schematu uzupełnia egzekwowanie poprzez uproszczenie przypuszczalny automatyczne zmiany schematu. W końcu dodanie kolumny nie powinno być trudne.

Wymuszone zastosowanie schematu to yang, podczas gdy ewolucja schematu to yin. W połączeniu te funkcje sprawiają, że tłumienie szumów i dostrajanie sygnału jest łatwiejsze niż kiedykolwiek.

Chcielibyśmy także podziękować Mukulowi Murthy’emu i Pranavowi Anandowi za ich wkład w powstanie tego artykułu.

Inne artykuły z tej serii:

Zanurz się w jeziorze Delta: rozpakowywanie dziennika transakcji

Powiązane artykuły

Uczenie maszynowe klasy produkcyjnej za pomocą Delta Lake

Co to jest jezioro danych?

Dowiedz się więcej o kursie

Źródło: www.habr.com

Dodaj komentarz