Схемаи шарораи эволютсия дар амал

Хонандагони азиз, рӯз ба хайр!

Дар ин мақола, мушовири пешбари соҳаи тиҷорати Neoflex Big Data Solutions имконоти сохтани намоишҳои сохтори тағирёбандаро бо истифода аз Apache Spark ба таври муфассал тавсиф мекунад.

Ҳамчун як қисми лоиҳаи таҳлили додаҳо, аксар вақт вазифаи сохтани дӯконҳо дар асоси маълумоти сусти сохторӣ ба миён меояд.

Одатан, инҳо гузоришҳо ё посухҳои системаҳои гуногун мебошанд, ки ҳамчун JSON ё XML захира шудаанд. Маълумот ба Hadoop бор карда мешавад, пас шумо бояд аз онҳо мағозаи дӯкон созед. Мо метавонем дастрасӣ ба намоишгоҳи сохташударо, масалан, тавассути Impala ташкил кунем.

Дар ин ҳолат, схемаи мағозаи мақсаднок пешакӣ маълум нест. Ғайр аз он, нақшаро низ пешакӣ таҳия кардан мумкин нест, зеро он аз маълумот вобаста аст ва мо бо ин маълумоти хеле суст сохторшуда сарукор дорем.

Масалан, имрӯз ҷавоби зерин сабт шудааст:

{source: "app1", error_code: ""}

ва пагох аз хамон система чавоби зерин меояд:

{source: "app1", error_code: "error", description: "Network error"}

Дар натиҷа, ба витрина як майдони дигар - тавсиф илова карда шавад ва касе намедонад, ки он меояд ё не.

Вазифаи эҷоди як мағоза дар чунин маълумот хеле стандартӣ аст ва Spark барои ин як қатор асбобҳо дорад. Барои таҳлили маълумоти манбаъ ҳам барои JSON ва ҳам XML дастгирӣ мавҷуд аст ва барои схемаи қаблан номаълум дастгирӣ барои schemaEvolution таъмин карда мешавад.

Дар назари аввал, ҳалли оддӣ менамояд. Шумо бояд ҷузвдонеро бо JSON гиред ва онро дар чаҳорчӯбаи dataframe хонед. Spark схема эҷод мекунад ва маълумоти лонаро ба сохтор табдил медиҳад. Ғайр аз он, ҳама чизро бояд дар паркет захира кард, ки он дар Импала низ дастгирӣ карда мешавад, тавассути сабти дӯкон дар metastore Hive.

Ҳама чиз оддӣ ба назар мерасад.

Аммо аз мисолхои кутохи хуччатхо маълум нест, ки як катор проблемахои дар амал ба миёномадаро чй бояд кард.

Ҳуҷҷатҳо равишро на сохтани витрина, балки хондани JSON ё XML дар чаҳорчӯбаи dataframe тавсиф мекунанд.

Маҳз, он танҳо нишон медиҳад, ки чӣ тавр хондан ва таҳлили JSON:

df = spark.read.json(path...)

Ин барои дастрас кардани маълумот ба Spark кифоя аст.

Дар амал, скрипт назар ба хондани файлҳои JSON аз ҷузвдон ва эҷоди чаҳорчӯбаи dataframe хеле мураккабтар аст. Вазъият чунин ба назар мерасад: аллакай як дӯкони муайян вуҷуд дорад, ҳар рӯз маълумоти нав ворид мешавад, онҳо бояд ба витрина илова карда шаванд, фаромӯш накунед, ки схема метавонад фарқ кунад.

Нақшаи муқаррарии сохтани намоишгоҳ чунин аст:

Қадами 1. Маълумот ба Hadoop бо боркунии минбаъдаи ҳаррӯза бор карда мешавад ва ба қисмати нав илова карда мешавад. Он папкае пайдо мешавад, ки маълумоти ибтидоӣ ба рӯз тақсим карда шудааст.

Қадами 2. Ҳангоми бори аввал ин ҷузвдон аз ҷониби Spark хонда ва таҳлил карда мешавад. Чорчӯби додашуда дар формати таҳлилшаванда захира карда мешавад, масалан, дар паркет, ки баъдан онро ба Impala ворид кардан мумкин аст. Ин як намоиши ҳадафро бо тамоми маълумоти то ин лаҳза ҷамъоварӣ мекунад.

Қадами 3. Зеркашӣ эҷод карда мешавад, ки ҳар рӯз пеши мағозаро нав мекунад.
Масъалаи борбардории афзоянда, зарурати таксим кардани витрина ва масъалаи нигох доштани схемаи генералии витрина ба миён меояд.

Биёед мисолро гирем. Биёед бигӯем, ки қадами аввалини сохтани анбор амалӣ карда шуд ва файлҳои JSON ба ҷузвдон бор карда мешаванд.

Аз онҳо эҷод кардани чаҳорчӯбаи dataframe ва сипас ҳамчун намоиш захира кардани он мушкиле нест. Ин қадами аввалинест, ки онро дар ҳуҷҷатҳои Spark ба осонӣ ёфтан мумкин аст:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Ҳама чиз хуб ба назар мерасад.

Мо JSON-ро хондем ва таҳлил кардем, пас мо dataframe-ро ҳамчун паркет нигоҳ медорем ва онро дар Hive бо ҳар роҳи мувофиқ сабт мекунем:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Мо тиреза мегирем.

Аммо, рӯзи дигар, маълумоти нав аз манбаъ илова карда шуд. Мо як папка бо JSON дорем ва намоиши аз ин ҷузвдон сохташуда. Пас аз бор кардани партияи навбатии додаҳо аз манбаъ, марти додаҳо арзиши як рӯзро гум мекунад.

Ҳалли мантиқӣ ин тақсим кардани дӯкон ба рӯз хоҳад буд, ки имкон медиҳад ҳар рӯзи дигар қисмати нав илова карда шавад. Механизми ин низ маълум аст, Spark ба шумо имкон медиҳад, ки қисмҳоро алоҳида нависед.

Аввалан, мо бори аввалро иҷро мекунем, маълумотро тавре, ки дар боло тавсиф шудааст, захира мекунем ва танҳо қисмҳоро илова мекунем. Ин амал оғозкунии мағоза номида мешавад ва танҳо як маротиба анҷом дода мешавад:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Рӯзи дигар, мо танҳо як қисми навро бор мекунем:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Танҳо барои навсозии схема дар Hive сабти ном шудан аст.
Бо вуҷуди ин, дар ин ҷо мушкилот ба миён меоянд.

Мушкилоти аввал. Дер ё зуд, паркет дар натиҷа нохонда мешавад. Ин ба он вобаста аст, ки чӣ гуна паркет ва JSON ба майдонҳои холӣ ба таври гуногун муносибат мекунанд.

Биёед як вазъияти маъмулиро дида бароем. Масалан, дирӯз JSON меояд:

День 1: {"a": {"b": 1}},

ва имрӯз ҳамон JSON чунин менамояд:

День 2: {"a": null}

Фарз мекунем, ки мо ду қисмҳои гуногун дорем, ки ҳар яки онҳо як сатр доранд.
Вақте ки мо тамоми маълумоти сарчашмаро мутолиа мекунем, Spark метавонад намудро муайян кунад ва фаҳмад, ки "a" майдони навъи "структур" аст ва майдони дохилии "b" навъи INT аст. Аммо, агар ҳар як қисм алоҳида захира карда шуда бошад, пас мо паркетро бо схемаҳои тақсимоти номувофиқ мегирем:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Ин вазъият ба ҳама маълум аст, бинобар ин як вариант махсус илова карда шудааст - ҳангоми таҳлили маълумоти манбаъ, майдонҳои холиро хориҷ кунед:

df = spark.read.json("...", dropFieldIfAllNull=True)

Дар ин ҳолат, паркет аз қисмҳое иборат хоҳад буд, ки онҳоро якҷоя хондан мумкин аст.
Хол он ки онхое, ки дар амал ин корро кардаанд, дар ин чо талх табассум мекунанд. Чаро? Бале, зеро эҳтимол ду ҳолати дигар вуҷуд дорад. Ё се. Ё чор. Якум, ки қариб бешубҳа рух хоҳад дод, ин аст, ки намудҳои ададӣ дар файлҳои гуногуни JSON гуногун хоҳанд буд. Масалан, {intField: 1} ва {intField: 1.1}. Агар чунин майдонҳо дар як қисм пайдо шаванд, он гоҳ якҷояшавии схема ҳама чизро дуруст хонда, ба намуди дақиқтарин оварда мерасонад. Аммо агар дар онҳо гуногун бошад, пас яке intField: int дорад ва дигаре intField: double дорад.

Барои ҳалли ин вазъият парчами зерин мавҷуд аст:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Ҳоло мо папка дорем, ки дар он қисмҳо мавҷуданд, ки онҳоро дар як чаҳорчӯбаи додаҳо хондан мумкин аст ва паркети дурусти тамоми намоишгоҳ. Ҳа? Не.

Мо бояд дар хотир дорем, ки мо ҷадвалро дар Hive ба қайд гирифтаем. Қутб дар номҳои майдонҳо ба ҳарфҳо ҳассос нест, дар ҳоле ки паркет ба ҳарфҳо ҳассос аст. Аз ин рӯ, қисмҳо бо схемаҳо: field1: int ва Field1: int барои Hive якхелаанд, аммо барои Spark не. Фаромӯш накунед, ки номҳои майдонҳоро ба ҳарфҳои хурд табдил диҳед.

Баъд аз ин, ҳама чиз хуб ба назар мерасад.

Бо вуҷуди ин, на ҳама он қадар оддӣ. Масъалаи дуюм, инчунин маълум аст. Азбаски ҳар як қисми нав алоҳида захира карда мешавад, ҷузвдони бахш дорои файлҳои хидматрасонии Spark, масалан, парчами муваффақияти амалиёти _SUCCESS. Ин боиси хатогӣ ҳангоми кӯшиши паркет мегардад. Барои пешгирӣ кардани ин, шумо бояд конфигуратсияро танзим кунед, то Spark аз илова кардани файлҳои хидматӣ ба ҷузвдон пешгирӣ кунад:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Чунин ба назар мерасад, ки ҳоло ҳар рӯз як қисми нави паркет ба ҷузвдони намоиши мақсаднок илова карда мешавад, ки дар он маълумоти таҳлилшуда барои рӯз ҷойгир аст. Мо пешакӣ ғамхорӣ кардем, ки қисмҳои дорои ихтилофи навъи додаҳо вуҷуд надоранд.

Аммо, мо як мушкили сеюм дорем. Ҳоло схемаи умумӣ маълум нест, илова бар ин, ҷадвал дар Hive схемаи нодуруст дорад, зеро ҳар як қисмати нав эҳтимолан таҳрифро ба схема ворид кардааст.

Шумо бояд ҷадвалро аз нав сабт кунед. Инро оддӣ кардан мумкин аст: паркети витринаро бори дигар хонед, схемаро гиред ва дар асоси он DDL эҷод кунед, ки бо он ҷузвдонро дар Hive ҳамчун ҷадвали беруна аз нав сабт кунед ва схемаи витринаро навсозӣ кунед.

Мо проблемаи чорум дорем. Вақте ки мо ҷадвалро бори аввал ба қайд гирифтем, мо ба Spark такя кардем. Ҳоло мо худамон ин корро мекунем ва мо бояд дар хотир дошта бошем, ки майдонҳои паркетӣ метавонанд бо аломатҳое оғоз шаванд, ки барои Hive иҷозат дода намешаванд. Масалан, Spark хатҳоро мепартояд, ки дар майдони "corrupt_record" таҳлил карда натавонистанд. Чунин майдонро дар Hive бидуни фирор сабт кардан мумкин нест.

Бо донистани ин, мо схемаро ба даст меорем:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

рамз ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").place(",", ",`").replace("массив<`", "массив<") DDL-ро бехатар месозад, яъне ба ҷои:

create table tname (_field1 string, 1field string)

Бо номҳои майдонҳо ба монанди "_field1, 1field", DDL-и бехатар дар он ҷое сохта мешавад, ки аз номҳои майдон хориҷ карда шудаанд: ҷадвали `tname`-ро эҷод кунед (сатри `_field1`, `1field`).

Саволе ба миён меояд: чӣ гуна бояд маълумотро бо схемаи мукаммал (дар коди pf) дуруст ба даст овард? Ин pf-ро чӣ гуна бояд гирифт? Ин масъалаи панҷум аст. Нақшаи ҳамаи қисмҳоро аз папка бо файлҳои паркетии витринаи мақсаднок дубора хонед? Ин усул бехатартарин, вале мушкил аст.

Схема аллакай дар Hive аст. Шумо метавонед бо якҷоя кардани схемаи тамоми ҷадвал ва қисмати нав схемаи нав гиред. Ҳамин тавр, шумо бояд схемаи ҷадвалро аз Hive гиред ва онро бо схемаи қисмати нав якҷоя кунед. Инро тавассути хондани метамаълумоти санҷишӣ аз Hive, захира кардани он ба ҷузвдони муваққатӣ ва истифодаи Spark барои хондани ҳарду қисмҳо якбора анҷом додан мумкин аст.

Дар асл, ҳама чизест, ки ба шумо лозим аст: схемаи аслии ҷадвал дар Hive ва қисмати нав. Мо инчунин маълумот дорем. Танҳо барои гирифтани схемаи нав боқӣ мемонад, ки схемаи дӯкон ва майдонҳои навро аз қисмати сохташуда муттаҳид мекунад:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Минбаъд, мо сабти ҷадвали DDL-ро, мисли порчаи қаблӣ эҷод мекунем.
Агар тамоми занҷир дуруст кор кунад, яъне сарбории ибтидоӣ вуҷуд дошта бошад ва ҷадвал дар Hive дуруст сохта шуда бошад, пас мо схемаи ҷадвали навшударо мегирем.

Ва мушкили охирин дар он аст, ки шумо наметавонед танҳо қисматро ба ҷадвали Hive илова кунед, зеро он шикаста мешавад. Шумо бояд Hive-ро маҷбур кунед, ки сохтори тақсимоти онро ислоҳ кунад:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Вазифаи оддии хондани JSON ва эҷоди дӯкон дар асоси он боиси бартараф кардани як қатор душвориҳои номуайян мегардад, ки ҳалли онҳоро шумо бояд алоҳида ҷустуҷӯ кунед. Ва гарчанде ки ин қарорҳо соддаанд, барои пайдо кардани онҳо вақти зиёд лозим аст.

Барои ба амал баровардани сохтмони витрина ман бояд:

  • Ба намоиш қисмҳо илова кунед, аз файлҳои хидматӣ халос шавед
  • Бо майдонҳои холӣ дар маълумоти манбаъ, ки Spark чоп кардааст, кор кунед
  • Навъҳои оддиро ба сатр интиқол диҳед
  • Номҳои майдонҳоро ба ҳарфҳои хурд табдил диҳед
  • Боркунии маълумот ва сабти ҷадвал дар Hive (насли DDL)
  • Фаромӯш накунед, ки аз номҳои майдоне, ки бо Hive номувофиқанд, канорагирӣ кунед
  • Омӯзед, ки чӣ тавр бақайдгирии ҷадвалро дар Hive навсозӣ кунед

Хулоса, мо қайд мекунем, ки қарор дар бораи сохтани витринаҳо бо камбудиҳои зиёд дучор меояд. Аз ин рӯ, дар сурати душворӣ дар татбиқ, беҳтар аст, ки бо шарики ботаҷриба бо таҷрибаи муваффақ тамос гиред.

Ташаккур ба шумо барои хондани ин мақола, мо умедворем, ки шумо маълумоти муфид пайдо мекунед.

Манбаъ: will.com

Илова Эзоҳ