Spark schemaEvolution на практике

Уважаемые читатели, доброго дня!

В данной статье ведущий консультант бизнес-направления Big Data Solutions компании «Неофлекс», подробно описывает варианты построения витрин переменной структуры с использованием Apache Spark.

В рамках проекта по анализу данных, часто возникает задача построения витрин на основе слабо структурированных данных.

Обычно это логи, или ответы различных систем, сохраняемые в виде JSON или XML. Данные выгружаются в Hadoop, далее из них нужно построить витрину. Организовать доступ к созданной витрине можем, например, через Impala.

В этом случае схема целевой витрины предварительно неизвестна. Более того, схема еще и не может быть составлена заранее, так как зависит от данных, а мы имеем дело с этими самыми слабо структурированными данными.

Например, сегодня логируется такой ответ:

{source: "app1", error_code: ""}

а завтра от этой же системы приходит такой ответ:

{source: "app1", error_code: "error", description: "Network error"}

В результате в витрину должно добавиться еще одно поле — description, и придет оно или нет, никто не знает.

Задача создания витрины на таких данных довольно стандартная, и у Spark для этого есть ряд инструментов. Для парсинга исходных данных есть поддержка и JSON, и XML, а для неизвестной заранее схемы предусмотрена поддержка schemaEvolution.

С первого взгляда решение выглядит просто. Надо взять папку с JSON и прочитать в dataframe. Spark создаст схему, вложенные данные превратит в структуры. Далее все нужно сохранить в parquet, который поддерживается в том числе и в Impala, зарегистрировав витрину в Hive metastore.

Вроде бы все просто.

Однако, из коротких примеров в документации не ясно, что делать с рядом проблем на практике.

В документации описывается подход не для создания витрины, а для чтения JSON или XML в dataframe.

А именно, просто приводится как прочитать и распарсить JSON:

df = spark.read.json(path...)

Этого достаточно, чтобы сделать данные доступными для Spark.

На практике сценарий гораздо сложнее, чем просто прочитать JSON файлы из папки, и создать dataframe. Ситуация выглядит так: уже есть определенная витрина, каждый день приходят новые данные, их нужно добавить в витрину, не забывая, что схема может отличаться.

Обычная схема построения витрины такая:

Шаг 1. Данные загружаются в Hadoop с последующей ежедневной дозагрузкой и складываются в новую партицию. Получается партиционированная по дням папка с исходными данными.

Шаг 2. В ходе инициализирующей загрузки эта папка читается и парсится средствами Spark. Полученный dataframe сохраняется в формат, доступный для анализа, например, в parquet, который потом можно импортировать в Impala. Так создается целевая витрина со всеми данными, которые накопились к этому моменту.

Шаг 3. Создается загрузка, которая будет каждый день обновлять витрину.
Возникает вопрос инкрементальной загрузки, необходимость партиционирования витрины, и вопрос поддержки общей схемы витрины.

Приведем пример. Допустим, реализован первый шаг построения хранилища, и настроена выгрузка JSON файлов в папку.

Создать из них dataframe, чтобы потом сохранить как витрину, проблем не составляет. Это тот самый первый шаг, который легко можно найти в документации Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Вроде бы все хорошо.

Мы прочитали и распарсили JSON, далее сохраняем dataframe как parquet, регистрируя в Hive любым удобным способом:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Получаем витрину.

Но, на другой день добавились новые данные из источника. У нас есть папка с JSON, и витрина, созданная на основе данной папки. После загрузки следующей порции данных из источника в витрине не хватает данных за один день.

Логичным решением будет партиционировать витрину по дням, что позволит каждый следующий день добавлять новую партицию. Механизм этого тоже хорошо известен, Spark позволяет записывать партиции отдельно.

Сначала делаем инициализирующую загрузку, сохраняя данные как было описано выше, добавив только партиционирование. Это действие называется инициализация витрины и делается только один раз:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

На следующий день загружаем только новую партицию:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Остается только перерегистрировать в Hive, чтобы обновить схему.
Однако, здесь и возникают проблемы.

Первая проблема. Рано или поздно получившийся parquet нельзя будет прочитать. Это связано с тем, как по-разному подходят parquet и JSON к пустым полям.

Рассмотрим типичную ситуацию. Например, вчера приходит JSON:

День 1: {"a": {"b": 1}},

а сегодня этот же JSON выглядит так:

День 2: {"a": null}

Допустим, у нас две разных партиции, в которых по одной строке.
Когда мы читаем исходные данные целиком, Spark сумеет определить тип, и поймет, что «a» — это поле типа «структура», со вложенным полем «b» типа INT. Но, если каждая партиция была сохранена по отдельности, то получается parquet с несовместимыми схемами партиций:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Данная ситуация хорошо известна, поэтому специально добавлена опция — при парсинге исходных данных удалять пустые поля:

df = spark.read.json("...", dropFieldIfAllNull=True)

В этом случае parquet будет состоять из партиций, которые можно будет прочитать вместе.
Хотя те, кто делал это на практике, тут горько усмехнутся. Почему? Да потому, что скорее всего возникнут еще две ситуации. Или три. Или четыре. Первая, которая возникнет почти наверняка, числовые типы будут по-разному выглядеть в разных JSON файлах. Например, {intField: 1} и {intField: 1.1}. Если такие поля попадутся в одной партции, то мерж схемы прочитает все правильно, приведя к самому точному типу. А вот если в разных, то в одной будет intField: int, а в другой intField: double.

Для обработки этой ситуации есть следующий флаг:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Теперь у нас есть папка, где находятся партиции, которые можно прочитать в единый dataframe и валидный parquet всей витрины. Да? Нет.

Надо вспомнить, что мы регистрировали таблицу в Hive. Hive не чувствителен к регистру в названиях полей, а parquet чувствителен. Поэтому партиции со схемами: field1: int, и Field1: int для Hive одинаковые, а для Spark нет. Надо не забыть привести названия полей к нижнему регистру.

Вот после этого, кажется, все хорошо.

Однако, не все так просто. Возникает вторая, тоже хорошо известная проблема. Так как каждая новая партиция сохраняется отдельно, то в папке партиции будут лежать служебные файлы Spark, например, флаг успешности операции _SUCCESS. Это приведет к ошибке при попытке parquet. Чтобы этого избежать, надо настроить конфигурацию, запретив Spark дописывать в папку служебные файлы:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Кажется, теперь каждый день в папку целевой витрины добавляется новая parquet партиция, где лежат распарсенные данные за день. Мы заранее озаботились тем, чтобы не было партиций с конфликтом типов данных.

Но, перед нами — третья проблема. Теперь общая схема не известна, более того, в Hive таблица с неправильной схемой, так как каждая новая партиция, скорее всего, внесла искажение в схему.

Нужно перерегистрировать таблицу. Это можно сделать просто: снова прочитать parquet витрины, взять схему и создать на ее основе DDL, с которым заново зарегистрировать папку в Hive как внешнюю таблицу, обновив схему целевой витрины.

Перед нами возникает четвертая проблема. Когда мы регистрировали таблицу первый раз, мы опирались на Spark. Теперь делаем это сами, и нужно помнить, что поля parquet могут начинаться с символов, недопустимых для Hive. Например, Spark выкидывает строки, которые не смог распарсить в поле «corrupt_record». Такое поле нельзя будет зарегистрировать в Hive без того, чтобы экранировать.

Зная это, получаем схему:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Код ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace(«array<`», «array<») делает безопасный DDL, то есть вместо:

create table tname (_field1 string, 1field string)

С такими названиями полей как «_field1, 1field», делается безопасный DDL, где названия полей экранированы: create table `tname` (`_field1` string, `1field` string).

Возникает вопрос: как правильно получить dataframe с полной схемой (в коде pf)? Как получить этот pf? Это пятая проблема. Перечитывать схему всех партиций из папки с parquet файлами целевой витрины? Это метод самый безопасный, но тяжелый.

Схема уже есть в Hive. Получить новую схему можно, объединив схему всей таблицы и новой партиции. Значит надо схему таблицы брать из Hive и объединить ее со схемой новой партиции. Это можно сделать, прочитав тестовые метаданные из Hive, сохранив их во временную папку и прочитав с помощью Spark обе партиции сразу.

По сути, есть все, что нужно: исходная схема таблицы в Hive и новая партиция. Данные у нас тоже есть. Остается лишь получить новую схему, в которой объединяется схема витрины и новые поля из созданной партиции:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Далее создаем DDL регистрации таблицы, как в предыдущем фрагменте.
Если вся цепочка работает правильно, а именно — была инициализирующая загрузка, и в Hive правильно созданная таблица, то мы получаем обновленную схему таблицы.

И последняя, проблема заключается в том, что нельзя так просто добавить партицию в таблицу Hive, так как она будет сломана. Необходимо заставить Hive починить у себя структуру партиций:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Простая задача чтения JSON и создания на его основе витрины выливается в преодоление ряда неявных трудностей, решения для которых приходится искать по отдельности. И хотя эти решения простые, но на их поиск уходит много времени.

Чтобы реализовать построение витрины, пришлось:

  • Добавлять партиции в витрину, избавившись от служебных файлов
  • Разобраться с пустыми полями в исходных данных, которые Spark типизировал
  • Привести простые типы к строке
  • Привести названия полей к нижнему регистру
  • Разделить выгрузку данных и регистрацию таблицы в Hive (создание DDL)
  • Не забыть экранировать названия полей, которые могут быть несовместимы с Hive
  • Научиться обновлять регистрацию таблицы в Hive

Подводя итог, отметим, что решение по построению витрин таит много подводных камней. Поэтому при возникновении сложностей в реализации лучше обратиться к опытному партнеру с успешной экспертизой.

Благодарим за чтение данной статьи, надеемся, что информация окажется полезной.

Источник: habr.com