Spark schémaEvolúcia v praxi

Vážení čitatelia, dobrý deň!

V tomto článku vedúci konzultant obchodnej oblasti Big Data Solutions spoločnosti Neoflex podrobne popisuje možnosti budovania vitrín s variabilnou štruktúrou pomocou Apache Spark.

V rámci projektu analýzy údajov často vzniká úloha vybudovať výklady založené na voľne štruktúrovaných údajoch.

Zvyčajne ide o protokoly alebo odpovede z rôznych systémov, uložené ako JSON alebo XML. Údaje sa nahrajú na Hadoop, potom z nich musíte vytvoriť výklad. Prístup k vytvorenej vitríne vieme zorganizovať napríklad cez Impala.

V tomto prípade nie je vopred známa schéma cieľového výkladu. Navyše schému nie je možné zostaviť vopred, pretože závisí od údajov a my máme do činenia s týmito veľmi voľne štruktúrovanými údajmi.

Napríklad dnes je zaznamenaná nasledujúca odpoveď:

{source: "app1", error_code: ""}

a zajtra z toho istého systému príde nasledujúca odpoveď:

{source: "app1", error_code: "error", description: "Network error"}

Tým pádom by do vitríny malo pribudnúť ešte jedno pole – popis a nikto nevie, či príde alebo nie.

Úloha vytvorenia výkladu na takýchto údajoch je celkom štandardná a Spark má na to množstvo nástrojov. Na analýzu zdrojových údajov existuje podpora pre JSON aj XML a pre predtým neznámu schému je poskytovaná podpora pre schemaEvolution.

Na prvý pohľad vyzerá riešenie jednoducho. Musíte vziať priečinok s JSON a prečítať ho do dátového rámca. Spark vytvorí schému, premení vnorené dáta na štruktúry. Ďalej je potrebné všetko uložiť do parkiet, čo je podporované aj v Impala, registráciou výkladu v metastore Hive.

Všetko sa zdá byť jednoduché.

Z krátkych príkladov v dokumentácii však nie je jasné, čo s množstvom problémov v praxi robiť.

Dokumentácia popisuje prístup nevytvárať výklad, ale čítať JSON alebo XML do dátového rámca.

Konkrétne to jednoducho ukazuje, ako čítať a analyzovať JSON:

df = spark.read.json(path...)

To stačí na sprístupnenie údajov Sparku.

V praxi je skript oveľa komplikovanejší ako len čítanie JSON súborov z priečinka a vytváranie dátového rámca. Situácia vyzerá takto: určitý výklad už existuje, každý deň prichádzajú nové údaje, treba ich pridať do výkladu, pričom nezabúdame, že schéma sa môže líšiť.

Obvyklá schéma výstavby vitríny je nasledovná:

Krok 1. Dáta sa načítajú do Hadoop s následným každodenným opätovným načítaním a pridajú sa do nového oddielu. Ukázalo sa, že priečinok s počiatočnými údajmi je rozdelený podľa dní.

Krok 2. Počas počiatočného načítania tento priečinok načíta a analyzuje Spark. Výsledný dátový rámec je uložený v analyzovateľnom formáte, napríklad v parketách, ktorý je možné následne importovať do Impala. Tým sa vytvorí cieľová vitrína so všetkými údajmi, ktoré sa doteraz nazhromaždili.

Krok 3. Vytvorí sa sťahovanie, ktoré bude aktualizovať výklad každý deň.
Je tu otázka postupného načítania, potreba rozdeliť vitrínu a otázka zachovania všeobecnej schémy vitríny.

Vezmime si príklad. Povedzme, že bol implementovaný prvý krok budovania úložiska a súbory JSON sa nahrajú do priečinka.

Vytvoriť z nich dátový rámec a potom ho uložiť ako výkladnú skriňu nie je problém. Toto je úplne prvý krok, ktorý možno ľahko nájsť v dokumentácii Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Zdá sa, že je všetko v poriadku.

Čítame a analyzujeme JSON, potom uložíme dátový rámec ako parket a zaregistrujeme ho v Hive akýmkoľvek pohodlným spôsobom:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Dostávame okno.

Nasledujúci deň však boli pridané nové údaje zo zdroja. Máme priečinok s JSON a vitrínu vytvorenú z tohto priečinka. Po načítaní ďalšej dávky údajov zo zdroja v dátovom trhu chýbajú údaje za jeden deň.

Logickým riešením by bolo rozdelenie obchodu podľa dňa, čo umožní pridávanie nového oddielu každý ďalší deň. Mechanizmus na to je tiež dobre známy, Spark umožňuje zapisovať oddiely samostatne.

Najprv vykonáme počiatočné načítanie, uložíme údaje, ako je popísané vyššie, pričom pridáme iba rozdelenie. Táto akcia sa nazýva inicializácia výkladu a vykoná sa iba raz:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Nasledujúci deň načítame iba nový oddiel:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Zostáva len znova sa zaregistrovať v Hive, aby sa schéma aktualizovala.
Práve tu však vznikajú problémy.

Prvý problém. Skôr či neskôr budú výsledné parkety nečitateľné. Je to spôsobené tým, ako parkety a JSON zaobchádzajú s prázdnymi poľami odlišne.

Zoberme si typickú situáciu. Napríklad včera prichádza JSON:

День 1: {"a": {"b": 1}},

a dnes ten istý JSON vyzerá takto:

День 2: {"a": null}

Povedzme, že máme dva rôzne oddiely, každý s jedným riadkom.
Keď si prečítame celé zdrojové dáta, Spark bude schopný určiť typ a pochopí, že „a“ je pole typu „štruktúra“ s vnoreným poľom „b“ typu INT. Ak sa však každý oddiel uloží samostatne, dostaneme parket s nekompatibilnými schémami oddielov:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Táto situácia je dobre známa, preto bola špeciálne pridaná možnosť - pri analýze zdrojových údajov odstráňte prázdne polia:

df = spark.read.json("...", dropFieldIfAllNull=True)

V tomto prípade budú parkety pozostávať z priečok, ktoré sa dajú čítať spolu.
Aj keď tí, ktorí to urobili v praxi, sa tu trpko usmejú. prečo? Áno, pretože pravdepodobne nastanú ďalšie dve situácie. Alebo tri. Alebo štyri. Prvý, ktorý sa takmer určite vyskytne, je, že číselné typy budú v rôznych súboroch JSON vyzerať inak. Napríklad {intField: 1} a {intField: 1.1}. Ak sa takéto polia nachádzajú v jednom oddiele, potom zlúčenie schém prečíta všetko správne, čo vedie k najpresnejšiemu typu. Ale ak v rôznych, potom jeden bude mať intField: int a druhý bude mať intField: double.

Na riešenie tejto situácie existuje nasledujúci príznak:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Teraz máme priečinok, kde sú oddiely, ktoré sa dajú načítať do jedného dátového rámca a platná parketa celej vitríny. Áno? Nie

Musíme si uvedomiť, že stôl sme zaregistrovali v Hive. Úľ nerozlišuje veľké a malé písmená v názvoch polí, zatiaľ čo parkety rozlišujú veľké a malé písmená. Preto sú oddiely so schémami: field1: int a Field1: int rovnaké pre Hive, ale nie pre Spark. Nezabudnite previesť názvy polí na malé písmená.

Potom sa zdá byť všetko v poriadku.

Nie všetko je však také jednoduché. Je tu druhý, tiež dobre známy problém. Keďže každý nový oddiel sa ukladá samostatne, priečinok oddielu bude obsahovať súbory služby Spark, napríklad príznak úspešnosti operácie _SUCCESS. To bude mať za následok chybu pri pokuse o parkety. Aby ste tomu zabránili, musíte nakonfigurovať konfiguráciu, aby ste zabránili Sparku pridávať súbory služieb do priečinka:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Zdá sa, že každý deň sa do cieľového priečinka vitríny, kde sa nachádzajú analyzované údaje pre daný deň, pridáva nový parketový oddiel. Vopred sme sa postarali o to, aby neexistovali žiadne oddiely s konfliktom dátových typov.

Máme však tretí problém. Teraz všeobecná schéma nie je známa, navyše tabuľka v Hive má nesprávnu schému, pretože každá nová partícia s najväčšou pravdepodobnosťou vnáša do schémy skreslenie.

Je potrebné znova zaregistrovať tabuľku. Dá sa to urobiť jednoducho: znova si prečítajte parkety výkladu, vezmite schému a na jej základe vytvorte DDL, pomocou ktorého znova zaregistrujete priečinok v Hive ako externú tabuľku, čím sa aktualizuje schéma cieľového výkladu.

Máme štvrtý problém. Keď sme stôl registrovali prvýkrát, stavili sme na Spark. Teraz to robíme sami a musíme si uvedomiť, že parketové polia môžu začínať znakmi, ktoré nie sú povolené pre Hive. Napríklad Spark vyhodí riadky, ktoré nedokázal analyzovať v poli "corrupt_record". Takéto pole nie je možné zaregistrovať v Hive bez toho, aby ste ho opustili.

Keď to vieme, dostaneme schému:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

kód ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("pole<`", "pole<") robí bezpečný DDL, t.j. namiesto:

create table tname (_field1 string, 1field string)

S názvami polí ako "_field1, 1field" sa vytvorí bezpečné DDL tam, kde sú názvy polí escapované: vytvorte tabuľku `tname` (reťazec `_field1`, reťazec `1field`).

Vzniká otázka: ako správne získať dátový rámec s úplnou schémou (v kóde pf)? Ako získať toto pf? Toto je piaty problém. Znovu prečítať schému všetkých oddielov z priečinka s parketovými súbormi cieľovej vitríny? Táto metóda je najbezpečnejšia, ale náročná.

Schéma je už v Hive. Novú schému môžete získať kombináciou schémy celej tabuľky a nového oddielu. Takže musíte vziať schému tabuľky z Hive a skombinovať ju so schémou nového oddielu. Dá sa to urobiť prečítaním testovacích metadát z Hive, ich uložením do dočasného priečinka a použitím Sparku na čítanie oboch partícií naraz.

V skutočnosti je tu všetko, čo potrebujete: pôvodná tabuľková schéma v Hive a nový oddiel. Máme aj údaje. Zostáva len získať novú schému, ktorá kombinuje schému výkladu a nové polia z vytvoreného oddielu:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Ďalej vytvoríme tabuľku registrácie DDL, ako v predchádzajúcom úryvku.
Ak celý reťazec funguje správne, konkrétne došlo k inicializačnému zaťaženiu a tabuľka bola v Hive správne vytvorená, získame aktualizovanú schému tabuľky.

A posledným problémom je, že do tabuľky Hive nemôžete len tak pridať partíciu, pretože tá bude porušená. Musíte prinútiť Hive opraviť štruktúru oddielov:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Jednoduchá úloha čítať JSON a na jeho základe vytvoriť obchod vedie k prekonaniu množstva implicitných ťažkostí, riešení, ktoré musíte hľadať samostatne. A hoci sú tieto riešenia jednoduché, ich nájdenie si vyžaduje veľa času.

Na realizáciu konštrukcie vitríny som musel:

  • Pridajte oddiely do vitríny a zbavte sa súborov služieb
  • Zaoberajte sa prázdnymi poľami v zdrojových údajoch, ktoré Spark zadal
  • Preneste jednoduché typy na reťazec
  • Preveďte názvy polí na malé písmená
  • Oddelené nahrávanie údajov a registrácia tabuľky v Hive (generovanie DDL)
  • Nezabudnite zadať názvy polí, ktoré môžu byť nekompatibilné s Hive
  • Zistite, ako aktualizovať registráciu tabuľky v Hive

Stručne povedané, poznamenávame, že rozhodnutie postaviť výklady je plné mnohých úskalí. Preto je v prípade ťažkostí pri realizácii lepšie kontaktovať skúseného partnera s úspešnou odbornosťou.

Ďakujeme, že ste si prečítali tento článok, dúfame, že vám tieto informácie budú užitočné.

Zdroj: hab.com

Pridať komentár