عمل ۾ اسپارڪ اسڪيما ارتقا

پيارا پڙهندڙ، سٺو ڏينهن!

هن آرٽيڪل ۾، Neoflex جي بگ ڊيٽا حلن جي ڪاروباري علائقي جو معروف صلاحڪار تفصيل سان بيان ڪري ٿو اپاچي اسپارڪ استعمال ڪندي متغير ڍانچي جي نمائش لاءِ اختيارن کي.

ڊيٽا جي تجزيي جي منصوبي جي حصي جي طور تي، اسٽوري فرنٽ جي تعمير جو ڪم لوز سان ٺهيل ڊيٽا جي بنياد تي اڪثر پيدا ٿئي ٿو.

عام طور تي اهي لاگ آهن، يا مختلف سسٽم مان جواب، JSON يا XML طور محفوظ ٿيل آهن. ڊيٽا Hadoop تي اپلوڊ ڪئي وئي آهي، پوء توهان کي انهن مان هڪ اسٽور فرنٽ ٺاهڻ جي ضرورت آهي. اسان ٺاهيل شوڪيس تائين رسائي کي منظم ڪري سگهون ٿا، مثال طور، Impala ذريعي.

انهي حالت ۾، ٽارگيٽ اسٽور جي اسڪيما اڳ ۾ معلوم نه آهي. ان کان علاوه، اسڪيم پڻ اڳ ۾ تيار نه ٿي ڪري سگھجي، ڇاڪاڻ ته اهو ڊيٽا تي منحصر آهي، ۽ اسان انهن تمام گهڻي ترتيب واري ڊيٽا سان معاملو ڪري رهيا آهيون.

مثال طور، اڄ هيٺ ڏنل جواب لاگ ٿيل آهي:

{source: "app1", error_code: ""}

۽ سڀاڻي ساڳئي سسٽم مان ايندڙ جواب اچي ٿو:

{source: "app1", error_code: "error", description: "Network error"}

نتيجي طور، ھڪڙو وڌيڪ فيلڊ شامل ڪيو وڃي شوڪيس - تفصيل، ۽ ڪو به نٿو ڄاڻي ته اھو ايندو يا نه.

اهڙي ڊيٽا تي اسٽور فرنٽ ٺاهڻ جو ڪم تمام معياري آهي، ۽ اسپارڪ ان لاءِ ڪيترائي اوزار آهن. ماخذ ڊيٽا کي پارس ڪرڻ لاءِ، JSON ۽ XML ٻنهي لاءِ سپورٽ آهي، ۽ اڳئين اڻڄاتل اسڪيما لاءِ، اسڪيما ارتقاءَ لاءِ سپورٽ مهيا ڪئي وئي آهي.

پهرين نظر ۾، حل سادو نظر اچي ٿو. توهان کي JSON سان گڏ فولڊر وٺڻو پوندو ۽ ان کي ڊيٽا فريم ۾ پڙهڻو پوندو. اسپارڪ هڪ اسڪيما ٺاهيندو، اندر ٿيل ڊيٽا کي جوڙجڪ ۾ ڦيرايو. ان کان علاوه، هر شيء کي parquet ۾ محفوظ ڪرڻ جي ضرورت آهي، جيڪا پڻ Impala ۾ سپورٽ ڪئي وئي آهي، Hive metastore ۾ اسٽور فرنٽ کي رجسٽر ڪندي.

سڀ ڪجھ سادو لڳي ٿو.

بهرحال، اهو واضح ناهي ته دستاويز ۾ مختصر مثالن مان ڇا ڪجي عملي طور تي ڪيترن ئي مسئلن سان.

دستاويز بيان ڪري ٿو هڪ طريقي سان اسٽور فرنٽ ٺاهڻ لاءِ نه، پر JSON يا XML کي ڊيٽا فريم ۾ پڙهڻ لاءِ.

يعني، اهو صرف ڏيکاري ٿو ته JSON کي ڪيئن پڙهڻ ۽ پارس ڪجي:

df = spark.read.json(path...)

اھو ڪافي آھي ڊيٽا کي اسپارڪ تائين پھچائڻ لاءِ.

عملي طور تي، اسڪرپٽ صرف هڪ فولڊر مان JSON فائلون پڙهڻ ۽ ڊيٽا فريم ٺاهڻ کان وڌيڪ پيچيده آهي. صورتحال هن طرح ڏسڻ ۾ اچي ٿي: اتي اڳ ۾ ئي هڪ خاص اسٽور فرنٽ آهي، هر روز نئين ڊيٽا اچي ٿي، انهن کي اسٽور فرنٽ ۾ شامل ڪرڻ جي ضرورت آهي، نه وساريو ته اسڪيم مختلف ٿي سگهي ٿي.

شوڪيس جي تعمير لاء معمولي اسڪيم هن ريت آهي:

1 قدم. ڊيٽا بعد ۾ روزاني ٻيهر لوڊ ڪرڻ سان Hadoop ۾ لوڊ ڪيو ويو آهي ۽ نئين ورهاڱي ۾ شامل ڪيو ويو آهي. اهو هڪ فولڊر ڪڍي ٿو شروعاتي ڊيٽا سان گڏ ڏينهن جي ورهاڱي سان.

2 قدم. شروعاتي لوڊ دوران، هي فولڊر اسپارڪ طرفان پڙهي ۽ پارس ڪيو ويندو آهي. نتيجو وارو ڊيٽا فريم هڪ پارسيبل فارميٽ ۾ محفوظ ڪيو ويو آهي، مثال طور، پارڪ ۾، جنهن کي پوءِ امپالا ۾ درآمد ڪري سگهجي ٿو. هي ٺاهي ٿو هڪ ٽارگيٽ شوڪيس سڀني ڊيٽا سان جيڪو هن نقطي تائين گڏ ڪيو آهي.

3 قدم. هڪ ڊائون لوڊ ٺاهيو ويو آهي جيڪو هر روز اسٽور فرنٽ کي اپڊيٽ ڪندو.
هتي هڪ سوال آهي وڌندڙ لوڊشيڊنگ، شوڪيس کي ورهاڱي جي ضرورت، ۽ شوڪيس جي عام اسڪيم کي برقرار رکڻ جو سوال.

اچو ته هڪ مثال وٺون. اچو ته چوندا آهن ته هڪ مخزن جي تعمير جو پهريون قدم لاڳو ڪيو ويو آهي، ۽ JSON فائلون فولڊر تي اپ لوڊ ڪيون ويون آهن.

انهن مان ڊيٽا فريم ٺاهڻ، پوءِ ان کي شوڪيس طور محفوظ ڪرڻ، ڪو مسئلو ناهي. اهو تمام پهريون قدم آهي جيڪو آساني سان ڳولي سگهجي ٿو اسپارڪ دستاويزن ۾:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

سڀ ڪجھ ٺيڪ ٿيڻ لڳي.

اسان JSON کي پڙهيو ۽ پارس ڪيو، پوءِ اسان ڊيٽا فريم کي پارڪٽ طور محفوظ ڪري، ان کي Hive ۾ ڪنهن به آسان طريقي سان رجسٽر ڪريون:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

اسان کي هڪ ونڊو ملي.

پر، ٻئي ڏينهن، ذريعن کان نئين ڊيٽا شامل ڪئي وئي. اسان وٽ JSON سان گڏ هڪ فولڊر آهي، ۽ هن فولڊر مان ٺهيل هڪ شوڪيس. ذريعن کان ڊيٽا جي ايندڙ بيچ کي لوڊ ڪرڻ کان پوء، ڊيٽا مارٽ هڪ ڏينهن جي قيمتي ڊيٽا غائب آهي.

منطقي حل اهو هوندو ته اسٽور فرنٽ کي ورهاڱي جي ڏينهن ۾، جيڪو هر ايندڙ ڏينهن ۾ نئين ورهاڱي کي شامل ڪرڻ جي اجازت ڏيندو. هن لاء ميڪانيزم پڻ مشهور آهي، اسپارڪ توهان کي الڳ الڳ ورهاڱي لکڻ جي اجازت ڏئي ٿي.

پهريون، اسان هڪ ابتدائي لوڊ ڪندا آهيون، مٿي بيان ڪيل ڊيٽا کي محفوظ ڪندي، صرف ورهاڱي کي شامل ڪندي. هن عمل کي سڏيو ويندو آهي اسٽور فرنٽ جي شروعات ۽ صرف هڪ ڀيرو ڪيو ويندو آهي:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

ٻئي ڏينهن، اسان صرف هڪ نئون ورهاڱي لوڊ ڪريون ٿا:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

باقي اهو آهي ته اسڪيما کي اپڊيٽ ڪرڻ لاءِ Hive ۾ ٻيهر رجسٽر ٿيو.
بهرحال، اهو آهي جتي مسئلا پيدا ٿين ٿا.

پهريون مسئلو. جلد يا بعد ۾، نتيجي ۾ parquet اڻ پڙهيل ٿيندو. اهو ئي سبب آهي ته ڪيئن پارڪ ۽ JSON خالي شعبن کي مختلف طريقي سان علاج ڪن ٿا.

اچو ته هڪ عام صورتحال تي غور ڪريو. مثال طور، ڪالهه JSON اچي ٿو:

День 1: {"a": {"b": 1}},

۽ اڄ ساڳيو JSON هن طرح نظر اچي ٿو:

День 2: {"a": null}

اچو ته اسان وٽ ٻه مختلف ڀاڱا آهن، هر هڪ هڪ لائين سان.
جڏهن اسان سڄو ماخذ ڊيٽا پڙهون ٿا، اسپارڪ قسم جو تعين ڪرڻ جي قابل ٿي ويندو، ۽ سمجهي سگهندو ته "a" قسم جي "ڍانچي" جو هڪ فيلڊ آهي، جنهن ۾ INT قسم جي هڪ nested فيلڊ "b" آهي. پر، جيڪڏهن هر ورهاڱي کي الڳ الڳ محفوظ ڪيو ويو، ته پوء اسان هڪ پارڪر حاصل ڪريون ٿا جنهن سان غير مطابقت واري ورهاڱي اسڪيمن سان:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

اها صورتحال چڱي طرح ڄاڻايل آهي، تنهنڪري هڪ اختيار خاص طور تي شامل ڪيو ويو آهي - جڏهن ماخذ ڊيٽا کي پارس ڪندي، خالي خانن کي هٽايو:

df = spark.read.json("...", dropFieldIfAllNull=True)

انهي حالت ۾، پارڪ ۾ ورهاڱي تي مشتمل هوندي جيڪا گڏ پڙهي سگهجي ٿي.
جيتوڻيڪ جن عملي طور اهو ڪيو آهي، اهي هتي بيحد مسڪرائيندا. ڇو؟ ها، ڇاڪاڻ ته اتي ٻه وڌيڪ حالتون ٿيڻ جو امڪان آهي. يا ٽي. يا چار. پهريون، جيڪو لڳ ڀڳ ضرور ٿيندو، اهو آهي ته عددي قسم جا مختلف JSON فائلن ۾ مختلف نظر ايندا. مثال طور، {intField: 1} ۽ {intField: 1.1}. جيڪڏهن اهڙا شعبا هڪ ورهاڱي ۾ مليا آهن، ته پوءِ اسڪيما مرج هر شي کي صحيح طريقي سان پڙهندو، سڀ کان وڌيڪ صحيح قسم ڏانهن. پر جيڪڏهن مختلف ۾، پوء هڪ وٽ هوندو intField: int، ۽ ٻئي وٽ هوندو intField: double.

ھن صورتحال کي سنڀالڻ لاء ھيٺ ڏنل پرچم آھي:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

ھاڻي اسان وٽ ھڪڙو فولڊر آھي جتي پارٽيشنون آھن جيڪي ھڪڙي ڊيٽا فريم ۾ پڙھي سگھجن ٿيون ۽ پوري شوڪيس جي صحيح پارڪ. ها؟ نه.

اسان کي ياد رکڻ گهرجي ته اسان Hive ۾ ٽيبل رجسٽرڊ ڪيو. Hive فيلڊ جي نالن ۾ ڪيس حساس نه آهي، جڏهن ته parquet ڪيس حساس آهي. تنهن ڪري، اسڪيما سان ورهاڱي: field1: int، ۽ Field1: int Hive لاءِ ساڳيا آهن، پر اسپارڪ لاءِ نه. فيلڊ جي نالن کي لوئر ڪيس ۾ تبديل ڪرڻ نه وساريو.

ان کان پوء، هر شيء ٺيڪ ٿي لڳي.

بهرحال، تمام سادو ناهي. اتي هڪ ٻيو، پڻ معروف مسئلو آهي. جيئن ته هر نئين ورهاڱي کي الڳ الڳ محفوظ ڪيو ويو آهي، ورهاڱي واري فولڊر ۾ اسپارڪ سروس فائلون شامل هونديون، مثال طور، _SUCCESS آپريشن ڪامياب پرچم. اهو هڪ غلطي جي نتيجي ۾ ٿيندو جڏهن پارڪ ڪرڻ جي ڪوشش ڪندي. هن کان بچڻ لاء، توهان کي اسپارڪ کي فولڊر ۾ سروس فائلون شامل ڪرڻ کان روڪڻ لاء ترتيب ترتيب ڏيڻ جي ضرورت آهي:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

اهو لڳي ٿو ته هاڻي هر روز هڪ نئون پارڪٽ ورهاڱي ٽارگيٽ شوڪيس فولڊر ۾ شامل ڪيو ويو آهي، جتي ڏينهن لاء پارس ٿيل ڊيٽا واقع آهي. اسان اڳ ۾ ئي خيال رکيو ته ڊيٽا جي قسم جي تڪرار سان ڪو به ورهاڱو نه هو.

پر، اسان وٽ ٽيون مسئلو آهي. هاڻي عام اسڪيما معلوم نه آهي، ان کان علاوه، Hive ۾ ٽيبل هڪ غلط اسڪيما آهي، ڇاڪاڻ ته هر نئين ورهاڱي گهڻو ڪري اسڪيما ۾ هڪ تحريف متعارف ڪرايو آهي.

توهان کي ٽيبل کي ٻيهر رجسٽر ڪرڻ جي ضرورت آهي. اهو آساني سان ڪري سگهجي ٿو: اسٽور فرنٽ جي پارڪ کي ٻيهر پڙهو، اسڪيما وٺو ۽ ان جي بنياد تي هڪ ڊي ڊي ايل ٺاهيو، جنهن سان فولڊر کي ٻيهر رجسٽر ڪرڻ لاءِ Hive ۾ هڪ خارجي ٽيبل جي طور تي، ٽارگيٽ اسٽور فرنٽ جي اسڪيما کي اپڊيٽ ڪندي.

اسان وٽ چوٿون مسئلو آهي. جڏهن اسان پهريون ڀيرو ٽيبل رجسٽرڊ ڪيو، اسان اسپارڪ تي ڀروسو ڪيو. هاڻي اسان اهو پاڻ ڪندا آهيون، ۽ اسان کي ياد رکڻ جي ضرورت آهي ته پارڪٽ فيلڊ ڪردارن سان شروع ٿي سگهن ٿيون جيڪي Hive لاء اجازت نه آهن. مثال طور، اسپارڪ لڪير ڪڍي ٿو ته اهو "corrupt_record" فيلڊ ۾ پارس نٿو ڪري سگهي. اهڙي فيلڊ کي هٽائڻ کان سواءِ Hive ۾ رجسٽر نٿو ڪري سگهجي.

اهو ڄاڻڻ، اسان کي اسڪيم ملي ٿو:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

ڪوڊ ("_corrupt_record", "` _corrupt_record`") + "" + f[1]. بدلايو(":", "`:").متبادل("<", "<`").تبديل(",", "،`"). بدلايو(" array<`"، "array<") محفوظ ڊي ڊي ايل ٺاهي ٿو، يعني بدران:

create table tname (_field1 string, 1field string)

فيلڊ نالن سان "_field1، 1field"، محفوظ DDL ٺاهيو ويندو آهي جتي فيلڊ جا نالا بچيا ويندا آهن: ٽيبل ٺاهيو `tname` (`_field1` string، `1field` string).

سوال پيدا ٿئي ٿو: مڪمل اسڪيما سان (پي ايف ڪوڊ ۾) ڊيٽا فريم کي صحيح طريقي سان ڪيئن حاصل ڪجي؟ هي پي ايف ڪيئن حاصل ڪجي؟ هي پنجون مسئلو آهي. فولڊر مان سڀني ورهاڱي جي اسڪيم کي ٻيهر پڙهو parquet فائلن سان ٽارگيٽ شوڪيس؟ اهو طريقو محفوظ آهي، پر ڏکيو.

اسڪيما اڳ ۾ ئي Hive ۾ آهي. توھان حاصل ڪري سگھو ٿا ھڪڙو نئون اسڪيما سڄي ٽيبل جي اسڪيما ۽ نئين ورهاڱي کي گڏ ڪندي. تنهن ڪري توهان کي گهرجي ته ٽيبل اسڪيما Hive مان وٺو ۽ ان کي نئين ورهاڱي جي اسڪيما سان گڏ ڪريو. اهو ٿي سگهي ٿو Hive مان ٽيسٽ ميٽاڊيٽا پڙهڻ، ان کي عارضي فولڊر ۾ محفوظ ڪرڻ، ۽ اسپارڪ استعمال ڪندي ٻنهي حصن کي هڪ ئي وقت پڙهڻ لاءِ.

حقيقت ۾، اتي هر شيء آهي جنهن جي توهان کي ضرورت آهي: Hive ۾ اصل ٽيبل اسڪيما ۽ نئين ورهاڱي. اسان وٽ پڻ ڊيٽا آهي. اهو صرف هڪ نئون اسڪيما حاصل ڪرڻ لاءِ رهي ٿو جيڪو ٺهيل ورهاڱي مان اسٽور فرنٽ اسڪيما ۽ نئين فيلڊ کي گڏ ڪري ٿو:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

اڳيون، اسان ٺاھيون ٿا ٽيبل رجسٽريشن DDL، جيئن اڳئين ٽڪڙي ۾.
جيڪڏهن سڄي زنجير صحيح طريقي سان ڪم ڪري ٿي، يعني، اتي هڪ شروعاتي لوڊ هو، ۽ ٽيبل صحيح طور تي Hive ۾ ٺاهي وئي، پوء اسان هڪ تازه ڪاري ٽيبل اسڪيما حاصل ڪندا آهيون.

۽ آخري مسئلو اهو آهي ته توهان صرف هڪ ورهاڱي کي Hive ٽيبل ۾ شامل نه ٿا ڪري سگهو، ڇاڪاڻ ته اهو ڀڄي ويندو. توهان کي ان جي ورهاڱي جي جوڙجڪ کي درست ڪرڻ لاء Hive کي مجبور ڪرڻو پوندو:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

JSON پڙهڻ جو سادو ڪم ۽ ان جي بنياد تي هڪ اسٽور فرنٽ ٺاهڻ جو نتيجو ڪيترن ئي ضمني مشڪلاتن تي غالب اچي ٿو، حل جن لاءِ توهان کي الڳ الڳ ڳولڻو پوندو. ۽ جيتوڻيڪ اهي حل سادو آهن، اهو انهن کي ڳولڻ لاء گهڻو وقت وٺندو آهي.

شوڪيس جي تعمير کي لاڳو ڪرڻ لاء، مون کي ڪرڻو پوندو:

  • شوڪيس ۾ پارٽيشن شامل ڪريو، سروس فائلن کان نجات حاصل ڪرڻ
  • ماخذ ڊيٽا ۾ خالي فيلڊن سان ڊيل ڪريو جيڪي اسپارڪ ٽائپ ڪيا آهن
  • سادي قسمن کي اسٽرنگ ڏانهن وڌايو
  • فيلڊ جي نالن کي ننڍي ۾ تبديل ڪريو
  • Hive ۾ الڳ ڊيٽا اپ لوڊ ۽ ٽيبل رجسٽريشن (DDL نسل)
  • فيلڊ نالن کان بچڻ نه وساريو جيڪي شايد Hive سان مطابقت نه رکن
  • Hive ۾ ٽيبل جي رجسٽريشن کي ڪيئن تازه ڪاري ڪجي سکو

خلاصو، اسان ياد رکون ٿا ته دڪان جي ونڊوز ٺاهڻ جو فيصلو ڪيترن ئي نقصانن سان ڀريل آهي. تنهن ڪري، عمل ۾ مشڪلاتن جي صورت ۾، اهو بهتر آهي ته ڪامياب ماهر سان هڪ تجربيڪار پارٽنر سان رابطو ڪريو.

هن مضمون پڙهڻ لاء توهان جي مهرباني، اسان کي اميد آهي ته توهان کي معلومات مفيد ملندي.

جو ذريعو: www.habr.com

تبصرو شامل ڪريو