Спарк сцхемаЕволутион у пракси

Драги читаоци, добар дан!

У овом чланку, водећи консултант Неофлек-овог пословног подручја Биг Дата Солутионс детаљно описује опције за прављење витрина варијабилне структуре користећи Апацхе Спарк.

Као део пројекта анализе података, често се јавља задатак изградње излога на основу лабаво структурираних података.

Обично су то евиденције, или одговори различитих система, сачувани као ЈСОН или КСМЛ. Подаци се отпремају у Хадооп, а затим морате да направите излог од њих. Можемо организовати приступ креираној витрини, на пример, преко Импале.

У овом случају, шема циљног излога није унапред позната. Штавише, шема се такође не може унапред израдити, јер зависи од података, а ми имамо посла са овим врло лабаво структурираним подацима.

На пример, данас се евидентира следећи одговор:

{source: "app1", error_code: ""}

а сутра из истог система стиже следећи одговор:

{source: "app1", error_code: "error", description: "Network error"}

Као резултат тога, још једно поље треба додати у излог - опис, а нико не зна да ли ће доћи или не.

Задатак креирања излога на таквим подацима је прилично стандардан, а Спарк има низ алата за то. За рашчлањивање изворних података постоји подршка за ЈСОН и КСМЛ, а за претходно непознату шему обезбеђена је подршка за сцхемаЕволутион.

На први поглед решење изгледа једноставно. Морате да узмете фасциклу са ЈСОН и прочитате је у оквиру података. Спарк ће креирати шему, претворити угнежђене податке у структуре. Даље, све треба сачувати у паркету, што је подржано и у Импали, регистрацијом излога у Хиве метасторе.

Чини се да је све једноставно.

Међутим, из кратких примера у документацији није јасно шта урадити са низом проблема у пракси.

Документација описује приступ да се не креира излог, већ да се чита ЈСОН или КСМЛ у оквир података.

Наиме, једноставно показује како се чита и анализира ЈСОН:

df = spark.read.json(path...)

Ово је довољно да подаци буду доступни Спарк-у.

У пракси, скрипта је много компликованија од пуког читања ЈСОН датотека из фасцикле и креирања оквира података. Ситуација изгледа овако: већ постоји одређена продавница, нови подаци долазе сваки дан, потребно их је додати на излог, не заборављајући да се шема може разликовати.

Уобичајена шема за изградњу витрине је следећа:

Корак КСНУМКС. Подаци се учитавају у Хадооп уз накнадно свакодневно поновно учитавање и додају на нову партицију. Испоставило се да је фасцикла са почетним подацима подељена по данима.

Корак КСНУМКС. Током почетног учитавања, Спарк чита и анализира ову фасциклу. Добијени оквир података се чува у формату који се може анализирати, на пример, на паркету, који се затим може увести у Импалу. Ово ствара циљни излог са свим подацима који су се акумулирали до ове тачке.

Корак КСНУМКС. Креира се преузимање које ће ажурирати излог сваког дана.
Постоји питање инкременталног учитавања, потребе за преградњом витрине и питање одржавања опште шеме витрине.

Узмимо пример. Рецимо да је први корак изградње спремишта имплементиран, а ЈСОН датотеке су отпремљене у фасциклу.

Прављење оквира података од њих, а затим његово чување као излог, није проблем. Ово је први корак који се лако може наћи у Спарк документацији:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Изгледа да је све у реду.

Прочитали смо и рашчланили ЈСОН, а затим сачували оквир података као паркет, региструјући га у Хиве на било који погодан начин:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Добијамо прозор.

Али, сутрадан су додани нови подаци из извора. Имамо фасциклу са ЈСОН-ом и излог креиран из ове фасцикле. Након учитавања следеће групе података из извора, на тржишту података недостаје једнодневна вредност података.

Логично решење би било да се излог подели по дану, што ће омогућити додавање нове партиције сваког следећег дана. Механизам за ово је такође добро познат, Спарк вам омогућава да засебно пишете партиције.

Прво, вршимо почетно учитавање, чувајући податке као што је горе описано, додајући само партиционисање. Ова радња се зове иницијализација излога и ради се само једном:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Следећег дана учитавамо само нову партицију:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Остаје само да се поново региструјете у Хиве-у да бисте ажурирали шему.
Међутим, ту настају проблеми.

Први проблем. Пре или касније, добијени паркет ће бити нечитљив. То је због начина на који паркет и ЈСОН различито третирају празна поља.

Хајде да размотримо типичну ситуацију. На пример, јуче стиже ЈСОН:

День 1: {"a": {"b": 1}},

а данас исти ЈСОН изгледа овако:

День 2: {"a": null}

Рецимо да имамо две различите партиције, свака са једном линијом.
Када прочитамо цео изворни податак, Спарк ће моћи да одреди тип, и разумеће да је „а“ поље типа „структура“, са угнежђеним пољем „б“ типа ИНТ. Али, ако је свака партиција сачувана посебно, онда добијамо паркет са некомпатибилним шемама преграда:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Ова ситуација је добро позната, па је посебно додата опција - када анализирате изворне податке, уклоните празна поља:

df = spark.read.json("...", dropFieldIfAllNull=True)

У овом случају, паркет ће се састојати од преграда које се могу читати заједно.
Иако ће се они који су то урадили у пракси овде горко осмехнути. Зашто? Да, јер ће вероватно бити још две ситуације. Или три. Или четири. Први, који ће се готово сигурно догодити, је да ће нумерички типови изгледати другачије у различитим ЈСОН датотекама. На пример, {интФиелд: 1} и {интФиелд: 1.1}. Ако се таква поља нађу у једној партицији, онда ће обједињавање шеме све исправно прочитати, што ће довести до најтачнијег типа. Али ако у различитим, онда ће један имати интФиелд: инт, а други ће имати интФиелд: доубле.

Постоји следећа заставица за решавање ове ситуације:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Сада имамо фасциклу у којој се налазе партиције које се могу читати у један оквир података и важећи паркет целе витрине. Да? Не.

Морамо запамтити да смо регистровали табелу у Хиве. Кошница не разликује велика и мала слова у називима поља, док паркет разликује велика и мала слова. Према томе, партиције са шемама: фиелд1: инт и Фиелд1: инт су исте за Хиве, али не и за Спарк. Не заборавите да конвертујете имена поља у мала слова.

После тога, чини се да је све у реду.

Међутим, није све тако једноставно. Постоји и други, такође добро познат проблем. Пошто се свака нова партиција чува засебно, фасцикла партиције ће садржати Спарк сервисне датотеке, на пример, ознаку успеха операције _СУЦЦЕСС. Ово ће довести до грешке при покушају паркета. Да бисте то избегли, морате да конфигуришете конфигурацију да спречите Спарк да додаје сервисне датотеке у фасциклу:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Чини се да се сада сваки дан додаје нова партиција паркета у циљну фасциклу витрине, где се налазе рашчлањени подаци за тај дан. Унапред смо се побринули да нема партиција са конфликтом типа података.

Али, имамо трећи проблем. Сада општа шема није позната, штавише, табела у Хиве-у има нетачну шему, пошто је свака нова партиција највероватније унела изобличење у шему.

Морате поново да региструјете сто. То се може урадити једноставно: поново прочитајте паркет излога, узмите шему и на основу ње направите ДДЛ, помоћу којег ћете поново регистровати фасциклу у Хиве-у као спољну табелу, ажурирајући шему циљног излога.

Имамо четврти проблем. Када смо први пут регистровали табелу, ослонили смо се на Спарк. Сада то радимо сами, и треба да запамтимо да паркетна поља могу да почињу са знаковима који нису дозвољени за Хиве. На пример, Спарк избацује линије које није могао да рашчлани у поље „цоррупт_рецорд“. Такво поље се не може регистровати у кошници а да се не избегне.

Знајући ово, добијамо шему:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Код ("_цоррупт_рецорд", "`_цоррупт_рецорд`") + " " + ф[1].реплаце(":", "`:").реплаце("<", "<`").реплаце(",", ",`").реплаце("низ<`", "низ<") чини безбедан ДДЛ, тј. уместо:

create table tname (_field1 string, 1field string)

Са именима поља као што су "_фиелд1, 1фиелд", прави се сигуран ДДЛ где су називи поља избачени: креирајте табелу `тнаме` (стринг `_фиелд1`, стринг `1фиелд`).

Поставља се питање: како правилно добити оквир података са комплетном шемом (у пф коду)? Како добити овај пф? Ово је пети проблем. Поново прочитати шему свих партиција из фасцикле са фајловима паркета циљне витрине? Овај метод је најсигурнији, али тежак.

Шема је већ у Хиве-у. Можете добити нову шему комбиновањем шеме целе табеле и нове партиције. Дакле, потребно је да узмете шему табеле из Хиве-а и комбинујете је са шемом нове партиције. Ово се може урадити читањем метаподатака теста из Хиве-а, чувањем их у привременој фасцикли и коришћењем Спарк-а за читање обе партиције одједном.

У ствари, постоји све што вам треба: оригинална шема табеле у Хиве-у и нова партиција. Имамо и податке. Остаје само да добијете нову шему која комбинује шему излога и нова поља из креиране партиције:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Затим креирамо ДДЛ регистрацију табеле, као у претходном исечку.
Ако цео ланац ради исправно, наиме, дошло је до иницијализационог оптерећења и табела је исправно креирана у Хиве-у, онда добијамо ажурирану шему табеле.

И последњи проблем је што не можете само да додате партицију у табелу Хиве, јер ће бити покварена. Морате натерати Хиве да поправи структуру партиције:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Једноставан задатак читања ЈСОН-а и креирања излога на основу њега резултира превазилажењем низа имплицитних потешкоћа, решења за која морате посебно да тражите. И иако су ова решења једноставна, потребно је доста времена да се пронађу.

Да бих имплементирао конструкцију витрине, морао сам:

  • Додајте партиције у излог, ослободите се сервисних датотека
  • Бавите се празним пољима у изворним подацима које је Спарк откуцао
  • Пребаците једноставне типове на низ
  • Претворите имена поља у мала слова
  • Одвојено отпремање података и регистрација табеле у Хиве (ДДЛ генерација)
  • Не заборавите да избегнете имена поља која могу бити некомпатибилна са Хиве-ом
  • Научите како да ажурирате регистрацију табеле у Хиве-у

Сумирајући, напомињемо да је одлука о изградњи излога препуна многих замки. Стога, у случају потешкоћа у имплементацији, боље је контактирати искусног партнера са успешном стручношћу.

Хвала вам што сте прочитали овај чланак, надамо се да ће вам информације бити корисне.

Извор: ввв.хабр.цом

Додај коментар