ప్రియమైన పాఠకులారా, మంచి రోజు!
ఈ కథనంలో, నియోఫ్లెక్స్ యొక్క బిగ్ డేటా సొల్యూషన్స్ బిజినెస్ ఏరియా యొక్క ప్రముఖ కన్సల్టెంట్ అపాచీ స్పార్క్ ఉపయోగించి వేరియబుల్ స్ట్రక్చర్ షోకేస్లను నిర్మించే ఎంపికలను వివరంగా వివరిస్తుంది.
డేటా విశ్లేషణ ప్రాజెక్ట్లో భాగంగా, వదులుగా నిర్మాణాత్మక డేటా ఆధారంగా స్టోర్ ముందరిని నిర్మించే పని తరచుగా తలెత్తుతుంది.
సాధారణంగా ఇవి లాగ్లు లేదా వివిధ సిస్టమ్ల నుండి వచ్చిన ప్రతిస్పందనలు, JSON లేదా XMLగా సేవ్ చేయబడతాయి. డేటా హడూప్కి అప్లోడ్ చేయబడింది, ఆపై మీరు వాటి నుండి దుకాణం ముందరిని నిర్మించాలి. మేము సృష్టించిన షోకేస్కి యాక్సెస్ని నిర్వహించవచ్చు, ఉదాహరణకు, ఇంపాలా ద్వారా.
ఈ సందర్భంలో, టార్గెట్ స్టోర్ ఫ్రంట్ యొక్క స్కీమా ముందుగా తెలియదు. అంతేకాకుండా, పథకం కూడా ముందుగానే రూపొందించబడదు, ఎందుకంటే ఇది డేటాపై ఆధారపడి ఉంటుంది మరియు మేము ఈ చాలా వదులుగా నిర్మాణాత్మక డేటాతో వ్యవహరిస్తున్నాము.
ఉదాహరణకు, ఈ రోజు కింది ప్రతిస్పందన లాగ్ చేయబడింది:
{source: "app1", error_code: ""}
మరియు రేపు అదే సిస్టమ్ నుండి క్రింది సమాధానం వస్తుంది:
{source: "app1", error_code: "error", description: "Network error"}
ఫలితంగా, షోకేస్ - వివరణకు మరో ఫీల్డ్ జోడించబడాలి మరియు అది వస్తుందో లేదో ఎవరికీ తెలియదు.
అటువంటి డేటాపై స్టోర్ ముందరిని సృష్టించే పని చాలా ప్రామాణికమైనది మరియు స్పార్క్ దీని కోసం అనేక సాధనాలను కలిగి ఉంది. మూల డేటాను అన్వయించడం కోసం, JSON మరియు XML రెండింటికీ మద్దతు ఉంది మరియు గతంలో తెలియని స్కీమా కోసం, స్కీమాఎవల్యూషన్కు మద్దతు అందించబడుతుంది.
మొదటి చూపులో, పరిష్కారం చాలా సులభం. మీరు JSONతో ఫోల్డర్ని తీసుకొని దానిని డేటాఫ్రేమ్లో చదవాలి. స్పార్క్ స్కీమాను సృష్టిస్తుంది, సమూహ డేటాను నిర్మాణాలుగా మారుస్తుంది. ఇంకా, హైవ్ మెటాస్టోర్లో స్టోర్ ఫ్రంట్ను రిజిస్టర్ చేయడం ద్వారా ఇంపాలాలో కూడా సపోర్ట్ చేసే పార్కెట్లో ప్రతిదీ సేవ్ చేయాలి.
అంతా సింపుల్గా ఉన్నట్లు అనిపిస్తుంది.
అయితే, ఆచరణలో ఉన్న అనేక సమస్యలతో ఏమి చేయాలో డాక్యుమెంటేషన్లోని చిన్న ఉదాహరణల నుండి స్పష్టంగా లేదు.
డాక్యుమెంటేషన్ స్టోర్ ఫ్రంట్ని సృష్టించడం కాదు, JSON లేదా XMLని డేటాఫ్రేమ్లో చదవడం గురించి వివరిస్తుంది.
అవి, ఇది కేవలం JSONని ఎలా చదవాలో మరియు అన్వయించాలో చూపిస్తుంది:
df = spark.read.json(path...)
స్పార్క్కి డేటా అందుబాటులో ఉంచడానికి ఇది సరిపోతుంది.
ఆచరణలో, ఫోల్డర్ నుండి JSON ఫైల్లను చదవడం మరియు డేటాఫ్రేమ్ను సృష్టించడం కంటే స్క్రిప్ట్ చాలా క్లిష్టంగా ఉంటుంది. పరిస్థితి ఇలా కనిపిస్తుంది: ఇప్పటికే ఒక నిర్దిష్ట స్టోర్ ఫ్రంట్ ఉంది, ప్రతిరోజూ కొత్త డేటా వస్తుంది, వాటిని స్టోర్ ఫ్రంట్కు జోడించాల్సిన అవసరం ఉంది, పథకం భిన్నంగా ఉండవచ్చని మర్చిపోకూడదు.
ప్రదర్శనశాలను నిర్మించడానికి సాధారణ పథకం క్రింది విధంగా ఉంటుంది:
1 దశ. తదుపరి రోజువారీ రీలోడింగ్తో డేటా హడూప్లోకి లోడ్ చేయబడుతుంది మరియు కొత్త విభజనకు జోడించబడుతుంది. ఇది రోజు వారీగా విభజించబడిన ప్రారంభ డేటాతో ఫోల్డర్గా మారుతుంది.
2 దశ. ప్రారంభ లోడ్ సమయంలో, ఈ ఫోల్డర్ స్పార్క్ ద్వారా చదవబడుతుంది మరియు అన్వయించబడుతుంది. ఫలితంగా డేటాఫ్రేమ్ పార్సేబుల్ ఫార్మాట్లో సేవ్ చేయబడుతుంది, ఉదాహరణకు, పార్కెట్లో, దానిని ఇంపాలాలోకి దిగుమతి చేసుకోవచ్చు. ఇది ఇప్పటివరకు సేకరించిన మొత్తం డేటాతో టార్గెట్ షోకేస్ను సృష్టిస్తుంది.
3 దశ. ప్రతిరోజు స్టోర్ ఫ్రంట్ను అప్డేట్ చేసే డౌన్లోడ్ సృష్టించబడింది.
ఇంక్రిమెంటల్ లోడింగ్, షోకేస్ను విభజించాల్సిన అవసరం మరియు షోకేస్ యొక్క సాధారణ స్కీమ్ను నిర్వహించడం అనే ప్రశ్న ఉంది.
ఒక ఉదాహరణ తీసుకుందాం. రిపోజిటరీని నిర్మించే మొదటి దశ అమలు చేయబడిందని మరియు JSON ఫైల్లు ఫోల్డర్కి అప్లోడ్ చేయబడిందని చెప్పండి.
వాటి నుండి డేటాఫ్రేమ్ను రూపొందించడం, ఆపై దానిని షోకేస్గా సేవ్ చేయడం సమస్య కాదు. స్పార్క్ డాక్యుమెంటేషన్లో సులభంగా కనుగొనగలిగే మొదటి దశ ఇది:
df = spark.read.option("mergeSchema", True).json(".../*")
df.printSchema()
root
|-- a: long (nullable = true)
|-- b: string (nullable = true)
|-- c: struct (nullable = true) |
|-- d: long (nullable = true)
అంతా బాగానే ఉన్నట్లుంది.
మేము JSONని చదివాము మరియు అన్వయించాము, ఆపై మేము డేటాఫ్రేమ్ను పార్కెట్గా సేవ్ చేస్తాము, ఏదైనా అనుకూలమైన మార్గంలో హైవ్లో నమోదు చేస్తాము:
df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')
మాకు విండో వస్తుంది.
కానీ, మరుసటి రోజు, మూలం నుండి కొత్త డేటా జోడించబడింది. మేము JSONతో ఫోల్డర్ని కలిగి ఉన్నాము మరియు ఈ ఫోల్డర్ నుండి సృష్టించబడిన షోకేస్. మూలాధారం నుండి తదుపరి బ్యాచ్ డేటాను లోడ్ చేసిన తర్వాత, డేటా మార్ట్లో ఒక రోజు విలువైన డేటా లేదు.
తార్కిక పరిష్కారం రోజు వారీగా స్టోర్ ఫ్రంట్ను విభజించడం, ఇది ప్రతి మరుసటి రోజు కొత్త విభజనను జోడించడాన్ని అనుమతిస్తుంది. దీని కోసం మెకానిజం కూడా బాగా తెలుసు, స్పార్క్ విడిగా విభజనలను వ్రాయడానికి మిమ్మల్ని అనుమతిస్తుంది.
మొదట, మేము ప్రారంభ లోడ్ చేస్తాము, పైన వివరించిన విధంగా డేటాను సేవ్ చేస్తాము, విభజనను మాత్రమే జోడిస్తాము. ఈ చర్యను స్టోర్ ఫ్రంట్ ఇనిషియలైజేషన్ అంటారు మరియు ఒకసారి మాత్రమే చేయబడుతుంది:
df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)
మరుసటి రోజు, మేము కొత్త విభజనను మాత్రమే లోడ్ చేస్తాము:
df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")
స్కీమాను అప్డేట్ చేయడానికి హైవ్లో మళ్లీ నమోదు చేసుకోవడం మాత్రమే మిగిలి ఉంది.
అయితే, ఇక్కడే సమస్యలు తలెత్తుతున్నాయి.
మొదటి సమస్య. ముందుగానే లేదా తరువాత, ఫలితంగా పారేకెట్ చదవబడదు. పార్కెట్ మరియు JSON ఖాళీ ఫీల్డ్లను ఎలా విభిన్నంగా పరిగణిస్తాయో దీనికి కారణం.
ఒక సాధారణ పరిస్థితిని పరిశీలిద్దాం. ఉదాహరణకు, నిన్న JSON వస్తుంది:
День 1: {"a": {"b": 1}},
మరియు నేడు అదే JSON ఇలా కనిపిస్తుంది:
День 2: {"a": null}
మనకు రెండు వేర్వేరు విభజనలు ఉన్నాయని అనుకుందాం, ఒక్కొక్కటి ఒక పంక్తితో.
మేము మొత్తం సోర్స్ డేటాను చదివినప్పుడు, స్పార్క్ రకాన్ని గుర్తించగలదు మరియు "a" అనేది INT రకం "b" యొక్క సమూహ ఫీల్డ్తో "నిర్మాణం" రకం ఫీల్డ్ అని అర్థం చేసుకుంటుంది. కానీ, ప్రతి విభజన విడిగా సేవ్ చేయబడితే, మేము అననుకూల విభజన పథకాలతో ఒక పారేకెట్ను పొందుతాము:
df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)
ఈ పరిస్థితి బాగా తెలుసు, కాబట్టి ఒక ఎంపిక ప్రత్యేకంగా జోడించబడింది - సోర్స్ డేటాను అన్వయించేటప్పుడు, ఖాళీ ఫీల్డ్లను తీసివేయండి:
df = spark.read.json("...", dropFieldIfAllNull=True)
ఈ సందర్భంలో, పారేకెట్ కలిసి చదవగలిగే విభజనలను కలిగి ఉంటుంది.
ఆచరణలో ఇలా చేసిన వారు ఇక్కడ మాత్రం ఘాటుగా నవ్వుతారు. ఎందుకు? అవును, ఎందుకంటే మరో రెండు పరిస్థితులు ఉండే అవకాశం ఉంది. లేదా మూడు. లేదా నాలుగు. మొదటిది, దాదాపు ఖచ్చితంగా సంభవించేది, వివిధ JSON ఫైల్లలో సంఖ్యా రకాలు భిన్నంగా కనిపిస్తాయి. ఉదాహరణకు, {intField: 1} మరియు {intField: 1.1}. అటువంటి ఫీల్డ్లు ఒక విభజనలో కనుగొనబడితే, స్కీమా విలీనం ప్రతిదీ సరిగ్గా చదవబడుతుంది, ఇది అత్యంత ఖచ్చితమైన రకానికి దారి తీస్తుంది. కానీ వేర్వేరు వాటిలో ఉంటే, ఒకదానిలో intField: int ఉంటుంది, మరియు మరొకటి intField: డబుల్ ఉంటుంది.
ఈ పరిస్థితిని నిర్వహించడానికి క్రింది జెండా ఉంది:
df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)
ఇప్పుడు మనకు ఒకే డేటాఫ్రేమ్లో చదవగలిగే విభజనలు మరియు మొత్తం షోకేస్ యొక్క చెల్లుబాటు అయ్యే పార్కెట్ ఉన్న ఫోల్డర్ ఉంది. అవునా? సంఖ్య
మేము అందులో నివశించే తేనెటీగలు లో పట్టిక నమోదు గుర్తుంచుకోవాలి. ఫీల్డ్ పేర్లలో హైవ్ కేస్ సెన్సిటివ్ కాదు, పార్కెట్ అనేది కేస్ సెన్సిటివ్. కాబట్టి, స్కీమాలతో కూడిన విభజనలు: field1: int, మరియు Field1: int హైవ్కి ఒకేలా ఉంటాయి, కానీ Spark కోసం కాదు. ఫీల్డ్ పేర్లను చిన్న అక్షరానికి మార్చడం మర్చిపోవద్దు.
ఆ తర్వాత అంతా సవ్యంగానే ఉన్నట్లుంది.
అయితే, ప్రతిదీ అంత సులభం కాదు. రెండవది, బాగా తెలిసిన సమస్య కూడా ఉంది. ప్రతి కొత్త విభజన విడిగా సేవ్ చేయబడినందున, విభజన ఫోల్డర్ Spark సర్వీస్ ఫైల్లను కలిగి ఉంటుంది, ఉదాహరణకు, _SUCCESS ఆపరేషన్ సక్సెస్ ఫ్లాగ్. ఇది పార్కెట్ చేయడానికి ప్రయత్నిస్తున్నప్పుడు లోపం ఏర్పడుతుంది. దీన్ని నివారించడానికి, ఫోల్డర్కు సర్వీస్ ఫైల్లను జోడించకుండా స్పార్క్ నిరోధించడానికి మీరు కాన్ఫిగరేషన్ను కాన్ఫిగర్ చేయాలి:
hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
రోజు కోసం అన్వయించిన డేటా ఉన్న టార్గెట్ షోకేస్ ఫోల్డర్కు ఇప్పుడు ప్రతిరోజూ కొత్త పార్కెట్ విభజన జోడించబడుతోంది. డేటా రకం వైరుధ్యంతో విభజనలు ఏవీ లేవని మేము ముందుగానే చూసుకున్నాము.
కానీ, మనకు మూడో సమస్య ఉంది. ఇప్పుడు సాధారణ స్కీమా తెలియదు, అంతేకాకుండా, హైవ్లోని పట్టిక తప్పు స్కీమాను కలిగి ఉంది, ఎందుకంటే ప్రతి కొత్త విభజన స్కీమాలో వక్రీకరణను ప్రవేశపెట్టింది.
మీరు పట్టికను మళ్లీ నమోదు చేసుకోవాలి. ఇది చాలా సరళంగా చేయవచ్చు: స్టోర్ ఫ్రంట్ యొక్క పార్కెట్ను మళ్లీ చదవండి, స్కీమాను తీసుకోండి మరియు దాని ఆధారంగా DDLని సృష్టించండి, దానితో హైవ్లోని ఫోల్డర్ను బాహ్య పట్టికగా మళ్లీ నమోదు చేయండి, టార్గెట్ స్టోర్ ఫ్రంట్ యొక్క స్కీమాను నవీకరిస్తుంది.
మనకు నాల్గవ సమస్య ఉంది. మేము మొదటిసారి పట్టికను నమోదు చేసినప్పుడు, మేము స్పార్క్పై ఆధారపడ్డాము. ఇప్పుడు మనం దీన్ని మనమే చేస్తాము మరియు పారేకెట్ ఫీల్డ్లు అందులో నివశించే తేనెటీగలు అనుమతించబడని అక్షరాలతో ప్రారంభించవచ్చని గుర్తుంచుకోవాలి. ఉదాహరణకు, స్పార్క్ "corrupt_record" ఫీల్డ్లో అన్వయించలేని పంక్తులను విసురుతుంది. అటువంటి ఫీల్డ్ తప్పించుకోకుండా హైవ్లో నమోదు చేయబడదు.
ఇది తెలుసుకోవడం, మేము పథకం పొందుతాము:
f_def = ""
for f in pf.dtypes:
if f[0] != "date_load":
f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<")
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)
కోడ్ ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") సురక్షితమైన DDLని చేస్తుంది, అంటే బదులుగా:
create table tname (_field1 string, 1field string)
"_field1, 1field" వంటి ఫీల్డ్ పేర్లతో, ఫీల్డ్ పేర్లు తప్పించుకున్న చోట సురక్షితమైన DDL చేయబడుతుంది: టేబుల్ `tname` (`_field1` స్ట్రింగ్, `1field` స్ట్రింగ్) సృష్టించండి.
ప్రశ్న తలెత్తుతుంది: పూర్తి స్కీమాతో (pf కోడ్లో) డేటాఫ్రేమ్ను ఎలా సరిగ్గా పొందాలి? ఈ pf ఎలా పొందాలి? ఇది ఐదవ సమస్య. టార్గెట్ షోకేస్ యొక్క పార్కెట్ ఫైల్లతో ఫోల్డర్ నుండి అన్ని విభజనల పథకాన్ని మళ్లీ చదవాలా? ఈ పద్ధతి సురక్షితమైనది, కానీ కష్టం.
స్కీమా ఇప్పటికే హైవ్లో ఉంది. మీరు మొత్తం పట్టిక మరియు కొత్త విభజన యొక్క స్కీమాను కలపడం ద్వారా కొత్త స్కీమాను పొందవచ్చు. కాబట్టి మీరు హైవ్ నుండి టేబుల్ స్కీమాను తీసుకోవాలి మరియు దానిని కొత్త విభజన యొక్క స్కీమాతో కలపాలి. ఇది హైవ్ నుండి పరీక్ష మెటాడేటాను చదవడం, తాత్కాలిక ఫోల్డర్లో సేవ్ చేయడం మరియు రెండు విభజనలను ఒకేసారి చదవడానికి స్పార్క్ని ఉపయోగించడం ద్వారా చేయవచ్చు.
వాస్తవానికి, మీకు కావాల్సినవన్నీ ఉన్నాయి: హైవ్లోని అసలు టేబుల్ స్కీమా మరియు కొత్త విభజన. మా దగ్గర డేటా కూడా ఉంది. సృష్టించిన విభజన నుండి స్టోర్ ఫ్రంట్ స్కీమా మరియు కొత్త ఫీల్డ్లను మిళితం చేసే కొత్త స్కీమాను పొందడానికి మాత్రమే ఇది మిగిలి ఉంది:
from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")
తరువాత, మేము మునుపటి స్నిప్పెట్లో వలె టేబుల్ రిజిస్ట్రేషన్ DDLని సృష్టిస్తాము.
మొత్తం గొలుసు సరిగ్గా పనిచేస్తే, అనగా, ప్రారంభించే లోడ్ ఉంది మరియు హైవ్లో పట్టిక సరిగ్గా సృష్టించబడితే, మేము నవీకరించబడిన టేబుల్ స్కీమాను పొందుతాము.
మరియు చివరి సమస్య ఏమిటంటే, మీరు హైవ్ టేబుల్కి విభజనను జోడించలేరు, ఎందుకంటే అది విచ్ఛిన్నమవుతుంది. దాని విభజన నిర్మాణాన్ని సరిచేయడానికి మీరు హైవ్ని బలవంతం చేయాలి:
from pyspark.sql import HiveContext
hc = HiveContext(spark)
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)
JSONని చదవడం మరియు దాని ఆధారంగా దుకాణం ముందరిని సృష్టించడం అనే సులభమైన పని అనేక అవ్యక్తమైన ఇబ్బందులను అధిగమించడానికి దారితీస్తుంది, వాటి కోసం మీరు విడిగా వెతకాలి. మరియు ఈ పరిష్కారాలు సరళమైనవి అయినప్పటికీ, వాటిని కనుగొనడానికి చాలా సమయం పడుతుంది.
షోకేస్ నిర్మాణాన్ని అమలు చేయడానికి, నేను వీటిని చేయాల్సి వచ్చింది:
- షోకేస్కు విభజనలను జోడించండి, సర్వీస్ ఫైల్లను వదిలించుకోండి
- స్పార్క్ టైప్ చేసిన సోర్స్ డేటాలోని ఖాళీ ఫీల్డ్లతో వ్యవహరించండి
- సరళమైన రకాలను స్ట్రింగ్కు ప్రసారం చేయండి
- ఫీల్డ్ పేర్లను చిన్న అక్షరానికి మార్చండి
- హైవ్లో ప్రత్యేక డేటా అప్లోడ్ మరియు టేబుల్ నమోదు (DDL జనరేషన్)
- హైవ్తో అననుకూలంగా ఉండే ఫీల్డ్ పేర్ల నుండి తప్పించుకోవడం మర్చిపోవద్దు
- హైవ్లో టేబుల్ రిజిస్ట్రేషన్ని ఎలా అప్డేట్ చేయాలో తెలుసుకోండి
సంగ్రహంగా, దుకాణ కిటికీలను నిర్మించాలనే నిర్ణయం అనేక ఆపదలతో నిండి ఉందని మేము గమనించాము. అందువల్ల, అమలులో ఇబ్బందులు ఉన్నట్లయితే, విజయవంతమైన నైపుణ్యంతో అనుభవజ్ఞుడైన భాగస్వామిని సంప్రదించడం మంచిది.
ఈ కథనాన్ని చదివినందుకు ధన్యవాదాలు, మీకు సమాచారం ఉపయోగకరంగా ఉంటుందని మేము ఆశిస్తున్నాము.
మూలం: www.habr.com