Spark схем Практикт хувьсал

Эрхэм уншигчид, энэ өдрийн мэнд!

Энэ нийтлэлд Neoflex-ийн Big Data Solutions бизнесийн салбарын тэргүүлэх зөвлөх нь Apache Spark-ийг ашиглан хувьсах бүтцийн үзүүлэнг бий болгох хувилбаруудыг дэлгэрэнгүй тайлбарласан болно.

Өгөгдлийн шинжилгээний төслийн нэг хэсэг болгон сул бүтэцтэй өгөгдөл дээр тулгуурлан дэлгүүрийн лангуу байгуулах ажил ихэвчлэн гарч ирдэг.

Ихэнхдээ эдгээр нь JSON эсвэл XML хэлбэрээр хадгалагдсан янз бүрийн системүүдийн бүртгэлүүд эсвэл хариултууд юм. Мэдээллийг Hadoop-д байршуулсан бол та тэдгээрээс дэлгүүрийн нүүрийг бүтээх хэрэгтэй. Бид жишээ нь Impala-ээр дамжуулан үүсгэсэн үзүүлэн рүү нэвтрэх боломжийг зохион байгуулж болно.

Энэ тохиолдолд зорилтот дэлгүүрийн лангууны схемийг урьдчилан мэдэхгүй байна. Түүгээр ч зогсохгүй, схемийг урьдчилан гаргах боломжгүй, учир нь энэ нь өгөгдлөөс хамаардаг бөгөөд бид эдгээр маш сул бүтэцтэй өгөгдөлтэй харьцаж байна.

Жишээлбэл, өнөөдөр дараах хариуг бүртгэсэн байна:

{source: "app1", error_code: ""}

мөн маргааш ижил системээс дараах хариулт гарч ирнэ.

{source: "app1", error_code: "error", description: "Network error"}

Үүний үр дүнд үзэсгэлэнд дахин нэг талбар нэмэгдэх ёстой - тайлбар, энэ нь ирэх эсэхийг хэн ч мэдэхгүй.

Ийм өгөгдөл дээр дэлгүүрийн нүүр хуудас үүсгэх ажил нь нэлээд стандарт бөгөөд Spark нь үүнд зориулсан хэд хэдэн хэрэгсэлтэй байдаг. Эх өгөгдлийг задлан шинжлэхийн тулд JSON болон XML-ийн аль алинд нь дэмжлэг байдаг бөгөөд урьд өмнө мэдэгддэггүй схемийн хувьд schemaEvolution-ийн дэмжлэг үзүүлдэг.

Өнгөц харахад шийдэл нь энгийн мэт. Та JSON-тэй хавтас аваад үүнийг дата фреймд унших хэрэгтэй. Spark нь схем үүсгэж, үүрлэсэн өгөгдлийг бүтэц болгон хувиргах болно. Цаашилбал, бүх зүйлийг Паркетан дээр хадгалах шаардлагатай бөгөөд үүнийг Impala-д дэмждэг бөгөөд Hive мета дэлгүүрт дэлгүүрийн нүүрийг бүртгүүлэх хэрэгтэй.

Бүх зүйл энгийн юм шиг санагддаг.

Гэсэн хэдий ч практикт хэд хэдэн асуудалтай тулгарвал юу хийх нь баримт бичигт байгаа богино жишээнүүдээс тодорхойгүй байна.

Баримт бичиг нь дэлгүүрийн нүүр хуудас үүсгэх биш харин JSON эсвэл XML-г дата фрейм болгон унших аргыг тайлбарладаг.

Тухайлбал, энэ нь зүгээр л JSON-г хэрхэн уншиж, задлахыг харуулж байна:

df = spark.read.json(path...)

Энэ нь өгөгдлийг Spark-д ашиглах боломжтой болгоход хангалттай юм.

Практикт скрипт нь JSON файлуудыг хавтаснаас уншиж, дата фрейм үүсгэхээс хамаагүй илүү төвөгтэй байдаг. Нөхцөл байдал иймэрхүү харагдаж байна: аль хэдийн тодорхой дэлгүүрийн нүүр хуудас байна, шинэ мэдээлэл өдөр бүр орж ирдэг, тэдгээрийг дэлгүүрийн нүүрэн дээр нэмэх шаардлагатай бөгөөд схем нь өөр байж болохыг мартаж болохгүй.

Үзэсгэлэнгийн барилгын ердийн схем нь дараах байдалтай байна.

1 алхам. Өгөгдлийг Hadoop руу ачаалж, дараа нь өдөр бүр дахин ачаалж, шинэ хуваалтад нэмдэг. Энэ нь өдөрт хуваагдсан анхны өгөгдөл бүхий хавтас болж байна.

2 алхам. Эхний ачааллын үед энэ фолдерыг Spark уншиж, задлан шинжилдэг. Үүссэн өгөгдлийн фрейм нь паркетан дээр, жишээ нь паркетан дээр хадгалагдаж, дараа нь Импала руу оруулж болно. Энэ нь өнөөг хүртэл хуримтлагдсан бүх өгөгдөл бүхий зорилтот үзүүлэнг бий болгодог.

3 алхам. Дэлгүүрийн нүүрийг өдөр бүр шинэчилдэг татан авалтыг бий болгосон.
Өсөн нэмэгдэж буй ачааллын асуудал, үзэсгэлэнгийн хэсгийг хуваах хэрэгцээ, үзэсгэлэнгийн ерөнхий схемийг хадгалах асуудал байна.

Нэг жишээ татъя. Хадгалах газар байгуулах эхний алхам хэрэгжиж, JSON файлуудыг хавтас руу байршуулсан гэж бодъё.

Тэднээс дата фрейм үүсгэж, дараа нь үзүүлэн болгон хадгалах нь асуудал биш юм. Энэ бол Spark баримт бичигт хялбархан олж болох хамгийн эхний алхам юм:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Бүх зүйл сайхан байгаа бололтой.

Бид JSON-г уншиж, задлан шинжилсний дараа өгөгдлийн фреймийг паркет болгон хадгалж, Hive-д ямар ч тохиромжтой аргаар бүртгэнэ.

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Бид цонх авдаг.

Гэвч маргааш нь эх сурвалжаас авсан шинэ мэдээлэл нэмэгдсэн. Бидэнд JSON-тай фолдер байгаа бөгөөд энэ фолдероос үүсгэсэн үзэсгэлэн бий. Эх сурвалжаас дараагийн багц өгөгдлүүдийг ачаалсны дараа data mart-д нэг өдрийн өгөгдөл дутуу байна.

Логик шийдэл бол дэлгүүрийн нүүрийг өдөр бүр хуваах явдал бөгөөд энэ нь дараагийн өдөр бүр шинэ хуваалт нэмэх боломжийг олгоно. Үүний механизм нь бас мэдэгдэж байгаа бөгөөд Spark нь хуваалтыг тусад нь бичих боломжийг олгодог.

Нэгдүгээрт, бид эхний ачааллыг хийж, дээр дурдсанчлан өгөгдлийг хадгалж, зөвхөн хуваалтыг нэмнэ. Энэ үйлдлийг дэлгүүрийн нүүрийг эхлүүлэх гэж нэрлэдэг бөгөөд зөвхөн нэг л удаа хийгддэг:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Дараагийн өдөр нь бид зөвхөн шинэ хуваалтыг ачаална:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Үлдсэн зүйл бол схемийг шинэчлэхийн тулд Hive-д дахин бүртгүүлэх явдал юм.
Гэсэн хэдий ч энд асуудал үүсдэг.

Эхний асуудал. Эрт орой хэзээ нэгэн цагт үүссэн паркет нь унших боломжгүй болно. Энэ нь паркет болон JSON нь хоосон талбарт хэрхэн ялгаатай ханддагтай холбоотой юм.

Ердийн нөхцөл байдлыг авч үзье. Жишээлбэл, өчигдөр JSON ирсэн:

День 1: {"a": {"b": 1}},

Өнөөдөр ижил JSON дараах байдалтай байна:

День 2: {"a": null}

Бид тус бүр нэг мөртэй хоёр өөр хуваалттай гэж үзье.
Бид эх өгөгдлийг бүхэлд нь уншихад Spark төрлийг тодорхойлох боломжтой бөгөөд "a" нь "бүтцийн" төрлийн талбар, INT төрлийн "b" талбартай гэдгийг ойлгох болно. Гэхдээ хэрэв хуваалт бүрийг тусад нь хадгалсан бол бид тохирохгүй хуваалтын схемтэй паркет авна.

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Энэ нөхцөл байдал сайн мэдэгдэж байгаа тул тусгайлан сонголтыг нэмсэн - эх өгөгдлийг задлан шинжлэхдээ хоосон талбаруудыг устгана уу:

df = spark.read.json("...", dropFieldIfAllNull=True)

Энэ тохиолдолд паркетан нь хамтдаа уншиж болох хуваалтуудаас бүрдэнэ.
Хэдийгээр практик дээр үүнийг хийсэн хүмүүс энд гашуунаар инээмсэглэх болно. Яагаад? Тийм ээ, учир нь дахиад хоёр нөхцөл байдал үүсэх магадлалтай. Эсвэл гурав. Эсвэл дөрөв. Эхнийх нь бараг л тохиолдох нь янз бүрийн JSON файлд тоон төрлүүд өөр харагдах болно. Жишээлбэл, {intField: 1} ба {intField: 1.1}. Хэрэв ийм талбарууд нэг хуваалтаас олдвол схемийг нэгтгэх нь бүх зүйлийг зөв уншиж, хамгийн зөв төрөлд хүргэдэг. Харин өөр нь байвал нэг нь intField: int, нөгөө нь intField: double байх болно.

Энэ нөхцөл байдлыг зохицуулах дараах туг байна:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Одоо бидэнд нэг өгөгдлийн фреймд унших боломжтой хуваалтууд болон бүхэл бүтэн үзэсгэлэнгийн хүчинтэй паркет бүхий хавтас бий. Тийм үү? Үгүй

Бид хүснэгтийг Hive-д бүртгүүлсэн гэдгээ санах ёстой. Талбайн нэрний хувьд Hive нь жижиг жижиг жижиг үсгийг ялгадаггүй бол паркет нь том жижиг жижиг үсгээр ялгадаг. Тиймээс схемүүдтэй хуваалтууд: талбар1: int, болон Field1: int нь Hive-ийн хувьд адилхан боловч Spark-ийн хувьд биш. Талбайн нэрийг жижиг үсгээр хөрвүүлэхээ бүү мартаарай.

Үүний дараа бүх зүйл сайхан байх шиг байна.

Гэсэн хэдий ч бүх зүйл тийм ч энгийн биш юм. Хоёрдахь, бас сайн мэддэг асуудал бий. Шинэ хуваалт бүрийг тусад нь хадгалдаг тул хуваалтын хавтсанд Spark үйлчилгээний файлууд байх болно, жишээлбэл, _SUCCESS үйлдлийн амжилтын туг. Энэ нь паркет хийх оролдлого хийхэд алдаа гарна. Үүнээс зайлсхийхийн тулд та Spark-ийг фолдерт үйлчилгээний файл нэмэхээс урьдчилан сэргийлэх тохиргоог хийх хэрэгтэй.

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Одоо өдөр бүр шинэ паркетан хуваалт нь тухайн өдрийн задлан шинжилсэн өгөгдөл байрлах зорилтот үзэсгэлэнгийн хавтсанд нэмэгдэж байх шиг байна. Бид өгөгдлийн төрлийн зөрчилтэй ямар ч хуваалт байхгүй байхыг урьдчилан анхаарч үзсэн.

Гэхдээ бидэнд гурав дахь асуудал байна. Одоо ерөнхий схем нь тодорхойгүй байна, үүнээс гадна шинэ хуваалт бүр схемд гажуудал оруулах магадлалтай тул Hive дахь хүснэгт буруу схемтэй байна.

Та хүснэгтийг дахин бүртгүүлэх хэрэгтэй. Үүнийг энгийнээр хийж болно: дэлгүүрийн лангууны паркетыг дахин уншиж, схемийг аваад түүн дээр үндэслэн DDL үүсгэн, Hive дахь хавтсыг гадаад хүснэгт болгон дахин бүртгүүлж, зорилтот дэлгүүрийн лангууны схемийг шинэчилнэ үү.

Бидэнд дөрөв дэх асуудал байна. Бид хүснэгтийг анх бүртгүүлэхдээ Spark-д найдаж байсан. Одоо бид өөрсдөө үүнийг хийдэг бөгөөд паркетан талбайнууд нь Hive-д зөвшөөрөгдөөгүй тэмдэгтүүдээс эхэлж болно гэдгийг санах хэрэгтэй. Жишээлбэл, Spark нь "corrupt_record" талбарт задлан шинжилж чадаагүй мөрүүдийг шиддэг. Ийм талбарыг зугтахгүйгээр Hive-д бүртгэх боломжгүй.

Үүнийг мэдсэнээр бид схемийг авна:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Хууль ("_corrupt_record", "`_corrupt_record`") + " " + f[1].орлуулах(":", "`:").орлуулах("<", "<`").орлуулах(",", ",`").replace("массив<`", "массив<") аюулгүй DDL болгодог, өөрөөр хэлбэл:

create table tname (_field1 string, 1field string)

"_field1, 1field" гэх мэт талбарын нэрээр талбарын нэрс зугтсан газар аюулгүй DDL-г хийдэг: `tname` хүснэгт үүсгэ (`_field1` string, `1field` string).

Асуулт гарч ирнэ: бүрэн схемтэй (pf кодоор) өгөгдлийн фреймийг хэрхэн зөв авах вэ? Энэ pf-г яаж авах вэ? Энэ бол тав дахь асуудал юм. Зорилтот үзэсгэлэнгийн паркет файл бүхий хавтсаас бүх хуваалтын схемийг дахин уншина уу? Энэ арга нь хамгийн аюулгүй боловч хэцүү байдаг.

Уг схем нь Hive-д аль хэдийн байна. Та бүхэл бүтэн хүснэгтийн схем болон шинэ хуваалтыг нэгтгэснээр шинэ схем авах боломжтой. Тиймээс та Hive-ээс хүснэгтийн схемийг авч, шинэ хуваалтын схемтэй нэгтгэх хэрэгтэй. Үүнийг Hive-аас туршилтын мета өгөгдлийг уншиж, түр хавтсанд хадгалах, Spark ашиглан хоёр хуваалтыг нэгэн зэрэг унших замаар хийж болно.

Үнэн хэрэгтээ танд хэрэгтэй бүх зүйл байна: Hive дахь анхны хүснэгтийн схем болон шинэ хуваалт. Бидэнд бас өгөгдөл бий. Үүсгэсэн хуваалтаас дэлгүүрийн нүүрэн талын схем болон шинэ талбаруудыг хослуулсан шинэ схемийг авахад л үлддэг.

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Дараа нь бид өмнөх хэсгүүдийн адил хүснэгтийн бүртгэлийн DDL-г үүсгэнэ.
Хэрэв бүхэл бүтэн гинж зөв ажиллаж байвал, тухайлбал, эхлүүлэх ачаалал байгаа бөгөөд Hive дээр хүснэгтийг зөв үүсгэсэн бол бид шинэчилсэн хүснэгтийн схемийг авах болно.

Хамгийн сүүлчийн асуудал бол та Hive хүснэгтэд хуваалт нэмж болохгүй, учир нь энэ нь эвдрэх болно. Хуваалтын бүтцийг засахын тулд та Hive-г хүчлэх хэрэгтэй:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

JSON-г уншиж, түүн дээр тулгуурлан дэлгүүрийн нүүр хуудас үүсгэх энгийн ажил нь хэд хэдэн далд бэрхшээлийг даван туулж, тусад нь хайж олох шийдлүүдийг олох болно. Хэдийгээр эдгээр шийдлүүд нь энгийн боловч тэдгээрийг олоход маш их цаг зарцуулдаг.

Үзэсгэлэнгийн барилгын ажлыг хэрэгжүүлэхийн тулд би дараахь зүйлийг хийх ёстой байв.

  • Үйлчилгээний файлуудаас салж, үзэсгэлэнд хуваалт нэмж оруулаарай
  • Spark-ийн бичсэн эх өгөгдлийн хоосон талбаруудыг шийдвэрлэх
  • Энгийн төрлүүдийг мөр рүү дамжуулах
  • Талбайн нэрийг жижиг үсгээр хөрвүүлэх
  • Hive-д тусдаа өгөгдөл байршуулах, хүснэгтийн бүртгэл хийх (DDL үүсгэх)
  • Hive-тэй нийцэхгүй байж болох талбарын нэрсээс зайлсхийхээ бүү мартаарай
  • Hive дахь хүснэгтийн бүртгэлийг хэрхэн шинэчлэх талаар олж мэдээрэй

Дүгнэж хэлэхэд, дэлгүүрийн цонх барих шийдвэр нь олон бэрхшээлийг дагуулж байгааг бид тэмдэглэж байна. Тиймээс хэрэгжүүлэхэд хүндрэлтэй байгаа тохиолдолд амжилттай туршлагатай туршлагатай хамтрагчтай холбоо барих нь дээр.

Энэ нийтлэлийг уншсан танд баярлалаа, танд хэрэгтэй мэдээлэл байна гэж найдаж байна.

Эх сурвалж: www.habr.com

сэтгэгдэл нэмэх