Sipaki schemaEvolution ni iwa

Eyin onkawe, o dara ọjọ!

Ninu àpilẹkọ yii, oludamọran oludari ti agbegbe iṣowo Awọn solusan Nla ti Neoflex ṣe apejuwe ni awọn alaye awọn aṣayan fun kikọ awọn iṣafihan igbekalẹ oniyipada nipa lilo Apache Spark.

Gẹ́gẹ́ bí ara iṣẹ́ àtúpalẹ̀ dátà, iṣẹ́ kíkọ́ àwọn ibi ìtajà tó dá lórí dátà tí a ti ṣètò àìnífẹ̀ẹ́ sábà máa ń wáyé.

Nigbagbogbo iwọnyi jẹ awọn akọọlẹ, tabi awọn idahun lati oriṣiriṣi awọn ọna ṣiṣe, ti o fipamọ bi JSON tabi XML. Ti gbe data naa si Hadoop, lẹhinna o nilo lati kọ iwaju ile itaja kan lati ọdọ wọn. A le ṣeto iraye si iṣafihan ti a ṣẹda, fun apẹẹrẹ, nipasẹ Impala.

Ni idi eyi, ero ti ibi-itaja ibi-itaja ibi-afẹde ni a ko mọ tẹlẹ. Pẹlupẹlu, ero naa tun ko le ṣe agbekalẹ ni ilosiwaju, nitori pe o da lori data naa, ati pe a n ṣe pẹlu awọn data ti a ti ṣeto ti o rọrun pupọ.

Fun apẹẹrẹ, loni idahun wọnyi ti wọle:

{source: "app1", error_code: ""}

ati ọla lati eto kanna wa idahun wọnyi:

{source: "app1", error_code: "error", description: "Network error"}

Bi abajade, aaye kan diẹ sii yẹ ki o ṣafikun si iṣafihan - apejuwe, ko si si ẹnikan ti o mọ boya yoo wa tabi rara.

Awọn iṣẹ-ṣiṣe ti ṣiṣẹda kan itaja lori iru data jẹ lẹwa boṣewa, ati Spark ni o ni awọn nọmba kan ti irinṣẹ fun yi. Fun sisọ awọn data orisun, atilẹyin wa fun mejeeji JSON ati XML, ati fun ero aimọ tẹlẹ, atilẹyin fun schemaEvolution ti pese.

Ni wiwo akọkọ, ojutu naa dabi irọrun. O nilo lati mu folda kan pẹlu JSON ki o ka sinu dataframe kan. Sipaki yoo ṣẹda ero kan, yi data itẹle sinu awọn ẹya. Siwaju sii, ohun gbogbo nilo lati wa ni fipamọ ni parquet, eyiti o tun ṣe atilẹyin ni Impala, nipa fiforukọṣilẹ ile itaja ni metastore Hive.

Ohun gbogbo dabi pe o rọrun.

Sibẹsibẹ, ko ṣe kedere lati awọn apẹẹrẹ kukuru ninu iwe ohun ti o le ṣe pẹlu nọmba awọn iṣoro ni iṣe.

Iwe naa ṣe apejuwe ọna kan kii ṣe lati ṣẹda iwaju ile itaja, ṣugbọn lati ka JSON tabi XML sinu aaye data kan.

Eyun, o kan fihan bi o ṣe le ka ati ṣe itupalẹ JSON:

df = spark.read.json(path...)

Eyi to lati jẹ ki data wa si Spark.

Ni iṣe, iwe afọwọkọ naa jẹ idiju pupọ ju kika awọn faili JSON lati folda kan ati ṣiṣẹda fireemu data kan. Ipo naa dabi eyi: ile itaja kan wa tẹlẹ, data tuntun wa ni gbogbo ọjọ, wọn nilo lati ṣafikun si iwaju ile itaja, laisi gbagbe pe ero naa le yatọ.

Ilana deede fun kikọ iṣafihan jẹ bi atẹle:

Igbese 1. Awọn data ti wa ni ti kojọpọ sinu Hadoop pẹlu ọwọ ojojumọ atunko ati fi kun si titun kan ipin. O wa jade folda kan pẹlu data akọkọ ti o pin nipasẹ ọjọ.

Igbese 2. Lakoko fifuye akọkọ, folda yii jẹ kika ati itupalẹ nipasẹ Spark. Abajade dataframe ti wa ni fipamọ ni ọna kika apere, fun apẹẹrẹ, ni parquet, eyiti o le ṣe gbe wọle sinu Impala. Eyi ṣẹda iṣafihan ibi-afẹde pẹlu gbogbo data ti o ti ṣajọpọ titi di aaye yii.

Igbese 3. A ṣe igbasilẹ igbasilẹ ti yoo ṣe imudojuiwọn iwaju ile itaja ni gbogbo ọjọ.
Ibeere kan wa ti ikojọpọ afikun, iwulo lati pin ifihan, ati ibeere ti mimu eto gbogbogbo ti iṣafihan naa wa.

Jẹ́ ká gbé àpẹẹrẹ kan yẹ̀ wò. Jẹ ki a sọ pe igbesẹ akọkọ ti kikọ ibi ipamọ kan ti ni imuse, ati pe awọn faili JSON ti gbe si folda kan.

Ṣiṣẹda fireemu data lati ọdọ wọn, lẹhinna fifipamọ bi iṣafihan, kii ṣe iṣoro. Eyi ni igbesẹ akọkọ ti o le rii ni irọrun ninu iwe Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Ohun gbogbo dabi pe o dara.

A ka ati ṣe atuntumọ JSON, lẹhinna a ṣafipamọ dataframe bi parquet, fiforukọṣilẹ rẹ ni Ile Agbon ni ọna irọrun:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

A gba ferese kan.

Ṣugbọn, ni ọjọ keji, data tuntun lati orisun ti ṣafikun. A ni folda kan pẹlu JSON, ati iṣafihan ti a ṣẹda lati folda yii. Lẹhin ikojọpọ ipele data atẹle lati orisun, ọja data n sonu iye data ọjọ kan.

Ojutu ọgbọn yoo jẹ lati pin iwaju ile itaja ni ọjọ kan, eyiti yoo gba laaye lati ṣafikun ipin tuntun ni gbogbo ọjọ ti n bọ. Ilana fun eyi tun mọ daradara, Spark gba ọ laaye lati kọ awọn ipin lọtọ.

Ni akọkọ, a ṣe fifuye akọkọ, fifipamọ data bi a ti salaye loke, fifi ipin nikan kun. Iṣe yii ni a pe ni ibẹrẹ ile itaja ati pe o ṣe ni ẹẹkan:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Ni ọjọ keji, a kojọpọ ipin tuntun nikan:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Gbogbo ohun ti o ku ni lati tun forukọsilẹ ni Ile-iṣọ lati ṣe imudojuiwọn ero-ọrọ naa.
Sibẹsibẹ, eyi ni ibi ti awọn iṣoro dide.

Iṣoro akọkọ. Pẹ tabi ya, awọn Abajade parquet yoo jẹ unreadable. Eyi jẹ nitori bii parquet ati JSON ṣe tọju awọn aaye ofo ni oriṣiriṣi.

Jẹ ki ká ro kan aṣoju ipo. Fun apẹẹrẹ, lana JSON de:

День 1: {"a": {"b": 1}},

ati loni JSON kanna dabi eyi:

День 2: {"a": null}

Jẹ ki a sọ pe a ni awọn ipin oriṣiriṣi meji, ọkọọkan pẹlu laini kan.
Nigba ti a ba ka gbogbo data orisun, Spark yoo ni anfani lati pinnu iru, ati pe yoo loye pe "a" jẹ aaye ti iru "igbekalẹ", pẹlu aaye itẹ-ẹiyẹ "b" ti iru INT. Ṣugbọn, ti ipin kọọkan ba ti fipamọ lọtọ, lẹhinna a gba parquet pẹlu awọn ero ipin ti ko ni ibamu:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Ipo yii jẹ mimọ daradara, nitorinaa a ti ṣafikun aṣayan ni pataki - nigbati o ba n ṣe itupalẹ data orisun, yọ awọn aaye ofo kuro:

df = spark.read.json("...", dropFieldIfAllNull=True)

Ni idi eyi, parquet yoo ni awọn ipin ti a le ka papọ.
Botilẹjẹpe awọn ti o ṣe eyi ni iṣe yoo rẹrin kikoro nibi. Kí nìdí? Bẹẹni, nitori o ṣee ṣe pe awọn ipo meji yoo wa. Tabi mẹta. Tabi mẹrin. Ni igba akọkọ ti, eyi ti yoo fẹrẹ waye, ni pe awọn oriṣi nọmba yoo yatọ ni oriṣiriṣi awọn faili JSON. Fun apẹẹrẹ, {intField: 1} ati {intField: 1.1}. Ti iru awọn aaye ba wa ni ipin kan, lẹhinna iṣopọ ero yoo ka ohun gbogbo ni deede, ti o yori si iru deede julọ. Ṣugbọn ti o ba wa ni awọn oriṣiriṣi, lẹhinna ọkan yoo ni intField: int, ati ekeji yoo ni intField: ilọpo.

Asia atẹle wa lati mu ipo yii:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Bayi a ni folda kan nibiti awọn ipin wa ti o le ka sinu dataframe kan ati parquet ti o wulo ti gbogbo iṣafihan. Bẹẹni? Rara.

A gbọdọ ranti pe a forukọsilẹ tabili ni Ile Agbon. Ile Agbon kii ṣe pataki ni awọn orukọ aaye, lakoko ti parquet jẹ ifura ọran. Nitorina, awọn ipin pẹlu awọn eto: field1: int, ati Field1: int jẹ kanna fun Hive, ṣugbọn kii ṣe fun Spark. Maṣe gbagbe lati yi awọn orukọ aaye pada si kekere.

Lẹhin iyẹn, ohun gbogbo dabi pe o dara.

Sibẹsibẹ, ko gbogbo ki o rọrun. O wa keji, tun daradara-mọ isoro. Niwọn igba ti ipin tuntun kọọkan ti wa ni ipamọ lọtọ, folda ipin yoo ni awọn faili iṣẹ Spark ninu, fun apẹẹrẹ, asia aṣeyọri iṣẹ ṣiṣe _SUCCESS. Eleyi yoo ja si ni ohun ašiše nigba ti gbiyanju lati parquet. Lati yago fun eyi, o nilo lati tunto iṣeto naa lati ṣe idiwọ Spark lati ṣafikun awọn faili iṣẹ si folda naa:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

O dabi pe ni bayi lojoojumọ ni a ṣafikun ipin parquet tuntun si folda iṣafihan ibi-afẹde, nibiti data ti a ti ṣagbekalẹ fun ọjọ naa wa. A ṣe itọju ni ilosiwaju pe ko si awọn ipin pẹlu ariyanjiyan iru data kan.

Ṣugbọn, a ni iṣoro kẹta. Bayi a ko mọ ero gbogbogbo, pẹlupẹlu, tabili ni Hive ni ero ti ko tọ, niwọn igba ti ipin tuntun kọọkan ṣe afihan ipalọlọ sinu ero.

O nilo lati tun-forukọsilẹ tabili. Eyi le ṣee ṣe ni irọrun: ka parquet ti iwaju ile itaja lẹẹkansi, mu ero naa ki o ṣẹda DDL kan ti o da lori rẹ, pẹlu eyiti o le tun forukọsilẹ folda ninu Ile Agbon bi tabili itagbangba, ṣe imudojuiwọn ero ti ibi-itaja ibi-itaja ibi-afẹde.

A ni a kẹrin isoro. Nigba ti a ba forukọsilẹ tabili fun igba akọkọ, a da lori Spark. Bayi a ṣe funrararẹ, ati pe a nilo lati ranti pe awọn aaye parquet le bẹrẹ pẹlu awọn kikọ ti ko gba laaye fun Ile Agbon. Fun apẹẹrẹ, Spark ju awọn laini jade ti ko le ṣe itupalẹ ni aaye “corrupt_record”. Iru aaye yii ko le forukọsilẹ ni Ile Agbon laisi salọ.

Ti o mọ eyi, a gba eto naa:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Koodu ("_corrupt_record", "`_corrupt_record`") + "" + f[1].ropo(":", "`:").ropo("<", "<`").ropo(",", ",`").ropo("orin<`", "orin<") ṣe DDL ailewu, ie dipo:

create table tname (_field1 string, 1field string)

Pẹlu awọn orukọ aaye bii "_field1, 1field", DDL ailewu ni a ṣe nibiti awọn orukọ aaye ti salọ: ṣẹda tabili `tname` (okun `_field1`, okun `1field`).

Ibeere naa waye: bawo ni a ṣe le gba dataframe daradara pẹlu eto pipe (ni koodu PF)? Bawo ni lati gba pf yii? Eyi ni iṣoro karun. Ṣe atunto ero ti gbogbo awọn ipin lati folda pẹlu awọn faili parquet ti iṣafihan ibi-afẹde? Ọna yii jẹ ailewu julọ, ṣugbọn o nira.

Eto naa ti wa tẹlẹ ni Ile Agbon. O le gba ero tuntun nipa apapọ ero ti gbogbo tabili ati ipin tuntun. Nitorinaa o nilo lati mu ero tabili lati Ile Agbon ki o darapọ pẹlu ero ti ipin tuntun. Eyi le ṣee ṣe nipa kika metadata idanwo lati Ile Agbon, fifipamọ si folda igba diẹ, ati lilo Spark lati ka awọn ipin mejeeji ni ẹẹkan.

Ni otitọ, ohun gbogbo wa ti o nilo: ero tabili atilẹba ni Ile Agbon ati ipin tuntun. A tun ni data. O wa nikan lati gba ero tuntun ti o ṣajọpọ ero iwaju ile itaja ati awọn aaye tuntun lati ipin ti a ṣẹda:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Nigbamii, a ṣẹda iforukọsilẹ tabili tabili DDL, bi ninu snippet ti tẹlẹ.
Ti gbogbo pq ba ṣiṣẹ ni deede, eyun, fifuye ipilẹṣẹ kan wa, ati pe a ṣẹda tabili ni deede ni Ile Agbon, lẹhinna a gba ero tabili imudojuiwọn.

Ati awọn ti o kẹhin isoro ni wipe o ko ba le kan fi kan ipin to a Ile Agbon tabili, nitori o ti yoo wa ni dà. O nilo lati fi ipa mu Hive lati ṣatunṣe eto ipin rẹ:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Iṣẹ-ṣiṣe ti o rọrun ti kika JSON ati ṣiṣẹda iwaju itaja ti o da lori rẹ ni abajade ni bibori nọmba kan ti awọn iṣoro ti ko tọ, awọn solusan fun eyiti o ni lati wa lọtọ. Ati pe botilẹjẹpe awọn solusan wọnyi rọrun, o gba akoko pupọ lati wa wọn.

Lati ṣe imuse ikole ti iṣafihan, Mo ni lati:

  • Ṣafikun awọn ipin si iṣafihan, yiyọ awọn faili iṣẹ kuro
  • Ṣe pẹlu awọn aaye ofo ni data orisun ti Spark ti tẹ
  • Sọ awọn iru ti o rọrun si okun kan
  • Yipada awọn orukọ aaye si kekere
  • Ikojọpọ data lọtọ ati iforukọsilẹ tabili ni Hive (iran DDL)
  • Maṣe gbagbe lati sa fun awọn orukọ aaye ti o le jẹ ibamu pẹlu Hive
  • Kọ ẹkọ bi o ṣe le ṣe imudojuiwọn iforukọsilẹ tabili ni Ile Agbon

Ni akojọpọ, a ṣe akiyesi pe ipinnu lati kọ awọn window itaja jẹ pẹlu ọpọlọpọ awọn ọfin. Nitorinaa, ni ọran ti awọn iṣoro ni imuse, o dara lati kan si alabaṣepọ ti o ni iriri pẹlu oye aṣeyọri.

O ṣeun fun kika nkan yii, a nireti pe alaye naa wulo.

orisun: www.habr.com

Fi ọrọìwòye kun