مخطط الشرارة - التطور في الممارسة العملية

القراء الأعزاء ، يوم جيد!

في هذه المقالة ، يصف المستشار الرائد لمنطقة أعمال حلول البيانات الضخمة في Neoflex بالتفصيل خيارات بناء واجهات عرض ذات بنية متغيرة باستخدام Apache Spark.

كجزء من مشروع تحليل البيانات ، غالبًا ما تنشأ مهمة بناء واجهات المحلات بناءً على بيانات منظمة بشكل غير منظم.

عادةً ما تكون هذه سجلات ، أو استجابات من أنظمة مختلفة ، محفوظة بتنسيق JSON أو XML. يتم تحميل البيانات إلى Hadoop ، فأنت بحاجة إلى إنشاء واجهة متجر منها. يمكننا تنظيم الوصول إلى الواجهة التي تم إنشاؤها ، على سبيل المثال ، من خلال Impala.

في هذه الحالة ، لا يكون مخطط واجهة المتجر المستهدفة معروفًا مسبقًا. علاوة على ذلك ، لا يمكن أيضًا إعداد المخطط مسبقًا ، لأنه يعتمد على البيانات ، ونحن نتعامل مع هذه البيانات المنظمة بشكل فضفاض للغاية.

على سبيل المثال ، يتم تسجيل الاستجابة التالية اليوم:

{source: "app1", error_code: ""}

وغدا من نفس النظام تأتي الإجابة التالية:

{source: "app1", error_code: "error", description: "Network error"}

نتيجة لذلك ، يجب إضافة حقل آخر إلى الواجهة - الوصف ، ولا أحد يعرف ما إذا كان سيأتي أم لا.

مهمة إنشاء واجهة متجر على هذه البيانات قياسية جدًا ، ولدى Spark عدد من الأدوات لذلك. لتحليل البيانات المصدر ، هناك دعم لكل من JSON و XML ، وبالنسبة لمخطط غير معروف سابقًا ، يتم توفير دعم لـ schemaEvolution.

للوهلة الأولى ، يبدو الحل بسيطًا. يجب أن تأخذ مجلدًا باستخدام JSON وقراءته في إطار بيانات. سيقوم Spark بإنشاء مخطط ، وتحويل البيانات المتداخلة إلى هياكل. علاوة على ذلك ، يجب حفظ كل شيء في الباركيه ، والذي يتم دعمه أيضًا في Impala ، من خلال تسجيل واجهة المتجر في Hive metastore.

يبدو أن كل شيء بسيط.

ومع ذلك ، ليس من الواضح من الأمثلة القصيرة في التوثيق ما يجب القيام به مع عدد من المشاكل في الممارسة العملية.

يصف التوثيق طريقة ليس لإنشاء واجهة محل ، ولكن لقراءة JSON أو XML في إطار بيانات.

وهي توضح ببساطة كيفية قراءة وتحليل JSON:

df = spark.read.json(path...)

هذا يكفي لإتاحة البيانات لشركة Spark.

من الناحية العملية ، يعد البرنامج النصي أكثر تعقيدًا من مجرد قراءة ملفات JSON من مجلد وإنشاء إطار بيانات. يبدو الموقف كما يلي: هناك بالفعل واجهة متجر معينة ، وتأتي البيانات الجديدة كل يوم ، ويجب إضافتها إلى واجهة المتجر ، دون أن ننسى أن المخطط قد يختلف.

المخطط المعتاد لبناء الواجهة هو كما يلي:

الخطوة 1. يتم تحميل البيانات في Hadoop مع إعادة التحميل اليومية اللاحقة وإضافتها إلى قسم جديد. اتضح أنه مجلد يحتوي على بيانات أولية مقسمة حسب اليوم.

الخطوة 2. أثناء التحميل الأولي ، تتم قراءة هذا المجلد وتحليله بواسطة Spark. يتم حفظ إطار البيانات الناتج بتنسيق قابل للتحليل ، على سبيل المثال ، في الباركيه ، والذي يمكن بعد ذلك استيراده إلى إمبالا. يؤدي هذا إلى إنشاء واجهة عرض مستهدفة تحتوي على جميع البيانات التي تراكمت حتى هذه النقطة.

الخطوة 3. يتم إنشاء تنزيل لتحديث واجهة المتجر كل يوم.
هناك مسألة التحميل الإضافي ، والحاجة إلى تقسيم الواجهة ، ومسألة الحفاظ على المخطط العام للعرض.

لنأخذ مثالا. لنفترض أنه تم تنفيذ الخطوة الأولى لبناء مستودع ، وتم تحميل ملفات JSON إلى مجلد.

إن إنشاء إطار بيانات منهم ، ثم حفظه كواجهة عرض ، ليس مشكلة. هذه هي الخطوة الأولى التي يمكن العثور عليها بسهولة في وثائق Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

يبدو أن كل شيء يكون على ما يرام.

قرأنا وحللنا JSON ، ثم نحفظ إطار البيانات كباركيه ، ونقوم بتسجيله في Hive بأي طريقة مناسبة:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

نحصل على نافذة.

لكن في اليوم التالي ، تمت إضافة بيانات جديدة من المصدر. لدينا مجلد به JSON ، وعرض تم إنشاؤه من هذا المجلد. بعد تحميل الدفعة التالية من البيانات من المصدر ، يفقد سوق البيانات قيمة البيانات ليوم واحد.

سيكون الحل المنطقي هو تقسيم واجهة المتجر حسب اليوم ، مما سيسمح بإضافة قسم جديد كل يوم تالي. آلية ذلك معروفة أيضًا ، يسمح لك Spark بكتابة الأقسام بشكل منفصل.

أولاً ، نقوم بتحميل أولي ، وحفظ البيانات كما هو موضح أعلاه ، وإضافة التقسيم فقط. يسمى هذا الإجراء تهيئة واجهة المحل ويتم تنفيذه مرة واحدة فقط:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

في اليوم التالي ، نقوم بتحميل قسم جديد فقط:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

كل ما تبقى هو إعادة التسجيل في Hive لتحديث مخطط قاعدة البيانات.
ومع ذلك ، هذا هو المكان الذي تنشأ فيه المشاكل.

المشكلة الأولى. عاجلاً أم آجلاً ، سيكون الباركيه الناتج غير قابل للقراءة. هذا يرجع إلى كيفية تعامل الباركيه و JSON مع الحقول الفارغة بشكل مختلف.

لنفكر في حالة نموذجية. على سبيل المثال ، وصل JSON أمس:

День 1: {"a": {"b": 1}},

واليوم تبدو JSON نفسها كما يلي:

День 2: {"a": null}

لنفترض أن لدينا قسمين مختلفين ، كل منهما يحتوي على سطر واحد.
عندما نقرأ بيانات المصدر بالكامل ، سيكون Spark قادرًا على تحديد النوع ، وسيفهم أن "a" هو حقل من النوع "هيكل" ، مع حقل متداخل "b" من النوع INT. ولكن ، إذا تم حفظ كل قسم بشكل منفصل ، فسنحصل على باركيه مع أنظمة تقسيم غير متوافقة:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

هذا الموقف معروف جيدًا ، لذلك تمت إضافة خيار بشكل خاص - عند تحليل البيانات المصدر ، قم بإزالة الحقول الفارغة:

df = spark.read.json("...", dropFieldIfAllNull=True)

في هذه الحالة ، تتكون الباركيه من أقسام يمكن قراءتها معًا.
على الرغم من أن أولئك الذين فعلوا ذلك في الممارسة العملية سوف يبتسمون بمرارة هنا. لماذا؟ نعم ، لأنه من المحتمل وجود حالتين أخريين. أو ثلاثة. أو أربعة. الأول ، الذي سيحدث بشكل شبه مؤكد ، هو أن الأنواع الرقمية ستبدو مختلفة في ملفات JSON المختلفة. على سبيل المثال ، {intField: 1} و {intField: 1.1}. إذا تم العثور على مثل هذه الحقول في قسم واحد ، فسيقوم دمج المخطط بقراءة كل شيء بشكل صحيح ، مما يؤدي إلى النوع الأكثر دقة. ولكن إذا كان في حالات مختلفة ، فسيكون لدى أحدهما intField: int ، والآخر سيكون intField: double.

هناك العلم التالي للتعامل مع هذا الموقف:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

الآن لدينا مجلد به أقسام يمكن قراءتها في إطار بيانات واحد وباركيه صالح للعرض بأكمله. نعم؟ لا.

يجب أن نتذكر أننا سجلنا الجدول في الخلية. الخلية ليست حساسة لحالة الأحرف في أسماء الحقول ، بينما الباركيه حساس لحالة الأحرف. لذلك ، الأقسام ذات المخططات: field1: int ، و Field1: int هي نفسها في Hive ، لكن ليس لـ Spark. لا تنس تحويل أسماء الحقول إلى أحرف صغيرة.

بعد ذلك ، يبدو أن كل شيء على ما يرام.

ومع ذلك ، ليس كل شيء بهذه البساطة. هناك مشكلة ثانية معروفة أيضًا. نظرًا لأنه يتم حفظ كل قسم جديد بشكل منفصل ، سيحتوي مجلد القسم على ملفات خدمة Spark ، على سبيل المثال ، علامة نجاح عملية _SUCCESS. سيؤدي ذلك إلى حدوث خطأ عند محاولة استخدام الباركيه. لتجنب ذلك ، تحتاج إلى تكوين التكوين لمنع Spark من إضافة ملفات الخدمة إلى المجلد:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

يبدو أنه يتم الآن إضافة قسم باركيه جديد كل يوم إلى مجلد العرض الهدف ، حيث توجد البيانات المحللة لهذا اليوم. لقد حرصنا مسبقًا على عدم وجود أقسام بها تعارض في نوع البيانات.

لكن لدينا مشكلة ثالثة. الآن المخطط العام غير معروف ، علاوة على ذلك ، يحتوي الجدول في Hive على مخطط غير صحيح ، نظرًا لأن كل قسم جديد على الأرجح قد أدخل تشويهًا في المخطط.

تحتاج إلى إعادة تسجيل الجدول. يمكن القيام بذلك ببساطة: قراءة باركيه واجهة المتجر مرة أخرى ، واتخاذ المخطط وإنشاء DDL بناءً عليه ، والذي من خلاله يمكن إعادة تسجيل المجلد في Hive كجدول خارجي ، وتحديث مخطط واجهة المتجر الهدف.

لدينا مشكلة رابعة. في المرة الأولى التي سجلنا فيها طاولة ، اعتمدنا على Spark. الآن نقوم بذلك بأنفسنا ، وعلينا أن نتذكر أن حقول الباركيه يمكن أن تبدأ بأحرف غير مسموح بها لـ Hive. على سبيل المثال ، يطرح Spark سطورًا يتعذر عليه تحليلها في الحقل "corrupt_record". لا يمكن تسجيل مثل هذا الحقل في Hive دون هروب.

بمعرفة هذا ، نحصل على المخطط:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

قانون ("_corrupt_record"، "` _corrupt_record` ") +" "+ f [1] .replace (": "،" `:"). استبدل ("<"، "<` "). استبدل ("، "، "،" "). استبدل (" المصفوفة <`" ، "المصفوفة <") يجعل DDL آمنًا ، أي بدلاً من:

create table tname (_field1 string, 1field string)

باستخدام أسماء الحقول مثل "_field1 ، 1field" ، يتم عمل DDL الآمن حيث يتم تخطي أسماء الحقول: إنشاء جدول `tname` (سلسلة` _field1` ، سلسلة `1field`).

السؤال الذي يطرح نفسه: كيف تحصل بشكل صحيح على إطار بيانات بمخطط كامل (في كود pf)؟ كيف تحصل على هذا pf؟ هذه هي المشكلة الخامسة. إعادة قراءة مخطط جميع الأقسام من المجلد مع ملفات الباركيه للعرض الهدف؟ هذه الطريقة هي الأكثر أمانًا ولكنها صعبة.

المخطط موجود بالفعل في الخلية. يمكنك الحصول على مخطط جديد من خلال الجمع بين مخطط الجدول بأكمله والقسم الجديد. لذلك عليك أن تأخذ مخطط الجدول من Hive ودمجه مع مخطط القسم الجديد. يمكن القيام بذلك عن طريق قراءة البيانات الأولية للاختبار من Hive ، وحفظها في مجلد مؤقت ، واستخدام Spark لقراءة كلا القسمين في وقت واحد.

في الواقع ، هناك كل ما تحتاجه: مخطط الجدول الأصلي في Hive والقسم الجديد. لدينا أيضًا بيانات. يبقى فقط الحصول على مخطط جديد يجمع بين مخطط واجهة المتجر والحقول الجديدة من القسم الذي تم إنشاؤه:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

بعد ذلك ، نقوم بإنشاء جدول التسجيل DDL ، كما في المقتطف السابق.
إذا كانت السلسلة بأكملها تعمل بشكل صحيح ، أي كان هناك تحميل تهيئة ، وتم إنشاء الجدول بشكل صحيح في الخلية ، فسنحصل على مخطط جدول محدث.

والمشكلة الأخيرة هي أنه لا يمكنك فقط إضافة قسم إلى جدول Hive ، لأنه سيتم كسره. تحتاج إلى إجبار Hive على إصلاح هيكل القسم الخاص به:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

تؤدي المهمة البسيطة المتمثلة في قراءة JSON وإنشاء واجهة متجر بناءً عليها في التغلب على عدد من الصعوبات الضمنية ، والحلول التي يجب عليك البحث عنها بشكل منفصل. وعلى الرغم من أن هذه الحلول بسيطة ، إلا أن العثور عليها يستغرق وقتًا طويلاً.

لتنفيذ بناء الواجهة ، كان علي:

  • أضف أقسامًا إلى الواجهة ، تخلص من ملفات الخدمة
  • تعامل مع الحقول الفارغة في بيانات المصدر التي كتبتها Spark
  • إرسال أنواع بسيطة إلى سلسلة
  • تحويل أسماء الحقول إلى أحرف صغيرة
  • تحميل بيانات منفصل وتسجيل الجدول في Hive (إنشاء DDL)
  • لا تنس الهروب من أسماء الحقول التي قد تكون غير متوافقة مع Hive
  • تعرف على كيفية تحديث تسجيل الجدول في Hive

بإيجاز ، نلاحظ أن قرار بناء نوافذ المتاجر محفوف بالعديد من المزالق. لذلك ، في حالة وجود صعوبات في التنفيذ ، فمن الأفضل الاتصال بشريك متمرس لديه خبرة ناجحة.

شكرا لقراءة هذا المقال ، نأمل أن تجد المعلومات مفيدة.

المصدر: www.habr.com

إضافة تعليق