Spark schemaEvolution на практыцы

Паважаныя чытачы, добрага дня!

У дадзеным артыкуле вядучы кансультант бізнес-кірункі Big Data Solutions кампаніі "Неафлекс", падрабязна апісвае варыянты пабудовы вітрын зменнай структуры з выкарыстаннем Apache Spark.

У рамках праекта па аналізе дадзеных, часта ўзнікае задача пабудовы вітрын на аснове слаба структураваных дадзеных.

Звычайна гэта логі, ці адказы розных сістэм, якія захоўваюцца ў выглядзе JSON ці XML. Дадзеныя выгружаюцца ў Hadoop, далей з іх трэба пабудаваць вітрыну. Арганізаваць доступ да створанай вітрыны можам, напрыклад, праз Impala.

У гэтым выпадку схема мэтавай вітрыны папярэдне невядомая. Больш за тое, схема яшчэ і не можа быць складзена загадзя, бо залежыць ад дадзеных, а мы маем справу з гэтымі самымі слаба структураваным дадзенымі.

Напрыклад, сёння лагуецца такі адказ:

{source: "app1", error_code: ""}

а заўтра ад гэтай жа сістэмы прыходзіць такі адказ:

{source: "app1", error_code: "error", description: "Network error"}

У выніку ў вітрыну павінна дадацца яшчэ адно поле - description, і прыйдзе яно ці не, ніхто не ведае.

Задача стварэння вітрыны на такіх дадзеных даволі стандартная, і ў Spark для гэтага ёсць шэраг прылад. Для парсінгу зыходных дадзеных ёсць падтрымка і JSON, і XML, а для невядомай загадзя схемы прадугледжана падтрымка schemaEvolution.

З першага погляду рашэнне выглядае проста. Трэба ўзяць тэчку з JSON і прачытаць у dataframe. Spark створыць схему, укладзеныя дадзеныя ператворыць у структуры. Далей усё трэба захаваць у parquet, які падтрымліваецца ў тым ліку і ў Impala, зарэгістраваўшы вітрыну ў Hive metastore.

Быццам бы ўсё проста.

Аднак, з кароткіх прыкладаў у дакументацыі не зразумела, што рабіць з шэрагам праблем на практыцы.

У дакументацыі апісваецца падыход не для стварэння вітрыны, а для чытання JSON ці XML у dataframe.

У прыватнасці, проста прыводзіцца як прачытаць і распарыць JSON:

df = spark.read.json(path...)

Гэтага дастаткова, каб зрабіць дадзеныя даступнымі для Spark.

На практыку сцэнар значна складаней, чым проста прачытаць JSON файлы з тэчкі, і стварыць dataframe. Сітуацыя выглядае так: ужо ёсць вызначаная вітрына, кожны дзень прыходзяць новыя дадзеныя, іх трэба дадаць у вітрыну, не забываючы, што схема можа адрознівацца.

Звычайная схема пабудовы вітрыны такая:

Крок 1. Дадзеныя загружаюцца ў Hadoop з наступнай штодзённай дазагрузкай і складаюцца ў новую партыцыю. Атрымліваецца партыцыянаваная па днях тэчка з зыходнымі дадзенымі.

Крок 2. Падчас ініцыялізавальнай загрузкі гэтая тэчка чытаецца і парыцца сродкамі Spark. Атрыманы dataframe захоўваецца ў фармат, даступны для аналізу, напрыклад, у parquet, які потым можна імпартаваць у Impala. Так ствараецца мэтавая вітрына са ўсімі дадзенымі, якія назапасіліся да гэтага моманту.

Крок 3. Ствараецца загрузка, якая будзе кожны дзень абнаўляць вітрыну.
Узнікае пытанне інкрыментальнай загрузкі, неабходнасць партыцыянавання вітрыны, і пытанне падтрымкі агульнай схемы вітрыны.

Прывядзем прыклад. Дапусцім, рэалізаваны першы крок пабудовы сховішчы, і настроена выгрузка JSON файлаў у тэчку.

Стварыць з іх dataframe, каб потым захаваць як вітрыну, праблем не складае. Гэта той самы першы крок, які лёгка можна знайсці ў дакументацыі Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Быццам бы ўсё добра.

Мы прачыталі і распарсілі JSON, далей захоўваем dataframe як parquet, рэгіструючы ў Hive любым зручным спосабам:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Атрымліваем вітрыну.

Але на другі дзень дадаліся новыя дадзеныя з крыніцы. У нас ёсць тэчка з JSON, і вітрына, створаная на аснове дадзенай тэчкі. Пасля загрузкі наступнай порцыі дадзеных з крыніцы ў вітрыне бракуе дадзеных за адзін дзень.

Лагічным рашэннем будзе партыцыянаваць вітрыну па днях, што дазволіць кожны наступны дзень дадаваць новую партыцыю. Механізм гэтага таксама добра вядомы, Spark дазваляе запісваць партыцыі асобна.

Спачатку які робіцца ініцыялізавальную загрузку, захоўваючы дадзеныя як было апісана вышэй, дадаўшы толькі партыцыянаванне. Гэтае дзеянне завецца ініцыялізацыя вітрыны і робіцца толькі адзін раз:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

На наступны дзень загружаем толькі новую партыцыю:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Застаецца толькі перарэгістраваць у Hive, каб абнавіць схему.
Аднак тут і ўзнікаюць праблемы.

Першая праблема. Рана ці позна які атрымаўся parquet нельга будзе прачытаць. Гэта злучана з тым, як па-рознаму падыходзяць parquet і JSON да пустых палёў.

Разгледзім тыповую сітуацыю. Напрыклад, учора прыходзіць JSON:

День 1: {"a": {"b": 1}},

а сёння гэты ж JSON выглядае так:

День 2: {"a": null}

Дапушчальны, у нас дзве розных партіціі, у якіх па адным радку.
Калі мы чытаем зыходныя дадзеныя цалкам, Spark здолее вызначыць тып, і зразумее, што "a" - гэта поле тыпу "структура", з укладзеным полем "b" тыпу INT. Але, калі кожная партыцыя была захавана паасобку, то атрымліваецца parquet з несумяшчальнымі схемамі партыцый:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Дадзеная сітуацыя добра вядомая, таму спецыяльна дададзена опцыя - пры парсінгу зыходных дадзеных выдаляць пустыя палі:

df = spark.read.json("...", dropFieldIfAllNull=True)

У гэтым выпадку parquet будзе складацца з партый, якія можна будзе прачытаць разам.
Хаця тыя, хто рабіў гэта на практыцы, тут горка ўсміхнуцца. Чаму? Ды таму, што хутчэй за ўсё ўзнікнуць яшчэ дзве сітуацыі. Або тры. Або чатыры. Першая, якая ўзнікне амаль напэўна, лікавыя тыпы будуць па-рознаму выглядаць у розных JSON файлах. Напрыклад, {intField: 1} і {intField: 1.1}. Калі такія палі трапяцца ў адной партцыі, то мерж схемы прачытае ўсё правільна, прывёўшы да самага дакладнага тыпу. А вось калі ў розных, то ў адной будзе intField: int, а ў іншай intField: double.

Для апрацоўкі гэтай сітуацыі ёсць наступны сцяг:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Цяпер у нас ёсць тэчка, дзе знаходзяцца партіціі, якія можна прачытаць у адзіны dataframe і валідны parquet ўсёй вітрыны. Так? Не.

Трэба ўспомніць, што мы рэгістравалі табліцу ў Hive. Hive не адчувальны да рэгістра ў назвах палёў, а parquet адчувальны. Таму партіціі са схемамі: field1: int, і Field1: int для Hive аднолькавыя, а для Spark не. Трэба не забыцца прывесці назовы палёў да ніжняга рэгістра.

Вось пасля гэтага, здаецца, усё добра.

Аднак не ўсё так проста. Узнікае другая, таксама добра вядомая праблема. Бо кожная новая партыцыя захоўваецца асобна, то ў тэчцы партіціі будуць ляжаць службовыя файлы Spark, напрыклад, сцяг паспяховасці аперацыі _SUCCESS. Гэта прывядзе да памылкі пры спробе parquet. Каб гэтага пазбегнуць, трэба наладзіць канфігурацыю, забараніўшы Spark дапісваць у тэчку службовыя файлы:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Здаецца, зараз кожны дзень у тэчку мэтавай вітрыны дадаецца новая parquet партыцыя, дзе ляжаць размаляваныя дадзеныя за дзень. Мы загадзя заклапаціліся тым, каб не было партый з канфліктам тыпаў дадзеных.

Але, перад намі - трэцяя праблема. Зараз агульная схема не вядомая, больш за тое, у Hive табліца з няправільнай схемай, бо кожная новая партыцыя, хутчэй за ўсё, унесла скажэнне ў схему.

Трэба перарэгістраваць табліцу. Гэта можна зрабіць проста: зноў прачытаць parquet вітрыны, узяць схему і стварыць на яе аснове DDL, з якім зноўку зарэгістраваць тэчку ў Hive як вонкавую табліцу, абнавіўшы схему мэтавай вітрыны.

Перад намі ўзнікае чацвёртая праблема. Калі мы рэгістравалі табліцу першы раз, мы абапіраліся на Spark. Цяпер робім гэта самі, і трэба памятаць, што палі parquet могуць пачынацца з сімвалаў, недапушчальных для Hive. Напрыклад, Spark выкідвае радкі, якія не змог распарсіць у поле "corrupt_record". Такое поле нельга будзе зарэгістраваць у Hive без таго, каб экранаваць.

Ведаючы гэта, атрымліваем схему:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Код ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace(«array<`», «array<») робіць бяспечны DDL, гэта значыць замест:

create table tname (_field1 string, 1field string)

З такімі назовамі палёў як «_field1, 1field», робіцца бяспечны DDL, дзе назовы палёў экранаваныя: create table `tname` (`_field1` string, `1field` string).

Узнікае пытанне: як правільна атрымаць dataframe з поўнай схемай (у кодзе pf)? Як атрымаць гэты pf? Гэта пятая праблема. Перачытваць схему ўсіх партыцый з тэчкі з parquet файламі мэтавай вітрыны? Гэта метад самы бяспечны, але цяжкі.

Схема ўжо ёсць у Hive. Атрымаць новую схему можна, аб'яднаўшы схему ўсёй табліцы і новай партыцыі. Значыць трэба схему табліцы браць з Hive і аб'яднаць яе са схемай новай партыцыі. Гэта можна зрабіць, прачытаўшы тэставыя метададзеныя з Hive, захаваўшы іх у часавую тэчку і прачытаўшы з дапамогай Spark абедзве партіціі адразу.

Па сутнасці, ёсць усё, што трэба: зыходная схема табліцы ў Hive і новая партыцыя. Дадзеныя ў нас таксама ёсць. Застаецца толькі атрымаць новую схему, у якой аб'ядноўваецца схема вітрыны і новыя палі са створанай партыцыі:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Далей ствараем DDL рэгістрацыі табліцы, як у папярэднім фрагменце.
Калі ўвесь ланцужок працуе правільна, а менавіта была ініцыялізавальная загрузка, і ў Hive правільна створаная табліца, то мы атрымліваем абноўленую схему табліцы.

І апошняя, праблема складаецца ў тым, што нельга так проста дадаць партыцыю ў табліцу Hive, бо яна будзе зламаная. Неабходна прымусіць Hive паправіць у сябе структуру партыцый:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Простая задача чытання JSON і стварэння на яго аснове вітрыны выліваецца ў пераадоленне шэрагу няяўных цяжкасцяў, рашэнні для якіх даводзіцца шукаць па асобнасці. І хоць гэтыя рашэнні простыя, але на іх пошук ідзе шмат часу.

Каб рэалізаваць пабудову вітрыны, прыйшлося:

  • Дадаваць партыцыі ў вітрыну, пазбавіўшыся службовых файлаў
  • Разабрацца з пустымі палямі ў зыходных дадзеных, якія Spark тыпізаваў
  • Прывесці простыя тыпы да радка
  • Прывесці назвы палёў да ніжняга рэгістра
  • Падзяліць выгрузку дадзеных і рэгістрацыю табліцы ў Hive (стварэнне DDL)
  • Не забыцца экранаваць назвы палёў, якія могуць быць несумяшчальныя з Hive
  • Навучыцца абнаўляць рэгістрацыю табліцы ў Hive

Падводзячы вынік, адзначым, што рашэнне па пабудове вітрын тоіць шмат падводных камянёў. Таму пры ўзнікненні складанасцей у рэалізацыі лепш звярнуцца да вопытнага партнёра з паспяховай экспертызай.

Дзякуем за чытанне дадзенага артыкула, спадзяемся, што інфармацыя апынецца карыснай.

Крыніца: habr.com

Дадаць каментар