Percikan skema Evolusi dalam praktiknya

Pembaca yang budiman, selamat siang!

Dalam artikel ini, konsultan terkemuka di area bisnis Solusi Big Data Neoflex menjelaskan secara rinci opsi untuk membangun etalase struktur variabel menggunakan Apache Spark.

Sebagai bagian dari proyek analisis data, tugas membangun etalase berdasarkan data yang terstruktur secara longgar sering kali muncul.

Biasanya ini adalah log, atau tanggapan dari berbagai sistem, disimpan sebagai JSON atau XML. Data diunggah ke Hadoop, lalu Anda perlu membuat etalase darinya. Kita bisa mengatur akses ke etalase yang dibuat, misalnya melalui Impala.

Dalam hal ini, skema etalase target tidak diketahui sebelumnya. Selain itu, skema ini juga tidak dapat dibuat terlebih dahulu, karena bergantung pada data, dan kita berurusan dengan data yang terstruktur sangat longgar.

Misalnya, hari ini respons berikut dicatat:

{source: "app1", error_code: ""}

dan besok dari sistem yang sama muncul jawaban berikut:

{source: "app1", error_code: "error", description: "Network error"}

Akibatnya, bidang lain harus ditambahkan ke etalase - deskripsi, dan tidak ada yang tahu apakah itu akan datang atau tidak.

Tugas membuat etalase pada data tersebut cukup standar, dan Spark memiliki sejumlah alat untuk ini. Untuk mengurai data sumber, terdapat dukungan untuk JSON dan XML, dan untuk skema yang sebelumnya tidak diketahui, dukungan untuk skemaEvolution disediakan.

Sekilas, solusinya terlihat sederhana. Anda perlu mengambil folder dengan JSON dan membacanya ke dalam kerangka data. Spark akan membuat skema, mengubah data bersarang menjadi struktur. Selanjutnya, semuanya perlu disimpan di parket, yang juga didukung di Impala, dengan mendaftarkan etalase di metastore Hive.

Segalanya tampak sederhana.

Namun, tidak jelas dari contoh singkat dalam dokumentasi apa yang harus dilakukan terhadap sejumlah masalah dalam praktiknya.

Dokumentasi menjelaskan pendekatan untuk tidak membuat etalase, tetapi untuk membaca JSON atau XML ke dalam kerangka data.

Yaitu, ini hanya menunjukkan cara membaca dan mengurai JSON:

df = spark.read.json(path...)

Ini cukup untuk membuat data tersedia untuk Spark.

Dalam praktiknya, skrip jauh lebih rumit daripada sekadar membaca file JSON dari folder dan membuat kerangka data. Situasinya begini: sudah ada etalase tertentu, data baru masuk setiap hari, perlu ditambahkan ke etalase, jangan lupa skemanya mungkin berbeda.

Skema biasa untuk membangun etalase adalah sebagai berikut:

Langkah 1. Data dimuat ke Hadoop, diikuti dengan memuat ulang setiap hari dan ditambahkan ke partisi baru. Ternyata folder dengan data awal dipartisi berdasarkan hari.

Langkah 2. Selama pemuatan awal, folder ini dibaca dan diuraikan oleh Spark. Kerangka data yang dihasilkan disimpan dalam format parsable, misalnya di parket, yang kemudian dapat diimpor ke Impala. Ini menciptakan tampilan target dengan semua data yang telah terakumulasi hingga saat ini.

Langkah 3. Unduhan dibuat yang akan memperbarui etalase setiap hari.
Ada pertanyaan tentang pemuatan tambahan, kebutuhan untuk mempartisi etalase, dan pertanyaan tentang mempertahankan skema umum etalase.

Mari kita ambil contoh. Katakanlah langkah pertama membangun repositori telah diterapkan, dan file JSON diunggah ke sebuah folder.

Membuat kerangka data dari mereka, lalu menyimpannya sebagai etalase, tidak menjadi masalah. Ini adalah langkah pertama yang dapat dengan mudah ditemukan di dokumentasi Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Semuanya tampaknya baik-baik saja.

Kami membaca dan menguraikan JSON, lalu kami menyimpan kerangka data sebagai parket, mendaftarkannya di Hive dengan cara apa pun yang nyaman:

df.write.format(β€œparquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Kami mendapatkan jendela.

Tapi, keesokan harinya, data baru dari sumbernya ditambahkan. Kami memiliki folder dengan JSON, dan etalase dibuat dari folder ini. Setelah memuat kumpulan data berikutnya dari sumber, data mart kehilangan data untuk satu hari.

Solusi logisnya adalah dengan mempartisi etalase berdasarkan hari, yang memungkinkan penambahan partisi baru setiap hari berikutnya. Mekanismenya juga sudah diketahui, Spark memungkinkan Anda menulis partisi secara terpisah.

Pertama kita melakukan load awal, menyimpan data seperti dijelaskan di atas, hanya menambahkan partisi saja. Tindakan ini disebut inisialisasi etalase dan dilakukan hanya sekali:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Keesokan harinya, kami hanya memuat partisi baru:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Yang tersisa hanyalah mendaftar ulang di Hive untuk memperbarui skema.
Namun, di sinilah permasalahan muncul.

Masalah pertama. Cepat atau lambat, parket yang dihasilkan tidak akan terbaca. Hal ini disebabkan oleh cara parket dan JSON memperlakukan bidang kosong secara berbeda.

Mari kita pertimbangkan situasi yang umum. Misalnya, kemarin JSON tiba:

Π”Π΅Π½ΡŒ 1: {"a": {"b": 1}},

dan hari ini JSON yang sama terlihat seperti ini:

Π”Π΅Π½ΡŒ 2: {"a": null}

Katakanlah kita mempunyai dua partisi berbeda, masing-masing dengan satu baris.
Saat kita membaca seluruh data sumber, Spark akan dapat menentukan tipenya, dan akan memahami bahwa "a" adalah bidang bertipe "struktur", dengan bidang bersarang "b" bertipe INT. Namun, jika setiap partisi disimpan secara terpisah, maka kita mendapatkan parket dengan skema partisi yang tidak kompatibel:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Situasi ini sudah diketahui, jadi opsi telah ditambahkan secara khusus - saat mengurai data sumber, hapus bidang kosong:

df = spark.read.json("...", dropFieldIfAllNull=True)

Dalam hal ini parket akan terdiri dari partisi-partisi yang dapat dibaca bersama-sama.
Meskipun mereka yang telah melakukan ini dalam latihan akan tersenyum pahit di sini. Mengapa? Ya, karena kemungkinan besar akan ada dua situasi lagi. Atau tiga. Atau empat. Hal pertama yang hampir pasti akan terjadi adalah tipe numerik akan terlihat berbeda di file JSON yang berbeda. Misalnya, {intField: 1} dan {intField: 1.1}. Jika bidang seperti itu ditemukan dalam satu partisi, maka penggabungan skema akan membaca semuanya dengan benar, sehingga menghasilkan tipe yang paling akurat. Tetapi jika berbeda, maka yang satu akan memiliki intField: int, dan yang lainnya akan memiliki intField: double.

Ada bendera berikut untuk menangani situasi ini:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Sekarang kita memiliki folder di mana terdapat partisi yang dapat dibaca ke dalam satu kerangka data dan parket valid dari keseluruhan etalase. Ya? TIDAK.

Kita harus ingat bahwa kita mendaftarkan tabel tersebut di Hive. Hive tidak peka huruf besar-kecil pada nama bidang, sedangkan parket peka huruf besar-kecil. Oleh karena itu, partisi dengan skema: field1: int, dan Field1: int sama untuk Hive, tetapi tidak untuk Spark. Jangan lupa untuk mengubah nama field menjadi huruf kecil.

Setelah itu, semuanya tampak baik-baik saja.

Namun, tidak semuanya sesederhana itu. Ada masalah kedua yang juga terkenal. Karena setiap partisi baru disimpan secara terpisah, folder partisi akan berisi file layanan Spark, misalnya, tanda keberhasilan operasi _SUCCESS. Hal ini akan mengakibatkan kesalahan saat mencoba memasang parket. Untuk menghindari hal ini, Anda perlu mengonfigurasi konfigurasi untuk mencegah Spark menambahkan file layanan ke folder:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Tampaknya sekarang setiap hari partisi parket baru ditambahkan ke folder etalase target, tempat data yang diurai untuk hari itu berada. Kami berhati-hati terlebih dahulu agar tidak ada partisi dengan konflik tipe data.

Tapi, kita punya masalah ketiga. Sekarang skema umum tidak diketahui, terlebih lagi, tabel di Hive memiliki skema yang salah, karena setiap partisi baru kemungkinan besar menimbulkan distorsi pada skema.

Anda perlu mendaftarkan ulang meja tersebut. Ini dapat dilakukan secara sederhana: baca kembali parket etalase, ambil skema dan buat DDL berdasarkan itu, yang dapat digunakan untuk mendaftarkan ulang folder di Hive sebagai tabel eksternal, memperbarui skema etalase target.

Kami memiliki masalah keempat. Saat kami mendaftarkan tabel untuk pertama kalinya, kami mengandalkan Spark. Sekarang kita melakukannya sendiri, dan kita perlu ingat bahwa bidang parket bisa dimulai dengan karakter yang tidak diperbolehkan untuk Hive. Misalnya, Spark mengeluarkan baris yang tidak dapat diurai di bidang "corrupt_record". Bidang seperti itu tidak dapat didaftarkan di Hive tanpa di-escape.

Mengetahui hal ini, kita mendapatkan skema:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

kode ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",, ",`").replace("array<`", "array<") membuat DDL aman, yaitu alih-alih:

create table tname (_field1 string, 1field string)

Dengan nama bidang seperti "_field1, 1field", DDL aman dibuat di mana nama bidang di-escape: buat tabel `tname` (string `_field1`, string `1field`).

Timbul pertanyaan: bagaimana cara mendapatkan kerangka data dengan skema lengkap (dalam kode pf) dengan benar? Bagaimana cara mendapatkan pf ini? Ini adalah masalah kelima. Baca kembali skema semua partisi dari folder dengan file parket dari etalase target? Cara ini paling aman, namun sulit.

Skemanya sudah ada di Hive. Anda bisa mendapatkan skema baru dengan menggabungkan skema seluruh tabel dan partisi baru. Jadi, Anda perlu mengambil skema tabel dari Hive dan menggabungkannya dengan skema partisi baru. Hal ini dapat dilakukan dengan membaca metadata pengujian dari Hive, menyimpannya ke folder sementara, dan menggunakan Spark untuk membaca kedua partisi sekaligus.

Faktanya, ada semua yang Anda butuhkan: skema tabel asli di Hive dan partisi baru. Kami juga punya datanya. Yang tersisa hanyalah mendapatkan skema baru yang menggabungkan skema etalase dan bidang baru dari partisi yang dibuat:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Selanjutnya kita buat tabel registrasi DDL seperti pada cuplikan sebelumnya.
Jika seluruh rantai berfungsi dengan benar, yaitu ada beban inisialisasi, dan tabel dibuat dengan benar di Hive, maka kita mendapatkan skema tabel yang diperbarui.

Dan masalah terakhir adalah Anda tidak bisa begitu saja menambahkan partisi ke tabel Hive, karena akan rusak. Anda perlu memaksa Hive untuk memperbaiki struktur partisinya:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Tugas sederhana membaca JSON dan membuat etalase berdasarkan itu menghasilkan mengatasi sejumlah kesulitan tersirat, solusi yang harus Anda cari secara terpisah. Meskipun solusi ini sederhana, dibutuhkan banyak waktu untuk menemukannya.

Untuk melaksanakan pembangunan etalase, saya harus:

  • Tambahkan partisi ke etalase, singkirkan file layanan
  • Menangani bidang kosong di data sumber yang diketik Spark
  • Transmisikan tipe sederhana ke string
  • Ubah nama bidang menjadi huruf kecil
  • Pisahkan unggahan data dan registrasi tabel di Hive (generasi DDL)
  • Jangan lupa untuk menghindari nama bidang yang mungkin tidak kompatibel dengan Hive
  • Pelajari cara memperbarui pendaftaran tabel di Hive

Kesimpulannya, kami mencatat bahwa keputusan untuk membangun jendela toko penuh dengan banyak kendala. Oleh karena itu, jika terjadi kesulitan dalam implementasi, lebih baik menghubungi mitra berpengalaman dengan keahlian yang sukses.

Terima kasih telah membaca artikel ini, semoga informasinya bermanfaat.

Sumber: www.habr.com

Tambah komentar