Spark schémaEvoluce v praxi

Vážení čtenáři, dobrý den!

V tomto článku přední konzultant obchodní oblasti Big Data Solutions společnosti Neoflex podrobně popisuje možnosti budování vitrín s variabilní strukturou pomocí Apache Spark.

V rámci projektu analýzy dat často vyvstává úkol vybudovat výkladní skříně na základě volně strukturovaných dat.

Obvykle se jedná o protokoly nebo odpovědi z různých systémů, uložené jako JSON nebo XML. Data se nahrávají do Hadoopu, pak z nich musíte postavit obchod. Přístup do vytvořené vitríny můžeme zorganizovat například přes Impalu.

V tomto případě není schéma cílového obchodu předem známé. Navíc schéma také nelze sestavit předem, protože závisí na datech a my máme co do činění s těmito velmi volně strukturovanými daty.

Například dnes je zaznamenána následující odpověď:

{source: "app1", error_code: ""}

a zítra ze stejného systému přijde následující odpověď:

{source: "app1", error_code: "error", description: "Network error"}

Ve výsledku by mělo do vitríny přibýt ještě jedno pole – popis a nikdo neví, zda přijde nebo ne.

Úkol vytvořit výkladní skříň na takových datech je docela standardní a Spark pro to má řadu nástrojů. Pro analýzu zdrojových dat existuje podpora pro JSON i XML a pro dříve neznámé schéma je poskytována podpora pro schemaEvolution.

Řešení na první pohled vypadá jednoduše. Musíte vzít složku s JSON a načíst ji do datového rámce. Spark vytvoří schéma, změní vnořená data na struktury. Dále je potřeba vše uložit do parketu, což je podporováno i v Impale, registrací obchodu v Hive metastore.

Všechno se zdá být jednoduché.

Z krátkých příkladů v dokumentaci však není jasné, co s řadou problémů v praxi dělat.

Dokumentace popisuje přístup nevytvářet obchod, ale číst JSON nebo XML do datového rámce.

Konkrétně to jednoduše ukazuje, jak číst a analyzovat JSON:

df = spark.read.json(path...)

To stačí ke zpřístupnění dat Sparku.

V praxi je skript mnohem složitější než pouhé čtení souborů JSON ze složky a vytvoření datového rámce. Situace vypadá takto: určitá výloha již existuje, každý den přicházejí nová data, je třeba je přidat do výlohy, přičemž nezapomínejte, že schéma se může lišit.

Obvyklé schéma pro stavbu vitríny je následující:

Krok 1. Data se načtou do Hadoopu s následným každodenním znovunačtením a přidají se do nového oddílu. Ukáže se složka s počátečními daty rozdělená podle dnů.

Krok 2. Během počátečního načtení je tato složka čtena a analyzována aplikací Spark. Výsledný datový rámec je uložen v parsovatelném formátu, například v parketách, který lze poté importovat do Impala. Tím se vytvoří cílová vitrína se všemi daty, která se do tohoto okamžiku nashromáždila.

Krok 3. Vytvoří se stahování, které aktualizuje výklad každý den.
Je zde otázka postupného načítání, potřeba rozdělit vitrínu a otázka zachování obecného schématu vitríny.

Vezměme si příklad. Řekněme, že byl implementován první krok budování úložiště a soubory JSON jsou nahrány do složky.

Vytvořit z nich datový rámec a poté jej uložit jako ukázku není problém. Toto je úplně první krok, který lze snadno najít v dokumentaci Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Všechno se zdá být v pořádku.

Načteme a analyzujeme JSON, pak uložíme datový rámec jako parket a zaregistrujeme ho v Hive jakýmkoli pohodlným způsobem:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Dostáváme okno.

Ale další den byla přidána nová data ze zdroje. Máme složku s JSON a z této složky vytvořenou ukázku. Po načtení další dávky dat ze zdroje v datovém tržišti chybí data za jeden den.

Logickým řešením by bylo rozdělení obchodu podle dne, což umožní přidání nového oddílu každý další den. Mechanismus pro to je také dobře známý, Spark umožňuje zapisovat oddíly samostatně.

Nejprve provedeme počáteční načtení, uložíme data, jak je popsáno výše, přidáme pouze rozdělení. Tato akce se nazývá inicializace výlohy a provádí se pouze jednou:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Následující den načteme pouze nový oddíl:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Zbývá se znovu zaregistrovat v Hive, aby se schéma aktualizovalo.
Zde však nastávají problémy.

První problém. Dříve nebo později bude výsledná parketa nečitelná. To je způsobeno tím, jak parkety a JSON zacházejí s prázdnými poli odlišně.

Podívejme se na typickou situaci. Například včera přišel JSON:

День 1: {"a": {"b": 1}},

a dnes stejný JSON vypadá takto:

День 2: {"a": null}

Řekněme, že máme dva různé oddíly, každý s jedním řádkem.
Když si přečteme celá zdrojová data, Spark bude schopen určit typ a pochopí, že „a“ je pole typu „struktura“ s vnořeným polem „b“ typu INT. Ale pokud byl každý oddíl uložen samostatně, dostaneme parket s nekompatibilními schématy oddílů:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Tato situace je dobře známá, proto byla speciálně přidána možnost - při analýze zdrojových dat odstraňte prázdná pole:

df = spark.read.json("...", dropFieldIfAllNull=True)

V tomto případě se parkety budou skládat z oddílů, které lze číst společně.
I když ti, kteří to udělali v praxi, se zde budou hořce usmívat. Proč? Ano, protože pravděpodobně nastanou další dvě situace. Nebo tři. Nebo čtyři. První, ke kterému téměř jistě dojde, je, že číselné typy budou v různých souborech JSON vypadat jinak. Například {intField: 1} a {intField: 1.1}. Pokud jsou taková pole nalezena v jednom oddílu, pak sloučení schémat přečte vše správně, což povede k nejpřesnějšímu typu. Ale pokud v různých, pak jeden bude mít intField: int a druhý bude mít intField: double.

Pro řešení této situace existuje následující příznak:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Nyní máme složku, kde jsou oddíly, které lze načíst do jednoho datového rámce a platnou parketu celé vitríny. Ano? Ne.

Musíme si pamatovat, že jsme stůl zaregistrovali v Hive. Hive nerozlišuje velká a malá písmena v názvech polí, zatímco parkety rozlišují velká a malá písmena. Proto jsou oddíly se schématy: pole1: int a Pole1: int stejné pro Hive, ale ne pro Spark. Nezapomeňte převést názvy polí na malá písmena.

Poté se zdá být vše v pořádku.

Nicméně, ne všechno tak jednoduché. Je tu druhý, také známý problém. Protože se každý nový oddíl ukládá samostatně, složka oddílu bude obsahovat soubory služby Spark, například příznak _SUCCESS operace úspěchu. To bude mít za následek chybu při pokusu o parkety. Abyste tomu zabránili, musíte nakonfigurovat konfiguraci tak, abyste zabránili Sparku přidávat soubory služeb do složky:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Zdá se, že nyní je každý den přidán nový parketový oddíl do cílové složky vitríny, kde se nacházejí analyzovaná data pro daný den. Předem jsme se postarali o to, aby neexistovaly žádné oddíly s konfliktem datových typů.

Máme však třetí problém. Nyní obecné schéma není známé, navíc tabulka v Hive má nesprávné schéma, protože každý nový oddíl s největší pravděpodobností vnesl do schématu zkreslení.

Je třeba znovu zaregistrovat stůl. To lze provést jednoduše: znovu si přečtěte parketu výkladu, vezměte schéma a na jeho základě vytvořte DDL, pomocí kterého znovu zaregistrujete složku v Hive jako externí tabulku a aktualizujete schéma cílového výkladu.

Máme čtvrtý problém. Když jsme stůl registrovali poprvé, spoléhali jsme na Spark. Nyní to děláme sami a musíme si uvědomit, že parketová pole mohou začínat znaky, které nejsou pro Hive povoleny. Například Spark vyhodí řádky, které nemohl analyzovat v poli "corrupt_record". Takové pole nemůže být zaregistrováno v Hive, aniž by bylo escapováno.

Když to víme, dostaneme schéma:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Kód ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("pole<`", "pole<") dělá bezpečný DDL, tj. místo:

create table tname (_field1 string, 1field string)

S názvy polí jako "_field1, 1field" se vytvoří bezpečné DDL tam, kde jsou názvy polí uvozeny: vytvořte tabulku `tname` (řetězec `_field1`, řetězec `1field`).

Nabízí se otázka: jak správně získat dataframe s kompletním schématem (v pf kódu)? Jak získat toto pf? To je pátý problém. Přečíst znovu schéma všech oddílů ze složky s parketovými soubory cílové vitríny? Tato metoda je nejbezpečnější, ale obtížná.

Schéma je již v Hive. Nové schéma můžete získat kombinací schématu celé tabulky a nového oddílu. Takže musíte vzít schéma tabulky z Hive a zkombinovat je se schématem nového oddílu. To lze provést načtením testovacích metadat z Hive, jejich uložením do dočasné složky a použitím Sparku ke čtení obou oddílů najednou.

Ve skutečnosti je zde vše, co potřebujete: původní schéma tabulky v Hive a nový oddíl. Máme také data. Zbývá pouze získat nové schéma, které kombinuje schéma obchodu a nová pole z vytvořeného oddílu:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Dále vytvoříme tabulku registrace DDL, jako v předchozím úryvku.
Pokud celý řetězec funguje správně, konkrétně došlo k inicializačnímu zatížení a tabulka byla v Hive správně vytvořena, dostaneme aktualizované schéma tabulky.

A poslední problém je, že do tabulky Hive nemůžete jen tak přidat oddíl, protože bude rozbitý. Musíte přinutit Hive, aby opravil svou strukturu oddílů:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Jednoduchý úkol přečíst JSON a vytvořit na něm založenou výlohu vede k překonání řady implicitních potíží, jejichž řešení musíte hledat samostatně. A přestože jsou tato řešení jednoduchá, jejich nalezení zabere spoustu času.

K realizaci konstrukce vitríny jsem musel:

  • Přidejte oddíly do vitríny a zbavte se souborů služeb
  • Vypořádejte se s prázdnými poli ve zdrojových datech, která Spark zadal
  • Odeslání jednoduchých typů do řetězce
  • Převeďte názvy polí na malá písmena
  • Samostatné nahrávání dat a registrace tabulky v Hive (generování DDL)
  • Nezapomeňte uvést názvy polí, která mohou být nekompatibilní s Hive
  • Přečtěte si, jak aktualizovat registraci stolu v Hive

Shrneme-li to, poznamenáváme, že rozhodnutí postavit výlohy je plné mnoha úskalí. V případě potíží s implementací je proto lepší kontaktovat zkušeného partnera s úspěšnou odborností.

Děkujeme, že jste si přečetli tento článek, doufáme, že pro vás budou informace užitečné.

Zdroj: www.habr.com

Přidat komentář