Spark eskemaEboluzioa praktikan

Irakurle maiteok, arratsalde on!

Artikulu honetan, Neoflex-en Big Data Solutions negozio-eremuko aholkulari nagusiak Apache Spark erabiliz egitura aldakorreko erakusleihoak eraikitzeko aukerak deskribatzen ditu.

Datuak aztertzeko proiektu baten barruan, egitura baxuko datuetan oinarritutako erakusleihoak eraikitzeko zeregina sortzen da maiz.

Normalean, erregistroak edo hainbat sistemetako erantzunak dira, JSON edo XML moduan gordeta. Datuak Hadoop-era kargatzen dira, gero erakusleiho bat eraiki behar da bertatik. Sortutako erakusleihorako sarbidea antola dezakegu, adibidez, Impalaren bitartez.

Kasu honetan, helburuko erakusleihoaren diseinua aldez aurretik ezezaguna da. Gainera, eskema ezin da aldez aurretik egin, datuen araberakoa baita, eta oso egitura ahuleko datu horietaz ari gara.

Adibidez, gaur erantzun hau erregistratzen da:

{source: "app1", error_code: ""}

eta bihar erantzun hau sistema beretik dator:

{source: "app1", error_code: "error", description: "Network error"}

Ondorioz, erakusleihoan beste eremu bat gehitu beharko litzateke - deskribapena, eta inork ez daki etorriko den ala ez.

Datu horiekin mart bat sortzeko zeregina nahiko estandarra da, eta Spark-ek hainbat tresna ditu horretarako. Iturburu-datuak analizatzeko, JSON eta XML-ren laguntza dago, eta aurretik ezezaguna den eskema baterako, schemaEvolution-en laguntza eskaintzen da.

Lehen begiratuan, irtenbidea erraza dirudi. Karpeta JSONrekin hartu eta datu-markoan irakurri behar duzu. Spark-ek eskema bat sortuko du eta habiatutako datuak egitura bihurtuko ditu. Ondoren, dena parketean gorde behar da, Impalan ere onartzen dena, erakusleihoa Hive metadendan erregistratuz.

Dena badirudi sinplea dela.

Hala ere, dokumentazioko adibide laburretatik ez dago argi zer egin praktikan hainbat arazorekin.

Dokumentazioak ez erakusleiho bat sortzeko ikuspegi bat deskribatzen du, JSON edo XML datu-marko batean irakurtzeko baizik.

Hots, JSON nola irakurri eta analizatu besterik ez du erakusten:

df = spark.read.json(path...)

Hau nahikoa da datuak Spark-en eskura jartzeko.

Praktikan, agertokia askoz konplexuagoa da karpeta batetik JSON fitxategiak irakurtzea eta datu-markoa sortzea baino. Egoerak honela ematen du: dagoeneko badago erakusleiho jakin bat, datu berriak iristen dira egunero, erakusleihora gehitu behar dira, eskema ezberdina izan daitekeela ahaztu gabe.

Denda bat eraikitzeko ohiko eskema hau da:

Urratsera 1. Datuak Hadoop-en kargatzen dira, ondoren egunero kargatzen dira eta partizio berri batera gehitzen dira. Emaitza iturri-datuak dituen karpeta bat da, egunaren arabera banatuta.

Urratsera 2. Hasierako kargatzean, karpeta hau Spark erabiliz irakurtzen eta analizatzen da. Sortutako datu-markoa azter daitekeen formatuan gordetzen da, adibidez, parketean, eta gero Impala-ra inportatu daiteke. Honek helburuko erakusleiho bat sortzen du ordura arte pilatutako datu guztiekin.

Urratsera 3. Deskarga bat sortzen da, erakusleihoa egunero eguneratuko duena.
Karga inkrementalaren galdera sortzen da, erakusleihoaren zatiketa beharra eta erakusleihoaren diseinu orokorrari eusteko galdera.

Eman dezagun adibide bat. Demagun biltegi bat eraikitzeko lehen urratsa ezarri dela eta JSON fitxategiak karpeta batera kargatzea konfiguratuta dagoela.

Ez da arazorik haietatik datu-markoa sortzea eta gero erakusleiho gisa gordetzea. Hau da Spark-en dokumentazioan erraz aurki daitekeen lehen urratsa:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Badirudi dena ondo dagoela.

JSON irakurri eta analizatu dugu, ondoren datu-markoa parket gisa gordetzen dugu, Hive-n erregistratuz edozein modu erosoan:

df.write.format(β€œparquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Erakusleiho bat lortzen dugu.

Baina, hurrengo egunean datu berriak gehitu ziren iturritik. JSON duen karpeta bat dugu, eta karpeta honetan oinarrituta sortutako erakusleiho bat. Iturburutik hurrengo datuen zatia kargatu ondoren, erakusleihoak ez du egun baterako datu nahikorik.

Irtenbide logiko bat erakusleihoa egunez banatzea izango litzateke, hurrengo egunean partizio berri bat gehitzeko aukera emango duena. Honen mekanismoa ere ezaguna da; Spark-ek partizioak bereizita grabatzeko aukera ematen du.

Lehenik eta behin, hasierako karga egiten dugu, datuak goian deskribatutako moduan gordez, partizioa bakarrik gehituz. Ekintza honi erakusleihoaren hasieratzea deitzen zaio eta behin bakarrik egiten da:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Hurrengo egunean partizio berria bakarrik deskargatuko dugu:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Eskema eguneratzeko Hive-n berriro erregistratzea besterik ez da geratzen.
Hala ere, hor sortzen dira arazoak.

Lehenengo arazoa. Lehenago edo beranduago, lortutako parketa ez da irakurgai izango. Hau parketak eta JSON-ek eremu hutsak modu ezberdinean tratatzen dituztelako gertatzen da.

Demagun egoera tipiko bat. Adibidez, atzo JSON iristen da:

Π”Π΅Π½ΡŒ 1: {"a": {"b": 1}},

eta gaur egun JSON berdinak honela dauka:

Π”Π΅Π½ΡŒ 2: {"a": null}

Demagun bi partizio ezberdin ditugula, bakoitza lerro batekin.
Iturburu-datu guztiak irakurtzen ditugunean, Spark-ek mota zehaztu ahal izango du, eta "a" "egitura" motako eremu bat dela ulertuko du, INT motako "b" eremu habiaratu batekin. Baina, partizio bakoitza bereizita gorde bada, emaitza partizio-eskema bateraezinak dituen parketa bat da:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Egoera hau ezaguna da, beraz, aukera bat gehitu da iturburuko datuak aztertzerakoan eremu hutsak kentzeko:

df = spark.read.json("...", dropFieldIfAllNull=True)

Kasu honetan, parketak elkarrekin irakur daitezkeen tabikeez osatuta egongo da.
Praktikan hori egin dutenek irribarre mingotsa egingo duten arren. Zergatik? Bai, ziurrenik beste bi egoera sortuko direlako. Edo hiru. Edo lau. Lehenengoa, ia ziurra dena, zenbakizko motak itxura ezberdina izango dutela JSON fitxategi desberdinetan. Adibidez, {intField: 1} eta {intField: 1.1}. Horrelako eremuak sorta batean agertzen badira, orduan eskemak bateratzeak dena behar bezala irakurriko du, motarik zehatzena lortzeko. Baina desberdinetan bada, batek intField: int izango du, eta besteak intField: double.

Egoera honi aurre egiteko, bandera hau dago:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Orain partizioak kokatzen diren karpeta bat dugu, datu-marko bakarrean eta erakusleiho osoaren parket baliodun batean irakur daitekeena. Bai? Ez.

Gogoratu behar dugu mahaia Hive-n erregistratu genuela. Erlauntzak ez ditu maiuskulak eta minuskulak bereizten eremu-izenetan, baina parketak bai. Beraz, eskemak dituzten partizioak: field1: int eta Field1: int berdinak dira Hiverentzat, baina ez Sparkentzat. Ez ahaztu eremuen izenak minuskulaz aldatzea.

Horren ostean, dena ondo dagoela dirudi.

Hala ere, dena ez da hain sinplea. Bigarren arazo bat ere ezaguna sortzen da. Partizio berri bakoitza bereizita gordetzen denez, partizio-karpetak Spark zerbitzu-fitxategiak izango ditu, adibidez, _SUCCESS eragiketaren arrakastaren bandera. Honek akats bat eragingo du parketa egiten saiatzean. Hori ekiditeko, konfigurazioa konfiguratu behar duzu Spark-ek zerbitzu-fitxategiak karpetara gehitzea eragotziz:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Badirudi orain egunero parket partizio berri bat gehitzen dela xede erakusleihoko karpetan, non eguneko analizatutako datuak dauden. Aldez aurretik zaindu genuen datu-mota gatazkak dituzten partiziorik ez zegoela ziurtatzeko.

Baina hirugarren arazo baten aurrean gaude. Orain eskema orokorra ez da ezagutzen, gainera, Hive-n taulak eskema okerra du, partizio berri bakoitzak ziurrenik eskeman distortsio bat sartu baitzuen.

Mahaia berriro erregistratu behar da. Hau besterik gabe egin daiteke: irakur ezazu berriro erakusleihoko parketa, hartu eskema eta sortu bertan oinarritutako DDL bat, eta horrekin karpeta berriro erregistratu dezakezu Hive-n kanpoko taula gisa, helburuko erakusleihoaren eskema eguneratuz.

Laugarren arazo baten aurrean gaude. Mahaia lehen aldiz erregistratu genuenean, Spark-en oinarritu ginen. Orain guk geuk egiten dugu, eta gogoratu behar dugu parket-eremuak Hive-k onartzen ez dituen karaktereekin has daitezkeela. Adibidez, Spark-ek "corrupt_record" eremuan analizatu ezin dituen lerroak botatzen ditu. Horrelako eremu bat ezin da Hive-n erregistratu ihes egin gabe.

Hau jakinda, diagrama lortuko dugu:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Code ("_erregistro_hondatuta", "`_erregistro_hondatuta`") + " " + f[1].ordezkatu(":", "`:").ordezkatu("<", "<`").ordezkatu(",", ",`").replace ("array<`", "array<") DDL segurua egiten du, hau da, honen ordez:

create table tname (_field1 string, 1field string)

"_field1, 1field" bezalako eremu-izenekin, DDL seguru bat egiten da non eremuen izenak ihes egiten diren: sortu taula `tname` (`_field1` katea, `1eremua` katea).

Galdera sortzen da: nola lortu behar bezala datu-markoa eskema oso batekin (pf kodean)? Nola lortu pf hau? Hau da bosgarren arazoa. Berriro irakurri helburuko erakusleihoko parket fitxategiekin karpetako partizio guztien diagrama? Metodo hau seguruena da, baina zaila.

Eskema Hive-n dago jada. Eskema berri bat lor dezakezu taula osoaren eskema eta partizio berria konbinatuz. Horrek esan nahi du taula-eskema Hive-tik hartu eta partizio berriaren eskemarekin konbinatu behar duzula. Hau Hive-ko ​​probako metadatuak irakurriz, aldi baterako karpeta batean gordez eta bi partizioak aldi berean irakurriz Spark erabiliz egin daiteke.

Funtsean, behar duzun guztia dago: Hive-n jatorrizko taula-eskema eta partizio berri bat. Datuak ere baditugu. Sortutako partiziotik erakusleihoaren eskema eta eremu berriak konbinatzen dituen eskema berri bat lortzea besterik ez da geratzen:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Jarraian, taula erregistroko DDL sortuko dugu, aurreko zatian bezala.
Kate osoak behar bezala funtzionatzen badu, hots, hasierako karga bat bazegoen eta taula Hive-n behar bezala sortu zen, orduan taularen eskema eguneratua lortuko dugu.

Azken arazoa da ezin duzula erraz gehitu partizio bat Hive taula batean, hautsi egingo baita. Hive bere partizioaren egitura konpontzera behartu behar duzu:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

JSON irakurtzeko eta horretan oinarritutako erakusleiho bat sortzearen zeregin sinpleak hainbat zailtasun inplizitu gainditzen ditu, eta horien irtenbideak bereizita aurkitu behar dira. Eta irtenbide hauek sinpleak diren arren, denbora asko behar da horiek aurkitzeko.

Erakusleiho baten eraikuntza gauzatzeko, hauek egin behar izan ditugu:

  • Gehitu partizioak erakusleihoan, zerbitzu-fitxategiak kenduz
  • Landu Sparkek idatzitako iturburuko datuetan hutsik dauden eremuak
  • Igorri mota sinpleak katera
  • Bihurtu eremuen izenak minuskulaz
  • Bereizi datuak kargatzea eta taula erregistratzea Hive-n (DDL sortzea)
  • Ez ahaztu Hive-rekin bateragarriak ez diren eremu-izenetatik ihes egitea
  • Ikasi Hive-n mahaiaren erregistroa eguneratzen

Laburbilduz, azpimarratzen dugu erakusleihoak eraikitzeko erabakiak tranpa asko dituela. Hori dela eta, ezarpenean zailtasunak sortzen badira, hobe da esperientzia arrakastatsua duen esperientziadun bazkide batengana jotzea.

Eskerrik asko artikulu hau irakurtzeagatik, informazioa erabilgarria izatea espero dugu.

Iturria: www.habr.com

Gehitu iruzkin berria