Spark schemaEvolution sa praktis

Minahal nga mga magbabasa, maayong adlaw!

Niini nga artikulo, ang nanguna nga consultant sa Neoflex's Big Data Solutions nga lugar sa negosyo naghulagway sa detalye sa mga kapilian alang sa pagtukod sa variable nga istruktura nga mga showcase gamit ang Apache Spark.

Isip kabahin sa usa ka proyekto sa pag-analisa sa datos, ang tahas sa pagtukod og mga storefront base sa mga loosely structured data kasagarang motungha.

Kasagaran kini mga log, o mga tubag gikan sa lainlaing mga sistema, gitipig ingon JSON o XML. Ang datos gi-upload sa Hadoop, nan kinahanglan nimo nga magtukod og storefront gikan kanila. Mahimo natong organisahon ang pag-access sa gibuhat nga showcase, pananglitan, pinaagi sa Impala.

Sa kini nga kaso, ang schema sa target nga storefront wala mahibal-an daan. Dugang pa, ang laraw dili usab mahimo nga abante, tungod kay kini nagdepende sa datos, ug kami nag-atubang niining mga loosely structured data.

Pananglitan, karon ang mosunod nga tubag gi-log:

{source: "app1", error_code: ""}

ug ugma gikan sa samang sistema moabut ang mosunod nga tubag:

{source: "app1", error_code: "error", description: "Network error"}

Ingon usa ka sangputanan, usa pa nga natad ang kinahanglan idugang sa showcase - paghulagway, ug wala’y nahibal-an kung kini moabut o dili.

Ang tahas sa paghimo sa usa ka storefront sa ingon nga datos medyo sukaranan, ug ang Spark adunay daghang mga himan alang niini. Alang sa pag-parse sa tinubdan nga datos, adunay suporta alang sa JSON ug XML, ug alang sa usa ka wala mailhi nga schema, suporta alang sa schemaEvolution gihatag.

Sa unang pagtan-aw, ang solusyon morag simple. Kinahanglan ka nga magkuha usa ka folder nga adunay JSON ug basahon kini sa usa ka dataframe. Ang Spark maghimo usa ka schema, himuon ang nested data nga mga istruktura. Dugang pa, ang tanan kinahanglan nga maluwas sa parquet, nga gisuportahan usab sa Impala, pinaagi sa pagrehistro sa storefront sa Hive metastore.

Morag simple ra ang tanan.

Bisan pa, dili klaro gikan sa mubo nga mga pananglitan sa dokumentasyon kung unsa ang buhaton sa daghang mga problema sa praktis.

Ang dokumentasyon naghulagway sa usa ka pamaagi nga dili sa paghimo sa usa ka storefront, apan sa pagbasa sa JSON o XML ngadto sa usa ka dataframe.

Sa ato pa, gipakita ra kung giunsa pagbasa ug pag-parse ang JSON:

df = spark.read.json(path...)

Igo na kini aron magamit ang datos sa Spark.

Sa praktis, ang script mas komplikado kay sa pagbasa lamang sa mga JSON file gikan sa usa ka folder ug paghimo og dataframe. Ang kahimtang ingon niini: adunay usa ka piho nga storefront, ang bag-ong datos moabut matag adlaw, kinahanglan nila nga idugang sa storefront, nga dili kalimtan nga ang laraw mahimong magkalainlain.

Ang naandan nga laraw alang sa pagtukod sa usa ka showcase mao ang mosunod:

Lakang 1. Ang datos gikarga sa Hadoop nga adunay sunod nga adlaw-adlaw nga pag-reload ug gidugang sa usa ka bag-ong partisyon. Kini nahimo nga usa ka folder nga adunay inisyal nga datos nga gibahin sa adlaw.

Lakang 2. Atol sa inisyal nga load, kini nga folder gibasa ug gi-parse ni Spark. Ang resulta nga dataframe gitipigan sa usa ka parsable nga pormat, pananglitan, sa parquet, nga mahimong ma-import ngadto sa Impala. Naghimo kini usa ka target nga showcase nga adunay tanan nga datos nga natipon hangtod niini nga punto.

Lakang 3. Gihimo ang usa ka pag-download nga mag-update sa storefront kada adlaw.
Adunay usa ka pangutana sa incremental loading, ang panginahanglan sa pagbahin sa showcase, ug ang pangutana sa pagmintinar sa kinatibuk-ang laraw sa showcase.

Atong tagdon ang usa ka pananglitan. Ingnon ta nga ang unang lakang sa pagtukod og repository kay gipatuman, ug ang JSON files gi-upload sa usa ka folder.

Ang paghimo og dataframe gikan kanila, unya i-save kini isip showcase, dili problema. Kini ang una nga lakang nga dali makit-an sa dokumentasyon sa Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Morag maayo ra ang tanan.

Among gibasa ug gi-parse ang JSON, unya among gitipigan ang dataframe isip parquet, nga nagparehistro niini sa Hive sa bisan unsang sayon ​​nga paagi:

df.write.format(β€œparquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Nagkuha kami usa ka bintana.

Apan, pagkasunod adlaw, gidugang ang bag-ong datos gikan sa gigikanan. Kami adunay usa ka folder nga adunay JSON, ug usa ka showcase nga gihimo gikan niini nga folder. Human ma-load ang sunod nga batch sa datos gikan sa tinubdan, ang data mart kulang sa usa ka adlaw nga kantidad sa datos.

Ang lohikal nga solusyon mao ang pagbahin sa storefront sa adlaw, nga magtugot sa pagdugang usa ka bag-ong partisyon matag sunod nga adlaw. Ang mekanismo alang niini nahibal-an usab, ang Spark nagtugot kanimo sa pagsulat sa mga partisyon nga gilain.

Una, naghimo kami usa ka inisyal nga pagkarga, nga nagtipig sa datos sama sa gihulagway sa ibabaw, nagdugang lamang sa partitioning. Kini nga aksyon gitawag nga storefront initialization ug gihimo kausa ra:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Pagkasunod adlaw, bag-ong partisyon lang ang among gikarga:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Ang nahabilin mao ang pagrehistro pag-usab sa Hive aron ma-update ang schema.
Apan, dinhi mitungha ang mga problema.

Unang problema. Sa madugay o sa madali, ang resulta nga parquet dili na mabasa. Kini tungod sa lahi nga pagtagad sa parquet ug JSON sa walay sulod nga mga uma.

Atong tagdon ang usa ka tipikal nga sitwasyon. Pananglitan, kagahapon miabot si JSON:

Π”Π΅Π½ΡŒ 1: {"a": {"b": 1}},

ug karon ang parehas nga JSON ingon niini:

Π”Π΅Π½ΡŒ 2: {"a": null}

Ingnon ta nga kami adunay duha ka lainlaing partisyon, matag usa adunay usa ka linya.
Kung gibasa namon ang tibuuk nga datos sa gigikanan, mahibal-an ni Spark ang tipo, ug masabtan nga ang "a" usa ka natad sa tipo nga "istruktura", nga adunay usa ka nested field "b" sa tipo nga INT. Apan, kung ang matag partisyon gitipigan nga gilain, nan makakuha kami usa ka parquet nga adunay dili managsama nga mga laraw sa partisyon:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Kini nga sitwasyon nahibal-an pag-ayo, busa usa ka kapilian ang espesyal nga gidugang - kung gi-parse ang tinubdan nga datos, kuhaa ang walay sulod nga mga uma:

df = spark.read.json("...", dropFieldIfAllNull=True)

Sa kini nga kaso, ang parquet maglangkob sa mga partisyon nga mahimong basahon nga magkauban.
Bisan tuod ang mga nakahimo niini sa praktis mopahiyom og mapait dinhi. Ngano man? Oo, tungod kay lagmit adunay duha pa nga mga sitwasyon. O tulo. O upat. Ang una, nga hapit gyud mahitabo, mao nga ang mga tipo sa numero magkalainlain ang hitsura sa lainlaing mga file sa JSON. Pananglitan, {intField: 1} ug {intField: 1.1}. Kung ang ingon nga mga natad makit-an sa usa ka partisyon, nan ang schema merge magbasa sa tanan nga husto, nga mosangput sa labing tukma nga tipo. Apan kung sa lain-laing mga, nan ang usa adunay intField: int, ug ang lain adunay intField: doble.

Adunay mosunod nga bandila aron pagdumala niini nga sitwasyon:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Karon kami adunay usa ka folder diin adunay mga partisyon nga mabasa sa usa ka dataframe ug usa ka balido nga parquet sa tibuuk nga showcase. Oo? Dili.

Kinahanglan natong hinumdoman nga narehistro nato ang lamesa sa Hive. Ang hive dili case sensitive sa mga ngalan sa field, samtang ang parquet kay case sensitive. Busa, ang mga partisyon nga adunay mga eskema: field1: int, ug Field1: int parehas alang sa Hive, apan dili alang sa Spark. Ayaw kalimti ang pag-convert sa mga ngalan sa field ngadto sa lower case.

After ato, murag okay ra ang tanan.

Bisan pa, dili tanan yano ra. Adunay ikaduha, nailhan usab nga problema. Tungod kay ang matag bag-ong partisyon gitipigan nga gilain, ang partition folder adunay sulud nga mga file sa serbisyo sa Spark, pananglitan, ang bandila sa kalampusan sa operasyon nga _SUCCESS. Kini moresulta sa usa ka sayup sa pagsulay sa parquet. Aron malikayan kini, kinahanglan nimo nga i-configure ang pagsumpo aron mapugngan ang Spark sa pagdugang sa mga file sa serbisyo sa folder:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Morag karon matag adlaw usa ka bag-ong partisyon sa parquet ang idugang sa target nga showcase folder, diin nahimutang ang parsed data alang sa adlaw. Giampingan namon daan nga wala’y mga partisyon nga adunay panagbangi sa tipo sa datos.

Apan, aduna kitay ikatulo nga problema. Karon ang kinatibuk-ang schema wala mahibal-an, dugang pa, ang lamesa sa Hive adunay dili husto nga schema, tungod kay ang matag bag-ong partition lagmit nagpaila sa usa ka pagtuis sa schema.

Kinahanglan nimo nga irehistro pag-usab ang lamesa. Mahimo kini nga yano: basaha pag-usab ang parquet sa storefront, kuhaa ang schema ug paghimo usa ka DDL base niini, diin irehistro pag-usab ang folder sa Hive ingon usa ka eksternal nga lamesa, pag-update sa schema sa target nga storefront.

Naa mi ikaupat nga problema. Sa dihang narehistro na namo ang lamesa sa unang higayon, nagsalig kami sa Spark. Karon gibuhat namo kini sa among kaugalingon, ug kinahanglan namon nga hinumdoman nga ang mga parquet field mahimong magsugod sa mga karakter nga dili gitugotan sa Hive. Pananglitan, ang Spark naglabay sa mga linya nga dili kini ma-parse sa field nga "corrupt_record". Ang ingon nga natad dili marehistro sa Hive nga dili makaikyas.

Sa pagkahibalo niini, atong makuha ang laraw:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

code ("_corrupt_record", "`_corrupt_record`") + " " + f[1].ilisan(":", "`:").ilisan("<", "<`").ilisan(",", ",`").replace("array<`", "array<") naghimo sa luwas nga DDL, i.e. imbes sa:

create table tname (_field1 string, 1field string)

Uban sa mga ngalan sa field sama sa "_field1, 1field", ang luwas nga DDL gihimo diin ang mga ngalan sa field gi-eskapo: paghimo og table `tname` (`_field1` string, `1field` string).

Ang pangutana mitungha: sa unsa nga paagi sa husto nga paagi sa pagkuha sa usa ka dataframe uban sa usa ka kompleto nga schema (sa pf code)? Unsaon pagkuha ani nga pf? Kini ang ikalima nga problema. Basaha pag-usab ang laraw sa tanan nga mga partisyon gikan sa folder nga adunay mga parquet file sa target nga showcase? Kini nga pamaagi mao ang labing luwas, apan lisud.

Ang schema anaa na sa Hive. Makakuha ka og bag-ong schema pinaagi sa paghiusa sa schema sa tibuok lamesa ug sa bag-ong partition. Busa kinahanglan nimong kuhaon ang schema sa lamesa gikan sa Hive ug i-combine kini sa schema sa bag-ong partition. Mahimo kini pinaagi sa pagbasa sa metadata sa pagsulay gikan sa Hive, pagtipig niini sa usa ka temporaryo nga folder, ug paggamit sa Spark aron mabasa ang duha nga partisyon sa usa ka higayon.

Sa tinuud, adunay tanan nga imong kinahanglan: ang orihinal nga laraw sa lamesa sa Hive ug ang bag-ong partisyon. Naa pud mi data. Nagpabilin lamang nga makakuha og bag-ong schema nga naghiusa sa storefront schema ug bag-ong mga field gikan sa gibuhat nga partition:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Sunod, gihimo namon ang DDL sa pagrehistro sa lamesa, sama sa miaging snippet.
Kung ang tibuuk nga kadena molihok sa husto, nga mao, adunay usa ka pagsugod nga pagkarga, ug ang lamesa gihimo nga husto sa Hive, unya makakuha kami usa ka na-update nga laraw sa lamesa.

Ug ang katapusan nga problema mao nga dili ka makadugang usa ka partisyon sa usa ka lamesa sa Hive, tungod kay kini mabuak. Kinahanglan nimong pugson ang Hive nga ayohon ang istruktura sa partisyon niini:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Ang yano nga tahas sa pagbasa sa JSON ug paghimo sa usa ka storefront nga gibase niini moresulta sa pagbuntog sa daghang dili klaro nga mga kalisud, mga solusyon nga kinahanglan nimong pangitaon nga gilain. Ug bisan kung kini nga mga solusyon yano ra, nagkinahanglag daghang oras aron makit-an kini.

Aron mapatuman ang pagtukod sa showcase, kinahanglan nako:

  • Pagdugang mga partisyon sa showcase, pagtangtang sa mga file sa serbisyo
  • Pag-atubang sa walay sulod nga mga natad sa tinubdan nga datos nga gi-type ni Spark
  • Ihulog ang yano nga mga tipo sa usa ka hilo
  • I-convert ang mga ngalan sa field ngadto sa lowercase
  • Pagbulag sa pag-upload sa datos ug pagrehistro sa lamesa sa Hive (kaliwat sa DDL)
  • Ayaw kalimti ang pag-ikyas sa mga ngalan sa uma nga mahimong dili tugma sa Hive
  • Pagkat-on unsaon pag-update sa pagrehistro sa lamesa sa Hive

Sa pagsumaryo, among namatikdan nga ang desisyon sa pagtukod sa mga bintana sa tindahan puno sa daghang mga lit-ag. Busa, sa kaso sa mga kalisdanan sa pagpatuman, kini mao ang mas maayo sa pagkontak sa usa ka batid nga partner uban sa malampuson nga kahanas.

Salamat sa pagbasa niini nga artikulo, kami nanghinaut nga ang impormasyon makatabang kanimo.

Source: www.habr.com

Idugang sa usa ka comment