Spark schemaEvolution w praktyce

Drodzy czytelnicy, dzień dobry!

W tym artykule wiodący konsultant obszaru biznesowego Big Data Solutions firmy Neoflex szczegółowo opisuje możliwości budowania prezentacji o zmiennej strukturze przy użyciu Apache Spark.

W ramach projektu analizy danych często pojawia się zadanie zbudowania witryn sklepowych w oparciu o luźno ustrukturyzowane dane.

Zwykle są to logi, czyli odpowiedzi z różnych systemów, zapisane w formacie JSON lub XML. Dane są przesyłane do Hadoopa, następnie należy z nich zbudować witrynę sklepową. Dostęp do stworzonej gabloty możemy zorganizować np. poprzez Impalę.

W tym przypadku schemat docelowej witryny sklepowej nie jest wcześniej znany. Co więcej, schematu również nie da się sporządzić z góry, bo to zależy od danych, a mamy do czynienia z tymi danymi o bardzo luźnej strukturze.

Na przykład dzisiaj zarejestrowana została następująca odpowiedź:

{source: "app1", error_code: ""}

a jutro z tego samego systemu przychodzi następująca odpowiedź:

{source: "app1", error_code: "error", description: "Network error"}

W rezultacie do gabloty powinno zostać dodane jeszcze jedno pole – opis i nikt nie wie, czy wejdzie, czy nie.

Zadanie stworzenia witryny sklepowej na takich danych jest dość standardowe, a Spark ma do tego szereg narzędzi. Do analizowania danych źródłowych dostępna jest obsługa zarówno JSON, jak i XML, a dla nieznanego wcześniej schematu zapewniona jest obsługa schemaEvolution.

Na pierwszy rzut oka rozwiązanie wygląda na proste. Musisz wziąć folder z JSON i wczytać go do ramki danych. Spark utworzy schemat, zamieni zagnieżdżone dane w struktury. Co więcej, wszystko należy zapisać na parkiecie, co jest również obsługiwane w Impala, rejestrując witrynę sklepową w metastore Hive.

Wszystko wydaje się proste.

Jednakże z krótkich przykładów zawartych w dokumentacji nie wynika jasno, co w praktyce zrobić z wieloma problemami.

Dokumentacja opisuje podejście nie polegające na tworzeniu witryny sklepowej, ale na wczytywaniu JSON lub XML do ramki danych.

Mianowicie po prostu pokazuje, jak czytać i analizować JSON:

df = spark.read.json(path...)

To wystarczy, aby udostępnić dane Sparkowi.

W praktyce skrypt jest znacznie bardziej skomplikowany niż samo wczytanie plików JSON z folderu i utworzenie ramki danych. Sytuacja wygląda tak: jest już pewna witryna sklepowa, codziennie napływają nowe dane, trzeba je dodać do witryny sklepowej, nie zapominając, że schemat może się różnić.

Typowy schemat budowania gabloty jest następujący:

Krok 1. Dane są ładowane do Hadoop z późniejszym codziennym ponownym ładowaniem i dodawane do nowej partycji. Okazuje się, że folder z danymi początkowymi podzielony jest według dnia.

Krok 2. Podczas początkowego ładowania ten folder jest odczytywany i analizowany przez platformę Spark. Powstała ramka danych jest zapisywana w formacie parsowalnym, na przykład na parkiecie, który można następnie zaimportować do Impala. Tworzy to docelową prezentację ze wszystkimi danymi, które zgromadziły się do tego momentu.

Krok 3. Utworzony zostanie plik do pobrania, który będzie codziennie aktualizował witrynę sklepową.
Pojawia się kwestia stopniowego ładowania, konieczności podziału witryny i zachowania ogólnego schematu witryny.

Weźmy przykład. Załóżmy, że zaimplementowano pierwszy etap budowy repozytorium i pliki JSON zostały przesłane do folderu.

Utworzenie z nich ramki danych, a następnie zapisanie jej jako prezentacji, nie stanowi problemu. To pierwszy krok, który można łatwo znaleźć w dokumentacji Sparka:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Wszystko wydaje się być w porządku.

Czytamy i parsujemy JSON, następnie zapisujemy ramkę danych jako parkiet, rejestrując ją w Hive w dowolny dogodny sposób:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Dostajemy okno.

Jednak następnego dnia dodano nowe dane ze źródła. Mamy folder z JSON i wizytówką utworzoną z tego folderu. Po załadowaniu kolejnej partii danych ze źródła w hurtowni danych brakuje danych z jednego dnia.

Logicznym rozwiązaniem byłoby podzielenie witryny sklepowej według dnia, co umożliwi dodawanie nowej partycji każdego kolejnego dnia. Mechanizm tego jest również dobrze znany, Spark pozwala na osobne pisanie partycji.

Najpierw wykonujemy wstępne ładowanie, zapisując dane jak opisano powyżej, dodając jedynie partycjonowanie. Ta akcja nazywa się inicjalizacją witryny sklepowej i jest wykonywana tylko raz:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Następnego dnia ładujemy tylko nową partycję:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Pozostaje tylko ponownie zarejestrować się w Hive, aby zaktualizować schemat.
Jednak w tym miejscu pojawiają się problemy.

Pierwszy problem. Wcześniej czy później powstały parkiet będzie nieczytelny. Wynika to z odmiennego podejścia parkietu i JSON do pustych pól.

Rozważmy typową sytuację. Na przykład wczoraj przybył JSON:

День 1: {"a": {"b": 1}},

a dzisiaj ten sam JSON wygląda tak:

День 2: {"a": null}

Załóżmy, że mamy dwie różne partycje, każda z jedną linią.
Kiedy przeczytamy całe dane źródłowe, Spark będzie w stanie określić typ i zrozumie, że „a” jest polem typu „struktura”, z zagnieżdżonym polem „b” typu INT. Ale jeśli każda partycja została zapisana osobno, otrzymamy parkiet z niezgodnymi schematami partycji:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Sytuacja jest dobrze znana, dlatego specjalnie dodano opcję - podczas analizowania danych źródłowych usuń puste pola:

df = spark.read.json("...", dropFieldIfAllNull=True)

W takim przypadku parkiet będzie składał się z przegród, które można czytać razem.
Choć ci, którzy to zrobili w praktyce, tutaj gorzko się uśmiechną. Dlaczego? Tak, ponieważ prawdopodobnie będą jeszcze dwie sytuacje. Albo trzy. Albo cztery. Pierwszą, która prawie na pewno wystąpi, jest to, że typy numeryczne będą wyglądać inaczej w różnych plikach JSON. Na przykład {intField: 1} i {intField: 1.1}. Jeśli takie pola zostaną znalezione w jednej partycji, wówczas scalanie schematów odczyta wszystko poprawnie, prowadząc do najdokładniejszego typu. Ale jeśli w różnych, to jeden będzie miał intField: int, a drugi będzie miał intField: double.

Istnieje następująca flaga radząca sobie z tą sytuacją:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Teraz mamy folder, w którym znajdują się partycje, które można wczytać do pojedynczej ramki danych i prawidłowy parkiet całej prezentacji. Tak? NIE.

Musimy pamiętać, że tabelę zarejestrowaliśmy w Hive. W nazwach pól w Hive nie jest rozróżniana wielkość liter, natomiast w nazwie Parquet wielkość liter jest uwzględniana. W związku z tym partycje ze schematami: pole1: int i Pole1: int są takie same dla programu Hive, ale nie dla platformy Spark. Nie zapomnij przekonwertować nazw pól na małe litery.

Potem wszystko wydaje się być w porządku.

Jednak nie wszystko jest takie proste. Jest jeszcze drugi, również dobrze znany problem. Ponieważ każda nowa partycja jest zapisywana osobno, folder partycji będzie zawierał pliki usługi Spark, na przykład flagę powodzenia operacji _SUCCESS. Spowoduje to błąd podczas próby parkietu. Aby tego uniknąć, musisz skonfigurować konfigurację tak, aby uniemożliwić Sparkowi dodawanie plików usług do folderu:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Wygląda na to, że teraz codziennie do docelowego folderu prezentacji, w którym znajdują się przeanalizowane dane z danego dnia, dodawana jest nowa partycja parkietowa. Z góry zadbaliśmy o to, aby nie było partycji z konfliktem typów danych.

Ale mamy trzeci problem. Teraz ogólny schemat nie jest znany, ponadto tabela w Hive ma błędny schemat, ponieważ każda nowa partycja najprawdopodobniej wprowadzała zniekształcenie do schematu.

Należy ponownie zarejestrować stół. Można to zrobić po prostu: ponownie przeczytaj parkiet witryny sklepowej, weź schemat i utwórz na jego podstawie plik DDL, za pomocą którego ponownie zarejestrujesz folder w Hive jako tabelę zewnętrzną, aktualizując schemat docelowej witryny sklepowej.

Mamy czwarty problem. Kiedy rejestrowaliśmy tabelę po raz pierwszy, zdaliśmy się na Sparka. Teraz robimy to sami i musimy pamiętać, że pola parkietowe mogą zaczynać się od znaków, które nie są dozwolone w Hive. Na przykład Spark wyrzuca linie, których nie mógł przeanalizować w polu „corrupt_record”. Takiego pola nie można zarejestrować w Hive bez ucieczki.

Wiedząc o tym, otrzymujemy schemat:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

kod ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",, ",`").replace("tablica<`", "tablica<") sprawia, że ​​DDL jest bezpieczny, tj. zamiast:

create table tname (_field1 string, 1field string)

W przypadku nazw pól takich jak „_pole1, 1 pole” tworzony jest bezpieczny kod DDL, w którym nazwy pól są oznaczone zmianą znaczenia: utwórz tabelę `tname` (ciąg `_pole1`, ciąg `1pole`).

Powstaje pytanie: jak poprawnie uzyskać ramkę danych z pełnym schematem (w kodzie pf)? Jak zdobyć tego pf? To jest piąty problem. Czy ponownie przeczytałeś schemat wszystkich przegród z folderu z plikami parkietu docelowej gabloty? Ta metoda jest najbezpieczniejsza, ale trudna.

Schemat jest już w Hive. Nowy schemat można uzyskać łącząc schemat całej tabeli i nowej partycji. Musisz więc pobrać schemat tabeli z Hive i połączyć go ze schematem nowej partycji. Można to zrobić, czytając metadane testowe z Hive, zapisując je w folderze tymczasowym i używając platformy Spark do jednoczesnego odczytania obu partycji.

Właściwie jest tam wszystko, czego potrzebujesz: oryginalny schemat tabeli w Hive i nowa partycja. Mamy też dane. Pozostaje tylko uzyskać nowy schemat, który łączy schemat storefront i nowe pola z utworzonej partycji:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Następnie tworzymy rejestrację tabeli DDL, jak w poprzednim fragmencie.
Jeśli cały łańcuch działa poprawnie, czyli nastąpiło ładowanie inicjujące, a tabela została poprawnie utworzona w Hive, to otrzymamy zaktualizowany schemat tabeli.

Ostatnim problemem jest to, że nie można po prostu dodać partycji do tabeli Hive, ponieważ zostanie ona uszkodzona. Musisz zmusić Hive do naprawienia struktury partycji:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Proste zadanie odczytania JSON i stworzenia na jego podstawie witryny sklepowej skutkuje pokonaniem szeregu ukrytych trudności, których rozwiązań trzeba szukać osobno. I choć rozwiązania te są proste, ich znalezienie zajmuje dużo czasu.

Aby zrealizować konstrukcję gabloty musiałem:

  • Dodaj partycje do gabloty, pozbywając się plików serwisowych
  • Radź sobie z pustymi polami w danych źródłowych, które wpisał Spark
  • Rzutuj proste typy na ciąg znaków
  • Konwertuj nazwy pól na małe litery
  • Oddzielne przesyłanie danych i rejestracja tabeli w Hive (generowanie DDL)
  • Nie zapomnij uciec od nazw pól, które mogą być niezgodne z Hive
  • Dowiedz się, jak zaktualizować rejestrację tabeli w programie Hive

Podsumowując, zauważamy, że decyzja o budowie witryn sklepowych jest obarczona wieloma pułapkami. Dlatego w przypadku trudności we wdrożeniu lepiej skontaktować się z doświadczonym partnerem, posiadającym udaną wiedzę specjalistyczną.

Dziękujemy za przeczytanie tego artykułu, mamy nadzieję, że informacje okażą się przydatne.

Źródło: www.habr.com

Dodaj komentarz