Percikan skemaEvolusi dalam amalan

Pembaca yang dihormati, selamat hari!

Dalam artikel ini, perunding terkemuka kawasan perniagaan Penyelesaian Data Besar Neoflex menerangkan secara terperinci pilihan untuk membina pameran struktur berubah menggunakan Apache Spark.

Sebagai sebahagian daripada projek analisis data, tugas membina etalase berdasarkan data berstruktur longgar sering timbul.

Biasanya ini adalah log, atau respons daripada pelbagai sistem, disimpan sebagai JSON atau XML. Data dimuat naik ke Hadoop, maka anda perlu membina etalase daripadanya. Kami boleh mengatur akses kepada pameran yang dibuat, contohnya, melalui Impala.

Dalam kes ini, skema etalase sasaran tidak diketahui terlebih dahulu. Selain itu, skim ini juga tidak boleh disediakan terlebih dahulu, kerana ia bergantung pada data, dan kami sedang berurusan dengan data berstruktur yang sangat longgar ini.

Sebagai contoh, hari ini respons berikut direkodkan:

{source: "app1", error_code: ""}

dan esok dari sistem yang sama datang jawapan berikut:

{source: "app1", error_code: "error", description: "Network error"}

Akibatnya, satu lagi medan harus ditambahkan pada pameran - penerangan, dan tiada siapa yang tahu sama ada ia akan datang atau tidak.

Tugas untuk mencipta etalase pada data sedemikian adalah agak standard, dan Spark mempunyai beberapa alat untuk ini. Untuk menghuraikan data sumber, terdapat sokongan untuk JSON dan XML, dan untuk skema yang tidak diketahui sebelum ini, sokongan untuk schemaEvolution disediakan.

Pada pandangan pertama, penyelesaiannya kelihatan mudah. Anda perlu mengambil folder dengan JSON dan membacanya ke dalam bingkai data. Spark akan membuat skema, menukar data bersarang kepada struktur. Selanjutnya, segala-galanya perlu disimpan dalam parket, yang juga disokong dalam Impala, dengan mendaftarkan etalase dalam metastore Hive.

Semuanya nampak mudah.

Walau bagaimanapun, tidak jelas daripada contoh ringkas dalam dokumentasi apa yang perlu dilakukan dengan beberapa masalah dalam amalan.

Dokumentasi menerangkan pendekatan bukan untuk mencipta etalase, tetapi untuk membaca JSON atau XML ke dalam bingkai data.

Iaitu, ia hanya menunjukkan cara membaca dan menghuraikan JSON:

df = spark.read.json(path...)

Ini sudah cukup untuk menjadikan data tersedia kepada Spark.

Dalam amalan, skrip adalah lebih rumit daripada hanya membaca fail JSON dari folder dan mencipta bingkai data. Keadaannya kelihatan seperti ini: sudah ada etalase tertentu, data baharu masuk setiap hari, mereka perlu ditambah ke etalase, tidak lupa bahawa skema mungkin berbeza.

Skim biasa untuk membina pameran adalah seperti berikut:

Langkah 1. Data dimuatkan ke dalam Hadoop dengan pemuatan semula harian berikutnya dan ditambah pada partition baharu. Ternyata folder dengan data awal dibahagikan mengikut hari.

Langkah 2. Semasa pemuatan awal, folder ini dibaca dan dihuraikan oleh Spark. Bingkai data yang terhasil disimpan dalam format parsable, contohnya, dalam parket, yang kemudiannya boleh diimport ke Impala. Ini mewujudkan pameran sasaran dengan semua data yang telah terkumpul sehingga tahap ini.

Langkah 3. Muat turun dibuat yang akan mengemas kini etalase setiap hari.
Terdapat persoalan mengenai pemuatan tambahan, keperluan untuk membahagikan showcase, dan persoalan mengekalkan skema umum showcase.

Mari kita ambil contoh. Katakan langkah pertama membina repositori telah dilaksanakan dan fail JSON dimuat naik ke folder.

Mencipta bingkai data daripada mereka, kemudian menyimpannya sebagai pameran, tidak menjadi masalah. Ini adalah langkah pertama yang boleh didapati dengan mudah dalam dokumentasi Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Semuanya nampak baik-baik saja.

Kami membaca dan menghuraikan JSON, kemudian kami menyimpan bingkai data sebagai parket, mendaftarkannya dalam Hive dengan cara yang mudah:

df.write.format(β€œparquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Kami mendapat tingkap.

Tetapi, keesokan harinya, data baharu daripada sumber itu telah ditambah. Kami mempunyai folder dengan JSON dan pameran yang dibuat daripada folder ini. Selepas memuatkan kumpulan data seterusnya daripada sumber, data mart kehilangan data bernilai satu hari.

Penyelesaian logik adalah untuk membahagikan etalase mengikut hari, yang akan membolehkan menambah partition baharu setiap hari berikutnya. Mekanisme untuk ini juga terkenal, Spark membolehkan anda menulis partition secara berasingan.

Mula-mula, kami melakukan beban awal, menyimpan data seperti yang diterangkan di atas, hanya menambah pembahagian. Tindakan ini dipanggil permulaan etalase dan dilakukan sekali sahaja:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Keesokan harinya, kami hanya memuatkan partition baharu:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Yang tinggal hanyalah mendaftar semula dalam Hive untuk mengemas kini skema.
Namun, di sinilah masalah timbul.

Masalah pertama. Lambat laun, parket yang terhasil tidak boleh dibaca. Ini disebabkan oleh cara parket dan JSON merawat medan kosong secara berbeza.

Mari kita pertimbangkan situasi biasa. Sebagai contoh, semalam JSON tiba:

Π”Π΅Π½ΡŒ 1: {"a": {"b": 1}},

dan hari ini JSON yang sama kelihatan seperti ini:

Π”Π΅Π½ΡŒ 2: {"a": null}

Katakan kita mempunyai dua partition berbeza, setiap satu dengan satu baris.
Apabila kita membaca keseluruhan data sumber, Spark akan dapat menentukan jenis, dan akan memahami bahawa "a" ialah medan jenis "struktur", dengan medan bersarang "b" jenis INT. Tetapi, jika setiap partition disimpan secara berasingan, maka kami mendapat parket dengan skema partition yang tidak serasi:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Keadaan ini diketahui umum, jadi pilihan telah ditambah khas - apabila menghuraikan data sumber, alih keluar medan kosong:

df = spark.read.json("...", dropFieldIfAllNull=True)

Dalam kes ini, parket akan terdiri daripada partition yang boleh dibaca bersama.
Walaupun mereka yang telah melakukan ini dalam amalan akan tersenyum pahit di sini. kenapa? Ya, kerana kemungkinan terdapat dua lagi situasi. Atau tiga. Atau empat. Yang pertama, yang hampir pasti akan berlaku, ialah jenis angka akan kelihatan berbeza dalam fail JSON yang berbeza. Contohnya, {intField: 1} dan {intField: 1.1}. Jika medan sedemikian ditemui dalam satu partition, maka gabungan skema akan membaca semuanya dengan betul, membawa kepada jenis yang paling tepat. Tetapi jika dalam yang berbeza, maka satu akan mempunyai intField: int, dan satu lagi akan mempunyai intField: double.

Terdapat bendera berikut untuk menangani situasi ini:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Kini kami mempunyai folder di mana terdapat sekatan yang boleh dibaca ke dalam satu bingkai data dan parket yang sah untuk keseluruhan pameran. ya? Tidak.

Kita mesti ingat bahawa kita telah mendaftarkan jadual dalam Hive. Hive tidak sensitif huruf besar dan kecil dalam nama medan, manakala parket sensitif huruf besar. Oleh itu, partition dengan skema: field1: int, dan Field1: int adalah sama untuk Hive, tetapi bukan untuk Spark. Jangan lupa untuk menukar nama medan kepada huruf kecil.

Selepas itu, semuanya kelihatan baik-baik saja.

Walau bagaimanapun, tidak semuanya begitu mudah. Terdapat masalah kedua, juga terkenal. Memandangkan setiap partition baharu disimpan secara berasingan, folder partition akan mengandungi fail perkhidmatan Spark, contohnya, bendera kejayaan operasi _SUCCESS. Ini akan mengakibatkan ralat semasa mencuba parket. Untuk mengelakkan ini, anda perlu mengkonfigurasi konfigurasi untuk menghalang Spark daripada menambah fail perkhidmatan ke folder:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Nampaknya kini setiap hari partition parket baharu ditambahkan pada folder pameran sasaran, di mana data yang dihuraikan untuk hari itu berada. Kami berhati-hati terlebih dahulu bahawa tiada partition dengan konflik jenis data.

Tetapi, kita mempunyai masalah ketiga. Kini skema umum tidak diketahui, lebih-lebih lagi, jadual dalam Hive mempunyai skema yang salah, kerana setiap partition baharu berkemungkinan besar memperkenalkan herotan ke dalam skema.

Anda perlu mendaftar semula jadual. Ini boleh dilakukan dengan mudah: baca parket etalase sekali lagi, ambil skema dan buat DDL berdasarkannya, untuk mendaftar semula folder dalam Hive sebagai jadual luaran, mengemas kini skema etalase sasaran.

Kami mempunyai masalah keempat. Apabila kami mendaftarkan jadual untuk kali pertama, kami bergantung pada Spark. Sekarang kita melakukannya sendiri, dan kita perlu ingat bahawa medan parket boleh bermula dengan aksara yang tidak dibenarkan untuk Hive. Sebagai contoh, Spark membuang baris yang tidak dapat dihuraikan dalam medan "corrupt_record". Medan sedemikian tidak boleh didaftarkan dalam Hive tanpa dilepaskan.

Mengetahui ini, kami mendapat skema:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Kod ("_record_corrupt", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") menjadikan DDL selamat, iaitu bukannya:

create table tname (_field1 string, 1field string)

Dengan nama medan seperti "_field1, 1field", DDL selamat dibuat di mana nama medan dilepaskan: buat jadual `tname` (rentetan `_field1`, rentetan `1field`).

Persoalannya timbul: bagaimana untuk mendapatkan bingkai data dengan skema lengkap (dalam kod pf) dengan betul? Bagaimana untuk mendapatkan pf ini? Ini adalah masalah kelima. Baca semula skema semua partition dari folder dengan fail parket pameran sasaran? Kaedah ini adalah yang paling selamat, tetapi sukar.

Skema sudah ada dalam Hive. Anda boleh mendapatkan skema baharu dengan menggabungkan skema keseluruhan jadual dan partition baharu. Oleh itu, anda perlu mengambil skema jadual dari Hive dan menggabungkannya dengan skema partition baharu. Ini boleh dilakukan dengan membaca metadata ujian daripada Hive, menyimpannya ke folder sementara dan menggunakan Spark untuk membaca kedua-dua partition sekaligus.

Sebenarnya, terdapat semua yang anda perlukan: skema jadual asal dalam Hive dan partition baharu. Kami juga mempunyai data. Ia kekal hanya untuk mendapatkan skema baharu yang menggabungkan skema etalase dan medan baharu daripada partition yang dibuat:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Seterusnya, kami mencipta DDL pendaftaran jadual, seperti dalam coretan sebelumnya.
Jika keseluruhan rantai berfungsi dengan betul, iaitu, terdapat beban permulaan, dan jadual telah dibuat dengan betul dalam Hive, maka kami mendapat skema jadual yang dikemas kini.

Dan masalah terakhir ialah anda tidak boleh hanya menambah partition pada jadual Hive, kerana ia akan rosak. Anda perlu memaksa Hive untuk membetulkan struktur partitionnya:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Tugas mudah membaca JSON dan mencipta etalase berdasarkannya menghasilkan mengatasi beberapa kesukaran tersirat, penyelesaian yang perlu anda cari secara berasingan. Dan walaupun penyelesaian ini mudah, ia memerlukan banyak masa untuk mencarinya.

Untuk melaksanakan pembinaan pameran, saya terpaksa:

  • Tambahkan partition pada showcase, menyingkirkan fail perkhidmatan
  • Berurusan dengan medan kosong dalam data sumber yang telah ditaip oleh Spark
  • Hantar jenis mudah kepada rentetan
  • Tukar nama medan kepada huruf kecil
  • Muat naik data yang berasingan dan pendaftaran jadual dalam Hive (penjanaan DDL)
  • Jangan lupa untuk melepaskan nama medan yang mungkin tidak serasi dengan Hive
  • Ketahui cara mengemas kini pendaftaran jadual dalam Hive

Kesimpulannya, kami perhatikan bahawa keputusan untuk membina tingkap kedai penuh dengan banyak perangkap. Oleh itu, sekiranya berlaku kesukaran dalam pelaksanaan, adalah lebih baik untuk menghubungi rakan kongsi yang berpengalaman dengan kepakaran yang berjaya.

Terima kasih kerana membaca artikel ini, kami harap anda mendapat maklumat yang berguna.

Sumber: www.habr.com

Tambah komen