Искра шемаЕволуција во пракса

Драги читатели, добар ден!

Во оваа статија, водечкиот консултант на деловната област Big Data Solutions на Neoflex детално ги опишува опциите за градење витрини со променлива структура користејќи Apache Spark.

Како дел од проектот за анализа на податоци, често се појавува задачата за градење излози врз основа на лабаво структурирани податоци.

Обично тоа се дневници или одговори од различни системи, зачувани како JSON или XML. Податоците се поставени на Hadoop, а потоа треба да изградите излог од нив. Можеме да организираме пристап до креираната изложба, на пример, преку Impala.

Во овој случај, шемата на целниот излог не е позната однапред. Покрај тоа, шемата, исто така, не може да се подготви однапред, бидејќи зависи од податоците, а ние се занимаваме со овие многу лабаво структурирани податоци.

На пример, денес е евидентиран следниов одговор:

{source: "app1", error_code: ""}

а утре од истиот систем доаѓа следниот одговор:

{source: "app1", error_code: "error", description: "Network error"}

Како резултат на тоа, треба да се додаде уште едно поле во витрината - опис, а никој не знае дали ќе дојде или не.

Задачата да се создаде излог на такви податоци е прилично стандардна, а Spark има голем број алатки за ова. За парсирање на изворните податоци, има поддршка и за JSON и за XML, а за претходно непозната шема, се обезбедува поддршка за schemaEvolution.

На прв поглед, решението изгледа едноставно. Треба да земете папка со JSON и да ја прочитате во податочна рамка. Spark ќе создаде шема, ќе ги претвори вгнездените податоци во структури. Понатаму, сè треба да се зачува во паркет, кој исто така е поддржан во Импала, со регистрирање на излогот во метасторот Hive.

Се чини дека сè е едноставно.

Меѓутоа, од кратките примери во документацијата не е јасно што да се прави со голем број проблеми во практиката.

Документацијата опишува пристап да не се креира излог, туку да се чита JSON или XML во податочна рамка.

Имено, едноставно покажува како се чита и анализира JSON:

df = spark.read.json(path...)

Ова е доволно за податоците да бидат достапни на Spark.

Во пракса, скриптата е многу посложена од само читање JSON-датотеки од папка и создавање податочна рамка. Ситуацијата изгледа вака: веќе има одреден излог, секој ден пристигнуваат нови податоци, тие треба да се додадат на излогот, не заборавајќи дека шемата може да се разликува.

Вообичаената шема за изградба на изложба е како што следува:

Чекор 1. Податоците се вчитуваат во Hadoop со последователно дневно повторно вчитување и се додаваат во нова партиција. Излегува папка со првични податоци поделени по ден.

Чекор 2. За време на почетното вчитување, оваа папка се чита и се анализира од Spark. Резултирачката податочна рамка се зачувува во формат што може да се анализира, на пример, во паркет, кој потоа може да се увезе во Impala. Ова создава целна изложба со сите податоци што се акумулирани до овој момент.

Чекор 3. Создадено е преземање што ќе го ажурира излогот секој ден.
Има прашање за постепено вчитување, потреба од поделба на витрината и прашање за одржување на општата шема на витрината.

Да земеме пример. Да речеме дека првиот чекор за градење складиште е имплементиран, а JSON-датотеките се поставуваат во папка.

Создавањето податочна рамка од нив, а потоа зачувувањето како приказ, не е проблем. Ова е првиот чекор што лесно може да се најде во документацијата на Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Се чини дека сè е во ред.

Го читаме и анализираме JSON, а потоа ја зачувуваме податочната рамка како паркет, регистрирајќи ја во Hive на кој било пригоден начин:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Добиваме прозорец.

Но, следниот ден се додадени нови податоци од изворот. Имаме папка со JSON и излог создаден од оваа папка. По вчитувањето на следната серија на податоци од изворот, на податочната март му недостасуваат податоци во вредност од еден ден.

Логичното решение би било да се поделува излогот по ден, што ќе овозможи додавање на нова партиција секој нареден ден. Механизмот за ова е исто така добро познат, Spark ви овозможува да пишувате партиции одделно.

Прво, правиме почетно оптоварување, зачувувајќи ги податоците како што е опишано погоре, додавајќи само партиција. Оваа акција се нарекува иницијализација на излогот и се прави само еднаш:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Следниот ден вчитуваме само нова партиција:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Останува само повторно да се регистрирате во Hive за да ја ажурирате шемата.
Сепак, тука се појавуваат проблеми.

Првиот проблем. Порано или подоцна, добиениот паркет ќе биде нечитлив. Ова се должи на тоа како паркетот и JSON различно ги третираат празните полиња.

Да разгледаме типична ситуација. На пример, вчера пристигнува JSON:

День 1: {"a": {"b": 1}},

и денес истиот JSON изгледа вака:

День 2: {"a": null}

Да речеме дека имаме две различни партиции, секоја со една линија.
Кога ќе ги прочитаме сите изворни податоци, Spark ќе може да го одреди типот и ќе разбере дека „a“ е поле од типот „structure“, со вгнездено поле „b“ од типот INT. Но, ако секоја партиција е зачувана посебно, тогаш добиваме паркет со некомпатибилни шеми за партиции:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Оваа ситуација е добро позната, така што е специјално додадена опција - при парсирање на изворните податоци, отстранете ги празните полиња:

df = spark.read.json("...", dropFieldIfAllNull=True)

Во овој случај, паркетот ќе се состои од прегради кои можат да се читаат заедно.
Иако оние што го направиле ова во пракса, овде горко ќе се насмевнат. Зошто? Да, затоа што најверојатно ќе има уште две ситуации. Или три. Или четири. Првиот, кој речиси сигурно ќе се случи, е дека нумеричките типови ќе изгледаат различно во различни JSON-датотеки. На пример, {intField: 1} и {intField: 1.1}. Ако таквите полиња се најдат во една партиција, тогаш спојувањето на шемата ќе прочита сè правилно, што ќе доведе до најточниот тип. Но, ако во различни, тогаш едниот ќе има intField: int, а другиот ќе има intField: двојно.

Постои следново знаме за справување со оваа ситуација:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Сега имаме папка каде што има партиции кои можат да се читаат во една податочна рамка и валиден паркет на целата изложба. Да? Бр.

Мора да запомниме дека ја регистриравме табелата во Hive. Hive не е осетлив на големи букви во имињата на полињата, додека паркетот е осетлив на големи букви. Затоа, партициите со шеми: field1: int и Field1: int се исти за Hive, но не и за Spark. Не заборавајте да ги конвертирате имињата на полињата во мали букви.

После тоа, се чини дека се е во ред.

Сепак, не се толку едноставно. Има и втор, исто така добро познат проблем. Бидејќи секоја нова партиција се зачувува одделно, папката со партиции ќе содржи датотеки за услугата Spark, на пример, знаменцето за успешност на операцијата _SUCCESS. Ова ќе резултира со грешка при обидот за паркет. За да го избегнете ова, треба да ја конфигурирате конфигурацијата за да спречите Spark да додава услужни датотеки во папката:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Се чини дека сега секој ден се додава нова паркетна партиција во целната витрина папка, каде што се наоѓаат анализираните податоци за тој ден. Однапред се погриживме да нема партиции со конфликт на типови на податоци.

Но, имаме трет проблем. Сега општата шема не е позната, згора на тоа, табелата во Hive има неточна шема, бидејќи секоја нова партиција најверојатно воведувала изобличување во шемата.

Треба повторно да ја регистрирате табелата. Ова може да се направи едноставно: повторно прочитајте го паркетот на излогот, земете ја шемата и креирајте DDL врз основа на неа, со која ќе ја пререгистрирате папката во Hive како надворешна табела, ажурирајќи ја шемата на целниот излог.

Имаме четврти проблем. Кога ја регистриравме табелата за прв пат, се потпревме на Spark. Сега го правиме тоа сами и треба да запомниме дека полињата за паркет може да започнат со знаци кои не се дозволени за Hive. На пример, Spark исфрла линии што не може да ги анализира во полето „corrupt_record“. Таквото поле не може да се регистрира во Hive без да се избега.

Знаејќи го ова, ја добиваме шемата:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Код ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace ("<", "<`").замени(",", ","").replace ("низа<`", "низа<") го прави безбедно DDL, т.е. наместо:

create table tname (_field1 string, 1field string)

Со имиња на полиња како „_field1, 1field“, се прави безбедно DDL каде што се бегаат имињата на полињата: креирајте табела `tname` (`_field1` стринг, `1field` стринг).

Се поставува прашањето: како правилно да се добие податочна рамка со комплетна шема (во pf код)? Како да го добиете овој PF? Ова е петти проблем. Препрочитајте ја шемата на сите партиции од папката со паркетни датотеки на целната витрина? Овој метод е најбезбеден, но тежок.

Шемата е веќе во Hive. Можете да добиете нова шема со комбинирање на шемата на целата табела и новата партиција. Значи, треба да ја земете шемата на табелата од Hive и да ја комбинирате со шемата на новата партиција. Ова може да се направи со читање на тест метаподатоците од Hive, зачувување во привремена папка и користење на Spark за читање на двете партиции одеднаш.

Всушност, има се што ви треба: оригиналната шема на табелата во Hive и новата партиција. Имаме и податоци. Останува само да се добие нова шема која ја комбинира шемата на излогот и новите полиња од креираната партиција:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Следно, ја креираме регистрацијата на табелата DDL, како и во претходниот фрагмент.
Ако целиот синџир работи правилно, имено, имало иницијализирачко оптоварување, а табелата е правилно креирана во Hive, тогаш добиваме ажурирана шема на табелата.

И последниот проблем е што не можете само да додадете партиција на табелата Hive, бидејќи ќе биде скршена. Треба да го принудите Hive да ја поправи структурата на партицијата:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Едноставната задача за читање на JSON и создавање излог врз основа на него резултира со надминување на голем број имплицитни тешкотии, решенија за кои треба да барате одделно. И иако овие решенија се едноставни, потребно е многу време да се најдат.

За да ја спроведам изградбата на изложбата, морав:

  • Додајте партиции во витрината, ослободувајќи се од услужните датотеки
  • Справете се со празните полиња во изворните податоци што ги напиша Spark
  • Фрлајте едноставни типови на низа
  • Претворете ги имињата на полињата во мали букви
  • Посебно поставување податоци и регистрација на табели во Hive (генерација на DDL)
  • Не заборавајте да ги избегнете имињата на полињата што можеби се некомпатибилни со Hive
  • Научете како да ја ажурирате регистрацијата на табелата во Hive

Сумирајќи, забележуваме дека одлуката за изградба на излози е полна со многу стапици. Затоа, во случај на потешкотии во спроведувањето, подобро е да контактирате со искусен партнер со успешна експертиза.

Ви благодариме што ја прочитавте оваа статија, се надеваме дека информациите ви се корисни.

Извор: www.habr.com

Додадете коментар