په عمل کې د سپارک سکیما ارتقا

ګرانو لوستونکو، ښه ورځ!

پدې مقاله کې ، د Neoflex لوی ډیټا حلونو سوداګرۍ ساحې لپاره مخکښ مشاور د اپاچي سپارک په کارولو سره د متغیر جوړښت پلورنځي رامینځته کولو لپاره توضیحي اختیارونه بیانوي.

د ډیټا تحلیلي پروژې د یوې برخې په توګه، ډیری وختونه د منظم جوړښت شوي معلوماتو پراساس د نمایشونو رامینځته کولو دنده رامینځته کیږي.

عموما دا لاګونه دي، یا د مختلفو سیسټمونو ځوابونه، د JSON یا XML په بڼه خوندي شوي. ډاټا هډوپ ته اپلوډ کیږي، بیا د پلورنځي فرنټ ته اړتیا لري چې له هغې څخه جوړ شي. موږ کولی شو جوړ شوي پلورنځي ته لاسرسی تنظیم کړو ، د مثال په توګه ، د امپالا له لارې.

پدې حالت کې ، د هدف پلورنځي سکیما دمخه نه پیژندل کیږي. سربیره پردې، سکیم هم مخکې له مخکې نه جوړیږي، ځکه چې دا په ډیټا پورې اړه لري، او موږ د دې خورا نرم جوړښت شوي ډیټا سره معامله کوو.

د مثال په توګه، نن ورځ لاندې ځواب ثبت شوی دی:

{source: "app1", error_code: ""}

او سبا د ورته سیسټم څخه لاندې ځواب راځي:

{source: "app1", error_code: "error", description: "Network error"}

د پایلې په توګه، یو بل ساحه باید په نندارتون کې اضافه شي - توضیحات، او هیڅوک نه پوهیږي چې دا به راشي یا نه.

په داسې معلوماتو کې د مارټ رامینځته کولو دنده خورا معیاري ده ، او سپارک د دې لپاره یو شمیر وسیلې لري. د سرچینې ډیټا پارس کولو لپاره ، د JSON او XML دواړو لپاره ملاتړ شتون لري ، او د پخوانۍ نامعلومې سکیما لپاره ، د سکیما ارتقا ملاتړ چمتو شوی.

په لومړي نظر کې، حل ساده ښکاري. تاسو اړتیا لرئ فولډر د JSON سره واخلئ او په ډیټا فریم کې یې ولولئ. سپارک به یو سکیما رامینځته کړي او د ځړول شوي ډاټا جوړښتونو ته واړوي. بیا ، هرڅه باید په پارکیټ کې خوندي شي ، کوم چې په امپالا کې هم ملاتړ کیږي ، په Hive میټاسټور کې د پلورنځي فرنټ ثبتولو سره.

هر څه ساده ښکاري.

په هرصورت، په اسنادو کې د لنډو مثالونو څخه دا روښانه نده چې په عمل کې د یو شمیر ستونزو سره څه وکړي.

اسناد یوه طریقه بیانوي چې نه د پلورنځي رامینځته کولو لپاره ، مګر په ډیټا فریم کې د JSON یا XML لوستلو لپاره.

د مثال په توګه ، دا په ساده ډول ښیې چې څنګه د JSON لوستل او تجزیه کول:

df = spark.read.json(path...)

دا د سپارک لپاره د معلوماتو چمتو کولو لپاره کافي دي.

په عمل کې، سناریو یوازې د فولډر څخه د JSON فایلونو لوستلو او د ډیټا فریم جوړولو په پرتله خورا پیچلې ده. وضعیت داسې ښکاري: دلته دمخه یو ټاکلی نندارتون شتون لري ، هره ورځ نوي معلومات راځي ، دوی باید په نندارتون کې اضافه شي ، دا مه هیروئ چې سکیم ممکن توپیر ولري.

د پلورنځي د جوړولو لپاره معمول سکیم په لاندې ډول دی:

1 پړاو. ډیټا په هاډوپ کې د ورځني بیا بارولو سره بار شوي او نوي برخې ته اضافه کیږي. دا د ورځې لخوا ویشل شوي لومړني ډیټا سره فولډر رامینځته کوي.

2 پړاو. د لومړني بار په جریان کې، دا فولډر د سپارک لخوا لوستل کیږي او تجزیه کیږي. پایله لرونکی ډیټا فریم په پارس وړ شکل کې خوندي شوی ، د مثال په توګه ، په پارکیټ کې ، کوم چې بیا امپالا ته وارد کیدی شي. دا د ټولو معلوماتو سره د هدف نمایش رامینځته کوي چې تر دې وخته پورې راټول شوي.

3 پړاو. یو ډاونلوډ رامینځته شوی چې هره ورځ به د پلورنځي تازه کوي.
د زیاتیدونکي بار کولو پوښتنه راپورته کیږي ، د سټور فرنټ ویشلو اړتیا ، او د پلورنځي عمومي ترتیب ملاتړ کولو پوښتنه.

راځئ چې یو مثال ورکړو. راځئ چې ووایو د ذخیره جوړولو لومړی ګام پلي شوی ، او فولډر ته د JSON فایلونو اپلوډ کول تنظیم شوي.

له دوی څخه د ډیټا فریم رامینځته کول ، بیا یې د نندارې په توګه خوندي کول ، کومه ستونزه نده. دا خورا لومړی ګام دی چې په اسانۍ سره د سپارک اسنادو کې موندل کیدی شي:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

هرڅه سم ښکاري.

موږ JSON لوستل او تجزیه کوو ، بیا موږ ډیټا فریم د پارکیټ په توګه خوندي کوو ، په Hive کې یې په هر مناسب ډول ثبت کوو:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

موږ یوه کړکۍ ترلاسه کوو.

مګر، بله ورځ، د سرچینې څخه نوي معلومات اضافه شوي. موږ د JSON سره یو فولډر لرو، او د دې فولډر څخه جوړ شوی شوکیس. د سرچینې څخه د ډیټا راتلونکي بسته کولو وروسته ، د ډیټا مارټ د یوې ورځې ارزښت لرونکی ډیټا له لاسه ورکوي.

یو منطقي حل به د ورځې لخوا د پلورنځي ویشل وي، کوم چې تاسو ته اجازه درکوي چې هره ورځ یوه نوې برخه اضافه کړئ. د دې لپاره میکانیزم هم ښه پیژندل شوی؛ سپارک تاسو ته اجازه درکوي په جلا توګه ویشونه ثبت کړئ.

لومړی، موږ ابتدايي بار کول ترسره کوو، د معلوماتو خوندي کول لکه څنګه چې پورته بیان شوي، یوازې د ویشلو اضافه کول. دې عمل ته د پلورنځي ابتدایي ویل کیږي او یوازې یو ځل ترسره کیږي:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

بله ورځ، موږ یوازې یو نوی ویش پورته کوو:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

ټول هغه څه چې پاتې دي د سکیما تازه کولو لپاره په Hive کې بیا راجستر کول دي.
په هرصورت، دا هغه ځای دی چې ستونزې رامنځته کیږي.

لومړۍ ستونزه. ژر یا وروسته، پایله لرونکی پارکیټ به د لوستلو وړ نه وي. دا د دې له امله دی چې څنګه پارکیټ او JSON خالي ساحې په مختلف ډول چلند کوي.

راځئ چې یو عادي حالت په پام کې ونیسو. د مثال په توګه، پرون JSON راځي:

День 1: {"a": {"b": 1}},

او نن ورځ ورته JSON داسې ښکاري:

День 2: {"a": null}

راځئ چې ووایو موږ دوه مختلف برخې لرو، هر یو د یوې کرښې سره.
کله چې موږ د سرچینې ټول معلومات ولولو، سپارک به د دې وړتیا ولري چې ډول وټاکي، او به پوه شي چې "a" د ډول "ساختمان" ساحه ده، د INT ډوله "b" د ځړول شوي ساحې سره. مګر، که هر ویش په جلا توګه خوندي شوی وي، نو پایله یې د برخې برخې سره د نامناسب سکیمونو سره یو پارک دی:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

دا وضعیت ښه پیژندل شوی، نو یو اختیار په ځانګړې توګه د خالي ساحو لرې کولو لپاره اضافه شوی کله چې د سرچینې ډاټا تجزیه کوي:

df = spark.read.json("...", dropFieldIfAllNull=True)

په دې حالت کې، پارکیټ به د برخو څخه جوړ وي چې یوځای لوستل کیدی شي.
که څه هم هغه چا چې په عمل کې دا کار کړی وي په خندا به خندا کوي. ولې؟ هو، ځکه چې ډیری احتمال به دوه نور حالتونه رامینځته شي. یا درې. یا څلور. لومړی، کوم چې تقریبا ډاډه دی، دا دی چې شمیرې ډولونه به په مختلفو JSON فایلونو کې مختلف ښکاري. د مثال په توګه، {intField: 1} او {intField: 1.1}. که چیرې دا ډول ساحې په یوه بسته کې ښکاره شي، نو د سکیما ضمیمه به هر څه په سمه توګه ولولي، چې خورا دقیق ډول ته الرښوونه کوي. مګر که په مختلفو کې وي، نو یو به intField: int ولري، او بل به intField: double ولري.

د دې وضعیت اداره کولو لپاره لاندې بیرغ شتون لري:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

اوس موږ یو فولډر لرو چیرې چې پارټیشنونه شتون لري چې په یو واحد ډیټا فریم کې لوستل کیدی شي او د ټول شوکیس یو معتبر پارکیټ. هو؟ نه.

موږ باید په یاد ولرو چې موږ میز په Hive کې ثبت کړی. Hive د ساحې په نومونو کې قضیه حساس نه ده، مګر پارکیټ دی. نو ځکه، د سکیما سره ویشونه: field1: int، او Field1: int د Hive لپاره یو شان دي، مګر د سپارک لپاره نه. د ساحې نومونه په ټیټ کیس کې بدلول مه هیروئ.

له هغې وروسته، هرڅه سم ښکاري.

په هرصورت، ټول دومره ساده ندي. بله، هم پیژندل شوې ستونزه رامنځته کیږي. څرنګه چې هر نوی ویش په جلا توګه خوندي شوی، د ویش فولډر به د سپارک خدماتو فایلونه ولري، د بیلګې په توګه، د _SUCCESS عملیات بریالیتوب بیرغ. دا به د یوې تېروتنې پایله وي کله چې د پارکیټ هڅه کول. د دې څخه د مخنیوي لپاره، تاسو اړتیا لرئ چې فولډر ته د خدماتو فایلونو اضافه کولو څخه د سپارک مخه ونیسئ:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

داسې بریښي چې اوس هره ورځ د هدف پلورنځي فرنټ فولډر کې د پارکیټ نوې برخه اضافه کیږي ، چیرې چې د ورځې لپاره پارس شوي ډاټا موقعیت لري. موږ دمخه پاملرنه وکړه ترڅو ډاډ ترلاسه کړو چې د ډیټا ډول شخړو سره هیڅ ډول ویش شتون نلري.

مګر، موږ دریمه ستونزه لرو. اوس عمومي سکیما نه ده معلومه، برسېره پردې، په Hive کې جدول غلط سکیما لري، ځکه چې هره نوې برخه ډیری احتمال په سکیما کې تحریف معرفي کوي.

تاسو اړتیا لرئ چې میز بیا ثبت کړئ. دا په ساده ډول ترسره کیدی شي: د پلورنځي پارکیټ بیا ولولئ ، سکیما واخلئ او د هغې پراساس DDL رامینځته کړئ ، له دې سره په Hive کې فولډر د بهرني میز په توګه راجستر کړئ ، د هدف پلورنځي سکیما تازه کول.

موږ څلورمه ستونزه لرو. کله چې موږ د لومړي ځل لپاره میز ثبت کړ، موږ په سپارک تکیه وکړه. اوس موږ دا پخپله کوو، او موږ باید په یاد ولرو چې د پارکیټ ساحې د هغو حروفونو سره پیل کیدی شي چې د Hive لپاره اجازه نلري. د مثال په توګه، سپارک لینونه غورځوي چې دا نشي کولی د "کرپټ_ریکارډ" ساحه کې تجزیه کړي. دا ډول ساحه پرته له تیښتې په Hive کې راجستر کیدی نشي.

د دې په پوهیدو سره، موږ سکیم ترلاسه کوو:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

کوډ ("_corrupt_record", "`_corrupt_record`") + "" + f[1]. ځای په ځای کول(":", "`:"). ځای په ځای کول("<", "<`").بدلول(",", ",`") بدل کړئ(" array<`", "array<") خوندي DDL کوي، دا د دې پرځای:

create table tname (_field1 string, 1field string)

د ساحې نومونو لکه "_field1، 1field" سره، یو خوندي DDL رامینځته کیږي چیرې چې د ساحې نومونه تښتیدلي دي: جدول `tname` (`_field1` string, `1field` string) جوړ کړئ.

پوښتنه راپورته کیږي: څنګه په سمه توګه د بشپړ سکیما سره ډیټا فریم ترلاسه کړئ (په pf کوډ کې)؟ دا پی ایف څنګه ترلاسه کړو؟ دا پنځمه ستونزه ده. د هدف پلورنځي فرنټ د پارکیټ فایلونو سره د فولډر څخه د ټولو برخو ډیاګرام بیا ولولئ؟ دا طریقه ترټولو خوندي، مګر ستونزمنه ده.

سکیما لا دمخه په Hive کې ده. تاسو کولی شئ د ټول جدول سکیما او نوي ویش سره یوځای کولو سره نوې سکیما ترلاسه کړئ. دا پدې مانا ده چې تاسو اړتیا لرئ د میز سکیما له Hive څخه واخلئ او د نوي برخې سکیما سره یې یوځای کړئ. دا د Hive څخه د ټیسټ میټاډاټا لوستلو سره ترسره کیدی شي ، په لنډمهاله فولډر کې یې خوندي کړئ ، او د سپارک په کارولو سره دواړه برخې په یوځل لوستلو سره.

په لازمي ډول ، دلته هرڅه شتون لري چې تاسو ورته اړتیا لرئ: په Hive کې اصلي میز سکیما او یو نوی تقسیم. موږ هم معلومات لرو. ټول هغه څه چې پاتې دي د نوي سکیما ترلاسه کول دي چې د پلورنځي سکیما او نوي ساحې له رامینځته شوي برخې څخه ترکیب کوي:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

بیا، موږ د جدول ثبتولو DDL جوړوو، لکه څنګه چې په تیرو ټوټو کې.
که ټوله سلسله په سمه توګه کار وکړي، یعنې، یو ابتدايي بار و، او جدول په سمه توګه په Hive کې جوړ شوی و، نو موږ د تازه جدول سکیما ترلاسه کوو.

او وروستۍ ستونزه دا ده چې تاسو نشئ کولی یوازې د Hive میز کې برخه اضافه کړئ، ځکه چې دا به مات شي. تاسو اړتیا لرئ Hive مجبور کړئ چې د هغې د ویش جوړښت تنظیم کړئ:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

د JSON لوستلو ساده دنده او د دې پراساس د پلورنځي رامینځته کول د یو شمیر ضمني ستونزو د بریالي کیدو پایله کوي ، هغه حلونه چې تاسو یې په جلا توګه ګورئ. او که څه هم دا حلونه ساده دي، دا د دوی موندلو لپاره ډیر وخت نیسي.

د نندارې د جوړولو د پلي کولو لپاره، ما باید:

  • د پلورنځي فرنټ ته برخې اضافه کړئ ، د خدماتو فایلونو څخه خلاصیدل
  • د سرچینې ډیټا کې د خالي ساحو سره معامله وکړئ چې سپارک ټایپ کړی
  • ساده ډولونه تار ته واچوئ
  • د ساحې نومونه کوچني ته بدل کړئ
  • په Hive کې د ډیټا جلا کول او د میز ثبت کول (DDL نسل)
  • د ساحې نومونو څخه تښتیدل مه هیروئ کوم چې ممکن د Hive سره مطابقت نلري
  • په Hive کې د میز راجسټریشن تازه کول زده کړئ

په لنډیز سره، موږ یادونه کوو چې د هټۍ د کړکیو جوړولو پریکړه د ډیری زیانونو څخه ډکه ده. له همدې امله، په پلي کولو کې د ستونزو په صورت کې، دا غوره ده چې د بریالي متخصص سره د تجربه لرونکي ملګري سره اړیکه ونیسئ.

د دې مقالې لوستلو لپاره مننه ، موږ امید لرو چې تاسو معلومات ګټور ومومئ.

سرچینه: www.habr.com

Add a comment