Skema e shkëndijësEvolucioni në praktikë

Të dashur lexues, ditë të mbarë!

Në këtë artikull, konsulenti kryesor i zonës së biznesit të Big Data Solutions të Neoflex përshkruan në detaje opsionet për ndërtimin e vitrinave të strukturave të ndryshueshme duke përdorur Apache Spark.

Si pjesë e një projekti të analizës së të dhënave, shpesh lind detyra e ndërtimit të vitrinave të bazuara në të dhëna të strukturuara lirshëm.

Zakonisht këto janë regjistra, ose përgjigje nga sisteme të ndryshme, të ruajtura si JSON ose XML. Të dhënat ngarkohen në Hadoop, atëherë duhet të ndërtoni një vitrinë prej tyre. Ne mund të organizojmë hyrjen në vitrinën e krijuar, për shembull, përmes Impala.

Në këtë rast, skema e vitrinës së synuar nuk dihet më parë. Për më tepër, skema gjithashtu nuk mund të hartohet paraprakisht, pasi varet nga të dhënat, dhe kemi të bëjmë me këto të dhëna të strukturuara shumë lirshëm.

Për shembull, sot është regjistruar përgjigja e mëposhtme:

{source: "app1", error_code: ""}

dhe nesër nga i njëjti sistem vjen përgjigjja e mëposhtme:

{source: "app1", error_code: "error", description: "Network error"}

Si rezultat, vitrinës duhet t'i shtohet një fushë tjetër - përshkrimi dhe askush nuk e di nëse do të vijë apo jo.

Detyra për të krijuar një vitrinë për të dhëna të tilla është mjaft standarde, dhe Spark ka një sërë mjetesh për këtë. Për analizimin e të dhënave burimore, ka mbështetje për JSON dhe XML, dhe për një skemë të panjohur më parë, ofrohet mbështetje për schemaEvolution.

Në pamje të parë, zgjidhja duket e thjeshtë. Duhet të marrësh një dosje me JSON dhe ta lexosh në një kornizë të dhënash. Spark do të krijojë një skemë, do t'i kthejë të dhënat e mbivendosura në struktura. Më tej, gjithçka duhet të ruhet në parket, i cili mbështetet edhe në Impala, duke regjistruar vitrinën në metastore Hive.

Gjithçka duket të jetë e thjeshtë.

Megjithatë, nga shembujt e shkurtër në dokumentacion nuk është e qartë se çfarë duhet bërë me një sërë problemesh në praktikë.

Dokumentacioni përshkruan një qasje jo për të krijuar një vitrinë, por për të lexuar JSON ose XML në një kornizë të dhënash.

Gjegjësisht, ai thjesht tregon se si të lexoni dhe analizoni JSON:

df = spark.read.json(path...)

Kjo është e mjaftueshme për t'i vënë të dhënat në dispozicion të Spark.

Në praktikë, skripti është shumë më i komplikuar sesa thjesht leximi i skedarëve JSON nga një dosje dhe krijimi i një kornize të dhënash. Situata duket kështu: ekziston tashmë një vitrinë e caktuar, të dhëna të reja vijnë çdo ditë, ato duhet të shtohen në vitrinë, duke mos harruar se skema mund të ndryshojë.

Skema e zakonshme për ndërtimin e një vitrinë është si më poshtë:

Hapi 1. Të dhënat ngarkohen në Hadoop me ringarkimin pasues ditor dhe shtohen në një ndarje të re. Rezulton një dosje me të dhënat fillestare të ndara sipas ditës.

Hapi 2. Gjatë ngarkimit fillestar, kjo dosje lexohet dhe analizohet nga Spark. Korniza e të dhënave që rezulton ruhet në një format të analizueshëm, për shembull, në parket, i cili më pas mund të importohet në Impala. Kjo krijon një vitrinë të synuar me të gjitha të dhënat që janë grumbulluar deri në këtë pikë.

Hapi 3. Është krijuar një shkarkim që do të përditësojë vitrinën e dyqanit çdo ditë.
Ekziston një çështje e ngarkimit në rritje, nevoja për ndarjen e vitrinës dhe çështja e ruajtjes së skemës së përgjithshme të vitrinës.

Le të marrim një shembull. Le të themi se hapi i parë i ndërtimit të një depoje është zbatuar dhe skedarët JSON ngarkohen në një dosje.

Krijimi i një kornize të dhënash prej tyre, pastaj ruajtja e tij si vitrinë, nuk është problem. Ky është hapi i parë që mund të gjendet lehtësisht në dokumentacionin e Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Gjithçka duket se është në rregull.

Ne lexojmë dhe analizojmë JSON, më pas ruajmë kornizën e të dhënave si parket, duke e regjistruar në Hive në çdo mënyrë të përshtatshme:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Ne marrim një dritare.

Por, të nesërmen u shtuan të dhëna të reja nga burimi. Ne kemi një dosje me JSON dhe një vitrinë të krijuar nga kjo dosje. Pas ngarkimit të grupit të ardhshëm të të dhënave nga burimi, të dhënave mart i mungojnë të dhënat me vlerë të një dite.

Zgjidhja logjike do të ishte ndarja e vitrinës në ditë, gjë që do të lejojë shtimin e një ndarje të re çdo ditë tjetër. Mekanizmi për këtë është gjithashtu i njohur, Spark ju lejon të shkruani ndarje veç e veç.

Së pari, ne bëjmë një ngarkesë fillestare, duke ruajtur të dhënat siç përshkruhet më sipër, duke shtuar vetëm ndarje. Ky veprim quhet inicializimi i dyqanit dhe kryhet vetëm një herë:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Të nesërmen, ne ngarkojmë vetëm një ndarje të re:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Gjithçka që mbetet është të riregjistroheni në Hive për të përditësuar skemën.
Megjithatë, këtu lindin problemet.

Problemi i parë. Herët a vonë, parketi që rezulton do të jetë i palexueshëm. Kjo është për shkak të mënyrës sesi parketi dhe JSON i trajtojnë ndryshe fushat boshe.

Le të shqyrtojmë një situatë tipike. Për shembull, dje mbërrin JSON:

День 1: {"a": {"b": 1}},

dhe sot i njëjti JSON duket kështu:

День 2: {"a": null}

Le të themi se kemi dy ndarje të ndryshme, secila me një rresht.
Kur të lexojmë të gjitha të dhënat e burimit, Spark do të jetë në gjendje të përcaktojë llojin dhe do të kuptojë se "a" është një fushë e tipit "strukturë", me një fushë të vendosur "b" të tipit INT. Por, nëse secila ndarje është ruajtur veçmas, atëherë marrim një parket me skema të ndarjeve të papajtueshme:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Kjo situatë është e njohur mirë, kështu që një opsion është shtuar posaçërisht - kur analizoni të dhënat e burimit, hiqni fushat boshe:

df = spark.read.json("...", dropFieldIfAllNull=True)

Në këtë rast, parketi do të përbëhet nga ndarje që mund të lexohen së bashku.
Edhe pse ata që e kanë bërë këtë në praktikë do të buzëqeshin hidhur këtu. Pse? Po, sepse ka të ngjarë të ketë edhe dy situata të tjera. Ose tre. Ose katër. E para, e cila pothuajse me siguri do të ndodhë, është se llojet numerike do të duken të ndryshme në skedarë të ndryshëm JSON. Për shembull, {intField: 1} dhe {intField: 1.1}. Nëse fusha të tilla gjenden në një ndarje, atëherë bashkimi i skemës do të lexojë gjithçka saktë, duke çuar në llojin më të saktë. Por nëse në të ndryshme, atëherë njëri do të ketë intField: int, dhe tjetri do të ketë intField: double.

Ekziston flamuri i mëposhtëm për të trajtuar këtë situatë:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Tani kemi një dosje ku ka ndarje që mund të lexohen në një kornizë të vetme të dhënash dhe një parket të vlefshëm të të gjithë vitrinës. Po? Nr.

Duhet të kujtojmë se ne kemi regjistruar tabelën në Hive. Hive nuk është e ndjeshme në emrat e fushave, ndërsa parketi është i ndjeshëm ndaj shkronjave të vogla. Prandaj, ndarjet me skema: field1: int dhe Field1: int janë të njëjta për Hive, por jo për Spark. Mos harroni të konvertoni emrat e fushave në shkronja të vogla.

Pas kësaj, gjithçka duket se është mirë.

Megjithatë, jo gjithçka kaq e thjeshtë. Ekziston një problem i dytë, gjithashtu i njohur. Meqenëse çdo ndarje e re ruhet veçmas, dosja e ndarjes do të përmbajë skedarët e shërbimit Spark, për shembull, flamurin e suksesit të operacionit _SUCCESS. Kjo do të rezultojë në një gabim kur përpiqeni të parketoni. Për të shmangur këtë, duhet të konfiguroni konfigurimin për të parandaluar që Spark të shtojë skedarë shërbimi në dosje:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Duket se tani çdo ditë një ndarje e re parketi shtohet në dosjen e vitrinës së synuar, ku ndodhen të dhënat e analizuara për ditën. Ne u kujdesëm paraprakisht që të mos kishte ndarje me konflikt të llojit të të dhënave.

Por, ne kemi një problem të tretë. Tani skema e përgjithshme nuk dihet, për më tepër, tabela në Hive ka një skemë të pasaktë, pasi çdo ndarje e re ka shumë të ngjarë të ketë futur një shtrembërim në skemë.

Duhet të riregjistroni tabelën. Kjo mund të bëhet thjesht: lexoni përsëri parketin e vitrinës, merrni skemën dhe krijoni një DDL bazuar në të, me të cilën do të riregjistroni dosjen në Hive si një tabelë të jashtme, duke përditësuar skemën e vitrinës së synuar.

Ne kemi një problem të katërt. Kur e regjistruam tabelën për herë të parë, u mbështetëm te Shkëndija. Tani e bëjmë vetë dhe duhet të kujtojmë se fushat e parketit mund të fillojnë me karaktere që nuk lejohen për Hive. Për shembull, Spark hedh rreshta që nuk mund t'i analizonte në fushën "corrupt_record". Një fushë e tillë nuk mund të regjistrohet në Hive pa u shpëtuar.

Duke e ditur këtë, marrim skemën:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Kod ("_regjistrimi_korruptuar", "`_regjistrimi_korruptuar") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") e bën të sigurt DDL, d.m.th. në vend të:

create table tname (_field1 string, 1field string)

Me emrat e fushave si "_field1, 1field", bëhet DDL e sigurt ku emrat e fushave janë ikje: krijoni tabelën `tname` (varg `_field1`, varg `1field`).

Shtrohet pyetja: si të merrni siç duhet një kornizë të dhënash me një skemë të plotë (në kodin pf)? Si ta merrni këtë pf? Ky është problemi i pestë. Rilexoni skemën e të gjitha ndarjeve nga dosja me skedarët e parketit të vitrinës së synuar? Kjo metodë është më e sigurta, por e vështirë.

Skema është tashmë në Hive. Ju mund të merrni një skemë të re duke kombinuar skemën e të gjithë tabelës dhe ndarjen e re. Kështu që ju duhet të merrni skemën e tabelës nga Hive dhe ta kombinoni atë me skemën e ndarjes së re. Kjo mund të bëhet duke lexuar metadatat e testimit nga Hive, duke i ruajtur në një dosje të përkohshme dhe duke përdorur Spark për të lexuar të dyja ndarjet në të njëjtën kohë.

Në fakt, ka gjithçka që ju nevojitet: skema origjinale e tabelës në Hive dhe ndarja e re. Kemi edhe të dhëna. Mbetet vetëm për të marrë një skemë të re që kombinon skemën e vitrinës dhe fushat e reja nga ndarja e krijuar:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Më pas, ne krijojmë regjistrimin e tabelës DDL, si në fragmentin e mëparshëm.
Nëse i gjithë zinxhiri funksionon si duhet, domethënë, ka pasur një ngarkesë inicializuese dhe tabela është krijuar saktë në Hive, atëherë marrim një skemë të përditësuar të tabelës.

Dhe problemi i fundit është se nuk mund të shtoni vetëm një ndarje në një tabelë Hive, sepse ajo do të prishet. Ju duhet të detyroni Hive të rregullojë strukturën e tij të ndarjes:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Detyra e thjeshtë e leximit të JSON dhe krijimit të një vitrinëje në bazë të tij rezulton në kapërcimin e një sërë vështirësish të nënkuptuara, zgjidhje për të cilat duhet të kërkoni veçmas. Dhe megjithëse këto zgjidhje janë të thjeshta, kërkon shumë kohë për t'i gjetur ato.

Për të zbatuar ndërtimin e vitrinës, më duhej:

  • Shtoni ndarje në vitrinë, duke hequr qafe skedarët e shërbimit
  • Merreni me fushat boshe në të dhënat burimore që ka shtypur Spark
  • Hidhni lloje të thjeshta në një varg
  • Konvertoni emrat e fushave në shkronja të vogla
  • Ngarkimi i veçantë i të dhënave dhe regjistrimi i tabelës në Hive (gjenerimi DDL)
  • Mos harroni të shmangni emrat e fushave që mund të jenë të papajtueshëm me Hive
  • Mësoni se si të përditësoni regjistrimin e tabelës në Hive

Duke përmbledhur, vërejmë se vendimi për të ndërtuar vitrina është i mbushur me shumë gracka. Prandaj, në rast vështirësish në zbatim, është më mirë të kontaktoni një partner me përvojë me ekspertizë të suksesshme.

Faleminderit që lexuat këtë artikull, shpresojmë që informacioni t'ju duket i dobishëm.

Burimi: www.habr.com

Shto një koment