عملی طور پر اسپارک اسکیما ارتقاء

پیارے قارئین، اچھا دن!

اس مضمون میں، Neoflex کے بگ ڈیٹا سلوشنز بزنس ایریا کے معروف کنسلٹنٹ نے Apache Spark کا استعمال کرتے ہوئے متغیر ڈھانچے کے شوکیس بنانے کے اختیارات کو تفصیل سے بیان کیا ہے۔

ڈیٹا کے تجزیہ کے منصوبے کے حصے کے طور پر، ڈھیلے ڈھانچے والے ڈیٹا پر مبنی اسٹور فرنٹ بنانے کا کام اکثر پیدا ہوتا ہے۔

عام طور پر یہ لاگز، یا مختلف سسٹمز کے جوابات ہوتے ہیں، جنہیں JSON یا XML کے بطور محفوظ کیا جاتا ہے۔ ڈیٹا کو ہڈوپ پر اپ لوڈ کیا جاتا ہے، پھر آپ کو ان سے اسٹور فرنٹ بنانے کی ضرورت ہوتی ہے۔ ہم تخلیق کردہ شوکیس تک رسائی کو منظم کر سکتے ہیں، مثال کے طور پر، Impala کے ذریعے۔

اس صورت میں، ٹارگٹ اسٹور فرنٹ کا اسکیما پہلے سے معلوم نہیں ہے۔ مزید برآں، اسکیم کو بھی پہلے سے تیار نہیں کیا جا سکتا، کیونکہ یہ ڈیٹا پر منحصر ہے، اور ہم ان بہت ہی ڈھیلے ڈھانچے والے ڈیٹا سے نمٹ رہے ہیں۔

مثال کے طور پر، آج درج ذیل جواب کو لاگ کیا گیا ہے:

{source: "app1", error_code: ""}

اور کل اسی نظام سے درج ذیل جواب آئے گا:

{source: "app1", error_code: "error", description: "Network error"}

نتیجے کے طور پر، شوکیس میں ایک اور فیلڈ کا اضافہ کیا جانا چاہئے - تفصیل، اور کوئی نہیں جانتا کہ آیا یہ آئے گا یا نہیں۔

اس طرح کے ڈیٹا پر اسٹور فرنٹ بنانے کا کام کافی معیاری ہے، اور اسپارک کے پاس اس کے لیے بہت سے ٹولز ہیں۔ سورس ڈیٹا کو پارس کرنے کے لیے، JSON اور XML دونوں کے لیے سپورٹ موجود ہے، اور پہلے سے نامعلوم اسکیما کے لیے، schemaEvolution کے لیے سپورٹ فراہم کی گئی ہے۔

پہلی نظر میں، حل آسان لگتا ہے. آپ کو JSON کے ساتھ ایک فولڈر لینے اور اسے ڈیٹا فریم میں پڑھنے کی ضرورت ہے۔ اسپارک ایک اسکیما بنائے گا، نیسٹڈ ڈیٹا کو ڈھانچے میں بدل دے گا۔ مزید، ہر چیز کو پارکیٹ میں محفوظ کرنے کی ضرورت ہے، جسے امپالا میں بھی مدد ملتی ہے، Hive میٹاسٹور میں اسٹور فرنٹ کو رجسٹر کرکے۔

سب کچھ آسان لگتا ہے۔

تاہم، دستاویزات میں دی گئی مختصر مثالوں سے یہ واضح نہیں ہے کہ عملی طور پر متعدد مسائل کا کیا کرنا ہے۔

دستاویزات اسٹور فرنٹ بنانے کے لیے نہیں بلکہ JSON یا XML کو ڈیٹا فریم میں پڑھنے کے لیے ایک نقطہ نظر کی وضاحت کرتی ہے۔

یعنی، یہ آسانی سے دکھاتا ہے کہ JSON کو کیسے پڑھنا اور پارس کرنا ہے:

df = spark.read.json(path...)

یہ اسپارک کو ڈیٹا دستیاب کرنے کے لیے کافی ہے۔

عملی طور پر، اسکرپٹ صرف ایک فولڈر سے JSON فائلوں کو پڑھنے اور ڈیٹا فریم بنانے سے کہیں زیادہ پیچیدہ ہے۔ صورت حال اس طرح نظر آتی ہے: پہلے سے ہی ایک مخصوص اسٹور فرنٹ موجود ہے، ہر روز نیا ڈیٹا آتا ہے، انہیں اسٹور فرنٹ میں شامل کرنے کی ضرورت ہے، یہ نہ بھولیں کہ اسکیم مختلف ہوسکتی ہے۔

شوکیس بنانے کی معمول کی اسکیم مندرجہ ذیل ہے:

1 قدم ہے. ڈیٹا کو بعد میں روزانہ دوبارہ لوڈ کرنے کے ساتھ ہڈوپ میں لوڈ کیا جاتا ہے اور ایک نئے پارٹیشن میں شامل کیا جاتا ہے۔ یہ ایک فولڈر نکلتا ہے جس میں ابتدائی ڈیٹا دن کے حساب سے تقسیم ہوتا ہے۔

2 قدم ہے. ابتدائی لوڈ کے دوران، اس فولڈر کو سپارک کے ذریعے پڑھا اور پارس کیا جاتا ہے۔ نتیجے میں ڈیٹا فریم قابل تجزیہ شکل میں محفوظ کیا جاتا ہے، مثال کے طور پر، پارکیٹ میں، جسے پھر امپالا میں درآمد کیا جا سکتا ہے۔ یہ اس وقت تک جمع ہونے والے تمام ڈیٹا کے ساتھ ایک ہدف کی نمائش بناتا ہے۔

3 قدم ہے. ایک ڈاؤن لوڈ بنایا گیا ہے جو ہر روز اسٹور فرنٹ کو اپ ڈیٹ کرے گا۔
انکریمنٹل لوڈنگ کا سوال ہے، شوکیس کو تقسیم کرنے کی ضرورت ہے، اور شوکیس کی عمومی اسکیم کو برقرار رکھنے کا سوال ہے۔

آئیے ایک مثال لیتے ہیں۔ ہم کہتے ہیں کہ ریپوزٹری کی تعمیر کا پہلا مرحلہ لاگو کیا گیا ہے، اور JSON فائلوں کو ایک فولڈر میں اپ لوڈ کیا جاتا ہے.

ان سے ڈیٹا فریم بنانا، پھر اسے شوکیس کے طور پر محفوظ کرنا، کوئی مسئلہ نہیں ہے۔ یہ پہلا قدم ہے جو آسانی سے Spark دستاویزات میں پایا جا سکتا ہے:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

لگتا ہے سب کچھ ٹھیک ہے۔

ہم JSON کو پڑھتے اور پارس کرتے ہیں، پھر ہم ڈیٹا فریم کو ایک پارکیٹ کے طور پر محفوظ کرتے ہیں، اسے Hive میں کسی بھی آسان طریقے سے رجسٹر کرتے ہیں:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

ہمیں ایک کھڑکی ملتی ہے۔

لیکن، اگلے دن، ذریعہ سے نیا ڈیٹا شامل کیا گیا تھا. ہمارے پاس JSON کے ساتھ ایک فولڈر ہے، اور اس فولڈر سے ایک شوکیس بنایا گیا ہے۔ ماخذ سے ڈیٹا کا اگلا بیچ لوڈ کرنے کے بعد، ڈیٹا مارٹ میں ایک دن کا ڈیٹا غائب ہے۔

منطقی حل یہ ہوگا کہ اسٹور فرنٹ کو دن بہ دن تقسیم کیا جائے، جو ہر اگلے دن ایک نیا پارٹیشن شامل کرنے کی اجازت دے گا۔ اس کا طریقہ کار بھی مشہور ہے، اسپارک آپ کو الگ الگ پارٹیشن لکھنے کی اجازت دیتا ہے۔

سب سے پہلے، ہم ایک ابتدائی بوجھ کرتے ہیں، جیسا کہ اوپر بیان کیا گیا ہے ڈیٹا کو بچاتے ہیں، صرف تقسیم کاری کا اضافہ کرتے ہیں۔ اس عمل کو اسٹور فرنٹ انیشیلائزیشن کہا جاتا ہے اور صرف ایک بار کیا جاتا ہے:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

اگلے دن، ہم صرف ایک نیا پارٹیشن لوڈ کرتے ہیں:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

اسکیما کو اپ ڈیٹ کرنے کے لیے Hive میں دوبارہ رجسٹر ہونا باقی ہے۔
تاہم، یہ وہ جگہ ہے جہاں مسائل پیدا ہوتے ہیں۔

پہلا مسئلہ۔ جلد یا بدیر، نتیجے میں آنے والی لکڑی پڑھے جانے کے قابل نہیں ہوگی۔ یہ اس وجہ سے ہے کہ کس طرح پارکیٹ اور JSON خالی کھیتوں کے ساتھ مختلف سلوک کرتے ہیں۔

آئیے ایک عام صورت حال پر غور کریں۔ مثال کے طور پر، کل JSON آیا:

День 1: {"a": {"b": 1}},

اور آج وہی JSON اس طرح نظر آتا ہے:

День 2: {"a": null}

ہم کہتے ہیں کہ ہمارے پاس دو مختلف پارٹیشنز ہیں، ہر ایک لائن کے ساتھ۔
جب ہم پورے سورس ڈیٹا کو پڑھتے ہیں، تو Spark قسم کا تعین کرنے کے قابل ہو جائے گا، اور یہ سمجھے گا کہ "a" قسم "سٹرکچر" کا ایک فیلڈ ہے، جس میں INT قسم کی ایک نیسٹڈ فیلڈ "b" ہے۔ لیکن، اگر ہر پارٹیشن کو الگ الگ محفوظ کیا گیا تھا، تو ہمیں غیر مطابقت پذیر پارٹیشن اسکیموں کے ساتھ ایک پارکیٹ ملتا ہے:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

یہ صورت حال مشہور ہے، اس لیے ایک آپشن خاص طور پر شامل کیا گیا ہے - سورس ڈیٹا کو پارس کرتے وقت، خالی فیلڈز کو ہٹا دیں:

df = spark.read.json("...", dropFieldIfAllNull=True)

اس صورت میں، پارکیٹ پارٹیشنز پر مشتمل ہوگا جسے ایک ساتھ پڑھا جا سکتا ہے۔
حالانکہ جن لوگوں نے عملی طور پر ایسا کیا ہے وہ یہاں کڑک سے مسکرائیں گے۔ کیوں؟ جی ہاں، کیونکہ دو مزید حالات ہونے کا امکان ہے۔ یا تین۔ یا چار۔ پہلی، جو تقریباً یقینی طور پر واقع ہوگی، یہ ہے کہ عددی اقسام مختلف JSON فائلوں میں مختلف نظر آئیں گی۔ مثال کے طور پر، {intField: 1} اور {intField: 1.1}۔ اگر اس طرح کے فیلڈز ایک پارٹیشن میں پائے جاتے ہیں، تو اسکیما مرج ہر چیز کو صحیح طریقے سے پڑھے گا، جس کی وجہ سے سب سے زیادہ درست قسم ہوگی۔ لیکن اگر مختلف میں، تو ایک کے پاس intField: int، اور دوسرے کے پاس intField: double ہوگا۔

اس صورت حال سے نمٹنے کے لیے درج ذیل جھنڈا ہے:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

اب ہمارے پاس ایک فولڈر ہے جہاں پارٹیشنز ہیں جنہیں ایک ہی ڈیٹا فریم میں پڑھا جا سکتا ہے اور پورے شوکیس کی ایک درست پارکیٹ۔ جی ہاں؟ نہیں.

ہمیں یاد رکھنا چاہیے کہ ہم نے ٹیبل کو Hive میں رجسٹر کیا تھا۔ کھیتوں کے ناموں میں Hive کیس حساس نہیں ہے، جبکہ پارکیٹ کیس حساس ہے۔ لہذا، سکیموں کے ساتھ پارٹیشنز: field1: int، اور Field1: int Hive کے لیے ایک جیسے ہیں، لیکن Spark کے لیے نہیں۔ فیلڈ کے ناموں کو لوئر کیس میں تبدیل کرنا نہ بھولیں۔

اس کے بعد سب کچھ ٹھیک ہونے لگتا ہے۔

تاہم، سب اتنا آسان نہیں ہے۔ ایک دوسرا، معروف مسئلہ بھی ہے۔ چونکہ ہر نیا پارٹیشن الگ سے محفوظ کیا جاتا ہے، اس لیے پارٹیشن فولڈر میں Spark سروس فائلیں ہوں گی، مثال کے طور پر، _SUCCESS آپریشن کامیابی کا جھنڈا۔ اس کے نتیجے میں پارکیٹ کرنے کی کوشش کرتے وقت ایک خرابی ہوگی۔ اس سے بچنے کے لیے، آپ کو اسپارک کو فولڈر میں سروس فائلیں شامل کرنے سے روکنے کے لیے کنفیگریشن کو ترتیب دینے کی ضرورت ہے۔

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

ایسا لگتا ہے کہ اب ہر روز ایک نیا پارکیٹ پارٹیشن ٹارگٹ شوکیس فولڈر میں شامل کیا جاتا ہے، جہاں اس دن کا تجزیہ شدہ ڈیٹا موجود ہوتا ہے۔ ہم نے پہلے سے اس بات کا خیال رکھا تھا کہ ڈیٹا ٹائپ تنازعہ کے ساتھ کوئی پارٹیشنز نہیں ہیں۔

لیکن، ہمارے پاس ایک تیسرا مسئلہ ہے۔ اب عام اسکیما معلوم نہیں ہے، مزید یہ کہ Hive میں موجود ٹیبل میں ایک غلط اسکیما ہے، کیونکہ ہر نئے پارٹیشن نے اسکیما میں تحریف کا امکان ظاہر کیا ہے۔

آپ کو ٹیبل کو دوبارہ رجسٹر کرنے کی ضرورت ہے۔ یہ آسانی سے کیا جا سکتا ہے: اسٹور فرنٹ کی لکڑی کو دوبارہ پڑھیں، اسکیما لیں اور اس کی بنیاد پر ایک DDL بنائیں، جس کے ساتھ Hive میں فولڈر کو ایک بیرونی ٹیبل کے طور پر دوبارہ رجسٹر کرنا، ہدف اسٹور فرنٹ کے اسکیما کو اپ ڈیٹ کرنا۔

ہمارا چوتھا مسئلہ ہے۔ جب ہم نے پہلی بار ٹیبل رجسٹر کیا تو ہم نے اسپارک پر انحصار کیا۔ اب ہم یہ خود کرتے ہیں، اور ہمیں یہ یاد رکھنے کی ضرورت ہے کہ پارکیٹ فیلڈز ایسے حروف سے شروع ہو سکتے ہیں جن کی Hive کے لیے اجازت نہیں ہے۔ مثال کے طور پر، اسپارک نے ایسی لکیریں پھینک دی ہیں جنہیں یہ "کرپٹ_ریکارڈ" فیلڈ میں پارس نہیں کر سکتی۔ اس طرح کے فیلڈ کو بچائے بغیر Hive میں رجسٹر نہیں کیا جا سکتا۔

یہ جان کر، ہمیں اسکیم ملتی ہے:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

کوڈ ("_کرپٹ_ریکارڈ"، "`_کرپٹ_ریکارڈ`") + "" + f[1]۔تبدیل کریں(":", "`:")۔تبدیل کریں("<", "<`")۔تبدیل کریں(",", "،`")) تبدیل کریں(" array<`", "array<") محفوظ DDL بناتا ہے، یعنی اس کے بجائے:

create table tname (_field1 string, 1field string)

فیلڈ کے ناموں جیسے "_field1، 1field" کے ساتھ، محفوظ DDL بنایا جاتا ہے جہاں فیلڈ کے نام بچ جاتے ہیں: ٹیبل `tname` (`_field1` string, `1field` string) بنائیں۔

سوال یہ پیدا ہوتا ہے: مکمل اسکیما (پی ایف کوڈ میں) کے ساتھ ڈیٹا فریم کو صحیح طریقے سے کیسے حاصل کیا جائے؟ یہ پی ایف کیسے حاصل کیا جائے؟ یہ پانچواں مسئلہ ہے۔ ٹارگٹ شوکیس کی پارکیٹ فائلوں کے ساتھ فولڈر سے تمام پارٹیشنز کی اسکیم کو دوبارہ پڑھیں؟ یہ طریقہ سب سے محفوظ ہے، لیکن مشکل ہے.

سکیما پہلے سے ہی Hive میں ہے۔ آپ پورے ٹیبل کے سکیما اور نئے پارٹیشن کو ملا کر ایک نیا سکیما حاصل کر سکتے ہیں۔ لہذا آپ کو Hive سے ٹیبل اسکیما لینے اور اسے نئے پارٹیشن کے اسکیما کے ساتھ جوڑنے کی ضرورت ہے۔ یہ Hive سے ٹیسٹ میٹا ڈیٹا کو پڑھ کر، اسے عارضی فولڈر میں محفوظ کرکے، اور دونوں پارٹیشنز کو ایک ساتھ پڑھنے کے لیے Spark کا استعمال کرکے کیا جا سکتا ہے۔

درحقیقت، آپ کی ضرورت کی ہر چیز موجود ہے: Hive میں اصل ٹیبل اسکیما اور نئی پارٹیشن۔ ہمارے پاس ڈیٹا بھی ہے۔ یہ صرف ایک نیا اسکیما حاصل کرنا باقی ہے جو اسٹور فرنٹ اسکیما اور تخلیق شدہ پارٹیشن سے نئے فیلڈز کو یکجا کرتا ہے:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

اگلا، ہم ٹیبل رجسٹریشن DDL بناتے ہیں، جیسا کہ پچھلے ٹکڑوں میں تھا۔
اگر پوری زنجیر صحیح طریقے سے کام کرتی ہے، یعنی، ایک ابتدائی بوجھ تھا، اور Hive میں ٹیبل صحیح طریقے سے بنایا گیا تھا، تو ہمیں ایک تازہ ترین ٹیبل اسکیما ملتا ہے۔

اور آخری مسئلہ یہ ہے کہ آپ صرف Hive ٹیبل میں پارٹیشن شامل نہیں کر سکتے، کیونکہ یہ ٹوٹ جائے گا۔ آپ کو Hive کو اس کی تقسیم کی ساخت کو ٹھیک کرنے کے لیے مجبور کرنے کی ضرورت ہے:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

JSON کو پڑھنے اور اس کی بنیاد پر ایک اسٹور فرنٹ بنانے کے آسان کام کے نتیجے میں متعدد مضمر مشکلات پر قابو پانا پڑتا ہے، جن کے حل کے لیے آپ کو الگ سے تلاش کرنا ہوگا۔ اور اگرچہ یہ حل آسان ہیں، لیکن انہیں تلاش کرنے میں کافی وقت لگتا ہے۔

شوکیس کی تعمیر کو نافذ کرنے کے لیے، مجھے یہ کرنا پڑا:

  • شوکیس میں پارٹیشنز شامل کریں، سروس فائلوں سے چھٹکارا حاصل کریں۔
  • سورس ڈیٹا میں خالی فیلڈز سے نمٹیں جو Spark نے ٹائپ کیا ہے۔
  • سٹرنگ میں سادہ اقسام کاسٹ کریں۔
  • فیلڈ کے نام کو چھوٹے حروف میں تبدیل کریں۔
  • Hive میں علیحدہ ڈیٹا اپ لوڈ اور ٹیبل رجسٹریشن (DDL جنریشن)
  • فیلڈ کے ناموں سے بچنا نہ بھولیں جو Hive سے مطابقت نہیں رکھتے
  • Hive میں ٹیبل رجسٹریشن کو اپ ڈیٹ کرنے کا طریقہ سیکھیں۔

خلاصہ کرتے ہوئے، ہم نوٹ کرتے ہیں کہ دکان کی کھڑکیاں بنانے کا فیصلہ بہت سے نقصانات سے بھرا ہوا ہے۔ لہذا، عمل درآمد میں مشکلات کی صورت میں، یہ بہتر ہے کہ کامیاب ماہر کے ساتھ تجربہ کار پارٹنر سے رابطہ کریں.

اس مضمون کو پڑھنے کے لیے آپ کا شکریہ، ہم امید کرتے ہیں کہ آپ کو معلومات مفید پائیں گی۔

ماخذ: www.habr.com

نیا تبصرہ شامل کریں