ఆచరణలో స్కీమా ఎవల్యూషన్‌ను ప్రేరేపించండి

ప్రియమైన పాఠకులారా, మంచి రోజు!

ఈ కథనంలో, నియోఫ్లెక్స్ యొక్క బిగ్ డేటా సొల్యూషన్స్ బిజినెస్ ఏరియా యొక్క ప్రముఖ కన్సల్టెంట్ అపాచీ స్పార్క్ ఉపయోగించి వేరియబుల్ స్ట్రక్చర్ షోకేస్‌లను నిర్మించే ఎంపికలను వివరంగా వివరిస్తుంది.

డేటా విశ్లేషణ ప్రాజెక్ట్‌లో భాగంగా, వదులుగా నిర్మాణాత్మక డేటా ఆధారంగా స్టోర్ ముందరిని నిర్మించే పని తరచుగా తలెత్తుతుంది.

సాధారణంగా ఇవి లాగ్‌లు లేదా వివిధ సిస్టమ్‌ల నుండి వచ్చిన ప్రతిస్పందనలు, JSON లేదా XMLగా సేవ్ చేయబడతాయి. డేటా హడూప్‌కి అప్‌లోడ్ చేయబడింది, ఆపై మీరు వాటి నుండి దుకాణం ముందరిని నిర్మించాలి. మేము సృష్టించిన షోకేస్‌కి యాక్సెస్‌ని నిర్వహించవచ్చు, ఉదాహరణకు, ఇంపాలా ద్వారా.

ఈ సందర్భంలో, టార్గెట్ స్టోర్ ఫ్రంట్ యొక్క స్కీమా ముందుగా తెలియదు. అంతేకాకుండా, పథకం కూడా ముందుగానే రూపొందించబడదు, ఎందుకంటే ఇది డేటాపై ఆధారపడి ఉంటుంది మరియు మేము ఈ చాలా వదులుగా నిర్మాణాత్మక డేటాతో వ్యవహరిస్తున్నాము.

ఉదాహరణకు, ఈ రోజు కింది ప్రతిస్పందన లాగ్ చేయబడింది:

{source: "app1", error_code: ""}

మరియు రేపు అదే సిస్టమ్ నుండి క్రింది సమాధానం వస్తుంది:

{source: "app1", error_code: "error", description: "Network error"}

ఫలితంగా, షోకేస్ - వివరణకు మరో ఫీల్డ్ జోడించబడాలి మరియు అది వస్తుందో లేదో ఎవరికీ తెలియదు.

అటువంటి డేటాపై స్టోర్ ముందరిని సృష్టించే పని చాలా ప్రామాణికమైనది మరియు స్పార్క్ దీని కోసం అనేక సాధనాలను కలిగి ఉంది. మూల డేటాను అన్వయించడం కోసం, JSON మరియు XML రెండింటికీ మద్దతు ఉంది మరియు గతంలో తెలియని స్కీమా కోసం, స్కీమాఎవల్యూషన్‌కు మద్దతు అందించబడుతుంది.

మొదటి చూపులో, పరిష్కారం చాలా సులభం. మీరు JSONతో ఫోల్డర్‌ని తీసుకొని దానిని డేటాఫ్రేమ్‌లో చదవాలి. స్పార్క్ స్కీమాను సృష్టిస్తుంది, సమూహ డేటాను నిర్మాణాలుగా మారుస్తుంది. ఇంకా, హైవ్ మెటాస్టోర్‌లో స్టోర్ ఫ్రంట్‌ను రిజిస్టర్ చేయడం ద్వారా ఇంపాలాలో కూడా సపోర్ట్ చేసే పార్కెట్‌లో ప్రతిదీ సేవ్ చేయాలి.

అంతా సింపుల్‌గా ఉన్నట్లు అనిపిస్తుంది.

అయితే, ఆచరణలో ఉన్న అనేక సమస్యలతో ఏమి చేయాలో డాక్యుమెంటేషన్‌లోని చిన్న ఉదాహరణల నుండి స్పష్టంగా లేదు.

డాక్యుమెంటేషన్ స్టోర్ ఫ్రంట్‌ని సృష్టించడం కాదు, JSON లేదా XMLని డేటాఫ్రేమ్‌లో చదవడం గురించి వివరిస్తుంది.

అవి, ఇది కేవలం JSONని ఎలా చదవాలో మరియు అన్వయించాలో చూపిస్తుంది:

df = spark.read.json(path...)

స్పార్క్‌కి డేటా అందుబాటులో ఉంచడానికి ఇది సరిపోతుంది.

ఆచరణలో, ఫోల్డర్ నుండి JSON ఫైల్‌లను చదవడం మరియు డేటాఫ్రేమ్‌ను సృష్టించడం కంటే స్క్రిప్ట్ చాలా క్లిష్టంగా ఉంటుంది. పరిస్థితి ఇలా కనిపిస్తుంది: ఇప్పటికే ఒక నిర్దిష్ట స్టోర్ ఫ్రంట్ ఉంది, ప్రతిరోజూ కొత్త డేటా వస్తుంది, వాటిని స్టోర్ ఫ్రంట్‌కు జోడించాల్సిన అవసరం ఉంది, పథకం భిన్నంగా ఉండవచ్చని మర్చిపోకూడదు.

ప్రదర్శనశాలను నిర్మించడానికి సాధారణ పథకం క్రింది విధంగా ఉంటుంది:

1 దశ. తదుపరి రోజువారీ రీలోడింగ్‌తో డేటా హడూప్‌లోకి లోడ్ చేయబడుతుంది మరియు కొత్త విభజనకు జోడించబడుతుంది. ఇది రోజు వారీగా విభజించబడిన ప్రారంభ డేటాతో ఫోల్డర్‌గా మారుతుంది.

2 దశ. ప్రారంభ లోడ్ సమయంలో, ఈ ఫోల్డర్ స్పార్క్ ద్వారా చదవబడుతుంది మరియు అన్వయించబడుతుంది. ఫలితంగా డేటాఫ్రేమ్ పార్సేబుల్ ఫార్మాట్‌లో సేవ్ చేయబడుతుంది, ఉదాహరణకు, పార్కెట్‌లో, దానిని ఇంపాలాలోకి దిగుమతి చేసుకోవచ్చు. ఇది ఇప్పటివరకు సేకరించిన మొత్తం డేటాతో టార్గెట్ షోకేస్‌ను సృష్టిస్తుంది.

3 దశ. ప్రతిరోజు స్టోర్ ఫ్రంట్‌ను అప్‌డేట్ చేసే డౌన్‌లోడ్ సృష్టించబడింది.
ఇంక్రిమెంటల్ లోడింగ్, షోకేస్‌ను విభజించాల్సిన అవసరం మరియు షోకేస్ యొక్క సాధారణ స్కీమ్‌ను నిర్వహించడం అనే ప్రశ్న ఉంది.

ఒక ఉదాహరణ తీసుకుందాం. రిపోజిటరీని నిర్మించే మొదటి దశ అమలు చేయబడిందని మరియు JSON ఫైల్‌లు ఫోల్డర్‌కి అప్‌లోడ్ చేయబడిందని చెప్పండి.

వాటి నుండి డేటాఫ్రేమ్‌ను రూపొందించడం, ఆపై దానిని షోకేస్‌గా సేవ్ చేయడం సమస్య కాదు. స్పార్క్ డాక్యుమెంటేషన్‌లో సులభంగా కనుగొనగలిగే మొదటి దశ ఇది:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

అంతా బాగానే ఉన్నట్లుంది.

మేము JSONని చదివాము మరియు అన్వయించాము, ఆపై మేము డేటాఫ్రేమ్‌ను పార్కెట్‌గా సేవ్ చేస్తాము, ఏదైనా అనుకూలమైన మార్గంలో హైవ్‌లో నమోదు చేస్తాము:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

మాకు విండో వస్తుంది.

కానీ, మరుసటి రోజు, మూలం నుండి కొత్త డేటా జోడించబడింది. మేము JSONతో ఫోల్డర్‌ని కలిగి ఉన్నాము మరియు ఈ ఫోల్డర్ నుండి సృష్టించబడిన షోకేస్. మూలాధారం నుండి తదుపరి బ్యాచ్ డేటాను లోడ్ చేసిన తర్వాత, డేటా మార్ట్‌లో ఒక రోజు విలువైన డేటా లేదు.

తార్కిక పరిష్కారం రోజు వారీగా స్టోర్ ఫ్రంట్‌ను విభజించడం, ఇది ప్రతి మరుసటి రోజు కొత్త విభజనను జోడించడాన్ని అనుమతిస్తుంది. దీని కోసం మెకానిజం కూడా బాగా తెలుసు, స్పార్క్ విడిగా విభజనలను వ్రాయడానికి మిమ్మల్ని అనుమతిస్తుంది.

మొదట, మేము ప్రారంభ లోడ్ చేస్తాము, పైన వివరించిన విధంగా డేటాను సేవ్ చేస్తాము, విభజనను మాత్రమే జోడిస్తాము. ఈ చర్యను స్టోర్ ఫ్రంట్ ఇనిషియలైజేషన్ అంటారు మరియు ఒకసారి మాత్రమే చేయబడుతుంది:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

మరుసటి రోజు, మేము కొత్త విభజనను మాత్రమే లోడ్ చేస్తాము:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

స్కీమాను అప్‌డేట్ చేయడానికి హైవ్‌లో మళ్లీ నమోదు చేసుకోవడం మాత్రమే మిగిలి ఉంది.
అయితే, ఇక్కడే సమస్యలు తలెత్తుతున్నాయి.

మొదటి సమస్య. ముందుగానే లేదా తరువాత, ఫలితంగా పారేకెట్ చదవబడదు. పార్కెట్ మరియు JSON ఖాళీ ఫీల్డ్‌లను ఎలా విభిన్నంగా పరిగణిస్తాయో దీనికి కారణం.

ఒక సాధారణ పరిస్థితిని పరిశీలిద్దాం. ఉదాహరణకు, నిన్న JSON వస్తుంది:

День 1: {"a": {"b": 1}},

మరియు నేడు అదే JSON ఇలా కనిపిస్తుంది:

День 2: {"a": null}

మనకు రెండు వేర్వేరు విభజనలు ఉన్నాయని అనుకుందాం, ఒక్కొక్కటి ఒక పంక్తితో.
మేము మొత్తం సోర్స్ డేటాను చదివినప్పుడు, స్పార్క్ రకాన్ని గుర్తించగలదు మరియు "a" అనేది INT రకం "b" యొక్క సమూహ ఫీల్డ్‌తో "నిర్మాణం" రకం ఫీల్డ్ అని అర్థం చేసుకుంటుంది. కానీ, ప్రతి విభజన విడిగా సేవ్ చేయబడితే, మేము అననుకూల విభజన పథకాలతో ఒక పారేకెట్ను పొందుతాము:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

ఈ పరిస్థితి బాగా తెలుసు, కాబట్టి ఒక ఎంపిక ప్రత్యేకంగా జోడించబడింది - సోర్స్ డేటాను అన్వయించేటప్పుడు, ఖాళీ ఫీల్డ్‌లను తీసివేయండి:

df = spark.read.json("...", dropFieldIfAllNull=True)

ఈ సందర్భంలో, పారేకెట్ కలిసి చదవగలిగే విభజనలను కలిగి ఉంటుంది.
ఆచరణలో ఇలా చేసిన వారు ఇక్కడ మాత్రం ఘాటుగా నవ్వుతారు. ఎందుకు? అవును, ఎందుకంటే మరో రెండు పరిస్థితులు ఉండే అవకాశం ఉంది. లేదా మూడు. లేదా నాలుగు. మొదటిది, దాదాపు ఖచ్చితంగా సంభవించేది, వివిధ JSON ఫైల్‌లలో సంఖ్యా రకాలు భిన్నంగా కనిపిస్తాయి. ఉదాహరణకు, {intField: 1} మరియు {intField: 1.1}. అటువంటి ఫీల్డ్‌లు ఒక విభజనలో కనుగొనబడితే, స్కీమా విలీనం ప్రతిదీ సరిగ్గా చదవబడుతుంది, ఇది అత్యంత ఖచ్చితమైన రకానికి దారి తీస్తుంది. కానీ వేర్వేరు వాటిలో ఉంటే, ఒకదానిలో intField: int ఉంటుంది, మరియు మరొకటి intField: డబుల్ ఉంటుంది.

ఈ పరిస్థితిని నిర్వహించడానికి క్రింది జెండా ఉంది:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

ఇప్పుడు మనకు ఒకే డేటాఫ్రేమ్‌లో చదవగలిగే విభజనలు మరియు మొత్తం షోకేస్ యొక్క చెల్లుబాటు అయ్యే పార్కెట్ ఉన్న ఫోల్డర్ ఉంది. అవునా? సంఖ్య

మేము అందులో నివశించే తేనెటీగలు లో పట్టిక నమోదు గుర్తుంచుకోవాలి. ఫీల్డ్ పేర్లలో హైవ్ కేస్ సెన్సిటివ్ కాదు, పార్కెట్ అనేది కేస్ సెన్సిటివ్. కాబట్టి, స్కీమాలతో కూడిన విభజనలు: field1: int, మరియు Field1: int హైవ్‌కి ఒకేలా ఉంటాయి, కానీ Spark కోసం కాదు. ఫీల్డ్ పేర్లను చిన్న అక్షరానికి మార్చడం మర్చిపోవద్దు.

ఆ తర్వాత అంతా సవ్యంగానే ఉన్నట్లుంది.

అయితే, ప్రతిదీ అంత సులభం కాదు. రెండవది, బాగా తెలిసిన సమస్య కూడా ఉంది. ప్రతి కొత్త విభజన విడిగా సేవ్ చేయబడినందున, విభజన ఫోల్డర్ Spark సర్వీస్ ఫైల్‌లను కలిగి ఉంటుంది, ఉదాహరణకు, _SUCCESS ఆపరేషన్ సక్సెస్ ఫ్లాగ్. ఇది పార్కెట్ చేయడానికి ప్రయత్నిస్తున్నప్పుడు లోపం ఏర్పడుతుంది. దీన్ని నివారించడానికి, ఫోల్డర్‌కు సర్వీస్ ఫైల్‌లను జోడించకుండా స్పార్క్ నిరోధించడానికి మీరు కాన్ఫిగరేషన్‌ను కాన్ఫిగర్ చేయాలి:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

రోజు కోసం అన్వయించిన డేటా ఉన్న టార్గెట్ షోకేస్ ఫోల్డర్‌కు ఇప్పుడు ప్రతిరోజూ కొత్త పార్కెట్ విభజన జోడించబడుతోంది. డేటా రకం వైరుధ్యంతో విభజనలు ఏవీ లేవని మేము ముందుగానే చూసుకున్నాము.

కానీ, మనకు మూడో సమస్య ఉంది. ఇప్పుడు సాధారణ స్కీమా తెలియదు, అంతేకాకుండా, హైవ్‌లోని పట్టిక తప్పు స్కీమాను కలిగి ఉంది, ఎందుకంటే ప్రతి కొత్త విభజన స్కీమాలో వక్రీకరణను ప్రవేశపెట్టింది.

మీరు పట్టికను మళ్లీ నమోదు చేసుకోవాలి. ఇది చాలా సరళంగా చేయవచ్చు: స్టోర్ ఫ్రంట్ యొక్క పార్కెట్‌ను మళ్లీ చదవండి, స్కీమాను తీసుకోండి మరియు దాని ఆధారంగా DDLని సృష్టించండి, దానితో హైవ్‌లోని ఫోల్డర్‌ను బాహ్య పట్టికగా మళ్లీ నమోదు చేయండి, టార్గెట్ స్టోర్ ఫ్రంట్ యొక్క స్కీమాను నవీకరిస్తుంది.

మనకు నాల్గవ సమస్య ఉంది. మేము మొదటిసారి పట్టికను నమోదు చేసినప్పుడు, మేము స్పార్క్‌పై ఆధారపడ్డాము. ఇప్పుడు మనం దీన్ని మనమే చేస్తాము మరియు పారేకెట్ ఫీల్డ్‌లు అందులో నివశించే తేనెటీగలు అనుమతించబడని అక్షరాలతో ప్రారంభించవచ్చని గుర్తుంచుకోవాలి. ఉదాహరణకు, స్పార్క్ "corrupt_record" ఫీల్డ్‌లో అన్వయించలేని పంక్తులను విసురుతుంది. అటువంటి ఫీల్డ్ తప్పించుకోకుండా హైవ్‌లో నమోదు చేయబడదు.

ఇది తెలుసుకోవడం, మేము పథకం పొందుతాము:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

కోడ్ ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") సురక్షితమైన DDLని చేస్తుంది, అంటే బదులుగా:

create table tname (_field1 string, 1field string)

"_field1, 1field" వంటి ఫీల్డ్ పేర్లతో, ఫీల్డ్ పేర్లు తప్పించుకున్న చోట సురక్షితమైన DDL చేయబడుతుంది: టేబుల్ `tname` (`_field1` స్ట్రింగ్, `1field` స్ట్రింగ్) సృష్టించండి.

ప్రశ్న తలెత్తుతుంది: పూర్తి స్కీమాతో (pf కోడ్‌లో) డేటాఫ్రేమ్‌ను ఎలా సరిగ్గా పొందాలి? ఈ pf ఎలా పొందాలి? ఇది ఐదవ సమస్య. టార్గెట్ షోకేస్ యొక్క పార్కెట్ ఫైల్‌లతో ఫోల్డర్ నుండి అన్ని విభజనల పథకాన్ని మళ్లీ చదవాలా? ఈ పద్ధతి సురక్షితమైనది, కానీ కష్టం.

స్కీమా ఇప్పటికే హైవ్‌లో ఉంది. మీరు మొత్తం పట్టిక మరియు కొత్త విభజన యొక్క స్కీమాను కలపడం ద్వారా కొత్త స్కీమాను పొందవచ్చు. కాబట్టి మీరు హైవ్ నుండి టేబుల్ స్కీమాను తీసుకోవాలి మరియు దానిని కొత్త విభజన యొక్క స్కీమాతో కలపాలి. ఇది హైవ్ నుండి పరీక్ష మెటాడేటాను చదవడం, తాత్కాలిక ఫోల్డర్‌లో సేవ్ చేయడం మరియు రెండు విభజనలను ఒకేసారి చదవడానికి స్పార్క్‌ని ఉపయోగించడం ద్వారా చేయవచ్చు.

వాస్తవానికి, మీకు కావాల్సినవన్నీ ఉన్నాయి: హైవ్‌లోని అసలు టేబుల్ స్కీమా మరియు కొత్త విభజన. మా దగ్గర డేటా కూడా ఉంది. సృష్టించిన విభజన నుండి స్టోర్ ఫ్రంట్ స్కీమా మరియు కొత్త ఫీల్డ్‌లను మిళితం చేసే కొత్త స్కీమాను పొందడానికి మాత్రమే ఇది మిగిలి ఉంది:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

తరువాత, మేము మునుపటి స్నిప్పెట్‌లో వలె టేబుల్ రిజిస్ట్రేషన్ DDLని సృష్టిస్తాము.
మొత్తం గొలుసు సరిగ్గా పనిచేస్తే, అనగా, ప్రారంభించే లోడ్ ఉంది మరియు హైవ్‌లో పట్టిక సరిగ్గా సృష్టించబడితే, మేము నవీకరించబడిన టేబుల్ స్కీమాను పొందుతాము.

మరియు చివరి సమస్య ఏమిటంటే, మీరు హైవ్ టేబుల్‌కి విభజనను జోడించలేరు, ఎందుకంటే అది విచ్ఛిన్నమవుతుంది. దాని విభజన నిర్మాణాన్ని సరిచేయడానికి మీరు హైవ్‌ని బలవంతం చేయాలి:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

JSONని చదవడం మరియు దాని ఆధారంగా దుకాణం ముందరిని సృష్టించడం అనే సులభమైన పని అనేక అవ్యక్తమైన ఇబ్బందులను అధిగమించడానికి దారితీస్తుంది, వాటి కోసం మీరు విడిగా వెతకాలి. మరియు ఈ పరిష్కారాలు సరళమైనవి అయినప్పటికీ, వాటిని కనుగొనడానికి చాలా సమయం పడుతుంది.

షోకేస్ నిర్మాణాన్ని అమలు చేయడానికి, నేను వీటిని చేయాల్సి వచ్చింది:

  • షోకేస్‌కు విభజనలను జోడించండి, సర్వీస్ ఫైల్‌లను వదిలించుకోండి
  • స్పార్క్ టైప్ చేసిన సోర్స్ డేటాలోని ఖాళీ ఫీల్డ్‌లతో వ్యవహరించండి
  • సరళమైన రకాలను స్ట్రింగ్‌కు ప్రసారం చేయండి
  • ఫీల్డ్ పేర్లను చిన్న అక్షరానికి మార్చండి
  • హైవ్‌లో ప్రత్యేక డేటా అప్‌లోడ్ మరియు టేబుల్ నమోదు (DDL జనరేషన్)
  • హైవ్‌తో అననుకూలంగా ఉండే ఫీల్డ్ పేర్ల నుండి తప్పించుకోవడం మర్చిపోవద్దు
  • హైవ్‌లో టేబుల్ రిజిస్ట్రేషన్‌ని ఎలా అప్‌డేట్ చేయాలో తెలుసుకోండి

సంగ్రహంగా, దుకాణ కిటికీలను నిర్మించాలనే నిర్ణయం అనేక ఆపదలతో నిండి ఉందని మేము గమనించాము. అందువల్ల, అమలులో ఇబ్బందులు ఉన్నట్లయితే, విజయవంతమైన నైపుణ్యంతో అనుభవజ్ఞుడైన భాగస్వామిని సంప్రదించడం మంచిది.

ఈ కథనాన్ని చదివినందుకు ధన్యవాదాలు, మీకు సమాచారం ఉపయోగకరంగా ఉంటుందని మేము ఆశిస్తున్నాము.

మూలం: www.habr.com

ఒక వ్యాఖ్యను జోడించండి