Spark schemaEvolution katika mazoezi

Wasomaji wapendwa, siku njema!

Katika makala haya, mshauri mkuu wa eneo la biashara la Neoflex's Big Data Solutions anaelezea kwa kina chaguo za kujenga maonyesho ya muundo tofauti kwa kutumia Apache Spark.

Kama sehemu ya mradi wa uchanganuzi wa data, kazi ya kujenga mbele ya duka kulingana na muundo wa data uliolegea mara nyingi hutokea.

Kawaida hizi ni kumbukumbu, au majibu kutoka kwa mifumo mbalimbali, iliyohifadhiwa kama JSON au XML. Data imepakiwa kwa Hadoop, basi unahitaji kujenga mbele ya duka kutoka kwao. Tunaweza kupanga ufikiaji wa onyesho iliyoundwa, kwa mfano, kupitia Impala.

Katika kesi hii, schema ya mbele ya duka inayolengwa haijulikani hapo awali. Zaidi ya hayo, mpango huo pia hauwezi kutayarishwa mapema, kwa kuwa inategemea data, na tunashughulika na data hizi zilizopangwa kwa urahisi sana.

Kwa mfano, leo jibu lifuatalo limeingia:

{source: "app1", error_code: ""}

na kesho kutoka kwa mfumo huo huo huja jibu lifuatalo:

{source: "app1", error_code: "error", description: "Network error"}

Kama matokeo, uwanja mmoja zaidi unapaswa kuongezwa kwenye onyesho - maelezo, na hakuna anayejua ikiwa itakuja au la.

Kazi ya kuunda mbele ya duka kwenye data kama hiyo ni kiwango kizuri, na Spark ina zana kadhaa za hii. Kwa kuchanganua data ya chanzo, kuna usaidizi kwa JSON na XML, na kwa schema isiyojulikana hapo awali, msaada wa schemaEvolution hutolewa.

Kwa mtazamo wa kwanza, suluhisho inaonekana rahisi. Unahitaji kuchukua folda na JSON na kuisoma kwenye mfumo wa data. Spark itaunda schema, kugeuza data iliyoorodheshwa kuwa miundo. Zaidi ya hayo, kila kitu kinahitaji kuokolewa katika parquet, ambayo pia inasaidiwa katika Impala, kwa kusajili mbele ya duka katika metastore ya Hive.

Kila kitu kinaonekana kuwa rahisi.

Hata hivyo, si wazi kutoka kwa mifano fupi katika nyaraka nini cha kufanya na idadi ya matatizo katika mazoezi.

Nyaraka zinaelezea mbinu ya kutounda mbele ya duka, lakini kusoma JSON au XML kwenye mfumo wa data.

Yaani, inaonyesha tu jinsi ya kusoma na kuchambua JSON:

df = spark.read.json(path...)

Hii inatosha kufanya data ipatikane kwa Spark.

Kwa mazoezi, hati ni ngumu zaidi kuliko kusoma faili za JSON kutoka kwa folda na kuunda mfumo wa data. Hali inaonekana kama hii: tayari kuna mbele ya duka, data mpya inakuja kila siku, zinahitaji kuongezwa kwenye mbele ya duka, bila kusahau kuwa schema inaweza kutofautiana.

Mpango wa kawaida wa kuunda maonyesho ni kama ifuatavyo.

Hatua 1. Data inapakiwa kwenye Hadoop na kupakiwa upya kila siku na kuongezwa kwa kizigeu kipya. Inageuka folda iliyo na data ya awali iliyogawanywa kwa siku.

Hatua 2. Wakati wa upakiaji wa awali, folda hii inasomwa na kuchanganuliwa na Spark. Dataframe inayotokana imehifadhiwa katika umbizo linaloweza kueleweka, kwa mfano, kwenye parquet, ambayo inaweza kuingizwa kwenye Impala. Hii inaunda onyesho lengwa na data yote ambayo imekusanywa hadi hatua hii.

Hatua 3. Upakuaji umeundwa ambao utasasisha mbele ya duka kila siku.
Kuna swali la upakiaji wa ziada, hitaji la kugawanya onyesho, na swali la kudumisha mpango wa jumla wa onyesho.

Hebu tuchukue mfano. Wacha tuseme kwamba hatua ya kwanza ya kujenga hazina imetekelezwa, na faili za JSON hupakiwa kwenye folda.

Kuunda mfumo wa data kutoka kwao, kisha kuihifadhi kama onyesho, sio shida. Hii ni hatua ya kwanza kabisa ambayo inaweza kupatikana kwa urahisi katika hati za Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Kila kitu kinaonekana kuwa sawa.

Tulisoma na kuchanganua JSON, kisha tunahifadhi mfumo wa data kama pakiti, tukisajili katika Hive kwa njia yoyote inayofaa:

df.write.format(β€œparquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Tunapata dirisha.

Lakini, siku iliyofuata, data mpya kutoka kwa chanzo iliongezwa. Tuna folda iliyo na JSON, na onyesho limeundwa kutoka kwa folda hii. Baada ya kupakia kundi linalofuata la data kutoka kwa chanzo, data mart inakosa data ya thamani ya siku moja.

Suluhisho la kimantiki litakuwa kugawanya mbele ya duka kwa siku, ambayo itaruhusu kuongeza kizigeu kipya kila siku inayofuata. Utaratibu wa hii pia unajulikana, Spark hukuruhusu kuandika kizigeu kando.

Kwanza, tunafanya mzigo wa awali, kuhifadhi data kama ilivyoelezwa hapo juu, na kuongeza tu kugawa. Kitendo hiki kinaitwa uanzishaji wa mbele ya duka na hufanywa mara moja tu:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Siku iliyofuata, tunapakia kizigeu kipya tu:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Kilichobaki ni kujiandikisha tena katika Hive ili kusasisha schema.
Hata hivyo, hapa ndipo matatizo hutokea.

Tatizo la kwanza. Hivi karibuni au baadaye, parquet inayosababisha haitasomeka. Hii ni kutokana na jinsi parquet na JSON huchukulia uga tupu kwa njia tofauti.

Hebu fikiria hali ya kawaida. Kwa mfano, jana JSON inafika:

Π”Π΅Π½ΡŒ 1: {"a": {"b": 1}},

na leo JSON yule yule anaonekana kama hii:

Π”Π΅Π½ΡŒ 2: {"a": null}

Wacha tuseme tuna sehemu mbili tofauti, kila moja ikiwa na mstari mmoja.
Tunaposoma data yote ya chanzo, Spark itaweza kubainisha aina, na itaelewa kuwa "a" ni sehemu ya aina ya "muundo", yenye sehemu iliyopachikwa "b" ya aina ya INT. Lakini, ikiwa kila kizigeu kilihifadhiwa kando, basi tunapata parquet na miradi isiyoendana ya kizigeu:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Hali hii inajulikana sana, kwa hivyo chaguo limeongezwa maalum - wakati wa kuchanganua data ya chanzo, ondoa sehemu tupu:

df = spark.read.json("...", dropFieldIfAllNull=True)

Katika kesi hii, parquet itakuwa na sehemu ambazo zinaweza kusomwa pamoja.
Ingawa wale ambao wamefanya hivi kwa mazoezi watatabasamu kwa uchungu hapa. Kwa nini? Ndiyo, kwa sababu kuna uwezekano wa kuwa na hali mbili zaidi. Au tatu. Au nne. Ya kwanza, ambayo karibu itatokea, ni kwamba aina za nambari zitaonekana tofauti katika faili tofauti za JSON. Kwa mfano, {intField: 1} na {intField: 1.1}. Ikiwa sehemu kama hizo zinapatikana katika kizigeu kimoja, basi uunganisho wa schema utasoma kila kitu kwa usahihi, na kusababisha aina sahihi zaidi. Lakini ikiwa katika tofauti, basi moja itakuwa na intField: int, na nyingine itakuwa na intField: double.

Kuna bendera ifuatayo kushughulikia hali hii:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Sasa tunayo folda ambapo kuna sehemu zinazoweza kusomwa katika mfumo mmoja wa data na parquet halali ya onyesho zima. Ndiyo? Hapana.

Lazima tukumbuke kwamba tulisajili meza katika Hive. Mzinga sio nyeti katika majina ya sehemu, ilhali parquet ni nyeti kwa ukubwa. Kwa hivyo, partitions zilizo na schemas: field1: int, na Field1: int ni sawa kwa Hive, lakini si kwa Spark. Usisahau kubadilisha majina ya sehemu kuwa herufi ndogo.

Baada ya hayo, kila kitu kinaonekana kuwa sawa.

Walakini, sio zote rahisi sana. Kuna shida ya pili, inayojulikana pia. Kwa kuwa kila kizigeu kipya kimehifadhiwa kando, folda ya kizigeu itakuwa na faili za huduma ya Spark, kwa mfano, bendera ya mafanikio ya operesheni _SUCCESS. Hii itasababisha kosa wakati wa kujaribu kuweka parquet. Ili kuepusha hili, unahitaji kusanidi usanidi ili kuzuia Spark kuongeza faili za huduma kwenye folda:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Inaonekana kwamba sasa kila siku kizigeu kipya cha parquet kinaongezwa kwenye folda ya onyesho la lengo, ambapo data iliyochanganuliwa ya siku iko. Tulitunza mapema kwamba hakukuwa na sehemu zilizo na mzozo wa aina ya data.

Lakini, tuna tatizo la tatu. Sasa schema ya jumla haijulikani, zaidi ya hayo, jedwali kwenye Hive ina schema isiyo sahihi, kwani kila kizigeu kipya kilianzisha upotoshaji kwenye schema.

Unahitaji kusajili upya meza. Hii inaweza kufanywa kwa urahisi: soma parquet ya mbele ya duka tena, chukua schema na uunde DDL kulingana nayo, ambayo unaweza kusajili tena folda kwenye Hive kama jedwali la nje, kusasisha schema ya mbele ya duka inayolengwa.

Tuna tatizo la nne. Tuliposajili meza kwa mara ya kwanza, tulitegemea Spark. Sasa tunaifanya sisi wenyewe, na tunahitaji kukumbuka kuwa mashamba ya parquet yanaweza kuanza na wahusika ambao hawaruhusiwi kwa Hive. Kwa mfano, Spark hutupa mistari ambayo haikuweza kuchanganua katika sehemu ya "corrupt_record". Sehemu kama hiyo haiwezi kusajiliwa katika Mzinga bila kutoroka.

Kujua hili, tunapata mpango:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Kanuni ("_corrupt_record", "`_corrupt_record`") + " " + f[1].badilisha(":", "`:").badilisha("<", "<`").badilisha(",",, ",`").badilisha("safu<`", "safu<"). hufanya DDL salama, yaani badala ya:

create table tname (_field1 string, 1field string)

Kwa majina ya sehemu kama vile "_field1, 1field", DDL salama inafanywa ambapo majina ya sehemu yametoroshwa: unda jedwali `tname` (`_field1` string, `1field` string).

Swali linatokea: jinsi ya kupata dataframe vizuri na schema kamili (katika nambari ya pf)? Jinsi ya kupata hii pf? Hili ni tatizo la tano. Soma tena mpango wa sehemu zote kutoka kwa folda iliyo na faili za parquet za onyesho la lengo? Njia hii ni salama zaidi, lakini ngumu.

Ratiba tayari iko kwenye Hive. Unaweza kupata schema mpya kwa kuchanganya schema ya jedwali zima na kizigeu kipya. Kwa hivyo unahitaji kuchukua schema ya meza kutoka kwa Hive na kuichanganya na schema ya kizigeu kipya. Hii inaweza kufanywa kwa kusoma metadata ya majaribio kutoka kwa Hive, kuihifadhi kwenye folda ya muda, na kutumia Spark kusoma sehemu zote mbili mara moja.

Kwa kweli, kuna kila kitu unachohitaji: schema ya asili ya jedwali kwenye Hive na kizigeu kipya. Pia tuna data. Inabakia tu kupata schema mpya inayochanganya schema ya mbele ya duka na sehemu mpya kutoka kwa kizigeu iliyoundwa:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Ifuatayo, tunaunda DDL ya usajili wa jedwali, kama ilivyo kwenye snippet iliyopita.
Ikiwa mlolongo mzima unafanya kazi kwa usahihi, yaani, kulikuwa na mzigo wa kuanzisha, na meza iliundwa kwa usahihi katika Hive, basi tunapata schema ya meza iliyosasishwa.

Na shida ya mwisho ni kwamba huwezi kuongeza tu kizigeu kwenye meza ya Hive, kwa sababu itavunjwa. Unahitaji kulazimisha Hive kurekebisha muundo wake wa kizigeu:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Kazi rahisi ya kusoma JSON na kuunda mbele ya duka kwa msingi wake husababisha kushinda shida kadhaa, suluhisho ambazo unapaswa kutafuta kando. Na ingawa suluhisho hizi ni rahisi, inachukua muda mwingi kuzipata.

Ili kutekeleza ujenzi wa maonyesho, ilibidi:

  • Ongeza sehemu kwenye onyesho, ukiondoa faili za huduma
  • Shughulikia sehemu tupu katika data chanzo ambayo Spark ameandika
  • Tuma aina rahisi kwa kamba
  • Badilisha majina ya sehemu kuwa herufi ndogo
  • Tenganisha upakiaji wa data na usajili wa jedwali katika Hive (kizazi cha DDL)
  • Usisahau kuepuka majina ya sehemu ambayo huenda yasioani na Hive
  • Jifunze jinsi ya kusasisha usajili wa jedwali katika Hive

Kwa muhtasari, tunaona kwamba uamuzi wa kujenga madirisha ya duka umejaa mitego mingi. Kwa hiyo, katika kesi ya matatizo katika utekelezaji, ni bora kuwasiliana na mpenzi mwenye ujuzi na ujuzi wa mafanikio.

Asante kwa kusoma nakala hii, tunatumai utapata habari kuwa muhimu.

Chanzo: mapenzi.com

Kuongeza maoni