Spark sxemasiEvolyutsiya amalda

Hurmatli o'quvchilar, xayrli kun!

Ushbu maqolada Neoflex kompaniyasining Big Data Solutions biznes sohasining yetakchi maslahatchisi Apache Spark yordamida o'zgaruvchan tuzilmali vitrinalar yaratish imkoniyatlarini batafsil tasvirlab beradi.

Ma'lumotlarni tahlil qilish loyihasining bir qismi sifatida, ko'pincha erkin tuzilgan ma'lumotlarga asoslangan do'konlarni qurish vazifasi paydo bo'ladi.

Odatda bu JSON yoki XML sifatida saqlangan jurnallar yoki turli tizimlarning javoblari. Ma'lumotlar Hadoop-ga yuklanadi, keyin siz ulardan do'kon yaratishingiz kerak. Biz yaratilgan vitrinaga kirishni tashkil qilishimiz mumkin, masalan, Impala orqali.

Bunday holda, maqsadli do'konning sxemasi oldindan ma'lum emas. Bundan tashqari, sxemani ham oldindan tuzib bo'lmaydi, chunki u ma'lumotlarga bog'liq va biz bu juda erkin tuzilgan ma'lumotlar bilan ishlaymiz.

Masalan, bugungi kunda quyidagi javob qayd etilgan:

{source: "app1", error_code: ""}

va ertaga xuddi shu tizimdan quyidagi javob keladi:

{source: "app1", error_code: "error", description: "Network error"}

Natijada, vitrinaga yana bitta maydon qo'shilishi kerak - tavsif va uning kelishi yoki kelmasligi hech kimga ma'lum emas.

Bunday ma'lumotlarga do'kon yaratish vazifasi juda standart va Spark buning uchun bir qator vositalarga ega. Manba ma'lumotlarini tahlil qilish uchun JSON ham, XML ham qo'llab-quvvatlanadi va ilgari noma'lum sxema uchun schemaEvolution qo'llab-quvvatlanadi.

Bir qarashda, yechim oddiy ko'rinadi. JSON bilan jildni olib, uni dataframega o'qishingiz kerak. Spark sxemani yaratadi, o'rnatilgan ma'lumotlarni tuzilmalarga aylantiradi. Bundan tashqari, hamma narsani parketda saqlash kerak, bu Impala-da ham qo'llab-quvvatlanadi, do'konni Hive metastore'da ro'yxatdan o'tkazish orqali.

Hamma narsa oddiy ko'rinadi.

Biroq, hujjatlardagi qisqa misollardan amalda bir qator muammolar bilan nima qilish kerakligi aniq emas.

Hujjatlar do'konni yaratish emas, balki JSON yoki XML-ni dataframe ichiga o'qish yondashuvini tavsiflaydi.

Ya'ni, u JSONni qanday o'qish va tahlil qilishni ko'rsatadi:

df = spark.read.json(path...)

Bu ma'lumotlarni Spark uchun mavjud qilish uchun etarli.

Amalda, skript JSON fayllarini jilddan o'qish va dataframe yaratishdan ko'ra ancha murakkabroq. Vaziyat shunday ko'rinadi: allaqachon ma'lum bir do'kon bor, har kuni yangi ma'lumotlar keladi, ular sxemasi farq qilishi mumkinligini unutmasdan, do'konga qo'shilishi kerak.

Vitrni qurishning odatiy sxemasi quyidagicha:

1 qadam. Ma'lumotlar har kuni qayta yuklash bilan Hadoop-ga yuklanadi va yangi bo'limga qo'shiladi. Dastlabki ma'lumotlar kun bo'yicha bo'lingan papka paydo bo'ldi.

2 qadam. Dastlabki yuklash vaqtida ushbu papka Spark tomonidan o'qiladi va tahlil qilinadi. Olingan dataframe parsable formatda, masalan, parketda saqlanadi, keyin uni Impala-ga import qilish mumkin. Bu shu paytgacha to'plangan barcha ma'lumotlar bilan maqsadli vitrinani yaratadi.

3 qadam. Do'kon ko'rinishini har kuni yangilaydigan yuklab olish yaratiladi.
O'sib borayotgan yuklash, vitrinani bo'lish zarurati va vitrinaning umumiy sxemasini saqlash masalasi mavjud.

Keling, bir misol keltiraylik. Aytaylik, omborni qurishning birinchi bosqichi amalga oshirildi va JSON fayllari jildga yuklandi.

Ulardan dataframe yaratish, keyin uni vitrina sifatida saqlash muammo emas. Bu Spark hujjatlarida osongina topilishi mumkin bo'lgan birinchi qadamdir:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Hammasi yaxshidek tuyuladi.

Biz JSON-ni o'qib chiqdik va tahlil qildik, keyin ma'lumotlar ramkasini parket sifatida saqlaymiz va uni Hive-da istalgan qulay usulda ro'yxatdan o'tkazamiz:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Biz oyna olamiz.

Ammo, ertasi kuni manbadan yangi ma'lumotlar qo'shildi. Bizda JSON bilan jild va shu jilddan yaratilgan vitrin mavjud. Manbadan keyingi ma'lumotlar to'plamini yuklagandan so'ng, ma'lumotlar martida bir kunlik ma'lumotlar etishmayapti.

Mantiqiy yechim do'kon peshtaxtasini kundan-kunga bo'lishdir, bu har kuni yangi bo'lim qo'shish imkonini beradi. Buning mexanizmi ham yaxshi ma'lum, Spark bo'limlarni alohida yozish imkonini beradi.

Birinchidan, biz dastlabki yukni bajaramiz, ma'lumotlarni yuqorida tavsiflanganidek saqlaymiz, faqat bo'limlarni qo'shamiz. Bu harakat vitrini ishga tushirish deb ataladi va faqat bir marta amalga oshiriladi:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Ertasi kuni biz faqat yangi bo'limni yuklaymiz:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Sxemani yangilash uchun Hive-da qayta ro'yxatdan o'tish qoladi.
Biroq, bu erda muammolar paydo bo'ladi.

Birinchi muammo. Ertami-kechmi, paydo bo'lgan parketni o'qib bo'lmaydi. Bu parket va JSON bo'sh maydonlarga qanday munosabatda bo'lishi bilan bog'liq.

Keling, odatiy vaziyatni ko'rib chiqaylik. Masalan, kecha JSON keldi:

День 1: {"a": {"b": 1}},

va bugungi kunda xuddi shu JSON shunday ko'rinadi:

День 2: {"a": null}

Aytaylik, bizda ikkita turli bo'lim bor, ularning har biri bitta qatorga ega.
Butun manba ma'lumotlarini o'qib chiqqach, Spark turini aniqlay oladi va "a" "struktura" tipidagi maydon ekanligini va INT tipidagi "b" ichki o'rnatilgan maydon ekanligini tushunadi. Ammo, agar har bir bo'lim alohida saqlangan bo'lsa, biz mos kelmaydigan bo'lim sxemalari bilan parket olamiz:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Bu holat yaxshi ma'lum, shuning uchun maxsus variant qo'shildi - manba ma'lumotlarini tahlil qilishda bo'sh maydonlarni olib tashlang:

df = spark.read.json("...", dropFieldIfAllNull=True)

Bunday holda, parket birgalikda o'qilishi mumkin bo'lgan qismlardan iborat bo'ladi.
Garchi buni amalda qilganlar bu erda achchiq tabassum qiladilar. Nega? Ha, chunki yana ikkita vaziyat bo'lishi mumkin. Yoki uchta. Yoki to'rtta. Birinchisi, deyarli albatta sodir bo'ladi, raqamli turlar turli JSON fayllarida har xil ko'rinadi. Masalan, {intField: 1} va {intField: 1.1}. Agar bunday maydonlar bitta bo'limda topilsa, u holda sxemani birlashtirish hamma narsani to'g'ri o'qiydi va bu eng aniq turga olib keladi. Ammo har xil bo'lsa, unda birida intField: int, ikkinchisida intField: double bo'ladi.

Ushbu vaziyatni hal qilish uchun quyidagi bayroq mavjud:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Endi bizda bitta dataframega o'qilishi mumkin bo'lgan bo'limlar va butun vitrinaning haqiqiy parketi mavjud bo'lgan papka mavjud. Ha? Yo'q.

Biz jadvalni Hive-da ro'yxatdan o'tkazganimizni eslashimiz kerak. Maydon nomlarida Hive katta-kichik harflarga sezgir emas, parket esa katta-kichik harflarga sezgir. Shuning uchun, sxemali bo'limlar: field1: int va Field1: int Hive uchun bir xil, lekin Spark uchun emas. Maydon nomlarini kichik harflarga aylantirishni unutmang.

Shundan so'ng, hamma narsa yaxshi bo'lib tuyuladi.

Biroq, hammasi ham oddiy emas. Ikkinchi, shuningdek, taniqli muammo bor. Har bir yangi bo'lim alohida saqlanganligi sababli, bo'lim papkasida Spark xizmat fayllari bo'ladi, masalan, _SUCCESS operatsiya muvaffaqiyati bayrog'i. Bu parketga urinishda xatolikka olib keladi. Bunga yo'l qo'ymaslik uchun Spark papkaga xizmat fayllarini qo'shishiga yo'l qo'ymaslik uchun konfiguratsiyani sozlashingiz kerak:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Ko'rinishidan, endi har kuni yangi parket bo'limi kun uchun tahlil qilingan ma'lumotlar joylashgan maqsadli vitrin papkasiga qo'shiladi. Ma'lumotlar turiga zid bo'lgan bo'limlar yo'qligiga oldindan g'amxo'rlik qildik.

Ammo bizda uchinchi muammo bor. Endi umumiy sxema noma'lum, bundan tashqari, Hive-dagi jadval noto'g'ri sxemaga ega, chunki har bir yangi bo'lim, ehtimol, sxemaga buzilish kiritgan.

Jadvalni qayta ro'yxatdan o'tkazishingiz kerak. Buni oddiygina qilish mumkin: vitrina parketini qayta o'qing, sxemani oling va uning asosida DDL yarating, uning yordamida Hive-dagi papkani tashqi jadval sifatida qayta ro'yxatdan o'tkazing, maqsadli vitrina sxemasini yangilang.

Bizda to'rtinchi muammo bor. Jadvalni birinchi marta ro'yxatdan o'tkazganimizda, biz Spark-ga tayandik. Endi biz buni o'zimiz qilamiz va parket maydonlari Hive uchun ruxsat etilmagan belgilar bilan boshlanishi mumkinligini unutmasligimiz kerak. Misol uchun, Spark "corrupt_record" maydonida tahlil qila olmaydigan qatorlarni chiqaradi. Bunday maydonni qochib qutulmagan holda Hive-da ro'yxatdan o'tkazib bo'lmaydi.

Buni bilib, biz sxemani olamiz:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

da ("_corrupt_record", "`_corrupt_record`") + " " + f[1].almashtirish(":", "`:").almashtirish("<", "<`").almashtirish(",", ",`").replace("massiv<`", "massiv<") xavfsiz DDL qiladi, ya'ni:

create table tname (_field1 string, 1field string)

"_field1, 1field" kabi maydon nomlari bilan xavfsiz DDL maydon nomlari qochib ketgan joyda amalga oshiriladi: "tname" jadvalini yarating ("_field1", "1field" qatori).

Savol tug'iladi: to'liq sxema (pf kodida) bilan dataframeni qanday qilib to'g'ri olish mumkin? Bu pfni qanday olish mumkin? Bu beshinchi muammo. Maqsadli vitrinaning parket fayllari bo'lgan papkadan barcha bo'limlarning sxemasini qayta o'qing? Bu usul eng xavfsiz, ammo qiyin.

Sxema allaqachon Hive-da. Butun jadval va yangi bo'limning sxemasini birlashtirib, yangi sxemani olishingiz mumkin. Shunday qilib, siz Hive-dan jadval sxemasini olishingiz va uni yangi bo'lim sxemasi bilan birlashtirishingiz kerak. Buni Hive’dan test metama’lumotlarini o‘qish, uni vaqtinchalik jildga saqlash va ikkala bo‘limni birdaniga o‘qish uchun Spark’dan foydalanish orqali amalga oshirish mumkin.

Aslida, sizga kerak bo'lgan hamma narsa mavjud: Hive-dagi asl jadval sxemasi va yangi bo'lim. Bizda ham ma'lumotlar bor. Yaratilgan bo'limdan do'kon sxemasi va yangi maydonlarni birlashtirgan yangi sxemani olishgina qoladi:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Keyinchalik, oldingi parchada bo'lgani kabi, jadvalni ro'yxatga olish DDL ni yaratamiz.
Agar butun zanjir to'g'ri ishlayotgan bo'lsa, ya'ni ishga tushirish yuki bo'lsa va Hive-da jadval to'g'ri yaratilgan bo'lsa, biz yangilangan jadval sxemasini olamiz.

Va oxirgi muammo shundaki, siz shunchaki Hive jadvaliga bo'lim qo'sha olmaysiz, chunki u buziladi. Hive-ni uning bo'lim tuzilishini tuzatishga majbur qilishingiz kerak:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

JSON-ni o'qish va uning asosida vitrina yaratish kabi oddiy vazifa bir qator noaniq qiyinchiliklarni, yechimlarni alohida izlashga to'g'ri keladi. Va bu echimlar oddiy bo'lsa-da, ularni topish uchun ko'p vaqt talab etiladi.

Vitrina qurilishini amalga oshirish uchun men quyidagilarga majbur bo'ldim:

  • Vitringa bo'limlar qo'shing, xizmat fayllaridan xalos bo'ling
  • Spark kiritgan manba ma'lumotlaridagi bo'sh maydonlar bilan ishlang
  • Oddiy turlarni satrga chiqarish
  • Maydon nomlarini kichik harflarga aylantiring
  • Hive-da alohida ma'lumotlarni yuklash va jadvallarni ro'yxatdan o'tkazish (DDL avlodi)
  • Hive bilan mos kelmasligi mumkin bo'lgan maydon nomlaridan qochishni unutmang
  • Hive-da jadval ro'yxatini qanday yangilashni bilib oling

Xulosa qilib shuni ta'kidlaymizki, do'kon oynalarini qurish qarori ko'plab tuzoqlarga to'la. Shuning uchun, amalga oshirishda qiyinchiliklar yuzaga kelgan taqdirda, muvaffaqiyatli tajribaga ega bo'lgan tajribali sherik bilan bog'lanish yaxshiroqdir.

Ushbu maqolani o'qiganingiz uchun tashakkur, umid qilamizki, siz foydali ma'lumot topasiz.

Manba: www.habr.com

a Izoh qo'shish