Spark schemaEvolution на практика

Уважаеми читатели, добър ден!

В тази статия водещият консултант на бизнес направление Big Data Solutions на Neoflex описва подробно опциите за изграждане на витрини с променлива структура с помощта на Apache Spark.

Като част от проект за анализ на данни често възниква задачата за изграждане на витрини на базата на слабо структурирани данни.

Обикновено това са регистрационни файлове или отговори от различни системи, записани като JSON или XML. Данните се качват в Hadoop, след което трябва да изградите витрина от тях. Можем да организираме достъп до създадената витрина, например чрез Impala.

В този случай схемата на целевата витрина не е известна предварително. Освен това схемата също не може да бъде изготвена предварително, тъй като зависи от данните, а ние имаме работа с тези много слабо структурирани данни.

Например днес е регистриран следният отговор:

{source: "app1", error_code: ""}

и утре от същата система идва следният отговор:

{source: "app1", error_code: "error", description: "Network error"}

В резултат на това трябва да се добави още едно поле към витрината - описание, и никой не знае дали ще дойде или не.

Задачата за създаване на витрина на такива данни е доста стандартна и Spark разполага с редица инструменти за това. За анализиране на изходните данни има поддръжка както за JSON, така и за XML, а за неизвестна досега схема е осигурена поддръжка за schemaEvolution.

На пръв поглед решението изглежда просто. Трябва да вземете папка с JSON и да я прочетете в рамка с данни. Spark ще създаде схема, ще превърне вложените данни в структури. Освен това всичко трябва да бъде запазено в parquet, което също се поддържа в Impala, чрез регистриране на витрината в metastore на Hive.

Всичко изглежда просто.

От кратките примери в документацията обаче не става ясно какво да се прави с редица проблеми на практика.

Документацията описва подход не за създаване на витрина, а за четене на JSON или XML в рамка с данни.

А именно, той просто показва как се чете и анализира JSON:

df = spark.read.json(path...)

Това е достатъчно, за да направят данните достъпни за Spark.

На практика скриптът е много по-сложен от простото четене на JSON файлове от папка и създаване на рамка с данни. Ситуацията изглежда така: вече има определена витрина, нови данни идват всеки ден, те трябва да бъдат добавени към витрината, като не забравяме, че схемата може да се различава.

Обичайната схема за изграждане на витрина е следната:

Стъпка 1. Данните се зареждат в Hadoop с последващо ежедневно презареждане и се добавят към нов дял. Получава се папка с първоначални данни, разделени по дни.

Стъпка 2. По време на първоначалното зареждане тази папка се чете и анализира от Spark. Полученият кадър с данни се записва във формат с възможност за разбор, например в паркет, който след това може да бъде импортиран в Impala. Това създава целева витрина с всички данни, натрупани до този момент.

Стъпка 3. Създава се изтегляне, което ще актуализира витрината всеки ден.
Има въпрос за постепенно натоварване, необходимост от преграждане на витрината и въпрос за поддържане на общата схема на витрината.

Да вземем пример. Да кажем, че първата стъпка от изграждането на хранилище е изпълнена и JSON файловете са качени в папка.

Създаването на рамка с данни от тях и след това запазването й като витрина не е проблем. Това е първата стъпка, която може лесно да бъде намерена в документацията на Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Изглежда всичко е наред.

Четем и анализираме JSON, след което запазваме рамката с данни като паркет, регистрирайки я в Hive по всеки удобен начин:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Получаваме прозорец.

Но на следващия ден бяха добавени нови данни от източника. Имаме папка с JSON и витрина, създадена от тази папка. След зареждането на следващата партида данни от източника, в витрината с данни липсват данни за един ден.

Логичното решение би било витрината да се раздели по дни, което ще позволи добавянето на нов дял всеки следващ ден. Механизмът за това също е добре известен, Spark ви позволява да пишете дялове отделно.

Първо правим първоначално зареждане, като запазваме данните, както е описано по-горе, като добавяме само разделяне. Това действие се нарича инициализация на витрина и се извършва само веднъж:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

На следващия ден зареждаме само нов дял:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Остава само да се регистрирате отново в Hive, за да актуализирате схемата.
Тук обаче възникват проблемите.

Първи проблем. Рано или късно полученият паркет ще стане нечетлив. Това се дължи на начина, по който parquet и JSON подхождат по различен начин към празните полета.

Нека разгледаме една типична ситуация. Например, вчера пристигна JSON:

День 1: {"a": {"b": 1}},

и днес същият JSON изглежда така:

День 2: {"a": null}

Да кажем, че имаме два различни дяла, всеки с една линия.
Когато прочетем всички изходни данни, Spark ще може да определи типа и ще разбере, че "a" е поле от тип "structure", с вложено поле "b" от тип INT. Но ако всеки дял беше запазен отделно, тогава получаваме паркет с несъвместими схеми на дялове:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Тази ситуация е добре известна, така че специално е добавена опция - когато анализирате изходните данни, премахвайте празните полета:

df = spark.read.json("...", dropFieldIfAllNull=True)

В този случай паркетът ще се състои от прегради, които могат да се четат заедно.
Въпреки че тези, които са направили това на практика, ще се усмихнат горчиво тук. Защо? Да, защото вероятно ще има още две ситуации. Или три. Или четири. Първото, което почти сигурно ще се случи, е, че числовите типове ще изглеждат различно в различните JSON файлове. Например {intField: 1} и {intField: 1.1}. Ако такива полета са намерени в един дял, тогава сливането на схемата ще прочете всичко правилно, което води до най-точния тип. Но ако в различни, тогава единият ще има intField: int, а другият ще има intField: double.

Има следния флаг за справяне с тази ситуация:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Сега имаме папка, в която има дялове, които могат да бъдат прочетени в един кадър с данни и валиден паркет на цялата витрина. да Не.

Трябва да помним, че регистрирахме таблицата в Hive. Hive не е чувствителен към главни и малки букви в имената на полетата, докато parquet е чувствителен към главни и малки букви. Следователно дяловете със схеми: field1: int и Field1: int са еднакви за Hive, но не и за Spark. Не забравяйте да конвертирате имената на полетата в малки букви.

След това изглежда всичко е наред.

Въпреки това, не всичко е толкова просто. Има и втори, също известен проблем. Тъй като всеки нов дял се записва отделно, папката на дяла ще съдържа сервизни файлове на Spark, например флага за успех на операцията _SUCCESS. Това ще доведе до грешка при опит за паркет. За да избегнете това, трябва да конфигурирате конфигурацията, за да попречите на Spark да добавя сервизни файлове към папката:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Изглежда, че сега всеки ден се добавя нов паркетен дял към целевата папка на витрината, където се намират анализираните данни за деня. Предварително се погрижихме да няма дялове с конфликт на типа данни.

Но имаме трети проблем. Сега общата схема не е известна, освен това таблицата в Hive има неправилна схема, тъй като всеки нов дял най-вероятно въвежда изкривяване в схемата.

Трябва да пререгистрирате таблицата. Това може да стане просто: прочетете отново паркета на витрината, вземете схемата и на базата на нея създайте DDL, с който да пререгистрирате папката в Hive като външна таблица, обновявайки схемата на целевата витрина.

Имаме и четвърти проблем. Когато регистрирахме таблицата за първи път, разчитахме на Spark. Сега го правим сами и трябва да помним, че паркетните полета могат да започват със знаци, които не са разрешени за Hive. Например Spark изхвърля редове, които не може да анализира в полето "corrupt_record". Такова поле не може да бъде регистрирано в Hive без да бъде екранирано.

Знаейки това, получаваме схемата:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Код ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("масив<`", "масив<") прави безопасен DDL, т.е. вместо:

create table tname (_field1 string, 1field string)

С имена на полета като „_field1, 1field“ се прави безопасен DDL, където имената на полетата са екранирани: създайте таблица `tname` (`_field1` низ, `1field` низ).

Възниква въпросът: как правилно да получите рамка от данни с пълна схема (в pf код)? Как да получа този pf? Това е петият проблем. Прочетете отново схемата на всички дялове от папката с паркетни файлове на целевата витрина? Този метод е най-безопасният, но труден.

Схемата вече е в Hive. Можете да получите нова схема, като комбинирате схемата на цялата таблица и новия дял. Така че трябва да вземете схемата на таблицата от Hive и да я комбинирате със схемата на новия дял. Това може да стане чрез четене на тестовите метаданни от Hive, запазването им във временна папка и използване на Spark за четене на двата дяла наведнъж.

Всъщност има всичко, от което се нуждаете: оригиналната схема на таблицата в Hive и новия дял. Имаме и данни. Остава само да получите нова схема, която комбинира схемата на магазина и нови полета от създадения дял:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

След това създаваме DDL за регистрация на таблица, както в предишния фрагмент.
Ако цялата верига работи правилно, а именно, имаше инициализиращо натоварване и таблицата е създадена правилно в Hive, тогава получаваме актуализирана схема на таблица.

И последният проблем е, че не можете просто да добавите дял към Hive таблица, защото тя ще бъде повредена. Трябва да принудите Hive да коригира структурата на дяловете си:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Простата задача за четене на JSON и създаване на витрина на базата на него води до преодоляване на редица неявни трудности, решения за които трябва да търсите отделно. И въпреки че тези решения са прости, отнема много време, за да ги намерите.

За да реализирам конструкцията на витрината, трябваше:

  • Добавете дялове към витрината, като се отървете от сервизните файлове
  • Работете с празни полета в изходните данни, въведени от Spark
  • Прехвърляне на прости типове към низ
  • Преобразувайте имената на полетата в малки букви
  • Отделно качване на данни и регистрация на таблица в Hive (генериране на DDL)
  • Не забравяйте да екранирате имена на полета, които може да са несъвместими с Hive
  • Научете как да актуализирате регистрацията на маса в Hive

Обобщавайки, отбелязваме, че решението за изграждане на витрини е изпълнено с много клопки. Ето защо, в случай на трудности при изпълнението, по-добре е да се свържете с опитен партньор с успешна експертиза.

Благодарим ви, че прочетохте тази статия, надяваме се информацията да ви бъде полезна.

Източник: www.habr.com

Добавяне на нов коментар