Spark schemaEvolution на практиці

Шановні читачі, добрий день!

У цій статті провідний консультант бізнес-напряму Big Data Solutions компанії «Неофлекс» детально описує варіанти побудови вітрин змінної структури з використанням Apache Spark.

У рамках проекту з аналізу даних часто виникає завдання побудови вітрин на основі слабо структурованих даних.

Зазвичай це логи або відповіді різних систем, що зберігаються у вигляді JSON або XML. Дані вивантажуються в Hadoop, далі потрібно побудувати вітрину. Організувати доступ до створеної вітрини можемо, наприклад через Impala.

І тут схема цільової вітрини попередньо невідома. Більше того, схема ще й не може бути складена заздалегідь, оскільки залежить від даних, а ми маємо справу з цими слабко структурованими даними.

Наприклад, сьогодні логується така відповідь:

{source: "app1", error_code: ""}

а завтра від цієї системи приходить така відповідь:

{source: "app1", error_code: "error", description: "Network error"}

У результаті вітрину має додатися ще одне поле - description, і прийде воно чи ні, ніхто не знає.

Завдання створення вітрини на таких даних досить стандартне, і Spark для цього має ряд інструментів. p align="justify"> Для парсингу вихідних даних є підтримка і JSON, і XML, а для невідомої заздалегідь схеми передбачена підтримка schemaEvolution.

З першого погляду рішення виглядає просто. Треба взяти папку з JSON і прочитати в dataframe. Spark створить схему, вкладені дані перетворить на структури. Далі все потрібно зберегти в parquet, який підтримується навіть в Impala, зареєструвавши вітрину в Hive metastore.

Начебто все просто.

Однак, з коротких прикладів у документації не ясно, що робити з низкою проблем на практиці.

У документації описується підхід не створення вітрини, а читання JSON чи XML в dataframe.

А саме, просто наводиться як прочитати та розпарсувати JSON:

df = spark.read.json(path...)

Цього достатньо, щоб зробити дані доступними для Spark.

На практиці сценарій набагато складніше, ніж просто прочитати файли JSON з папки, і створити dataframe. Ситуація виглядає так: вже є певна вітрина, щодня приходять нові дані, їх потрібно додати до вітрини, не забуваючи, що схема може відрізнятися.

Звичайна схема побудови вітрини така:

Крок 1. Дані завантажуються в Hadoop з наступним щоденним дозавантаженням і складаються в нову партицію. Виходить партиціонована днями папка з вихідними даними.

Крок 2. У ході ініціалізації завантаження ця папка читається і парситься засобами Spark. Отриманий dataframe зберігається у форматі, доступному для аналізу, наприклад, в parquet, який потім можна імпортувати в Impala. Так створюється цільова вітрина з усіма даними, що накопичилися на цей момент.

Крок 3. Створюється завантаження, яке щодня оновлюватиме вітрину.
Виникає питання інкрементального завантаження, необхідність партиціонування вітрини та питання підтримки загальної схеми вітрини.

Наведемо приклад. Припустимо, реалізовано перший крок побудови сховища, і настроєно вивантаження JSON файлів у папку.

Створити з них dataframe, щоб потім зберегти як вітрину, проблем не становить. Це той перший крок, який легко можна знайти в документації Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Начебто все добре.

Ми прочитали та розпарили JSON, далі зберігаємо dataframe як parquet, реєструючи у Hive будь-яким зручним способом:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Отримуємо вітрину.

Але другого дня додалися нові дані з джерела. У нас є папка з JSON, і вітрина, створена на основі цієї папки. Після завантаження наступної порції даних із джерела у вітрині не вистачає даних за день.

Логічним рішенням буде партиціонувати вітрину щодня, що дозволить кожен наступний день додавати нову партицію. Механізм цього також добре відомий, Spark дозволяє записувати партиції окремо.

Спочатку робимо ініціалізуючу завантаження, зберігаючи дані як було описано вище, додавши лише партиціонування. Ця дія називається ініціалізація вітрини і робиться лише один раз:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Наступного дня завантажуємо лише нову партицію:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Залишається тільки перереєструвати Hive, щоб оновити схему.
Однак тут і виникають проблеми.

Перша проблема. Паркет, який рано чи пізно вийшов, не можна буде прочитати. Це пов'язано з тим, як по-різному підходять parquet та JSON до порожніх полів.

Розглянемо типову ситуацію. Наприклад, вчора приходить JSON:

День 1: {"a": {"b": 1}},

а сьогодні цей же JSON виглядає так:

День 2: {"a": null}

Припустимо, у нас дві різні партиції, в яких по одному рядку.
Коли ми читаємо вихідні дані повністю, Spark зможе визначити тип, і зрозуміє, що "a" - це поле типу "структура", з вкладеним полем "b" типу INT. Але, якщо кожна партиція була збережена окремо, виходить parquet з несумісними схемами партицій:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Ця ситуація добре відома, тому спеціально додана опція - при парсингу вихідних даних видаляти порожні поля:

df = spark.read.json("...", dropFieldIfAllNull=True)

У цьому випадку parquet складатиметься з партицій, які можна буде прочитати разом.
Хоча ті, хто робив це на практиці, тут гірко посміхнуться. Чому? Та тому, що, швидше за все, виникнуть ще дві ситуації. Або три. Або чотири. Перша, яка виникне майже напевно, числові типи по-різному виглядатимуть у різних JSON файлах. Наприклад, {intField: 1} та {intField: 1.1}. Якщо такі поля трапляться в одній партції, то мерж схеми прочитає все правильно, привівши до найточнішого типу. А от якщо в різних, то в одній буде intField: int, а в іншій intField: double.

Для обробки цієї ситуації є такий прапор:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Тепер у нас є папка, де знаходяться партиції, які можна прочитати в єдиний dataframe і валідний parquet всієї вітрини. Так? Ні.

Потрібно згадати, що ми реєстрували таблицю в Hive. Hive не чутливий до регістру в назвах полів, а parquet чутливий. Тому партиції зі схемами: field1: int і Field1: int для Hive однакові, а для Spark немає. Треба не забути навести назви полів до нижнього регістру.

Ось після цього, здається, все гаразд.

Однак не все так просто. Виникає друга, теж добре відома проблема. Оскільки кожна нова партиція зберігається окремо, у папці партиції лежатимуть службові файли Spark, наприклад, прапор успішності операції _SUCCESS. Це призведе до помилки у спробі parquet. Щоб цього уникнути, потрібно налаштувати конфігурацію, заборонивши Spark дописувати до папки службові файли:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Здається, тепер щодня до папки цільової вітрини додається нова parquet партиція, де лежать розпарені дані за день. Ми заздалегідь перейнялися тим, щоб не було партицій із конфліктом типів даних.

Але перед нами — третя проблема. Тепер загальна схема не відома, більше того, у Hive таблиця з неправильною схемою, тому що кожна нова партиція, швидше за все, внесла викривлення у схему.

Потрібно перереєструвати таблицю. Це можна зробити просто: знову прочитати parquet вітрини, взяти схему та створити на її основі DDL, з яким наново зареєструвати папку в Hive як зовнішню таблицю, оновивши схему цільової вітрини.

Перед нами постає четверта проблема. Коли ми зареєстрували таблицю вперше, ми спиралися на Spark. Тепер робимо це самі, і слід пам'ятати, що поля parquet можуть починатися з символів, неприпустимих для Hive. Наприклад, Spark викидає рядки, які не зміг розпарити в полі corrupt_record. Таке поле не можна буде зареєструвати в Hive без екранування.

Знаючи це, отримуємо схему:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Код ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace(«array<'», «array<») робить безпечний DDL, тобто замість:

create table tname (_field1 string, 1field string)

З такими назвами полів як "_field1, 1field", робиться безпечний DDL, де назви полів екрановані: create table `tname` (`_field1` string, `1field` string).

Виникає питання: як правильно отримати даніфраму з повною схемою (у коді pf)? Як отримати цей pf? Це п'ята проблема. Перечитувати схему всіх партицій із папки із parquet файлами цільової вітрини? Це найбезпечніший, але важкий метод.

Схема вже є у Hive. Отримати нову схему можна, поєднавши схему всієї таблиці та нової партиції. Отже треба схему таблиці купувати з Hive і поєднати її зі схемою нової партиції. Це можна зробити, прочитавши тестові метадані з Hive, зберігши їх у тимчасову папку та прочитавши за допомогою Spark обидві партиції відразу.

По суті є все, що потрібно: вихідна схема таблиці в Hive і нова партиція. Дані у нас також є. Залишається лише отримати нову схему, в якій поєднується схема вітрини та нові поля із створеної партиції:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Далі створюємо DDL реєстрації таблиці, як у попередньому фрагменті.
Якщо весь ланцюжок працює правильно, а саме було ініціалізуюче завантаження, і в Hive правильно створена таблиця, то ми отримуємо оновлену схему таблиці.

І остання проблема полягає в тому, що не можна так просто додати партицію в таблицю Hive, оскільки вона буде зламана. Необхідно змусити Hive відремонтувати у себе структуру партицій:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Просте завдання читання JSON та створення на його основі вітрини виливається у подолання низки неявних труднощів, рішення для яких доводиться шукати окремо. І хоча ці рішення прості, але на їхній пошук йде багато часу.

Щоб реалізувати побудову вітрини, довелося:

  • Додавати партиції у вітрину, позбавившись службових файлів
  • Розібратися з порожніми полями у вихідних даних, які Spark типізував
  • Привести прості типи до рядка
  • Привести назви полів до нижнього регістру
  • Розділити вивантаження даних та реєстрацію таблиці в Hive (створення DDL)
  • Не забути екранувати назви полів, які можуть бути несумісні з Hive
  • Навчитися оновлювати реєстрацію таблиці у Hive

Підсумовуючи, зазначимо, що рішення щодо побудови вітрин таїть багато підводних каменів. Тому у разі виникнення складнощів у реалізації краще звернутися до досвідченого партнера з успішною експертизою.

Дякуємо за читання цієї статті, сподіваємось, що інформація виявиться корисною.

Джерело: habr.com

Додати коментар або відгук