Spark схемасы Эволюция практикада

Урматтуу окурмандар, жакшы күн!

Бул макалада Neoflex'тин Big Data Solutions бизнес чөйрөсүнүн жетектөөчү консультанты Apache Spark аркылуу өзгөрмөлүү структуралардын витриналарын куруу жолдорун кеңири сүрөттөйт.

Маалыматтарды талдоо долбоорунун бир бөлүгү катары, эркин структураланган маалыматтарга негизделген дүкөндөрдүн маңдайкы беттерин куруу милдети көп учурда пайда болот.

Адатта бул JSON же XML катары сакталган ар кандай системалардын журналдары же жооптору. Маалыматтар Hadoop'ко жүктөлөт, андан кийин сиз алардан дүкөндү курушуңуз керек. Биз, мисалы, Impala аркылуу түзүлгөн витринага кирүүнү уюштура алабыз.

Бул учурда, максаттуу дүкөндүн схемасы алдын ала белгилүү эмес. Андан тышкары, схеманы да алдын ала түзүү мүмкүн эмес, анткени ал маалыматтарга көз каранды жана биз бул өтө эркин структураланган маалыматтар менен алектенип жатабыз.

Мисалы, бүгүн төмөнкү жооп катталды:

{source: "app1", error_code: ""}

жана эртең ошол эле системадан төмөнкүдөй жооп келет:

{source: "app1", error_code: "error", description: "Network error"}

Жыйынтыгында витринага дагы бир талаа – сыпаттоо кошулушу керек, анын келер-келбесин эч ким билбейт.

Мындай маалыматтар боюнча дүкөндү түзүү милдети абдан стандарттуу жана Spark бул үчүн бир катар куралдарга ээ. Булак маалыматтарын талдоо үчүн JSON жана XML үчүн колдоо бар, ал эми мурда белгисиз схема үчүн schemaEvolution үчүн колдоо көрсөтүлөт.

Бир караганда, чечим жөнөкөй көрүнөт. Сиз JSON менен папканы алып, аны dataframe'ге окушуңуз керек. Spark схема түзүп, уяланган маалыматтарды структураларга айлантат. Андан тышкары, бардыгын паркетте сактоо керек, ал Импалада да колдоого алынат, дүкөндүн маңдайкы жагын Hive мета дүкөнүндө каттоо менен.

Баары жөнөкөй көрүнөт.

Бирок, иш жузунде бир катар проблемаларды эмне кылуу керектиги документтеги кыска мисалдардан ачык-айкын эмес.

Документте дүкөндүн маңдайчасын түзүү эмес, JSON же XMLди датафрамга окуу ыкмасы сүрөттөлөт.

Тактап айтканда, ал жөн гана JSON кантип окууну жана талдоону көрсөтөт:

df = spark.read.json(path...)

Бул маалыматтарды Spark үчүн жеткиликтүү кылуу үчүн жетиштүү.

Иш жүзүндө, скрипт жөн гана папкадан JSON файлдарын окууга жана dataframe түзүүгө караганда алда канча татаал. Кырдаал мындай көрүнөт: белгилүү бир дүкөн бар, жаңы маалыматтар күн сайын келип турат, аларды дүкөндүн маңдайына кошуу керек, схема ар кандай болушу мүмкүн экенин унутпаш керек.

Витрина куруунун кадимки схемасы төмөнкүдөй:

Step 1. Маалыматтар Hadoop'ко жүктөлөт жана күн сайын кайра жүктөлөт жана жаңы бөлүмгө кошулат. Күнүнө бөлүнгөн баштапкы маалыматтары бар папка чыкты.

Step 2. Баштапкы жүктөө учурунда бул папка Spark тарабынан окулат жана талданат. Алынган dataframe талдоо форматында сакталат, мисалы, паркетте, андан кийин Impala импорттоого болот. Бул ушул учурга чейин топтолгон бардык маалыматтар менен максаттуу көргөзмөнү түзөт.

Step 3. Дүкөндүн маңдайкы жагын күн сайын жаңыртып турган жүктөө түзүлөт.
Бул жерде кошумча жүктөө, витринаны бөлүү зарылдыгы жана витринанын жалпы схемасын сактоо маселеси бар.

Мисал келтирели. Репозиторийди куруунун биринчи кадамы аткарылды жана JSON файлдары папкага жүктөлдү дейли.

Алардан dataframe түзүү, анан аны витрина катары сактоо көйгөй эмес. Бул Spark документтеринде оңой табыла турган эң биринчи кадам:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Баары жакшы окшойт.

Биз JSON-ду окуп, талдап чыктык, андан кийин дата фреймди паркет катары сактайбыз, аны Hiveге каалаган ыңгайлуу жол менен каттайбыз:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Биз терезе алабыз.

Бирок, эртеси булактан жаңы маалыматтар кошулду. Бизде JSON менен папка бар жана бул папкадан түзүлгөн көргөзмө. Булактан берилиштердин кийинки партиясын жүктөгөндөн кийин, datamart бир күндүк маалымат жетишпейт.

Логикалык чечим дүкөндүн алдыңкы бөлүгүн күн сайын бөлүү болот, бул кийинки күнү жаңы бөлүмдү кошууга мүмкүндүк берет. Мунун механизми да белгилүү, Spark бөлүктөрдү өзүнчө жазууга мүмкүндүк берет.

Биринчиден, биз баштапкы жүктөөнү жасайбыз, маалыматтарды жогоруда сүрөттөлгөндөй сактап, бөлүүнү гана кошобуз. Бул аракет дүкөндүн маңдайкы инициализациясы деп аталат жана бир гана жолу аткарылат:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Эртеси күнү биз жаңы бөлүмдү гана жүктөйбүз:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Схеманы жаңыртуу үчүн Уюкта кайра катталуу гана калды.
Бирок, бул жерде көйгөйлөр пайда болот.

Биринчи көйгөй. Эртеби-кечпи, пайда болгон паркет окулбай калат. Бул паркет менен JSON бош талааларга кандай мамиле кылышы менен шартталган.

Бир типтүү жагдайды карап көрөлү. Мисалы, кечээ JSON келди:

День 1: {"a": {"b": 1}},

жана бүгүнкү күндө ошол эле JSON мындай көрүнөт:

День 2: {"a": null}

Келгиле, бизде эки башка бөлүм бар дейли, ар биринде бир сызык бар.
Биз бүткүл баштапкы маалыматтарды окуганда, Spark түрүн аныктай алат жана "a" INT түрүндөгү "b" уяча талаасы менен "структура" тибиндеги талаа экенин түшүнөт. Бирок, ар бир бөлүм өзүнчө сакталган болсо, анда биз шайкеш келбеген бөлүү схемалары менен паркет алабыз:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Бул жагдай жакшы белгилүү, ошондуктан атайын опция кошулган - булак маалыматтарын талдоодо бош талааларды алып салыңыз:

df = spark.read.json("...", dropFieldIfAllNull=True)

Бул учурда, паркет чогуу окуй турган бөлүктөрдөн турат.
Муну иш жүзүндө кылгандар бул жерде ачуу жылмайышат да. Неге? Ооба, анткени дагы эки жагдай болушу мүмкүн. Же үч. Же төрт. Биринчиси, дээрлик сөзсүз пайда болот, ар кандай JSON файлдарында сандык түрлөр ар кандай көрүнөт. Мисалы, {intField: 1} жана {intField: 1.1}. Эгерде мындай талаалар бир бөлүмдө табылса, анда схемаларды бириктирүү бардыгын туура окуп, эң так түргө алып келет. Бирок ар башка болсо, анда биринде intField: int, экинчисинде intField: double болот.

Бул жагдайды чечүү үчүн төмөнкү желек бар:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Эми бизде папка бар, анда бир датафрамда окуй турган бөлүмдөр жана бүт көргөзмөнүн жарактуу паркети бар. Ооба? Жок.

Уюкта үстөлдү каттаганыбызды унутпашыбыз керек. Уюк талаа аталыштарында регистрге сезимтал эмес, ал эми паркет регистрге сезимтал. Ошондуктан, схемалары бар бөлүмдөр: field1: int жана Field1: int Hive үчүн бирдей, бирок Spark үчүн эмес. Талаа аттарын кичине тамгага которууну унутпаңыз.

Ошондон кийин баары жакшы болот окшойт.

Бирок, баары ушунчалык жөнөкөй эмес. Экинчи, ошондой эле белгилүү маселе бар. Ар бир жаңы бөлүм өзүнчө сакталгандыктан, бөлүм папкасында Spark кызматтык файлдары болот, мисалы, _SUCCESS операциясынын ийгиликтүү желеги. Бул паркет жасоодо катага алып келет. Мунун алдын алуу үчүн, Spark папкага кызматтык файлдарды кошууга жол бербөө үчүн конфигурацияны конфигурациялашыңыз керек:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Азыр күн сайын талданган маалыматтар жайгашкан максаттуу витрина папкасына күн сайын жаңы паркет бөлүмү кошулуп жаткандай сезилет. Биз алдын ала маалымат түрүнө карама-каршы келген бөлүктөр жок экенине кам көрдүк.

Бирок бизде үчүнчү маселе бар. Азыр жалпы схема белгисиз, анын үстүнө уюктагы таблицада туура эмес схема бар, анткени ар бир жаңы бөлүм схемага бурмалоо киргизиши мүмкүн.

Сиз үстөлдү кайра катташыңыз керек. Муну жөн гана кылса болот: дүкөндүн маңдайкы паркетинин кайра окуп, схеманы алып, анын негизинде DDL түзүңүз, анын жардамы менен Hiveдеги папканы тышкы таблица катары кайра каттаңыз, максаттуу дүкөндүн маңдайкы жагынын схемасын жаңыртыңыз.

Бизде төртүнчү маселе бар. Биз үстөлдү биринчи жолу каттаганда Sparkка таянганбыз. Эми биз муну өзүбүз жасайбыз жана паркет талаалары Уюкка уруксат берилбеген символдордон башталышы мүмкүн экенин эстен чыгарбашыбыз керек. Мисалы, Spark "corrupt_record" талаасында талдай албаган сызыктарды ыргытат. Мындай талааны уюктан кутулбастан каттоого болбойт.

Муну билип, биз схеманы алабыз:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

коду ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") коопсуз DDL түзөт, б.а. анын ордуна:

create table tname (_field1 string, 1field string)

"_field1, 1field" сыяктуу талаа аталыштары менен коопсуз DDL талаа аталыштары качып кеткен жерде жасалат: `tname` таблицасын түзүңүз (`_field1` сап, `1талаа` сабы).

Суроо туулат: толук схемасы бар датафраймди кантип туура алса болот (pf кодунда)? Бул pf кантип алса болот? Бул бешинчи маселе. Максаттуу витринанын паркет файлдары бар папкадагы бардык бөлүмдөрдүн схемасын кайра окуйсузбу? Бул ыкма эң коопсуз, бирок кыйын.

Схема уюкта мурунтан эле бар. Сиз бүт таблицанын схемасын жана жаңы бөлүмдү бириктирип, жаңы схеманы ала аласыз. Ошентип, сиз Уюктан таблица схемасын алып, аны жаңы бөлүмдүн схемасы менен бириктиришиңиз керек. Бул Уюктан сыноо метадайындарын окуп, аны убактылуу папкага сактоо жана Spark аркылуу эки бөлүмдү бир эле учурда окуу менен жасалса болот.

Чынында, сизге керектүү нерселердин баары бар: Уюктагы оригиналдуу таблица схемасы жана жаңы бөлүм. Бизде да маалыматтар бар. Дүкөндүн схемасын жана түзүлгөн бөлүмдөн жаңы талааларды бириктирген жаңы схеманы алуу үчүн гана калды:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Андан кийин, мурунку үзүндүдөгүдөй, DDL таблицасын түзөбүз.
Эгерде бүт чынжыр туура иштесе, тактап айтканда, инициализациялоо жүктөмү бар болсо жана уюкта таблица туура түзүлгөн болсо, анда биз жаӊыртылган таблица схемасын алабыз.

Ал эми акыркы маселе, сиз Hive таблицасына жөн эле бөлүм кошо албайсыз, анткени ал бузулат. Уюкту анын бөлүү түзүмүн оңдоого мажбурлашыңыз керек:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

JSON окуу жана анын негизинде дүкөндү түзүү жөнөкөй тапшырмасы, сиз өзүнчө издешиңиз керек болгон бир катар жашыруун кыйынчылыктарды, чечимдерди жеңүүгө алып келет. Жана бул чечимдер жөнөкөй болгону менен, аларды табуу үчүн көп убакыт талап кылынат.

Витрина курууну ишке ашыруу үчүн мен төмөнкүлөргө туура келди:

  • Сервис файлдарынан арылуу менен көргөзмөгө бөлүмдөрдү кошуңуз
  • Spark терген баштапкы маалыматтардагы бош талаалар менен иштөө
  • Жөнөкөй типтерди сапка чыгаруу
  • Талаа аттарын кичине тамгага айландырыңыз
  • Hive'де өзүнчө маалыматтарды жүктөө жана таблицаны каттоо (DDL мууну)
  • Hive менен шайкеш келбеген талаа аттарынан качууну унутпаңыз
  • Hive'де таблица каттоосун кантип жаңыртуу керектигин үйрөнүңүз

Жыйынтыктап айтканда, биз дүкөндөрдүн терезелерин куруу чечими көптөгөн тузактар ​​менен коштолгондугун белгилейбиз. Ошондуктан, ишке ашырууда кыйынчылыктар болгон учурда, ийгиликтүү тажрыйбалуу тажрыйбалуу өнөктөш менен байланышуу жакшы.

Бул макаланы окуганыңыз үчүн рахмат, биз сизге пайдалуу маалымат табасыз деп ишенебиз.

Source: www.habr.com

Комментарий кошуу