Spark schemaEvolution mukuita

Vadiwa vaverengi, zuva rakanaka!

Muchinyorwa chino, anotungamira chipangamazano weNeoflex's Big Data Solutions bhizinesi nharaunda inotsanangura zvakadzama sarudzo dzekuvaka akasiyana masisitimu ekuratidzira uchishandisa Apache Spark.

Sechikamu chepurojekiti yekuongorora data, basa rekuvaka matura ezvitoro zvichibva pane yakasununguka yakarongeka data inowanzoitika.

Kazhinji aya matanda, kana mhinduro kubva kune akasiyana masisitimu, anochengetwa seJSON kana XML. Iyo data inoiswa kuHadoop, saka iwe unofanirwa kuvaka chitoro kubva kwavari. Isu tinogona kuronga kupinda kune yakasikwa showcase, semuenzaniso, kuburikidza neImpala.

Muchiitiko ichi, schema yechinangwa chechitoro hachizivikanwi zvisati zvaitika. Zvakare, chirongwa ichi zvakare hachigone kudhirowa kumberi, sezvo zvinoenderana nedata, uye isu tiri kubata neaya akanyatso kurongeka data.

Somuenzaniso, nhasi mhinduro inotevera yakanyorwa:

{source: "app1", error_code: ""}

uye mangwana kubva kune imwecheteyo system inouya inotevera mhinduro:

{source: "app1", error_code: "error", description: "Network error"}

Nekuda kweizvozvo, imwezve munda unofanirwa kuwedzerwa kune showcase - tsananguro, uye hapana anoziva kana ichauya kana kwete.

Basa rekugadzira chitoro pane yakadaro data rakanaka chiyero, uye Spark ine akati wandei maturusi eizvi. Pakuparura iyo sosi data, kune rutsigiro rwezvose JSON neXML, uye kune yaimbozivikanwa schema, rutsigiro rwe schemaEvolution runopihwa.

Pakutanga kuona, mhinduro inotaridzika iri nyore. Iwe unofanirwa kutora folda neJSON woiverenga mu dataframe. Spark ichagadzira schema, shandura nested data kuita zvimiro. Kupfuurirazve, zvese zvinoda kuchengetwa mu parquet, iyo zvakare inotsigirwa muImpala, nekunyoresa kumberi kwechitoro muHive metastore.

Zvose zvinoita sezviri nyore.

Zvisinei, hazvina kujeka kubva mumienzaniso mipfupi mune zvinyorwa zvekuita nehuwandu hwematambudziko mukuita.

Zvinyorwa zvinotsanangura nzira yekusagadzira chitoro, asi kuverenga JSON kana XML mune dataframe.

Sezvineiwo, zvinongoratidza kuti ungaverenga sei uye patsanura JSON:

df = spark.read.json(path...)

Izvi zvakakwana kuita kuti data iwanikwe kuSpark.

Mukuita, iyo script yakanyanya kuomarara pane kungoverenga maJSON mafaera kubva kune folda uye kugadzira dataframe. Mamiriro ezvinhu anotaridzika seizvi: kwatova neimwe chitoro, nyowani data inouya zuva rega rega, inoda kuwedzerwa kumberi kwechitoro, kwete kukanganwa kuti chirongwa chinogona kusiyana.

Iyo yakajairika chirongwa chekugadzira showcase ndeiyi inotevera:

Step 1. Iyo data inotakurwa muHadoop nekuzotevera kurodha zuva nezuva uye yakawedzerwa kune chikamu chitsva. Inoburitsa folda ine yekutanga data yakakamurwa nezuva.

Step 2. Panguva yekuremerwa kwekutanga, iyi folda inoverengwa uye yakapatsanurwa neSpark. Iyo inokonzeresa dataframe inochengetwa mune parsable fomati, semuenzaniso, mu parquet, iyo inogona kubva kunze kwenyika kuImpala. Izvi zvinogadzira tarisiro yekuratidzira ine data rese rakaunganidzwa kusvika panguva ino.

Step 3. Dhawunirodha inogadzirwa iyo inovandudza kumberi kwechitoro mazuva ese.
Pane mubvunzo wekuwedzera kurodha, kudiwa kwekugovanisa showcase, uye mubvunzo wekuchengetedza hurongwa hwese hwekuratidzira.

Ngatitorei muenzaniso. Ngatitii danho rekutanga rekuvaka repository raitwa, uye mafaera eJSON anoiswa kune folda.

Kugadzira dataframe kubva kwavari, wozoichengeta seshowcase, harisi dambudziko. Iri ndiro danho rekutanga rinogona kuwanikwa zviri nyore muzvinyorwa zveSpark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Zvinhu zvese zvinenge zvakanaka.

Isu takaverenga nekuparura JSON, tobva tachengetedza dataframe separquet, tichiinyoresa muHive nenzira ipi zvayo iri nyore:

df.write.format(β€œparquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Tinowana hwindo.

Asi, zuva rakatevera, data idzva kubva kunobva kwakawedzerwa. Tine folda ine JSON, uye showcase yakagadzirwa kubva pane iyi folda. Mushure mekurodha iyo inotevera batch yedata kubva kwainobva, iyo data mart inoshaya yezuva rimwe chete yedata.

Mhinduro ine musoro ingave yekugovera kumberi kwechitoro masikati, izvo zvinozobvumira kuwedzera chikamu chitsva zuva rega rega rinotevera. Iyo nzira yeiyi inozivikanwa zvakare, Spark inobvumidza iwe kunyora zvikamu zvakasiyana.

Kutanga, tinoita mutoro wekutanga, kuchengetedza data sezvakatsanangurwa pamusoro, tichiwedzera kugovera chete. Chiito ichi chinodaidzwa kuti kumberi kwechitoro uye chinoitwa kamwe chete:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Zuva rinotevera, tinotakura chete chikamu chitsva:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Chasara kunyoresa zvakare muHive kuti uvandudze schema.
Zvisinei, apa ndipo panomuka matambudziko.

Dambudziko rekutanga. Nokukurumidza kana kuti gare gare, parquet inoguma ichave isingaverengeki. Izvi zvinokonzerwa nekuti parquet neJSON vanobata sei minda isina chinhu zvakasiyana.

Ngatirangarirei mamiriro ezvinhu chaiwo. Semuenzaniso, nezuro JSON inosvika:

Π”Π΅Π½ΡŒ 1: {"a": {"b": 1}},

uye nhasi JSON imwechete inotaridzika seizvi:

Π”Π΅Π½ΡŒ 2: {"a": null}

Ngatitii tine zvikamu zviviri zvakasiyana, chimwe nechimwe chine mutsara mumwe.
Kana isu tikaverenga iyo yekutanga data yakazara, Spark ichakwanisa kuona rudzi, uye ichanzwisisa kuti "a" imunda wemhando "chimiro", ine nested munda "b" yemhando INT. Asi, kana chikamu chega chega chakachengetedzwa zvakasiyana, saka tinowana parquet ine zvisingaenderane partition zvirongwa:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Mamiriro ezvinhu aya anozivikanwa, saka sarudzo yakawedzerwa zvakanyanya - kana uchibvisa iyo sosi data, bvisa minda isina chinhu:

df = spark.read.json("...", dropFieldIfAllNull=True)

Muchiitiko ichi, parquet ichave nezvikamu zvinogona kuverengwa pamwe chete.
Kunyangwe avo vakaita izvi mukuita vachanyemwerera zvinorwadza pano. Sei? Hongu, nekuti panogona kunge paine mamwe mamiriro maviri. Kana matatu. Kana mana. Chekutanga, chinozoitika, ndechekuti mhando dzenhamba dzinotaridzika zvakasiyana mumafaira eJSON akasiyana. Semuenzaniso, {intField: 1} uye {intField: 1.1}. Kana minda yakadaro ichiwanikwa mune imwe chikamu, ipapo schema merge ichaverenga zvese nemazvo, zvichitungamira kune yakanyanya kurongeka mhando. Asi kana mune akasiyana, ipapo imwe ichave neInField: int, uye imwe ichave ine intField: kaviri.

Pane mureza unotevera wekugadzirisa mamiriro aya:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Iye zvino isu tine folda uko kune zvikamu zvinogona kuverengerwa mune imwe chete dataframe uye yakakodzera parquet yeiyo yese showcase. Ehe? Aihwa.

Tinofanira kurangarira kuti takanyoresa tafura muHive. Hive haisi nyaya inonzwisisika mumazita emunda, nepo parquet inobata nyaya. Naizvozvo, zvikamu zvine schemas: munda1: int, uye Munda1: int zvakafanana kune Hive, asi kwete yeSpark. Usakanganwa kushandura mazita emunda kuita maduku maduku.

Pashure pacho, zvinhu zvose zvinoratidzika kuva zvakanaka.

Zvisinei, hazvisi zvose zviri nyore. Pane dambudziko rechipiri, rinozivikanwawo. Sezvo yega yega chikamu chitsva ichichengetedzwa zvakasiyana, iyo yekugovera folda ichave ine Spark sevhisi mafaera, semuenzaniso, iyo _SUCCESS mashandiro ebudiriro mureza. Izvi zvinoguma nekukanganisa paunenge uchiedza parquet. Kuti udzivise izvi, iwe unofanirwa kugadzirisa iyo gadziriso kudzivirira Spark kubva pakuwedzera sevhisi mafaera kune folda:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Zvinoita sekuti ikozvino zuva rega rega kupatsanurwa kutsva kweparquet kunowedzerwa kune inotaridzwa showcase folda, uko iyo yakapatsanurwa data yezuva iripo. Isu takatarisira pachine nguva kuti pakanga pasina mapartitions ane rudzi rwe data kusawirirana.

Asi, tine dambudziko rechitatu. Ikozvino iyo general schema haisati yazivikanwa, uyezve, tafura muHive ine isiriyo schema, sezvo yega yega chikamu chitsva chingangove chakaunza kukanganisa mu schema.

Unofanira kunyoresa zvakare tafura. Izvi zvinogona kuitwa zviri nyore: verenga parquet yekumberi kwechitoro zvakare, tora schema uye gadzira DDL yakavakirwa pairi, iyo yekunyora zvakare folda muHive setafura yekunze, kugadzirisa schema yechinangwa chechitoro.

Tine dambudziko rechina. Patakanyoresa tafura kekutanga, takavimba naSpark. Iye zvino tinozviita isu pachedu, uye tinofanira kuyeuka kuti minda yeparquet inogona kutanga nemavara asingabvumirwi kuHive. Semuyenzaniso, Spark inokanda mitsetse yayaisakwanisa kupfuudza mundima ye "corrupt_record". Munda wakadaro haugone kunyoreswa muHive pasina kutiza.

Kuziva izvi, tinowana chirongwa:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

kodhi ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").tsiva("<", "<`").replace(",",, ",`").replace("array<`", "array<") inoita DDL yakachengeteka, i.e. pachinzvimbo che:

create table tname (_field1 string, 1field string)

Iine mazita emumunda se "_field1, 1field", yakachengeteka DDL inoitwa panopukunyuka mazita emunda: gadzira tafura `tname` (`_field1` tambo, `1field` tambo).

Mubvunzo unomuka: nzira yekuwana sei dataframe ine yakazara schema (mu pf kodhi)? Nzira yekuwana iyi pf? Iri ndiro dambudziko rechishanu. Verenga zvakare chirongwa chezvikamu zvese kubva mufolda ine parquet mafaera eiyo inotaridzwa showcase? Iyi nzira ndiyo yakachengeteka, asi yakaoma.

Schema yatova muHive. Iwe unogona kuwana schema nyowani nekubatanidza schema yetafura yese uye chikamu chitsva. Saka iwe unofanirwa kutora tafura schema kubva kuHive uye kuisanganisa neiyo schema yechikamu chitsva. Izvi zvinogona kuitwa nekuverenga bvunzo metadata kubva kuHive, kuichengeta kune yenguva folda, uye kushandisa Spark kuverenga ese ari maviri zvikamu kamwechete.

Muchokwadi, pane zvese zvaunoda: iyo yekutanga tafura schema muHive uye chikamu chitsva. Isu tine data zvakare. Izvo zvinongosara kuti uwane schema nyowani inosanganisa schema yekumberi uye minda mitsva kubva kune yakagadzirwa chikamu:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Tevere, isu tinogadzira iyo tafura yekunyoresa DDL, senge mune yapfuura snippet.
Kana ketani yose ichishanda nemazvo, kureva, pakanga paine mutoro wekutanga, uye tafura yakagadzirwa nenzira kwayo muHive, saka tinowana yakagadziridzwa tafura schema.

Uye dambudziko rekupedzisira nderekuti haugone kungowedzera chikamu patafura yeHive, nekuti ichatsemuka. Iwe unofanirwa kumanikidza Hive kuti igadzirise yayo yekuparadzanisa chimiro:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Basa rakareruka rekuverenga JSON uye kugadzira chitoro chakavakirwa pairi rinoguma nekukunda akati wandei matambudziko akajeka, mhinduro dzaunofanirwa kutsvaga zvakasiyana. Uye kunyangwe zvigadziriso izvi zviri nyore, zvinotora nguva yakawanda kuti uzviwane.

Kuti ndiite kugadzirwa kweshowcase, ndaifanira:

  • Wedzera zvikamu kune showcase, kubvisa mafaira ebasa
  • Bata neminda isina chinhu mune sosi data iyo Spark yakanyora
  • Kanda mhando dziri nyore kune tambo
  • Shandura mazita enzvimbo kuita mavara madiki
  • Kupatsanura kurodha data uye kunyoresa tafura muHive (DDL chizvarwa)
  • Usakanganwe kutiza mazita emumunda anogona kunge asingaenderane neHive
  • Dzidza maitiro ekugadzirisa kunyoresa tafura muHive

Mukupfupisa, tinoona kuti sarudzo yekuvaka mahwindo ezvitoro yakazara nemakomba akawanda. Naizvozvo, kana paine kuomerwa mukuita, zviri nani kubata mudiwa ane ruzivo ane hunyanzvi hunobudirira.

Tinokutendai nekuverenga chinyorwa ichi, tinovimba muchawana ruzivo rwunobatsira.

Source: www.habr.com

Voeg