Pratikte Spark schemaEvolution

Sevgili okuyucular, iyi günler!

Bu makalede, Neoflex'in Büyük Veri Çözümleri iş alanının önde gelen danışmanı, Apache Spark kullanarak değişken yapı gösterileri oluşturma seçeneklerini ayrıntılı olarak açıklamaktadır.

Bir veri analizi projesinin parçası olarak, genellikle gevşek yapılandırılmış verilere dayalı vitrinler oluşturma görevi ortaya çıkar.

Genellikle bunlar, JSON veya XML olarak kaydedilen çeşitli sistemlerden gelen günlükler veya yanıtlardır. Veriler Hadoop'a yüklenir, ardından bunlardan bir vitrin oluşturmanız gerekir. Oluşturulan vitrine erişimi, örneğin Impala aracılığıyla organize edebiliriz.

Bu durumda, hedef vitrinin şeması önceden bilinmemektedir. Ayrıca, verilere bağlı olduğu için şema da önceden çizilemez ve bu çok gevşek yapılandırılmış verilerle uğraşıyoruz.

Örneğin, bugün aşağıdaki yanıt günlüğe kaydedilir:

{source: "app1", error_code: ""}

ve yarın aynı sistemden şu cevap geliyor:

{source: "app1", error_code: "error", description: "Network error"}

Sonuç olarak vitrine bir alan daha eklenmeli - açıklama ve gelip gelmeyeceğini kimse bilmiyor.

Bu tür veriler üzerinde vitrin oluşturma görevi oldukça standarttır ve Spark'ın bunun için bir dizi aracı vardır. Kaynak verileri ayrıştırmak için hem JSON hem de XML desteği vardır ve daha önce bilinmeyen bir şema için schemaEvolution desteği sağlanır.

İlk bakışta çözüm basit görünüyor. JSON ile bir klasör almanız ve onu bir veri çerçevesine okumanız gerekir. Spark bir şema oluşturacak, iç içe geçmiş verileri yapılara dönüştürecektir. Ayrıca, vitrini Hive metastore'a kaydederek Impala'da da desteklenen parkede her şeyin kaydedilmesi gerekir.

Her şey basit görünüyor.

Ancak, pratikte bir takım problemlerle ne yapılacağı dokümantasyondaki kısa örneklerden net değildir.

Belgeler, bir vitrin oluşturmaya değil, JSON veya XML'i bir veri çerçevesine okumaya yönelik bir yaklaşımı açıklar.

Yani, basitçe JSON'un nasıl okunacağını ve ayrıştırılacağını gösterir:

df = spark.read.json(path...)

Bu, verileri Spark'ın kullanımına sunmak için yeterlidir.

Pratikte betik, bir klasörden JSON dosyalarını okumaktan ve bir veri çerçevesi oluşturmaktan çok daha karmaşıktır. Durum şuna benziyor: zaten belirli bir vitrin var, her gün yeni veriler geliyor, şemanın farklı olabileceğini unutmadan vitrine eklenmesi gerekiyor.

Bir vitrin oluşturmak için olağan şema aşağıdaki gibidir:

1 adım. Veriler, müteakip günlük yeniden yükleme ile Hadoop'a yüklenir ve yeni bir bölüme eklenir. İlk verileri güne göre bölümlenmiş bir klasör ortaya çıkıyor.

2 adım. İlk yükleme sırasında bu klasör Spark tarafından okunur ve ayrıştırılır. Ortaya çıkan veri çerçevesi ayrıştırılabilir bir formatta, örneğin parkeye kaydedilir ve daha sonra Impala'ya aktarılabilir. Bu, bu noktaya kadar birikmiş tüm verilerle bir hedef vitrin oluşturur.

3 adım. Vitrini her gün güncelleyecek bir indirme oluşturulur.
Kademeli yükleme sorunu, vitrini bölme ihtiyacı ve vitrinin genel şemasını koruma sorunu var.

Bir örnek alalım. Bir depo oluşturmanın ilk adımının uygulandığını ve JSON dosyalarının bir klasöre yüklendiğini varsayalım.

Onlardan bir veri çerçevesi oluşturmak ve ardından bunu bir vitrin olarak kaydetmek sorun değil. Bu, Spark belgelerinde kolayca bulunabilen ilk adımdır:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Her şey yolunda gibi görünüyor.

JSON'u okuduk ve ayrıştırdık, ardından veri çerçevesini herhangi bir uygun şekilde Hive'a kaydederek bir parke olarak kaydettik:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Bir penceremiz var.

Ancak ertesi gün kaynaktan yeni veriler eklendi. JSON ile bir klasörümüz ve bu klasörden oluşturulmuş bir vitrinimiz var. Kaynaktan bir sonraki veri yığınını yükledikten sonra, data mart'ta bir günlük veri eksik.

Mantıklı çözüm, vitrini güne göre bölümlemek olacaktır; bu, her gün yeni bir bölüm eklenmesine izin verecektir. Bunun mekanizması da iyi bilinmektedir, Spark bölümleri ayrı ayrı yazmanıza izin verir.

İlk olarak, verileri yukarıda açıklandığı gibi kaydederek ve yalnızca bölümleme ekleyerek bir başlangıç ​​yüklemesi yapıyoruz. Bu eyleme vitrin başlatma adı verilir ve yalnızca bir kez yapılır:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Ertesi gün, yalnızca yeni bir bölüm yüklüyoruz:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Geriye kalan tek şey, şemayı güncellemek için Hive'a yeniden kaydolmaktır.
Ancak, sorunların ortaya çıktığı yer burasıdır.

İlk sorun. Er ya da geç, ortaya çıkan parke okunamaz hale gelecektir. Bunun nedeni, parke ve JSON'un boş alanları nasıl farklı şekilde ele aldığıdır.

Tipik bir durumu ele alalım. Örneğin, dün JSON geldi:

День 1: {"a": {"b": 1}},

ve bugün aynı JSON şöyle görünüyor:

День 2: {"a": null}

Diyelim ki her biri bir satıra sahip iki farklı bölümümüz var.
Tüm kaynak verileri okuduğumuzda, Spark türü belirleyebilecek ve "a"nın, INT türünde iç içe geçmiş bir "b" alanı olan "yapı" türünde bir alan olduğunu anlayacaktır. Ancak, her bölüm ayrı ayrı kaydedilmişse, uyumsuz bölüm şemalarına sahip bir parke elde ederiz:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Bu durum iyi bilinmektedir, bu nedenle özel olarak bir seçenek eklenmiştir - kaynak verileri ayrıştırırken boş alanları kaldırın:

df = spark.read.json("...", dropFieldIfAllNull=True)

Bu durumda parke birlikte okunabilen bölmelerden oluşacaktır.
Bunu pratikte yapmış olanlar burada acı bir şekilde gülümseyecek olsa da. Neden? Evet, çünkü muhtemelen iki durum daha olacak. Veya üç. Veya dört. Neredeyse kesinlikle gerçekleşecek olan ilki, sayısal türlerin farklı JSON dosyalarında farklı görüneceğidir. Örneğin, {intField: 1} ve {intField: 1.1}. Bu tür alanlar bir bölümde bulunursa, şema birleştirme her şeyi doğru okuyarak en doğru türe yol açar. Ancak farklı olanlarda ise, birinde intField: int, diğerinde intField: double olacaktır.

Bu durumu işlemek için aşağıdaki bayrak var:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Artık tek bir veri çerçevesinde okunabilen bölümlerin ve tüm vitrinin geçerli bir parkesinin olduğu bir klasörümüz var. Evet? HAYIR.

Tabloyu Hive'a kaydettiğimizi hatırlamalıyız. Hive, alan adlarında büyük/küçük harfe duyarlı değildir, parke ise büyük/küçük harfe duyarlıdır. Bu nedenle, şu şemalara sahip bölümler: alan1: int ve Alan1: int Hive için aynıdır, ancak Spark için değildir. Alan adlarını küçük harfe çevirmeyi unutmayınız.

Bundan sonra, her şey yolunda görünüyor.

Ancak, hepsi o kadar basit değil. İyi bilinen ikinci bir sorun daha var. Her yeni bölüm ayrı olarak kaydedildiğinden, bölüm klasörü örneğin _SUCCESS işlem başarı bayrağı gibi Spark hizmet dosyalarını içerecektir. Bu, parke yapmaya çalışırken bir hataya neden olacaktır. Bundan kaçınmak için yapılandırmayı, Spark'ın hizmet dosyalarını klasöre eklemesini engelleyecek şekilde yapılandırmanız gerekir:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Görünüşe göre artık her gün, o gün için ayrıştırılan verilerin bulunduğu hedef vitrin klasörüne yeni bir parke bölümü ekleniyor. Veri türü çakışması olan bölümlerin olmamasına önceden özen gösterdik.

Ancak üçüncü bir sorunumuz var. Artık genel şema bilinmiyor, ayrıca Hive'daki tablo yanlış bir şemaya sahip, çünkü her yeni bölüm büyük olasılıkla şemaya bir bozulma getirdi.

Tabloyu yeniden kaydetmeniz gerekiyor. Bu basitçe yapılabilir: vitrinin parkesini tekrar okuyun, şemayı alın ve ona dayalı bir DDL oluşturun; bununla Hive'daki klasörü harici bir tablo olarak yeniden kaydedin ve hedef vitrinin şemasını güncelleyin.

Dördüncü bir sorunumuz var. Tabloyu ilk kez kaydettiğimizde Spark'a güvendik. Artık kendimiz yapıyoruz ve parke alanların Hive için izin verilmeyen karakterlerle başlayabileceğini unutmamamız gerekiyor. Örneğin, Spark ayrıştıramadığı satırları "corrupt_record" alanına atar. Böyle bir alan kaçış yapılmadan Hive'a kaydedilemez.

Bunu bilerek, şemayı elde ederiz:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Kod ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("dizi<`", "dizi<") güvenli DDL yapar, yani:

create table tname (_field1 string, 1field string)

"_field1, 1field" gibi alan adlarıyla, alan adlarının öncelendiği yerde güvenli DDL yapılır: "tname" tablosu oluşturun ('_field1' dizesi, '1field' dizesi).

Şu soru ortaya çıkıyor: tam bir şemaya sahip bir veri çerçevesi nasıl düzgün bir şekilde elde edilir (pf kodunda)? Bu pf nasıl alınır? Bu beşinci sorun. Hedef vitrinin parke dosyalarıyla klasördeki tüm bölümlerin şemasını yeniden okuyun? Bu yöntem en güvenli, ancak zordur.

Şema zaten Hive'da. Tüm tablonun şemasını ve yeni bölümü birleştirerek yeni bir şema elde edebilirsiniz. Yani tablo şemasını Hive'dan alıp yeni bölümün şemasıyla birleştirmeniz gerekiyor. Bu, test meta verilerini Hive'dan okuyarak, onu geçici bir klasöre kaydederek ve her iki bölümü aynı anda okumak için Spark kullanarak yapılabilir.

Aslında ihtiyacınız olan her şey var: Hive'daki orijinal tablo şeması ve yeni bölüm. Verilerimiz de var. Yalnızca vitrin şemasını ve oluşturulan bölümden yeni alanları birleştiren yeni bir şema almak için kalır:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Ardından, önceki kod parçacığında olduğu gibi tablo kaydı DDL'sini oluşturuyoruz.
Tüm zincir doğru çalışıyorsa, yani bir başlatma yükü varsa ve tablo Hive'da doğru şekilde oluşturulmuşsa, güncellenmiş bir tablo şeması alırız.

Ve son sorun şu ki, bir Hive tablosuna öylece bir bölüm ekleyemezsiniz çünkü o bozulacaktır. Hive'ı bölüm yapısını düzeltmeye zorlamanız gerekir:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

JSON'u okuma ve buna dayalı bir vitrin oluşturma basit görevi, ayrı ayrı aramanız gereken çözümler olan bir dizi örtük zorluğun üstesinden gelmenizi sağlar. Ve bu çözümler basit olsa da onları bulmak çok zaman alıyor.

Vitrin yapımını uygulamak için şunları yapmam gerekiyordu:

  • Servis dosyalarından kurtularak vitrine bölümler ekleyin
  • Spark'ın yazdığı kaynak verilerdeki boş alanlarla ilgilenin
  • Basit türleri bir dizeye yayınla
  • Alan adlarını küçük harfe çevir
  • Hive'da ayrı veri yükleme ve tablo kaydı (DDL üretimi)
  • Hive ile uyumsuz olabilecek alan adlarından kaçmayı unutmayın
  • Hive'da tablo kaydını nasıl güncelleyeceğinizi öğrenin

Özetle, vitrin inşa etme kararının birçok tuzakla dolu olduğunu not ediyoruz. Bu nedenle, uygulamada zorluklar olması durumunda, başarılı uzmanlığa sahip deneyimli bir ortakla iletişime geçmek daha iyidir.

Bu makaleyi okuduğunuz için teşekkür ederiz, bilgileri yararlı bulacağınızı umuyoruz.

Kaynak: habr.com

Yorum ekle