Dhiibida schemaEvolution ficil ahaan

Akhristayaasheena sharafta lahow, maalin wanaagsan!

Maqaalkan, lataliyaha ugu horreeya ee Neoflex's Big Data Solutions aagga ganacsiga wuxuu si faahfaahsan u qeexayaa fursadaha lagu dhisayo qaab dhismeedka doorsoomayaasha ah iyadoo la adeegsanayo Apache Spark.

Iyada oo qayb ka ah mashruuca falanqaynta xogta, hawsha dhisidda meelaha dukaamada ee ku salaysan xogta habaysan ee habaysan ayaa badanaa soo baxda.

Caadiyan kuwani waa qoraallo, ama jawaabo ka yimid habab kala duwan, oo loo kaydiyay JSON ama XML. Xogta waxaa lagu shubaa Hadoop, ka dib waxaad u baahan tahay inaad ka dhisto meel bakhaar ah iyaga. Waxaan abaabuli karnaa gelitaanka bandhigga la abuuray, tusaale ahaan, iyada oo loo marayo Impala.

Xaaladdan oo kale, qorshaha bakhaarka bartilmaameedka ah lama yaqaan ka hor. Intaa waxaa dheer, qorshaha sidoo kale hore looma diyaarin karo, maadaama ay ku xiran tahay xogta, waxaanan wax ka qabaneynaa xogtan habaysan ee aadka u dabacsan.

Tusaale ahaan, maanta jawaabta soo socota ayaa la qoray:

{source: "app1", error_code: ""}

berrina isla nidaamkaas waxaa ka imanaysa jawaabta soo socota:

{source: "app1", error_code: "error", description: "Network error"}

Natiijo ahaan, hal goob oo kale waa in lagu daraa bandhigga - sharaxaad, qofna ma oga inuu iman doono iyo in kale.

Hawsha abuurista bakhaarka xogtan oo kale waa heer aad u qurux badan, Spark-na waxa ay haysataa tiro qalab ah oo tan ah. Si loo kala saaro xogta isha, waxaa jira taageero JSON iyo XML labadaba, iyo qorshe aan hore loo aqoon, taageerada schemaEvolution ayaa la bixiyaa.

Jaleecada hore, xalku wuxuu u muuqdaa mid fudud. Waxaad u baahan tahay inaad qaadato JSON gal oo aad ku akhrido kaydka xogta. Spark waxay abuuri doontaa schema, xogta buulka leh waxay u rogi doontaa dhismayaal. Dheeraad ah, wax walba waxay u baahan yihiin in lagu keydiyo parquet, kaas oo sidoo kale lagu taageeray Impala, adoo diiwaangelinaya bakhaarka hore ee Hive metastore.

Wax walba waxay u muuqdaan kuwo fudud.

Si kastaba ha ahaatee, kama cadda tusaalayaasha gaagaaban ee dukumentiga waxa lagu sameeyo tiro dhibaatooyin ah oo ficil ahaan ah.

Dukumeentigu wuxuu qeexayaa habka aan loo abuurin bakhaarka hortiisa, laakiin in lagu akhriyo JSON ama XML gudaha xogta.

Magac ahaan, waxay si fudud u tusinaysaa sida loo akhriyo oo loo kala saaro JSON:

df = spark.read.json(path...)

Tani waa ku filan tahay in xogta laga dhigo Spark.

Ficil ahaan, qoraalku aad ayuu uga dhib badan yahay kaliya akhrinta faylasha JSON ee galka iyo abuurista xog-frame. Xaaladdu waxay u egtahay sidan: waxaa horayba u jiray dukaamo gaar ah, xog cusub ayaa soo gala maalin kasta, waxay u baahan yihiin in lagu daro bakhaarka, iyada oo aan la iloobin in nidaamku ka duwanaan karo.

Habka caadiga ah ee lagu dhisayo bandhig bandhigeedka waa sida soo socota:

Tallaabada 1. Xogta waxa lagu shubay Hadoop iyada oo maalin kasta dib loo soo raro waxaana lagu daraa qayb cusub. Waxay soo saartaa gal leh xogta bilowga ah oo qaybsan maalintii.

Tallaabada 2. Inta lagu jiro culeyska bilowga ah, galkan waxaa akhrinaya oo kala saaraya Spark. Qaabka xogta ee ka soo baxay waxa lagu kaydiyaa qaab la qiyaasi karo, tusaale ahaan, parquet, ka dibna la soo dejin karo Impala. Tani waxay abuurtaa bandhig bartilmaameed leh dhammaan xogta la ururiyey ilaa heerkan.

Tallaabada 3. Soodejin ayaa la sameeyay kaas oo cusboonaysiin doona bakhaarka maalin kasta.
Waxaa jira su'aal ku saabsan kordhinta kordhinta, baahida loo qabo in la qaybiyo bandhigga, iyo su'aasha ah ilaalinta nidaamka guud ee bandhigga.

Aan tusaale usoo qaadano. Aynu nidhaahno talaabadii ugu horaysay ee dhisidda kaydka waa la fuliyay, faylasha JSONna waxa lagu shubaa gal.

Abuuritaanka database-ka iyaga, ka dibna u xafidaya bandhig ahaan, dhib maaha. Tani waa tillaabada ugu horreysa ee si fudud looga heli karo dukumeentiga Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Wax walba waxay u muuqdaan inay fiican yihiin.

Waanu akhrinay oo kala saarnay JSON, ka dib waxaanu u kaydinay xogta qaab-dhismeed ahaan, anagoo ku diiwaan galinayna rugta hab kasta oo ku haboon:

df.write.format(β€œparquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Waxaan helnaa daaqad.

Laakiin, maalintii xigtay, xog cusub oo laga helay isha ayaa lagu daray. Waxaan haynaa gal JSON ah, iyo bandhig laga sameeyay galkan. Ka dib markii la soo dejiyo xogta soo socota ee isha, mareegta xogta ayaa ka maqan qiimaha hal maalin ee xogta.

Xalka macquulka ah wuxuu noqon lahaa in la qaybiyo wajiga hore ee bakhaarka maalintii, taas oo u oggolaan doonta in lagu daro qayb cusub maalin kasta oo xigta. Habka tan ayaa sidoo kale si fiican loo yaqaan, Spark wuxuu kuu ogolaanayaa inaad si gooni ah u qorto qaybo.

Marka hore, waxaan sameyneynaa culeys bilow ah, keydinta xogta sida kor lagu sharaxay, ku dar kaliya qaybinta. Ficilkan waxaa loo yaqaan bilowga dukaanka waxaana la sameeyaa hal mar oo keliya:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Maalinta ku xigta, waxaan soo rarnaa kaliya qayb cusub:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Waxa kaliya ee hadhay waa in dib-u-diiwaangelinta Hive si loo cusboonaysiiyo qorshaha.
Si kastaba ha ahaatee, tani waa meesha dhibaatooyinka ka soo baxaan.

Dhibka koowaad. Mar dhow ama dambe, parquet ka soo baxay waxay noqon doontaa mid aan la akhrin karin. Tan waxa u sabab ah sida parquet iyo JSON u wajahaan beeraha madhan si kala duwan.

Aynu tixgelinno xaalad caadi ah. Tusaale ahaan, shalay JSON ayaa timid:

Π”Π΅Π½ΡŒ 1: {"a": {"b": 1}},

maantana isla JSON ayaa u eeg sidan:

Π”Π΅Π½ΡŒ 2: {"a": null}

Aynu nidhaahno waxaynu leenahay laba qaybood oo kala duwan, mid walbana wuxuu leeyahay hal xariiq.
Markaan akhrino dhammaan xogta isha, Spark waxay awood u yeelan doontaa inay go'aamiso nooca, oo waxay fahmi doontaa in "a" ay tahay qayb ka mid ah "qaab-dhismeedka", oo leh beer buul leh "b" oo ah nooca INT. Laakiin, haddii qayb kasta si gaar ah loo badbaadiyey, markaas waxaan helnaa parquet leh nidaamyada qaybinta aan ku habboonayn:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Xaaladdan si fiican ayaa loo yaqaan, markaa ikhtiyaar ayaa si gaar ah loogu daray - marka la falanqeynayo xogta isha, ka saar meelaha madhan:

df = spark.read.json("...", dropFieldIfAllNull=True)

Xaaladdan oo kale, parquet wuxuu ka koobnaan doonaa qaybo la wada akhriyi karo.
In kasta oo kuwa sidan ku sameeyay ficil ahaan ay halkan dhoola cadeynayaan. Waa maxay sababtu? Haa, sababtoo ah waxay u badan tahay inay jiraan laba xaaladood oo kale. Ama saddex. Ama afar. Midka ugu horreeya, oo runtii dhici doona, waa in noocyada tirooyinka ay u eegi doonaan noocyo kala duwan oo faylasha JSON ah. Tusaale ahaan, {intField: 1} iyo {intField: 1.1}. Haddii goobahan oo kale laga helo hal qayb, ka dibna isku-dhafka schema ayaa wax walba si sax ah u akhrin doona, taasoo horseedaysa nooca ugu saxsan. Laakiin haddii kuwa kala duwan, markaas mid ayaa yeelan doona intField: int, kan kalena wuxuu yeelan doonaa intField: double.

Waxaa jira calanka soo socda si loo xalliyo xaaladdan:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Hadda waxaan haynaa gal halkaas oo ay jiraan qaybo lagu akhriyi karo hal dataframe iyo parquet sax ah oo ka mid ah bandhigga oo dhan. Haa? Maya

Waa in aan xasuusannaa in aan ka diiwaan gashannay miiska Hive. rugtu maaha mid xasaasi u ah magacyada goobta, halka parquet ay tahay kiis xasaasi ah. Sidaa darteed, qaybo leh schemas: field1: int, iyo Field1: int waxay la mid yihiin Hive, laakiin maaha Spark. Ha iloobin inaad u bedesho magacyada goobta kuwa yaryar.

Taas ka dib, wax walba waxay u muuqdaan inay fiican yihiin.

Si kastaba ha ahaatee, dhammaan ma fududa. Waxaa jirta dhibaato labaad, sidoo kale si fiican loo yaqaan. Maadaama qayb kasta oo cusub si gaar ah loo kaydiyay, galka qaybinta waxa ku jiri doona faylalka adeegga Spark, tusaale ahaan, calanka guusha hawlgalka _SUCCESS. Tani waxay keeni doontaa qalad marka la isku dayayo in parquet. Si taas looga fogaado, waxaad u baahan tahay inaad habayso qaabaynta si aad uga ilaaliso Spark inay ku darto faylalka adeegga galka:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Waxay u muuqataa in hadda maalin kasta qayb cusub oo parquet ah lagu daro galka bandhiga bartilmaameedka, halkaas oo xogta la soo bandhigay ee maalinta ay ku taal. Waxaan horay uga taxaddarnay in aysan jirin qaybo leh isku dhac nooca xogta ah.

Laakiin, dhib saddexaad ayaa na haysata. Hadda qorshaha guud lama yaqaan, sidoo kale, miiska Hive wuxuu leeyahay qorshe khaldan, maadaama qayb kasta oo cusubi ay u badan tahay inay soo bandhigtay qallooc ku yimid qorshaha.

Waxaad u baahan tahay inaad dib-u-diiwaangeliso miiska. Tan waxaa loo samayn karaa si fudud: mar labaad akhri bakhaarka hore ee dukaanka, qaado schema oo samee DDL iyada oo ku saleysan, kaas oo dib loogu diiwaan geliyo galka Hive sida miis dibadda ah, cusboonaysiinta schema ee dukaanka bartilmaameedka.

Dhib afraad ayaa na haysata. Markii ugu horeysay ee aan diiwaan gelinay miiska, waxaan ku tiirsanahay Spark. Hadda waxaan samaynaa nafteena, waxaanan u baahanahay inaan xasuusano in beeraha parquet ay ka bilaabi karaan jilayaasha aan loo ogolayn Hive. Tusaale ahaan, Spark waxa ay tuurtaa xariiqyo aanay ku kala saari karin gudaha "corrupt_record". Goobtan oo kale laguma diwaan gashan karo rugta iyada oo aan la baxsan.

Anagoo og tan, waxaan helnaa nidaamka:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Code (" _corrupt_record", "`_dikoorka_corrupt_") + "" + f[1]. bedel (":", "`:"). bedel("<", "<`"). bedel(",",", ",`"). bedel ("array<`", "array"). waxay samaysaa badbaado DDL, ie. halkii:

create table tname (_field1 string, 1field string)

Magacyada goobta sida "_field1, 1field", DDL ammaan ah ayaa lagu sameeyay halka magacyada goobta ay ka baxsadeen: samee shaxda `tname` (`_field1` string, `1 field` string).

Su'aashu waxay soo baxaysaa: sida saxda ah ee loo helo xogta qaabaysan oo leh schema dhamaystiran (ee pf code)? Sidee lagu helaa PF-kan? Tani waa dhibkii shanaad. Dib u akhri nidaamka dhammaan qaybaha galka galka oo wata faylalka parquet ee bandhiga bartilmaameedka? Habkani waa kan ugu badbaado badan, laakiin adag.

Nidaamku wuxuu ku yaal Hive. Waxaad heli kartaa schema cusub adiga oo isku dara qorshaha miiska oo dhan iyo qaybta cusub. Markaa waxaad u baahan tahay inaad ka soo qaadato shaxda miiska Hive oo aad ku darsato qorshaha qaybta cusub. Tan waxa lagu samayn karaa iyadoo la akhriyo xogta badan ee tijaabada ee Hive, lagu kaydiyo gal ku meel gaadh ah, oo la isticmaalo Spark si loo akhriyo labada qayboodba hal mar.

Dhab ahaantii, waxaa jira wax kasta oo aad u baahan tahay: shaxda miiska asalka ah ee Hive iyo qaybta cusub. Waxaan sidoo kale haynaa xog. Waxa ay hadhaysaa oo kaliya in la helo schema cusub oo isku daraysa shaxda hore ee dukaanka iyo goobo cusub oo ka soo jeeda qaybta la abuuray:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Marka xigta, waxaan abuurnaa diiwaan gelinta miiska DDL, sida ku qoran jeexdii hore.
Haddii silsiladda oo dhami ay si sax ah u shaqeyso, taas oo ah, waxaa jiray culeys bilow ah, miiskana si sax ah ayaa loo sameeyay Hive, ka dibna waxaan helnaa qorshe miiska la cusbooneysiiyay.

Dhibaatada ugu dambeysana waa inaadan ku dari karin qayb ka mid ah miiska Hive, sababtoo ah waa la jebin doonaa. Waxaad u baahan tahay inaad ku qasabto Hive inay hagaajiso qaab dhismeedka qaybinteeda:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Hawsha fudud ee akhrinta JSON iyo abuurista goob dukaamo ah oo ku salaysan waxay keenaysaa in laga gudbo tiro dhibaatooyin aan qarsoonayn, xalal kuwaas oo ay tahay inaad si gaar ah u raadiso. Inkasta oo xalalkani ay yihiin kuwo fudud, waxay qaadataa wakhti badan si loo helo.

Si aan u hirgeliyo dhismaha bandhigga, waxaan ku khasbanaaday:

  • Ku dar qaybo ka mid ah bandhigga, ka takhalusida faylasha adeegga
  • Wax ka qabso meelaha madhan ee xogta isha ee Spark qortay
  • Noocyo fudud ku dheji xadhig
  • U beddelo magacyada goobta xarfo yar
  • Soo dejinta xogta iyo diiwaangelinta miiska ee Hive (jiilka DDL)
  • Ha iloobin inaad ka baxsato magacyada goobta ee laga yaabo inay la jaan qaadi karin Hive
  • Baro sida loo cusboonaysiiyo diiwaangelinta miiska Hive

Isku soo wada duuboo, waxaan ogaanay in go'aanka lagu dhisayo daaqadaha dukaamada ay ka buuxaan godad badan. Sidaa darteed, haddii ay dhacdo dhibaatooyin xagga fulinta, waxaa fiican inaad la xiriirto lammaane khibrad leh oo leh khibrad guul leh.

Waad ku mahadsan tahay akhrinta maqaalkan, waxaan rajeyneynaa inaad u hesho macluumaadka faa'iido leh.

Source: www.habr.com

Add a comment