Spark schemaEvolution በተግባር

ውድ አንባቢዎች, መልካም ቀን!

በዚህ ጽሑፍ ውስጥ፣ የNeoflex's Big Data Solutions የንግድ አካባቢ ዋና አማካሪ Apache Spark ን በመጠቀም ተለዋዋጭ መዋቅራዊ ማሳያዎችን የመገንባት አማራጮችን በዝርዝር ይገልፃል።

እንደ የውሂብ መመርመሪያ ፕሮጀክት አካል፣ ልቅ በሆነ የተዋቀረ መረጃ ላይ በመመስረት የመደብር ፊት የመገንባት ተግባር ብዙውን ጊዜ ይነሳል።

ብዙውን ጊዜ እነዚህ እንደ JSON ወይም XML የተቀመጡ ምዝግብ ማስታወሻዎች ወይም ከተለያዩ ስርዓቶች የተሰጡ ምላሾች ናቸው። ውሂቡ ወደ Hadoop ተሰቅሏል፣ ከዚያ የሱቅ ፊት ለፊት ከነሱ መገንባት ያስፈልግዎታል። የተፈጠረውን ማሳያ መዳረሻ ለምሳሌ በኢምፓላ በኩል ማደራጀት እንችላለን።

በዚህ ሁኔታ የዒላማው የመደብር ፊት ንድፍ አስቀድሞ አይታወቅም. በተጨማሪም ፣ መርሃግብሩ እንዲሁ አስቀድሞ ሊዘጋጅ አይችልም ፣ ምክንያቱም በመረጃው ላይ የተመሠረተ ነው ፣ እና እኛ በጣም ልቅ በሆነ የተዋቀረ መረጃ ላይ እንገናኛለን።

ለምሳሌ፣ ዛሬ የሚከተለው ምላሽ ተመዝግቧል።

{source: "app1", error_code: ""}

እና ነገ ከተመሳሳይ ስርዓት የሚከተለው መልስ ይመጣል.

{source: "app1", error_code: "error", description: "Network error"}

በውጤቱም, አንድ ተጨማሪ መስክ ወደ ማሳያው - መግለጫው መጨመር አለበት, እና ይምጣ ወይም አይመጣም ማንም አያውቅም.

በእንደዚህ ዓይነት መረጃ ላይ የመደብር ፊት የመፍጠር ተግባር በጣም መደበኛ ነው, እና ስፓርክ ለዚህ በርካታ መሳሪያዎች አሉት. የምንጩን መረጃ ለመተንተን፣ ለሁለቱም JSON እና XML ድጋፍ አለ፣ እና ከዚህ ቀደም ላልታወቀ እቅድ፣ የschemaEvolution ድጋፍ ቀርቧል።

በመጀመሪያ ሲታይ, መፍትሄው ቀላል ይመስላል. በJSON አቃፊ ወስደህ በዳታ ፍሬም ውስጥ ማንበብ አለብህ። ስፓርክ ንድፍ ይፈጥራል፣ የጎጆ ውሂብን ወደ መዋቅሮች ይለውጣል። በተጨማሪም ፣ በሂቭ ሜታስቶር ውስጥ ያለውን የመደብር ፊት በመመዝገብ በኢምፓላ ውስጥ በሚደገፈው ፓርክ ውስጥ ሁሉም ነገር መቀመጥ አለበት።

ሁሉም ነገር ቀላል ይመስላል.

ይሁን እንጂ በሰነዱ ውስጥ ካሉት አጫጭር ምሳሌዎች በተግባር ላይ ካሉ በርካታ ችግሮች ጋር ምን እንደሚደረግ ግልጽ አይደለም.

ሰነዱ የመደብር ፊት ላለመፍጠር፣ ነገር ግን JSON ወይም XML በዳታ ፍሬም ውስጥ የማንበብ አካሄድን ይገልጻል።

ይኸውም፣ በቀላሉ JSON እንዴት ማንበብ እና መተንተን እንደሚቻል ያሳያል፡-

df = spark.read.json(path...)

ይህ መረጃ ለስፓርክ እንዲገኝ ለማድረግ በቂ ነው።

በተግባር፣ ስክሪፕቱ የJSON ፋይሎችን ከአቃፊ ከማንበብ እና የውሂብ ፍሬም ከመፍጠር የበለጠ የተወሳሰበ ነው። ሁኔታው እንደዚህ ይመስላል: ቀድሞውኑ የተወሰነ የሱቅ ፊት አለ, አዲስ ውሂብ በየቀኑ ይመጣል, ወደ መደብሩ ፊት መጨመር ያስፈልጋቸዋል, መርሃግብሩ ሊለያይ እንደሚችል መርሳት የለብዎትም.

ማሳያውን ለመገንባት የተለመደው እቅድ እንደሚከተለው ነው.

1 ደረጃ. ውሂቡ ወደ Hadoop ተጭኗል በቀጣይ ዕለታዊ ዳግም መጫን እና ወደ አዲስ ክፍልፍል ታክሏል። በቀን የተከፋፈለ የመጀመሪያ ውሂብ ያለው አቃፊ ይወጣል።

2 ደረጃ. በመጀመሪያው ጭነት ጊዜ ይህ አቃፊ በስፓርክ ይነበባል እና ይተነተናል። የተገኘው የውሂብ ፍሬም በምሳሌያዊ ቅርጸት ይቀመጣል ፣ ለምሳሌ ፣ በፓኬት ውስጥ ፣ ከዚያ ወደ ኢምፓላ ሊገባ ይችላል። ይህ እስከዚህ ነጥብ ድረስ ከተጠራቀመው መረጃ ሁሉ ጋር የታለመ ማሳያን ይፈጥራል።

3 ደረጃ. የመደብር ፊትን በየቀኑ የሚያዘምን ውርድ ተፈጥሯል።
የመጨመሪያ ጭነት, የማሳያውን ክፍል የመከፋፈል አስፈላጊነት እና የአጠቃላይ የአጠቃላይ እቅድን የመጠበቅ ጥያቄ አለ.

አንድ ምሳሌ እንውሰድ። የማከማቻ ቦታን ለመገንባት የመጀመሪያው እርምጃ ተተግብሯል እና የJSON ፋይሎች ወደ ማህደር ተሰቅለዋል እንበል።

ከነሱ የውሂብ ፍሬም መፍጠር እና እንደ ማሳያ ማስቀመጥ ችግር አይደለም። ይህ በስፓርክ ሰነድ ውስጥ በቀላሉ ሊገኝ የሚችል የመጀመሪያው እርምጃ ነው፡-

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

ሁሉም ነገር ጥሩ ይመስላል።

JSON አንብበን ተንትነናል፣ከዚያም የመረጃ ክፈፉን እንደ ፓርኬት እናስቀምጠዋለን፣በቀፎ ውስጥ በማንኛውም ምቹ መንገድ አስመዘገብነው፡

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

መስኮት እናገኛለን.

ነገር ግን፣ በማግስቱ፣ ከምንጩ አዲስ መረጃ ታክሏል። ከJSON ጋር አቃፊ እና ከዚህ አቃፊ የተፈጠረ ማሳያ አለን። የሚቀጥለውን የውሂብ ስብስብ ከምንጩ ከጫኑ በኋላ የውሂብ ማርት የአንድ ቀን ዋጋ ይጎድላል።

አመክንዮአዊው መፍትሄ የመደብሩን ፊት በቀን መከፋፈል ነው፣ ይህም በየሚቀጥለው ቀን አዲስ ክፍልፍል ለመጨመር ያስችላል። የዚህ ዘዴ ዘዴም ይታወቃል, ስፓርክ ክፍሎችን በተናጠል እንዲጽፉ ይፈቅድልዎታል.

በመጀመሪያ, የመጀመሪያውን ጭነት እንሰራለን, ከላይ እንደተገለፀው መረጃን በማስቀመጥ, መከፋፈልን ብቻ እንጨምራለን. ይህ እርምጃ የመደብር የፊት ማስጀመሪያ ተብሎ ይጠራል እና አንድ ጊዜ ብቻ ይከናወናል፡

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

በሚቀጥለው ቀን አዲስ ክፍልፋይ ብቻ እንጭናለን፡-

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

የቀረው ሁሉ ንድፉን ለማዘመን በ Hive ውስጥ እንደገና መመዝገብ ነው።
ይሁን እንጂ ችግሮች የሚነሱበት ቦታ ይህ ነው.

የመጀመሪያው ችግር. ፈጥኖም ይሁን ዘግይቶ የተገኘው ፓርኬት የማይነበብ ይሆናል. ይህ የሆነው parquet እና JSON ባዶ ሜዳዎችን እንዴት እንደሚይዙ በምክንያት ነው።

አንድ የተለመደ ሁኔታን እንመልከት. ለምሳሌ፣ ትላንትና JSON ደርሷል፡-

День 1: {"a": {"b": 1}},

እና ዛሬ ያው JSON ይህን ይመስላል፡-

День 2: {"a": null}

እያንዳንዳቸው አንድ መስመር ያላቸው ሁለት የተለያዩ ክፍሎች አሉን እንበል.
ሙሉውን የምንጭ መረጃ ስናነብ ስፓርክ አይነቱን ሊወስን ይችላል እና “a” የአይነት “መዋቅር” መስክ መሆኑን እንረዳለን፣ ከጎጆው መስክ “ለ” አይነት INT። ግን እያንዳንዱ ክፍልፍል ለብቻው ከተቀመጠ ፣ ከዚያ ተኳሃኝ ያልሆኑ የክፍሎች እቅዶች ያለው ፓርኬት እናገኛለን።

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

ይህ ሁኔታ በጣም የታወቀ ነው ፣ ስለሆነም አንድ አማራጭ በልዩ ሁኔታ ተጨምሯል - የምንጭ ውሂቡን ሲተነተን ባዶ መስኮችን ያስወግዱ ።

df = spark.read.json("...", dropFieldIfAllNull=True)

በዚህ ሁኔታ, ፓርኩ አንድ ላይ ሊነበቡ የሚችሉ ክፍሎችን ያካትታል.
ምንም እንኳን በተግባር ይህን ያደረጉ ሰዎች እዚህ መራራ ፈገግ ይላሉ. ለምን? አዎ፣ ምክንያቱም ሁለት ተጨማሪ ሁኔታዎች ሊኖሩ ይችላሉ። ወይም ሶስት. ወይም አራት. የመጀመሪያው፣ በእርግጠኝነት የሚከሰት፣ የቁጥር አይነቶች በተለያዩ የJSON ፋይሎች ውስጥ የሚለያዩ መሆናቸው ነው። ለምሳሌ፣ {intField፡ 1} እና {intField፡ 1.1}። እንደነዚህ ያሉ መስኮች በአንድ ክፍል ውስጥ ከተገኙ, የመርሃግብሩ ውህደት ሁሉንም ነገር በትክክል ያነባል, ይህም ወደ ትክክለኛው አይነት ይመራል. ነገር ግን በተለያዩ ውስጥ ከሆነ, ከዚያም አንዱ intField: int ይኖረዋል, እና ሌሎች intField: ድርብ ይኖረዋል.

ይህንን ሁኔታ ለመቆጣጠር የሚከተለው ባንዲራ አለ.

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

አሁን ወደ አንድ የውሂብ ፍሬም ውስጥ ሊነበቡ የሚችሉ ክፍልፋዮች ያሉበት አቃፊ እና የጠቅላላው ማሳያ ክፍል ትክክለኛ ፓርክ አለን ። አዎ? አይ.

በሂቭ ውስጥ ጠረጴዛውን እንደመዘገብን ማስታወስ አለብን. ቀፎ በመስክ ስም ጉዳዩን ሚስጥራዊነት ያለው አይደለም፣ parquet ግን ጉዳዩን የሚነካ ነው። ስለዚህ፣ ክፍልፋዮች ከመርሃግብር ጋር፡ field1፡ int እና Field1፡ int ለቀፎ ተመሳሳይ ናቸው፣ ግን ለስፓርክ አይደለም። የመስክ ስሞችን ወደ ንዑስ ሆሄ መቀየርን አይርሱ።

ከዚያ በኋላ ሁሉም ነገር ጥሩ ይመስላል.

ሆኖም ግን, ሁሉም በጣም ቀላል አይደሉም. ሁለተኛም የታወቀ ችግርም አለ። እያንዳንዱ አዲስ ክፋይ ለብቻው ስለሚቀመጥ፣ የክፋይ አቃፊው የስፓርክ አገልግሎት ፋይሎችን ይይዛል፣ ለምሳሌ የ_SUCCESS ኦፕሬሽን ስኬት ባንዲራ። ይህ parquet ለማድረግ ሲሞክር ስህተት ያስከትላል. ይህንን ለማስቀረት ስፓርክ የአገልግሎት ፋይሎችን ወደ አቃፊው እንዳይጨምር ለመከላከል አወቃቀሩን ማዋቀር ያስፈልግዎታል፡-

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

አሁን በየቀኑ አዲስ የፓርኬት ክፍልፍል ወደ ዒላማው ማሳያ አቃፊ የሚጨመር ይመስላል፣ የእለቱ የተተነተነው መረጃ የሚገኝበት። የውሂብ አይነት ግጭት ያለባቸው ክፍልፋዮች አለመኖራቸውን አስቀድመን እንጠነቀቅ ነበር።

ግን, ሦስተኛው ችግር አለብን. አሁን አጠቃላይ መርሃግብሩ አይታወቅም ፣ በተጨማሪም ፣ በሂቭ ውስጥ ያለው ሠንጠረዥ ትክክል ያልሆነ ንድፍ አለው ፣ ምክንያቱም እያንዳንዱ አዲስ ክፍልፍል በእቅዱ ውስጥ መዛባትን ስላስተዋወቀ።

ጠረጴዛውን እንደገና መመዝገብ ያስፈልግዎታል. ይህ በቀላሉ ሊከናወን ይችላል-የመደብሩን ፊት ለፊት ያለውን ፓርክ እንደገና ያንብቡ ፣ መርሃግብሩን ይውሰዱ እና በእሱ ላይ በመመስረት ዲዲኤል ይፍጠሩ ፣ በእሱ ላይ በ Hive ውስጥ ያለውን አቃፊ እንደ ውጫዊ ሠንጠረዥ እንደገና ለማስመዝገብ ፣ የታለመውን የመደብር የፊት ገጽታ ንድፍ በማዘመን።

አራተኛ ችግር አለብን። ጠረጴዛውን ለመጀመሪያ ጊዜ ስንመዘግብ በስፓርክ ላይ ተመስርተናል. አሁን እኛ እራሳችንን እናደርገዋለን, እና የፓርኬት ማሳዎች ለቀፎ የማይፈቀዱ ቁምፊዎች ሊጀምሩ እንደሚችሉ ማስታወስ አለብን. ለምሳሌ ስፓርክ በ"corrupt_record" መስክ ውስጥ ሊተነተን ያልቻለውን መስመሮችን ይጥላል። እንዲህ ዓይነቱ መስክ ሳያመልጥ በቀፎ ውስጥ መመዝገብ አይቻልም.

ይህንን በማወቅ መርሃግብሩን እናገኛለን-

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

ኮድ ("_የተበላሸ_መዝገብ"፣ "`_የተበላሸ_መዝገብ`") + "" + ረ[1] ተካ(":"":")።ተካ("<"""")።ተካ(""፣ ","").ተካ ("ድርድር<`", "ድርድር"). ደህንነቱ የተጠበቀ DDL ያደርጋል፣ ማለትም በምትኩ፡-

create table tname (_field1 string, 1field string)

እንደ "_field1፣ 1field" ባሉ የመስክ ስሞች ደህንነቱ የተጠበቀ DDL የተሰራው የመስክ ስሞች ያመለጡበት ነው፡ ሠንጠረዥ `tname` (`_field1` string፣ `1field` string) ፍጠር።

ጥያቄው የሚነሳው-የዳታ ፍሬም እንዴት በተሟላ ንድፍ (በፒኤፍ ኮድ) እንዴት በትክክል ማግኘት እንደሚቻል? ይህንን ፒኤፍ እንዴት ማግኘት ይቻላል? ይህ አምስተኛው ችግር ነው። የሁሉንም ክፍልፋዮች እቅድ ከአቃፊው ውስጥ በታለመው ማሳያ ክፍልፋይ ፋይሎች እንደገና አንብብ? ይህ ዘዴ በጣም አስተማማኝ ነው, ግን አስቸጋሪ ነው.

መርሃግብሩ አስቀድሞ በቀፎ ውስጥ ነው። የጠቅላላውን ጠረጴዛ እና አዲሱን ክፍልፋይ በማጣመር አዲስ እቅድ ማግኘት ይችላሉ. ስለዚህ የጠረጴዛውን ንድፍ ከቀፎ መውሰድ እና ከአዲሱ ክፍልፍል ንድፍ ጋር ማጣመር ያስፈልግዎታል። ይህ የፈተናውን ሜታዳታ ከ Hive በማንበብ፣ ወደ ጊዜያዊ ማህደር በማስቀመጥ እና ሁለቱንም ክፍልፋዮች በአንድ ጊዜ ለማንበብ ስፓርክን በመጠቀም ሊከናወን ይችላል።

እንደ እውነቱ ከሆነ, የሚያስፈልግዎ ነገር አለ-የመጀመሪያው የሠንጠረዥ ንድፍ በቀፎ እና አዲሱ ክፍልፍል. መረጃም አለን። ከተፈጠረው ክፍልፋዮች የመደብር ፊት ንድፍ እና አዲስ መስኮችን የሚያጣምር አዲስ ንድፍ ለማግኘት ብቻ ይቀራል።

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

በመቀጠል, ልክ እንደ ቀድሞው ቅንጭብ, የጠረጴዛ ምዝገባን DDL እንፈጥራለን.
አጠቃላይ ሰንሰለቱ በትክክል ከሰራ ፣ ማለትም ፣ የማስጀመሪያ ጭነት ነበር ፣ እና ሰንጠረዡ በትክክል በ Hive ውስጥ ተፈጠረ ፣ ከዚያ የተሻሻለ የጠረጴዛ ንድፍ እናገኛለን።

እና የመጨረሻው ችግር አንድ ክፍልፋይ ወደ ቀፎ ጠረጴዛ ላይ ብቻ ማከል አይችሉም, ምክንያቱም ይሰበራል. Hive የክፍፍል አወቃቀሩን እንዲያስተካክል ማስገደድ አለቦት፡-

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

JSON ን የማንበብ ቀላል ስራ እና በእሱ ላይ የተመሰረተ የመደብር ፊት መፍጠር ብዙ ስውር ችግሮችን ፣ መፍትሄዎችን ለብቻዎ መፈለግን ያስከትላል። እና እነዚህ መፍትሄዎች ቀላል ቢሆኑም, እነሱን ለማግኘት ብዙ ጊዜ ይወስዳል.

የማሳያውን ግንባታ ተግባራዊ ለማድረግ የሚከተሉትን ማድረግ ነበረብኝ፡-

  • የአገልግሎት ፋይሎችን በማስወገድ ወደ ትርኢቱ ክፍልፋዮችን ያክሉ
  • Spark በከተተባቸው የምንጭ ውሂብ ውስጥ ባዶ መስኮችን ይስሩ
  • ቀላል ዓይነቶችን ወደ ሕብረቁምፊ ውሰድ
  • የመስክ ስሞችን ወደ ንዑስ ሆሄ ቀይር
  • በ Hive (DDL ትውልድ) ውስጥ የተለየ የውሂብ ሰቀላ እና የጠረጴዛ ምዝገባ
  • ከቀፎ ጋር የማይጣጣሙ የመስክ ስሞችን ማምለጥዎን አይርሱ
  • በ Hive ውስጥ የጠረጴዛ ምዝገባን እንዴት ማዘመን እንደሚችሉ ይወቁ

ለማጠቃለል ያህል, የሱቅ መስኮቶችን የመገንባት ውሳኔ በብዙ ወጥመዶች የተሞላ መሆኑን እናስተውላለን. ስለዚህ, በአተገባበር ላይ ችግሮች ካጋጠሙ, የተሳካ ልምድ ያለው ልምድ ያለው አጋር ማነጋገር የተሻለ ነው.

ይህን ጽሑፍ ስላነበቡ እናመሰግናለን፣ መረጃው ጠቃሚ ሆኖ እንዳገኙት ተስፋ እናደርጋለን።

ምንጭ: hab.com

አስተያየት ያክሉ