ප්‍රායෝගිකව ක්‍රමපරිණාමය ඇති කරන්න

හිතවත් පාඨකයින්, සුභ දවසක්!

මෙම ලිපියෙන්, Neoflex's Big Data Solutions ව්‍යාපාරික ප්‍රදේශයේ ප්‍රමුඛ උපදේශකයා Apache Spark භාවිතයෙන් විචල්‍ය ව්‍යුහ ප්‍රදර්ශන ගොඩනැගීමේ විකල්පයන් විස්තරාත්මකව විස්තර කරයි.

දත්ත විශ්ලේෂණ ව්‍යාපෘතියක කොටසක් ලෙස, ලිහිල් ව්‍යුහගත දත්ත මත පදනම්ව ගබඩා ඉදිරිපස ගොඩනැගීමේ කාර්යය බොහෝ විට පැන නගී.

සාමාන්‍යයෙන් මේවා JSON හෝ XML ලෙස සුරකින ලඝු හෝ විවිධ පද්ධති වලින් ලැබෙන ප්‍රතිචාර වේ. දත්ත Hadoop වෙත උඩුගත කර ඇත, එවිට ඔබ ඔවුන්ගෙන් ගබඩා ඉදිරිපස ගොඩනගා ගත යුතුය. අපට නිර්මාණය කළ ප්‍රදර්ශනාගාරයට ප්‍රවේශය සංවිධානය කළ හැකිය, උදාහරණයක් ලෙස, Impala හරහා.

මෙම අවස්ථාවෙහිදී, ඉලක්කගත වෙළඳසැල් ඉදිරිපස සැලැස්ම කලින් නොදනී. එපමනක් නොව, මෙම යෝජනා ක්රමය ද කල්තියා සකස් කළ නොහැක, එය දත්ත මත රඳා පවතින අතර, අපි මෙම ඉතා ලිහිල් ව්යුහගත දත්ත සමඟ කටයුතු කරන්නෙමු.

උදාහරණයක් ලෙස, අද පහත ප්‍රතිචාරය ලොග් වී ඇත:

{source: "app1", error_code: ""}

හෙට එම පද්ධතියෙන් පහත පිළිතුර ලැබේ:

{source: "app1", error_code: "error", description: "Network error"}

එහි ප්‍රතිඵලයක් වශයෙන්, ප්‍රදර්ශනාගාරයට තවත් එක් ක්ෂේත්‍රයක් එකතු කළ යුතුය - විස්තරය, එය පැමිණෙන්නේද නැද්ද යන්න කිසිවෙකු දන්නේ නැත.

එවැනි දත්ත මත ගබඩා ඉදිරිපස නිර්මාණය කිරීමේ කාර්යය ඉතා සම්මත වන අතර, මේ සඳහා Spark සතුව මෙවලම් ගණනාවක් තිබේ. මූලාශ්‍ර දත්ත විග්‍රහ කිරීම සඳහා, JSON සහ XML යන දෙකටම සහය ඇති අතර, කලින් නොදන්නා schema සඳහා, schemaEvolution සඳහා සහය සපයා ඇත.

මුලින්ම බැලූ බැල්මට විසඳුම සරල බව පෙනේ. ඔබට JSON සමඟ ෆෝල්ඩරයක් ගෙන එය දත්ත රාමුවකට කියවිය යුතුය. Spark විසින් යෝජනා ක්‍රමයක් නිර්මාණය කරයි, කැදලි දත්ත ව්‍යුහයන් බවට පත් කරයි. තවද, Hive metastore හි ගබඩා ඉදිරිපස ලියාපදිංචි කිරීමෙන් Impala හි ද සහාය දක්වන පාකට් එකෙහි සියල්ල සුරැකිය යුතුය.

සෑම දෙයක්ම සරල බව පෙනේ.

කෙසේ වෙතත්, ප්රායෝගිකව ගැටළු ගණනාවක් සමඟ කළ යුතු දේ ලේඛනගත කිරීමේ කෙටි උදාහරණ වලින් පැහැදිලි නැත.

ප්‍රලේඛනය විස්තර කරන්නේ ගබඩා ඉදිරිපස නිර්මාණය කිරීමට නොව, JSON හෝ XML දත්ත රාමුවකට කියවීමටය.

එනම්, එය JSON කියවා විග්‍රහ කරන ආකාරය සරලව පෙන්වයි:

df = spark.read.json(path...)

Spark වෙත දත්ත ලබා ගැනීමට මෙය ප්‍රමාණවත් වේ.

ප්‍රායෝගිකව, ස්ක්‍රිප්ට් එක ෆෝල්ඩරයකින් JSON ගොනු කියවා දත්ත රාමුවක් සෑදීමට වඩා සංකීර්ණ වේ. තත්වය මේ ආකාරයෙන් පෙනේ: දැනටමත් නිශ්චිත වෙළඳසැලක් ඇත, සෑම දිනකම නව දත්ත පැමිණේ, ඒවා වෙළඳසැල ඉදිරිපිටට එකතු කළ යුතුය, යෝජනා ක්‍රමය වෙනස් විය හැකි බව අමතක නොකරන්න.

ප්‍රදර්ශනාගාරයක් තැනීමේ සාමාන්‍ය යෝජනා ක්‍රමය පහත පරිදි වේ:

1 පියවර. දත්ත පසුව දිනපතා නැවත පූරණය කිරීමත් සමඟ Hadoop වෙත පටවනු ලබන අතර නව කොටසකට එක් කෙරේ. එය දිනෙන් දින කොටස් කරන ලද මූලික දත්ත සහිත ෆෝල්ඩරයක් බවට පත් කරයි.

2 පියවර. ආරම්භක පැටවීමේදී, මෙම ෆෝල්ඩරය Spark මගින් කියවා විග්‍රහ කරයි. ප්‍රතිඵලයක් ලෙස ලැබෙන දත්ත රාමුව විග්‍රහ කළ හැකි ආකෘතියකින් සුරකිනු ලැබේ, නිදසුනක් ලෙස, පාර්කට් ආකාරයෙන්, එය ඉම්පාලා වෙත ආනයනය කළ හැකිය. මෙය මේ දක්වා එකතු වී ඇති සියලුම දත්ත සමඟ ඉලක්ක ප්‍රදර්ශනාගාරයක් නිර්මාණය කරයි.

3 පියවර. සෑම දිනකම වෙළඳසැල ඉදිරිපස යාවත්කාලීන කරන බාගැනීමක් සාදනු ලැබේ.
වර්ධක පැටවීම පිළිබඳ ප්‍රශ්නයක්, ප්‍රදර්ශනාගාරය කොටස් කිරීමේ අවශ්‍යතාවය සහ ප්‍රදර්ශනාගාරයේ සාමාන්‍ය යෝජනා ක්‍රමය පවත්වා ගැනීමේ ප්‍රශ්නය ඇත.

අපි උදාහරණයක් ගනිමු. ගබඩාවක් තැනීමේ පළමු පියවර ක්‍රියාත්මක කර ඇති බවත්, JSON ගොනු ෆෝල්ඩරයකට උඩුගත කරන බවත් කියමු.

ඔවුන්ගෙන් දත්ත රාමුවක් නිර්මාණය කිරීම, පසුව එය ප්‍රදර්ශනාගාරයක් ලෙස සුරැකීම ගැටළුවක් නොවේ. Spark ලේඛනයේ පහසුවෙන් සොයා ගත හැකි පළමු පියවර මෙයයි:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

හැම දෙයක්ම හොඳයි වගේ.

අපි JSON කියවා විග්‍රහ කළෙමු, පසුව අපි දත්ත රාමුව පාකට් එකක් ලෙස සුරකිමු, එය ඕනෑම පහසු ආකාරයකින් Hive හි ලියාපදිංචි කරමු:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

අපි කවුළුවක් ලබා ගනිමු.

නමුත්, ඊළඟ දවසේ, මූලාශ්රයෙන් නව දත්ත එකතු කරන ලදී. අපට JSON සමඟ ෆෝල්ඩරයක් සහ මෙම ෆෝල්ඩරයෙන් සාදන ලද ප්‍රදර්ශනාගාරයක් ඇත. මූලාශ්‍රයෙන් මීළඟ දත්ත තොගය පූරණය කිරීමෙන් පසු, data mart හි එක් දිනක් වටිනා දත්ත අතුරුදහන් වේ.

තාර්කික විසඳුම වනුයේ වෙළඳසැල් ඉදිරිපස කොටස දිනෙන් දින කොටස් කිරීමයි, එය සෑම ඊළඟ දිනකම නව කොටසක් එක් කිරීමට ඉඩ සලසයි. මේ සඳහා යාන්ත්රණය ද හොඳින් දන්නා කරුණකි, Spark ඔබට කොටස් වෙන් වෙන් වශයෙන් ලිවීමට ඉඩ සලසයි.

පළමුව, අපි මූලික පැටවීමක් කරන්නෙමු, ඉහත විස්තර කර ඇති පරිදි දත්ත සුරැකීම, කොටස් කිරීම පමණක් එකතු කිරීම. මෙම ක්‍රියාව storefront initialization ලෙස හඳුන්වන අතර එය සිදු කරනු ලබන්නේ එක් වරක් පමණි:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

ඊළඟ දවසේ, අපි නව කොටසක් පමණක් පූරණය කරමු:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

ඉතිරිව ඇත්තේ යෝජනා ක්‍රමය යාවත්කාලීන කිරීමට Hive හි නැවත ලියාපදිංචි වීම පමණි.
කෙසේ වෙතත්, ගැටළු පැන නගින්නේ මෙයයි.

පළමු ගැටළුව. ඉක්මනින් හෝ පසුව, ප්රතිඵලයක් ලෙස පාකට් කියවිය නොහැකි වනු ඇත. මෙයට හේතුව පාකට් සහ JSON හිස් ක්ෂේත්‍ර වෙනස් ලෙස සලකන ආකාරයයි.

සාමාන්ය තත්වයක් සලකා බලමු. උදාහරණයක් ලෙස, ඊයේ JSON පැමිණේ:

День 1: {"a": {"b": 1}},

අද එම JSON මේ ආකාරයට පෙනේ:

День 2: {"a": null}

අපි හිතමු අපිට විවිධ කොටස් දෙකක් තියෙනවා, එක පේළියක් එක්ක.
අපි සම්පූර්ණ මූලාශ්‍ර දත්ත කියවන විට, Spark හට වර්ගය තීරණය කිරීමට හැකි වනු ඇති අතර, "a" යනු INT වර්ගයේ "b" වර්ගයේ කැදැලි ක්ෂේත්‍රයක් සහිත "ව්‍යුහය" වර්ගයේ ක්ෂේත්‍රයක් බව තේරුම් ගනීවි. නමුත්, එක් එක් කොටස වෙන වෙනම සුරකින ලද්දේ නම්, අපට නොගැලපෙන කොටස් යෝජනා ක්‍රම සහිත පාකට් එකක් ලැබේ:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

මෙම තත්වය හොඳින් දන්නා බැවින් විකල්පයක් විශේෂයෙන් එකතු කර ඇත - මූලාශ්‍ර දත්ත විග්‍රහ කිරීමේදී හිස් ක්ෂේත්‍ර ඉවත් කරන්න:

df = spark.read.json("...", dropFieldIfAllNull=True)

මෙම අවස්ථාවේදී, පාකට් එකට කියවිය හැකි කොටස් වලින් සමන්විත වේ.
මේක ප්‍රායෝගිකව කරපු අය මෙතන කහට හිනා වෙනවා ඇති. ඇයි? ඔව්, තවත් අවස්ථා දෙකක් ඇති වීමට ඉඩ ඇති බැවිනි. නැත්නම් තුනක්. නැත්නම් හතරක්. පළමුවැන්න, නිසැකවම සිදුවනු ඇත, විවිධ JSON ගොනු වල සංඛ්‍යාත්මක වර්ග වෙනස් ලෙස පෙනෙනු ඇත. උදාහරණයක් ලෙස, {intField: 1} සහ {intField: 1.1}. එවැනි ක්ෂේත්‍ර එක් කොටසක දක්නට ලැබේ නම්, යෝජනා ක්‍රමය ඒකාබද්ධ කිරීම සියල්ල නිවැරදිව කියවා, වඩාත් නිවැරදි වර්ගයට යොමු කරයි. නමුත් විවිධ ඒවායේ නම්, එකකට intField: int ඇත, අනෙක intField: ද්විත්ව ඇත.

මෙම තත්වය හැසිරවීමට පහත ධජය ඇත:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

දැන් අපට තනි දත්ත රාමුවකට කියවිය හැකි කොටස් ඇති ෆෝල්ඩරයක් සහ සම්පූර්ණ ප්‍රදර්ශනාගාරයේ වලංගු පාර්ක් එකක් තිබේ. ඔව්? නැත.

අපි Hive හි මේසය ලියාපදිංචි කළ බව මතක තබා ගත යුතුය. ක්ෂේත්‍ර නාමවල දී කාර්යබහුල අවස්ථාව සංවේදී නොවන අතර පාකට් යනු සිද්ධි සංවේදී වේ. එබැවින්, ස්කීමා සහිත කොටස්: field1: int, සහ Field1: int Hive සඳහා සමාන වේ, නමුත් Spark සඳහා නොවේ. ක්ෂේත්‍ර නාම කුඩා අකුරට පරිවර්තනය කිරීමට අමතක නොකරන්න.

ඊට පස්සේ, හැම දෙයක්ම හොඳයි වගේ.

කෙසේ වෙතත්, සියල්ලම එතරම් සරල නැත. දෙවන, සුප්රසිද්ධ ගැටලුවක් ද තිබේ. සෑම නව කොටසක්ම වෙන වෙනම සුරකින බැවින්, කොටස් ෆෝල්ඩරයේ Spark සේවා ගොනු අඩංගු වේ, උදාහරණයක් ලෙස, _SUCCESS මෙහෙයුම් සාර්ථකත්ව ධජය. මෙය පාකට් කිරීමට උත්සාහ කිරීමේදී දෝෂයක් ඇති කරයි. මෙය වලක්වා ගැනීම සඳහා, Spark විසින් ෆෝල්ඩරයට සේවා ගොනු එකතු කිරීම වැළැක්වීම සඳහා ඔබ වින්‍යාසය වින්‍යාසගත කළ යුතුය:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

දවස සඳහා විග්‍රහ කළ දත්ත පිහිටා ඇති ඉලක්ක ප්‍රදර්ශන ෆෝල්ඩරයට දැන් සෑම දිනකම නව පාකට් කොටසක් එකතු වන බව පෙනේ. දත්ත ආකාරයේ ගැටුමක් සහිත කොටස් නොමැති බව අපි කල්තියා බලා ගත්තෙමු.

නමුත්, අපට තුන්වන ගැටලුවක් තිබේ. දැන් සාමාන්‍ය යෝජනා ක්‍රමය නොදනී, එපමනක් නොව, Hive හි ඇති වගුවේ වැරදි සැලැස්මක් ඇත, මන්ද සෑම නව කොටසක්ම බොහෝ විට යෝජනා ක්‍රමයට විකෘතියක් හඳුන්වා දී ඇත.

ඔබ මේසය නැවත ලියාපදිංචි කළ යුතුය. මෙය සරලව කළ හැකිය: ගබඩා ඉදිරිපස පාර්ක් එක නැවත කියවා, ස්කීමාව ගෙන එය මත පදනම්ව ඩීඩීඑල් එකක් සාදන්න, එමඟින් ෆෝල්ඩරය බාහිර වගුවක් ලෙස නැවත ලියාපදිංචි කරන්න, ඉලක්ක වෙළඳසැලේ ක්‍රමලේඛනය යාවත්කාලීන කරන්න.

අපට ඇත්තේ හතරවැනි ගැටලුවකි. අපි පළමු වරට මේසය ලියාපදිංචි කරන විට, අපි ස්පාර්ක් මත විශ්වාසය තැබුවෙමු. දැන් අපි එය අප විසින්ම කරන අතර, අපි මතක තබා ගත යුතුයි පාකට් ක්ෂේත්‍ර Hive සඳහා ඉඩ නොදෙන අක්ෂර වලින් ආරම්භ විය හැකිය. උදාහරණයක් ලෙස, Spark විසින් "corrupt_record" ක්ෂේත්‍රය තුළ විග්‍රහ කිරීමට නොහැකි වූ රේඛා ඉවතට විසි කරයි. එබඳු ක්ෂේත්‍රයක් පැන නොගොස් Hive හි ලියාපදිංචි කළ නොහැක.

මෙය දැන ගැනීමෙන්, අපි යෝජනා ක්රමය ලබා ගනිමු:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

කේතය ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`")replace(",", ",`").ප්‍රතිස්ථාපනය ("අරාව<`", "අරාව<") ආරක්ෂිත DDL කරයි, එනම් ඒ වෙනුවට:

create table tname (_field1 string, 1field string)

"_field1, 1field" වැනි ක්ෂේත්‍ර නාම සමඟින්, ක්ෂේත්‍ර නාම ගැලවී ගිය තැන ආරක්ෂිත DDL සාදනු ලැබේ: වගුව `tname` (`_field1` string, `1field` string) සාදන්න.

ප්රශ්නය පැනනගින්නේ: සම්පූර්ණ සැලැස්මක් සහිත දත්ත රාමුවක් (pf කේතයෙන්) නිසි ලෙස ලබා ගන්නේ කෙසේද? මෙම pf ලබා ගන්නේ කෙසේද? මෙය පස්වන ගැටලුවයි. ඉලක්ක ප්‍රදර්ශනාගාරයේ පාකට් ගොනු සහිත ෆෝල්ඩරයෙන් සියලුම කොටස්වල යෝජනා ක්‍රමය නැවත කියවන්නද? මෙම ක්රමය ආරක්ෂිතම, නමුත් අපහසු වේ.

යෝජනා ක්‍රමය දැනටමත් Hive හි ඇත. සම්පූර්ණ වගුව සහ නව කොටසෙහි සැලැස්ම ඒකාබද්ධ කිරීමෙන් ඔබට නව සැලැස්මක් ලබා ගත හැකිය. එබැවින් ඔබ Hive වෙතින් වගු යෝජනා ක්‍රමය ගෙන එය නව කොටසේ යෝජනා ක්‍රමය සමඟ ඒකාබද්ධ කළ යුතුය. මෙය Hive වෙතින් පරීක්ෂණ පාරදත්ත කියවීමෙන්, එය තාවකාලික ෆෝල්ඩරයකට සුරැකීමෙන් සහ Spark භාවිතයෙන් කොටස් දෙකම එකවර කියවීමෙන් සිදු කළ හැක.

ඇත්ත වශයෙන්ම, ඔබට අවශ්ය සියල්ල තිබේ: Hive හි මුල් වගු සැලැස්ම සහ නව කොටස. අපිටත් දත්ත තියෙනවා. එය ඉතිරිව ඇත්තේ සාදන ලද කොටසෙන් වෙළඳසැල් ඉදිරිපස යෝජනා ක්‍රමය සහ නව ක්ෂේත්‍ර ඒකාබද්ධ කරන නව යෝජනා ක්‍රමයක් ලබා ගැනීම සඳහා පමණි:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

මීලඟට, අපි පෙර ස්නිපටයේ මෙන් වගු ලියාපදිංචිය DDL නිර්මාණය කරමු.
සම්පූර්ණ දාමය නිවැරදිව ක්‍රියා කරන්නේ නම්, එනම්, ආරම්භක භාරයක් තිබුනේ නම්, සහ වගුව Hive හි නිවැරදිව නිර්මාණය කර ඇත්නම්, අපට යාවත්කාලීන වගු සැලැස්මක් ලැබේ.

අනික අන්තිම අවුල තමයි Hive table එකකට partition එකක් විතරක් දාගන්න බැරි නිසා ඒක කැඩිලා යන නිසා. එහි කොටස් ව්‍යුහය සවි කිරීමට ඔබ හයිව්ට බල කළ යුතුය:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

JSON කියවීම සහ එය මත පදනම් වූ වෙළඳසැල් ඉදිරිපස නිර්මාණය කිරීමේ සරල කාර්යයේ ප්‍රතිඵලය වන්නේ ඔබට වෙන වෙනම සෙවිය යුතු ව්‍යංග දුෂ්කරතා ගණනාවක් ජය ගැනීමයි. තවද මෙම විසඳුම් සරල වුවද, ඒවා සොයා ගැනීමට බොහෝ කාලයක් ගත වේ.

ප්‍රදර්ශනාගාරය ඉදිකිරීම ක්‍රියාත්මක කිරීම සඳහා, මට සිදු වූයේ:

  • ප්‍රදර්ශනාගාරයට කොටස් එක් කරන්න, සේවා ගොනු ඉවත් කරන්න
  • Spark ටයිප් කර ඇති මූලාශ්‍ර දත්තවල හිස් ක්ෂේත්‍ර සමඟ කටයුතු කරන්න
  • සරල වර්ග තන්තුවකට දමන්න
  • ක්ෂේත්‍ර නාම කුඩා අකුරට පරිවර්තනය කරන්න
  • Hive හි වෙනම දත්ත උඩුගත කිරීම සහ වගු ලියාපදිංචි කිරීම (DDL උත්පාදනය)
  • Hive සමඟ නොගැලපෙන ක්ෂේත්‍ර නාමවලින් ගැලවීමට අමතක නොකරන්න
  • Hive හි වගු ලියාපදිංචිය යාවත්කාලීන කරන ආකාරය ඉගෙන ගන්න

සාරාංශගත කිරීම, සාප්පු කවුළු තැනීමේ තීරණය බොහෝ අන්තරායන්ගෙන් පිරී ඇති බව අපි සටහන් කරමු. එබැවින්, ක්රියාත්මක කිරීමේදී දුෂ්කරතා ඇති වුවහොත්, සාර්ථක විශේෂඥතාවයක් ඇති පළපුරුදු හවුල්කරුවෙකු සම්බන්ධ කර ගැනීම වඩා හොඳය.

මෙම ලිපිය කියවීමට ස්තූතියි, ඔබට තොරතුරු ප්‍රයෝජනවත් වනු ඇතැයි අපි බලාපොරොත්තු වෙමු.

මූලාශ්රය: www.habr.com

අදහස් එක් කරන්න