Spark schemaEvolution در عمل

خوانندگان عزیز، روز بخیر!

در این مقاله، مشاور پیشرو حوزه کسب و کار Big Data Solutions Neoflex گزینه های ساخت ویترین سازه های متغیر را با استفاده از Apache Spark به تفصیل شرح می دهد.

به عنوان بخشی از یک پروژه تجزیه و تحلیل داده ها، وظیفه ساخت ویترین فروشگاه ها بر اساس داده های ساختار ضعیف اغلب مطرح می شود.

معمولاً اینها گزارش‌ها یا پاسخ‌هایی از سیستم‌های مختلف هستند که به صورت JSON یا XML ذخیره می‌شوند. داده ها در Hadoop آپلود می شوند، سپس باید از آنها یک فروشگاه بسازید. ما می توانیم دسترسی به ویترین ایجاد شده را سازماندهی کنیم، به عنوان مثال، از طریق Impala.

در این حالت، طرح ویترین فروشگاه هدف از قبل مشخص نیست. علاوه بر این، این طرح نیز نمی تواند از قبل طراحی شود، زیرا به داده ها بستگی دارد، و ما با این داده های ساختار بسیار ضعیف سروکار داریم.

به عنوان مثال، امروز پاسخ زیر ثبت شده است:

{source: "app1", error_code: ""}

و فردا از همان سیستم جواب زیر می آید:

{source: "app1", error_code: "error", description: "Network error"}

در نتیجه باید یک فیلد دیگر به ویترین اضافه شود - توضیحات و هیچ کس نمی داند که می آید یا نه.

وظیفه ایجاد ویترین فروشگاه روی چنین داده‌هایی کاملاً استاندارد است و Spark ابزارهای زیادی برای این کار دارد. برای تجزیه داده‌های منبع، از JSON و XML پشتیبانی می‌شود، و برای طرحواره‌ای که قبلاً ناشناخته بود، پشتیبانی از schemaEvolution ارائه می‌شود.

در نگاه اول، راه حل ساده به نظر می رسد. شما باید یک پوشه با JSON بردارید و آن را در یک دیتافریم بخوانید. Spark یک طرحواره ایجاد می کند، داده های تودرتو را به ساختار تبدیل می کند. علاوه بر این، همه چیز باید در پارکت ذخیره شود، که در ایمپالا نیز پشتیبانی می شود، با ثبت ویترین فروشگاه در metastore Hive.

به نظر می رسد همه چیز ساده است.

با این حال، از مثال های کوتاه در مستندات مشخص نیست که در عمل با تعدادی از مشکلات چه باید کرد.

مستندات رویکردی را برای ایجاد یک فروشگاه، بلکه برای خواندن JSON یا XML در یک دیتافریم توصیف می‌کند.

یعنی به سادگی نحوه خواندن و تجزیه JSON را نشان می دهد:

df = spark.read.json(path...)

این برای در دسترس قرار دادن داده ها برای Spark کافی است.

در عمل، اسکریپت بسیار پیچیده تر از خواندن فایل های JSON از یک پوشه و ایجاد یک دیتافریم است. وضعیت به این صورت است: در حال حاضر یک فروشگاه خاص وجود دارد، داده های جدید هر روز وارد می شود، آنها باید به ویترین اضافه شوند، فراموش نکنید که این طرح ممکن است متفاوت باشد.

طرح معمول برای ساخت ویترین به شرح زیر است:

مرحله 1. داده ها با بارگذاری مجدد روزانه بعدی در Hadoop بارگذاری می شوند و به یک پارتیشن جدید اضافه می شوند. به نظر می رسد یک پوشه با داده های اولیه پارتیشن بندی شده در روز.

مرحله 2. در طول بارگذاری اولیه، این پوشه توسط Spark خوانده و تجزیه می شود. چارچوب داده به دست آمده در قالب قابل تجزیه ذخیره می شود، به عنوان مثال، در پارکت، که سپس می تواند به ایمپالا وارد شود. این یک ویترین هدف با تمام داده هایی که تا این مرحله جمع شده است ایجاد می کند.

مرحله 3. دانلودی ایجاد شده است که هر روز ویترین فروشگاه را به روز می کند.
بحث بارگذاری افزایشی، نیاز به پارتیشن بندی ویترین و بحث حفظ طرح کلی ویترین وجود دارد.

بیایید یک مثال بزنیم. فرض کنید که مرحله اول ساخت یک مخزن پیاده سازی شده است و فایل های JSON در یک پوشه آپلود می شوند.

ایجاد یک دیتافریم از آنها و سپس ذخیره آن به عنوان ویترین مشکلی نیست. این اولین قدمی است که به راحتی در اسناد Spark یافت می شود:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

به نظر می رسد همه چیز خوب است.

ما JSON را می‌خوانیم و تجزیه می‌کنیم، سپس دیتافریم را به‌عنوان پارکت ذخیره می‌کنیم و آن را به هر روشی راحت در Hive ثبت می‌کنیم:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

ما یک پنجره می گیریم.

اما، روز بعد، داده های جدیدی از منبع اضافه شد. ما یک پوشه با JSON داریم و یک ویترین از این پوشه ایجاد شده است. پس از بارگیری دسته بعدی داده از منبع، داده‌های مارت یک روز از داده‌های خود را از دست می‌دهد.

راه حل منطقی این است که ویترین فروشگاه را به صورت روز پارتیشن بندی کنید، که اجازه می دهد هر روز بعد یک پارتیشن جدید اضافه کنید. مکانیسم این نیز به خوبی شناخته شده است، Spark به شما امکان می دهد پارتیشن ها را جداگانه بنویسید.

ابتدا یک بار اولیه انجام می دهیم، داده ها را همانطور که در بالا توضیح داده شد ذخیره می کنیم و فقط پارتیشن بندی را اضافه می کنیم. این عمل مقداردهی اولیه فروشگاه نامیده می شود و فقط یک بار انجام می شود:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

روز بعد، ما فقط یک پارتیشن جدید را بارگذاری می کنیم:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

تنها چیزی که باقی می ماند این است که برای به روز رسانی طرحواره مجدداً در Hive ثبت نام کنید.
با این حال، اینجاست که مشکلات ایجاد می شود.

مشکل اول دیر یا زود، پارکت به دست آمده قابل خواندن نخواهد بود. این به دلیل رفتار متفاوت پارکت و JSON با فیلدهای خالی است.

بیایید یک موقعیت معمولی را در نظر بگیریم. به عنوان مثال، دیروز JSON وارد شد:

День 1: {"a": {"b": 1}},

و امروز همان JSON به شکل زیر است:

День 2: {"a": null}

فرض کنید دو پارتیشن مختلف داریم که هر کدام یک خط دارند.
وقتی کل داده های منبع را می خوانیم، Spark می تواند نوع را تعیین کند و متوجه می شود که "a" یک فیلد از نوع "structure" است، با یک فیلد تودرتو "b" از نوع INT. اما، اگر هر پارتیشن به طور جداگانه ذخیره شده باشد، یک پارکت با طرح های پارتیشن ناسازگار دریافت می کنیم:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

این وضعیت به خوبی شناخته شده است، بنابراین یک گزینه به طور خاص اضافه شده است - هنگام تجزیه داده های منبع، فیلدهای خالی را حذف کنید:

df = spark.read.json("...", dropFieldIfAllNull=True)

در این صورت پارکت شامل پارتیشن هایی خواهد بود که با هم قابل خواندن هستند.
اگرچه کسانی که این کار را در عمل انجام داده اند اینجا لبخند تلخی خواهند زد. چرا؟ بله، زیرا احتمالاً دو موقعیت دیگر وجود دارد. یا سه. یا چهار. اولین مورد، که تقریباً به طور قطع اتفاق می افتد، این است که انواع عددی در فایل های مختلف JSON متفاوت به نظر می رسند. به عنوان مثال، {intField: 1} و {intField: 1.1}. اگر چنین فیلدهایی در یک پارتیشن یافت شوند، ادغام طرحواره همه چیز را به درستی می خواند و به دقیق ترین نوع منجر می شود. اما اگر در موارد مختلف باشد، یکی دارای intField: int و دیگری دارای intField: double خواهد بود.

پرچم زیر برای رسیدگی به این وضعیت وجود دارد:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

اکنون ما یک پوشه داریم که در آن پارتیشن هایی وجود دارد که می توان آنها را در یک دیتافریم واحد و یک پارکت معتبر از کل ویترین خواند. آره؟ خیر

باید به یاد داشته باشیم که جدول را در Hive ثبت کردیم. کندو در نام رشته ها به حروف بزرگ و کوچک حساس نیست، در حالی که پارکت به حروف کوچک و بزرگ حساس است. بنابراین، پارتیشن‌های دارای طرحواره: field1: int و Field1: int برای Hive یکسان هستند، اما برای Spark نه. فراموش نکنید که نام فیلدها را به حروف کوچک تبدیل کنید.

بعد از آن به نظر می رسد همه چیز خوب است.

با این حال، همه چیز به این سادگی نیست. یک مشکل دوم نیز وجود دارد که به خوبی شناخته شده است. از آنجایی که هر پارتیشن جدید به طور جداگانه ذخیره می شود، پوشه پارتیشن حاوی فایل های سرویس Spark است، به عنوان مثال، پرچم موفقیت عملیات _SUCCESS. این امر منجر به خطا در هنگام تلاش برای پارکت می شود. برای جلوگیری از این امر، باید پیکربندی را به گونه ای پیکربندی کنید که اسپارک فایل های سرویس را به پوشه اضافه نکند:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

به نظر می رسد که اکنون هر روز یک پارتیشن پارکت جدید به پوشه ویترین هدف اضافه می شود، جایی که داده های تجزیه شده برای روز در آن قرار دارد. ما از قبل مراقب بودیم که هیچ پارتیشنی با تداخل نوع داده وجود نداشته باشد.

اما، ما یک مشکل سوم داریم. اکنون طرح کلی مشخص نیست، علاوه بر این، جدول در Hive دارای یک طرح نادرست است، زیرا هر پارتیشن جدید به احتمال زیاد یک اعوجاج در طرحواره ایجاد می کند.

باید جدول را دوباره ثبت کنید. این را می توان به سادگی انجام داد: دوباره پارکت ویترین را بخوانید، طرح را بردارید و بر اساس آن یک DDL ایجاد کنید، که با آن پوشه را در Hive به عنوان یک جدول خارجی مجدداً ثبت کنید، و طرح ویترین فروشگاه مورد نظر را به روز کنید.

ما مشکل چهارمی داریم. وقتی برای اولین بار جدول را ثبت کردیم، به Spark تکیه کردیم. حالا ما خودمان این کار را انجام می دهیم و باید به خاطر داشته باشیم که زمینه های پارکت می توانند با کاراکترهایی شروع شوند که برای Hive مجاز نیستند. به عنوان مثال، Spark خطوطی را که نمی تواند آن ها را تجزیه کند در قسمت "corrupt_record" پرتاب می کند. چنین فیلدی را نمی توان بدون فرار در Hive ثبت کرد.

با دانستن این موضوع، این طرح را دریافت می کنیم:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

رمز ("_corrupt_record"، "`_corrupt_record`") + "" + f[1].replace(":"، "`:").replace("<"، "<`").replace("," ",`").replace("array<`", "array<") DDL را ایمن می کند، یعنی به جای:

create table tname (_field1 string, 1field string)

با نام‌های فیلد مانند "_field1، 1field"، DDL ایمن در جایی که نام فیلدها فرار می‌شود ساخته می‌شود: جدول "tname" را ایجاد کنید (رشته "_field1"، رشته "1field".

این سوال مطرح می شود: چگونه می توان یک دیتافریم با یک طرح کامل (در کد pf) به درستی دریافت کرد؟ این pf رو چجوری بگیرم؟ این مشکل پنجم است. طرح همه پارتیشن ها را از پوشه با فایل های پارکت ویترین هدف دوباره بخوانید؟ این روش ایمن ترین، اما دشوار است.

این طرح از قبل در Hive است. شما می توانید با ترکیب طرح کل جدول و پارتیشن جدید، طرحی جدید دریافت کنید. بنابراین باید طرح جدول را از Hive بگیرید و با طرح پارتیشن جدید ترکیب کنید. این را می توان با خواندن فراداده آزمایشی از Hive، ذخیره آن در یک پوشه موقت و استفاده از Spark برای خواندن هر دو پارتیشن به طور همزمان انجام داد.

در واقع، هر چیزی که نیاز دارید وجود دارد: طرح اصلی جدول در Hive و پارتیشن جدید. ما هم داده داریم. فقط برای بدست آوردن یک طرح جدید که طرح ویترین فروشگاه و فیلدهای جدید را از پارتیشن ایجاد شده ترکیب می کند، باقی می ماند:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

در مرحله بعد، مانند قطعه قبلی، DDL ثبت جدول را ایجاد می کنیم.
اگر کل زنجیره به درستی کار کند، یعنی یک بار اولیه وجود داشته باشد، و جدول به درستی در Hive ایجاد شده باشد، یک طرح جدول به روز شده دریافت می کنیم.

و آخرین مشکل این است که نمی توانید فقط یک پارتیشن به جدول Hive اضافه کنید، زیرا خراب می شود. باید Hive را مجبور کنید تا ساختار پارتیشن خود را اصلاح کند:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

کار ساده خواندن JSON و ایجاد ویترین فروشگاه بر اساس آن منجر به غلبه بر تعدادی از مشکلات ضمنی می شود، راه حل هایی که باید جداگانه به دنبال آنها بگردید. و اگرچه این راه حل ها ساده هستند، اما یافتن آنها زمان زیادی می برد.

برای اجرای ساخت ویترین مجبور شدم:

  • پارتیشن ها را به ویترین اضافه کنید و از شر فایل های سرویس خلاص شوید
  • با فیلدهای خالی در داده های منبعی که Spark تایپ کرده است، برخورد کنید
  • انواع ساده را به یک رشته بریزید
  • نام فیلدها را به حروف کوچک تبدیل کنید
  • آپلود اطلاعات جداگانه و ثبت جدول در Hive (تولید DDL)
  • فراموش نکنید که از نام فیلدهایی که ممکن است با Hive ناسازگار باشند فرار کنید
  • نحوه به روز رسانی ثبت جدول در Hive را بیاموزید

در جمع بندی، متذکر می شویم که تصمیم برای ساخت ویترین مغازه ها با مشکلات زیادی همراه است. بنابراین در صورت بروز مشکل در اجرا، بهتر است با یک شریک مجرب و دارای تخصص موفق تماس بگیرید.

از اینکه این مقاله را مطالعه کردید متشکریم، امیدواریم که اطلاعات برای شما مفید واقع شود.

منبع: www.habr.com

اضافه کردن نظر