Qığılcım sxemi Təcrübədə təkamül

Əziz oxucular, gününüz xeyir!

Bu məqalədə Neoflex-in Big Data Solutions biznes sahəsinin aparıcı məsləhətçisi Apache Spark-dan istifadə edərək dəyişən struktur vitrinlərinin qurulması variantlarını ətraflı təsvir edir.

Məlumatların təhlili layihəsinin bir hissəsi olaraq, tez-tez boş strukturlaşdırılmış məlumatlara əsaslanan vitrinlərin qurulması vəzifəsi ortaya çıxır.

Adətən bunlar JSON və ya XML kimi yadda saxlanılan müxtəlif sistemlərin qeydləri və ya cavablarıdır. Məlumat Hadoop-a yüklənir, sonra onlardan bir vitrin qurmalısınız. Biz, məsələn, Impala vasitəsilə yaradılmış vitrinə girişi təşkil edə bilərik.

Bu halda, hədəf vitrininin sxemi əvvəlcədən məlum deyil. Üstəlik, sxem də əvvəlcədən tərtib edilə bilməz, çünki bu, məlumatlardan asılıdır və biz bu çox zəif strukturlaşdırılmış məlumatlarla məşğul oluruq.

Məsələn, bu gün aşağıdakı cavab qeyd olunur:

{source: "app1", error_code: ""}

və sabah eyni sistemdən aşağıdakı cavab gəlir:

{source: "app1", error_code: "error", description: "Network error"}

Nəticədə vitrinə başqa bir sahə əlavə edilməlidir - təsvir və onun gəlib-gəlməyəcəyini heç kim bilmir.

Bu cür məlumatlar üzərində mart yaratmaq vəzifəsi kifayət qədər standartdır və Spark-ın bunun üçün bir sıra alətləri var. Mənbə məlumatlarının təhlili üçün həm JSON, həm də XML üçün dəstək var və əvvəllər məlum olmayan sxem üçün schemaEvolution dəstəyi təmin edilir.

İlk baxışdan həll yolu sadə görünür. Siz JSON ilə bir qovluq götürməli və onu dataframe-də oxumalısınız. Spark sxem yaradacaq, iç-içə məlumatları strukturlara çevirəcək. Bundan əlavə, hər şeyi Hive metastore-da vitrin qeydiyyatdan keçirərək Impala-da dəstəklənən parketdə saxlamaq lazımdır.

Hər şey sadə görünür.

Bununla belə, praktikada bir sıra problemlərlə nə etmək lazım olduğu sənədlərdəki qısa nümunələrdən aydın deyil.

Sənədlər vitrin yaratmaq üçün deyil, JSON və ya XML-i dataframe-də oxumaq üçün yanaşmanı təsvir edir.

Yəni, sadəcə JSON-u necə oxumaq və təhlil etmək lazım olduğunu göstərir:

df = spark.read.json(path...)

Bu, məlumatları Spark-a təqdim etmək üçün kifayətdir.

Praktikada skript sadəcə JSON fayllarını qovluqdan oxumaqdan və dataframe yaratmaqdan daha mürəkkəbdir. Vəziyyət belə görünür: artıq müəyyən bir vitrin var, hər gün yeni məlumatlar daxil olur, sxemin fərqli ola biləcəyini unutmadan vitrinə əlavə etmək lazımdır.

Bir vitrin qurmaq üçün adi sxem aşağıdakı kimidir:

1 Adım. Məlumat sonrakı gündəlik yenidən yükləmə ilə Hadoop-a yüklənir və yeni bölməyə əlavə olunur. Günə bölünmüş ilkin məlumatları olan bir qovluq çıxır.

2 Adım. İlkin yükləmə zamanı bu qovluq Spark tərəfindən oxunur və təhlil edilir. Nəticə məlumat çərçivəsi təhlil edilə bilən formatda, məsələn, parketdə saxlanılır və sonra Impalaya idxal edilə bilər. Bu, bu nöqtəyə qədər toplanmış bütün məlumatlarla hədəf vitrin yaradır.

3 Adım. Hər gün vitrin yenilənəcək yükləmə yaradılır.
Artan yüklənmə, vitrin arakəsmələrə ehtiyac və vitrin ümumi sxeminin saxlanılması məsələsi var.

Bir misal verək. Deyək ki, repozitoriyanın qurulmasının ilk addımı həyata keçirilib və JSON fayllarının qovluğa yüklənməsi konfiqurasiya edilib.

Onlardan dataframe yaratmaq, sonra onu vitrin kimi saxlamaq problem deyil. Bu, Spark sənədlərində asanlıqla tapıla bilən ilk addımdır:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Hər şey qaydasındadır.

Biz JSON-u oxuduq və təhlil etdik, sonra dataframeni parket kimi saxlayırıq, onu Hive-da istənilən rahat şəkildə qeyd edirik:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Bir pəncərə alırıq.

Ancaq ertəsi gün mənbədən yeni məlumatlar əlavə edildi. JSON ilə qovluğumuz və bu qovluqdan yaradılmış vitrinimiz var. Mənbədən məlumatların növbəti partiyasını yüklədikdən sonra data marketdə bir günlük məlumat çatışmır.

Məntiqi həll vitrin gün ərzində bölmək olardı ki, bu da hər növbəti gün yeni bölmə əlavə etməyə imkan verəcək. Bunun mexanizmi də yaxşı məlumdur, Spark arakəsmələri ayrıca yazmağa imkan verir.

Birincisi, biz ilkin yükləməni həyata keçiririk, məlumatları yuxarıda təsvir edildiyi kimi saxlayırıq, yalnız bölmə əlavə edirik. Bu əməliyyat vitrin başlanğıcı adlanır və yalnız bir dəfə edilir:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Ertəsi gün biz yalnız yeni bölməni yükləyirik:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Sxemi yeniləmək üçün Hive-da yenidən qeydiyyatdan keçmək qalır.
Ancaq burada problemlər yaranır.

Birinci problem. Gec-tez ortaya çıxan parket oxunmaz olacaq. Bu, parket və JSON-un boş sahələrə fərqli yanaşması ilə bağlıdır.

Tipik bir vəziyyəti nəzərdən keçirək. Məsələn, dünən JSON gəldi:

День 1: {"a": {"b": 1}},

və bu gün eyni JSON belə görünür:

День 2: {"a": null}

Tutaq ki, hər birində bir xətt olan iki fərqli bölməmiz var.
Bütün mənbə məlumatlarını oxuduqda, Spark növü müəyyən edə biləcək və "a" in "struktur" tipli bir sahə olduğunu və INT tipli "b" yuvası sahəsi olduğunu başa düşəcək. Ancaq hər bir bölmə ayrıca saxlanılıbsa, onda uyğun olmayan bölmə sxemləri olan bir parket alırıq:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Bu vəziyyət yaxşı məlumdur, buna görə də xüsusi olaraq bir seçim əlavə edilmişdir - mənbə məlumatlarını təhlil edərkən boş sahələri çıxarın:

df = spark.read.json("...", dropFieldIfAllNull=True)

Bu vəziyyətdə, parket birlikdə oxuna bilən arakəsmələrdən ibarət olacaqdır.
Baxmayaraq ki, bunu praktikada edənlər burada acı-acı gülümsəyəcəklər. Niyə? Bəli, çünki daha iki vəziyyətin olması ehtimalı var. Və ya üç. Və ya dörd. Birincisi, demək olar ki, baş verəcək, rəqəmsal növlərin müxtəlif JSON fayllarında fərqli görünməsidir. Məsələn, {intField: 1} və {intField: 1.1}. Əgər belə sahələr bir bölmədə tapılarsa, o zaman birləşmə sxemi hər şeyi düzgün oxuyacaq və ən dəqiq tipə gətirib çıxaracaq. Fərqli olanlar varsa, onda birində intField: int, digərində isə intField: double olacaq.

Bu vəziyyəti idarə etmək üçün aşağıdakı bayraq var:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

İndi bizdə arakəsmələrin yerləşdiyi bir qovluq var, onu bir dataframe və bütün vitrin üçün etibarlı parket kimi oxumaq olar. Bəli? Yox.

Biz masanı Hive-də qeyd etdiyimizi xatırlamalıyıq. Pətək sahə adlarında hərflərə həssas deyil, parket isə hərflərə həssasdır. Buna görə də, sxemləri olan bölmələr: field1: int və Field1: int Hive üçün eynidir, lakin Spark üçün deyil. Sahə adlarını kiçik hərflərə çevirməyi unutmayın.

Bundan sonra hər şey yaxşı görünür.

Ancaq hər şey o qədər də sadə deyil. İkinci, həm də məlum problem var. Hər bir yeni bölmə ayrıca saxlandığından, bölmə qovluğunda Spark xidmət faylları, məsələn, _SUCCESS əməliyyatının uğur bayrağı olacaq. Bu, parket çəkməyə çalışarkən səhvlə nəticələnəcək. Bunun qarşısını almaq üçün Spark-ın qovluğa xidmət faylları əlavə etməsinin qarşısını almaq üçün konfiqurasiyanı konfiqurasiya etməlisiniz:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Görünür, indi hər gün yeni bir parket bölməsi gün üçün təhlil edilmiş məlumatların yerləşdiyi hədəf vitrin qovluğuna əlavə olunur. Məlumat növü ilə ziddiyyət təşkil edən bölmələrin olmamasına əvvəlcədən diqqət yetirdik.

Ancaq üçüncü problemimiz var. İndi ümumi sxem məlum deyil, üstəlik, Hive-dəki cədvəl yanlış sxemə malikdir, çünki hər yeni bölmə çox güman ki, sxemə bir təhrif təqdim etdi.

Cədvəli yenidən qeydiyyatdan keçirməlisiniz. Bunu sadəcə etmək olar: vitrin parketini yenidən oxuyun, sxemi götürün və onun əsasında DDL yaradın, bununla da Hive-da qovluğu xarici cədvəl kimi yenidən qeydiyyatdan keçirin, hədəf vitrin sxemini yeniləyin.

Dördüncü problemimiz var. Cədvəli ilk dəfə qeydiyyatdan keçirəndə Spark-a etibar etdik. İndi biz bunu özümüz edirik və parket sahələri Hive üçün icazə verilməyən simvollarla başlaya biləcəyini xatırlamalıyıq. Məsələn, Spark "corrupt_record" sahəsində təhlil edə bilmədiyi sətirləri atır. Belə bir sahə qaçış olmadan Hive-da qeydə alına bilməz.

Bunu bilərək, sxemi alırıq:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Kod ("_corrupt_record", "`_corrupt_record`") + " " + f[1].əvəz edin(":", "`:").əvəz edin("<", "<`").əvəz edin(",", ",`").replace("massiv<`", "massiv<") təhlükəsiz DDL edir, yəni:

create table tname (_field1 string, 1field string)

"_field1, 1field" kimi sahə adları ilə təhlükəsiz DDL sahə adlarının qaçdığı yerlərdə edilir: `tname` cədvəli yaradın (`_field1` sətri, `1field` sətri).

Sual yaranır: tam sxemi (pf kodunda) olan dataframe-ni necə düzgün əldə etmək olar? Bu pf-ni necə əldə etmək olar? Bu, beşinci problemdir. Hədəf vitrininin parket faylları olan qovluqdan bütün bölmələrin sxemini yenidən oxuyun? Bu üsul ən təhlükəsizdir, lakin çətindir.

Sxema artıq Hive-dadır. Bütün cədvəlin sxemini və yeni bölməni birləşdirərək yeni sxem əldə edə bilərsiniz. Beləliklə, siz Hive-dan cədvəl sxemini götürməli və onu yeni bölmənin sxemi ilə birləşdirməlisiniz. Bu, Hive-dan test metadatasını oxumaqla, onu müvəqqəti qovluğa saxlamaqla və hər iki bölməni birdən oxumaq üçün Spark-dan istifadə etməklə edilə bilər.

Əslində, sizə lazım olan hər şey var: Hive-da orijinal cədvəl sxemi və yeni bölmə. Məlumatlarımız da var. Yalnız vitrin sxemini və yaradılmış bölmədən yeni sahələri birləşdirən yeni sxem əldə etmək qalır:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Sonra, əvvəlki parçada olduğu kimi cədvəl qeydiyyatı DDL yaradırıq.
Bütün zəncir düzgün işləyirsə, yəni başlanğıc yükü olubsa və Hive-də cədvəl düzgün yaradılıbsa, yenilənmiş cədvəl sxemini alırıq.

Son problem, Hive cədvəlinə asanlıqla bölmə əlavə edə bilməməyinizdir, çünki o, qırılacaq. Hive-ı bölmə quruluşunu düzəltməyə məcbur etməlisiniz:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

JSON-u oxumaq və onun əsasında vitrin yaratmaq kimi sadə tapşırıq bir sıra gizli çətinliklərin, həllərin ayrıca axtarmalı olduğunuz həllərin aradan qaldırılması ilə nəticələnir. Və bu həllər sadə olsa da, onları tapmaq çox vaxt aparır.

Vitrin tikintisini həyata keçirmək üçün mən etməli oldum:

  • Xidmət fayllarından xilas olaraq vitrinə bölmələr əlavə edin
  • Spark-ın yazdığı mənbə məlumatında boş sahələrlə məşğul olun
  • Sadə növləri sətirə köçürün
  • Sahə adlarını kiçik hərflərə çevirin
  • Hive-da ayrı məlumat yükləməsi və cədvəl qeydiyyatı (DDL nəsli)
  • Hive ilə uyğun gəlməyən sahə adlarından qaçmağı unutmayın
  • Hive-da cədvəl qeydiyyatını necə yeniləməyi öyrənin

Xülasə etmək üçün qeyd edirik ki, vitrinlər qurmaq qərarı bir çox tələlərlə doludur. Buna görə də, həyata keçirməkdə çətinliklər yaranarsa, uğurlu təcrübəyə malik təcrübəli tərəfdaşa müraciət etmək daha yaxşıdır.

Bu məqaləni oxuduğunuz üçün təşəkkür edirik, ümid edirik ki, məlumat faydalı olacaq.

Mənbə: www.habr.com

Добавить комментарий