Spark schemaEvolution գործնականում

Հարգելի ընթերցողներ, բարի օր:

Այս հոդվածում Neoflex-ի Big Data Solutions բիզնես տարածքի առաջատար խորհրդատուն մանրամասն նկարագրում է Apache Spark-ի միջոցով փոփոխական կառուցվածքի ցուցափեղկեր կառուցելու տարբերակները:

Որպես տվյալների վերլուծության նախագծի մի մաս, հաճախ առաջանում է ցուցափեղկեր կառուցելու խնդիրը, որը հիմնված է թույլ կառուցվածքային տվյալների վրա:

Սովորաբար դրանք տեղեկամատյաններ են կամ պատասխաններ տարբեր համակարգերից, որոնք պահվում են որպես JSON կամ XML: Տվյալները վերբեռնվում են Hadoop-ում, այնուհետև դուք պետք է դրանցից ցուցափեղկ կառուցեք: Մենք կարող ենք կազմակերպել մուտք դեպի ստեղծված ցուցափեղկ, օրինակ՝ Impala-ի միջոցով։

Այս դեպքում թիրախային ցուցափեղկի սխեման նախապես հայտնի չէ: Ավելին, սխեման չի կարող նախապես կազմվել, քանի որ դա կախված է տվյալներից, և մենք գործ ունենք այս շատ թույլ կառուցվածքի տվյալների հետ։

Օրինակ, այսօր գրանցված է հետևյալ պատասխանը.

{source: "app1", error_code: ""}

իսկ վաղը նույն համակարգից գալիս է հետևյալ պատասխանը.

{source: "app1", error_code: "error", description: "Network error"}

Արդյունքում ցուցափեղկին պետք է ավելացվի ևս մեկ դաշտ՝ նկարագրություն, և ոչ ոք չգիտի՝ կգա, թե ոչ։

Նման տվյալների վրա խանութի ցուցափեղկեր ստեղծելու խնդիրը բավականին ստանդարտ է, և Spark-ը դրա համար ունի մի շարք գործիքներ: Աղբյուրի տվյալների վերլուծության համար կա աջակցություն և՛ JSON, և՛ XML, իսկ նախկինում անհայտ սխեմայի համար տրամադրվում է աջակցություն schemaEvolution-ին:

Առաջին հայացքից լուծումը պարզ է թվում. Ձեզ անհրաժեշտ է թղթապանակ վերցնել JSON-ով և կարդալ այն տվյալների շրջանակում: Spark-ը կստեղծի սխեմա, տեղադրվող տվյալները կվերածի կառուցվածքների: Ավելին, ամեն ինչ պետք է պահպանվի մանրահատակի մեջ, որը նույնպես աջակցվում է Impala-ում՝ գրանցելով խանութի ցուցափեղկը Hive metastore-ում:

Ամեն ինչ կարծես պարզ է.

Այնուամենայնիվ, փաստաթղթերի կարճ օրինակներից պարզ չէ, թե ինչ անել գործնականում մի շարք խնդիրների հետ:

Փաստաթղթերը նկարագրում են ոչ թե ցուցափեղկեր ստեղծելու մոտեցում, այլ JSON կամ XML տվյալների շրջանակում կարդալու համար:

Մասնավորապես, այն պարզապես ցույց է տալիս, թե ինչպես կարդալ և վերլուծել JSON-ը.

df = spark.read.json(path...)

Սա բավական է, որպեսզի տվյալները հասանելի լինեն Spark-ին։

Գործնականում սկրիպտը շատ ավելի բարդ է, քան պարզապես JSON ֆայլերը թղթապանակից կարդալը և տվյալների շրջանակ ստեղծելը: Իրավիճակն այսպիսին է՝ արդեն կա որոշակի ցուցափեղկ, ամեն օր նոր տվյալներ են գալիս, դրանք պետք է ավելացնել ցուցափեղկին՝ չմոռանալով, որ սխեման կարող է տարբերվել։

Ցուցափեղկի կառուցման սովորական սխեման հետևյալն է.

Քայլ 1. Տվյալները բեռնվում են Hadoop-ում՝ հետագա ամենօրյա վերաբեռնմամբ և ավելացվում նոր բաժանման մեջ: Ստացվում է թղթապանակ նախնական տվյալներով՝ բաժանված ըստ օրվա։

Քայլ 2. Սկզբնական բեռնման ժամանակ այս թղթապանակը կարդացվում և վերլուծվում է Spark-ի կողմից: Ստացված տվյալների շրջանակը պահվում է վերլուծելի ձևաչափով, օրինակ՝ մանրահատակի մեջ, որն այնուհետև կարող է ներմուծվել Իմպալա: Սա ստեղծում է թիրախային ցուցափեղկ՝ մինչ այս պահը կուտակված բոլոր տվյալների հետ:

Քայլ 3. Ստեղծվել է ներբեռնում, որն ամեն օր կթարմացնի խանութի ցուցափեղկը:
Կա աստիճանական բեռնման, ցուցափեղկի բաժանման անհրաժեշտության և ցուցափեղկի ընդհանուր սխեմայի պահպանման հարց:

Օրինակ բերենք. Ենթադրենք, որ պահոց կառուցելու առաջին քայլն իրականացվել է, և JSON ֆայլերը թղթապանակ վերբեռնելը կազմաձևված է:

Նրանցից տվյալների շրջանակ ստեղծելը, այնուհետև որպես ցուցափեղկի պահպանում, խնդիր չէ: Սա առաջին քայլն է, որը հեշտությամբ կարելի է գտնել Spark-ի փաստաթղթերում.

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Ամեն ինչ կարծես թե լավ է։

Մենք կարդացել և վերլուծել ենք JSON-ը, այնուհետև պահում ենք տվյալների շրջանակը որպես մանրահատակ՝ գրանցելով այն Hive-ում ցանկացած հարմար եղանակով.

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Մենք պատուհան ենք ստանում:

Բայց, հաջորդ օրը աղբյուրից նոր տվյալներ են ավելացվել։ Մենք ունենք JSON-ով թղթապանակ և այս պանակից ստեղծված ցուցափեղկ: Աղբյուրից տվյալների հաջորդ խմբաքանակը բեռնելուց հետո տվյալների շուկան բացակայում է մեկ օրվա արժեքի տվյալները:

Տրամաբանական լուծումը կլինի խանութի ցուցափեղկի բաժանումն օրեցօր, ինչը թույլ կտա ամեն հաջորդ օրը նոր բաժանմունք ավելացնել։ Դրա մեխանիզմը նույնպես հայտնի է, Spark-ը թույլ է տալիս առանձին գրել միջնորմներ։

Նախ, մենք կատարում ենք նախնական բեռնում, պահպանելով տվյալները, ինչպես նկարագրված է վերևում, ավելացնելով միայն բաժանումը: Այս գործողությունը կոչվում է խանութի սկզբնավորում և կատարվում է միայն մեկ անգամ.

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Հաջորդ օրը մենք բեռնում ենք միայն նոր բաժին.

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Մնում է միայն վերագրանցվել Hive-ում՝ սխեման թարմացնելու համար:
Այնուամենայնիվ, այստեղ է, որ խնդիրներ են առաջանում:

Առաջին խնդիր. Վաղ թե ուշ ստացված մանրահատակը անընթեռնելի կլինի։ Դա պայմանավորված է նրանով, թե ինչպես են մանրահատակը և JSON-ը տարբեր կերպ են վերաբերվում դատարկ դաշտերին:

Դիտարկենք բնորոշ իրավիճակ. Օրինակ, երեկ JSON ժամանում է.

День 1: {"a": {"b": 1}},

և այսօր նույն JSON-ն այսպիսի տեսք ունի.

День 2: {"a": null}

Ենթադրենք, մենք ունենք երկու տարբեր բաժանումներ, որոնցից յուրաքանչյուրը ունի մեկ տող:
Երբ մենք կարդում ենք աղբյուրի ամբողջ տվյալները, Spark-ը կկարողանա որոշել տեսակը և կհասկանա, որ «a»-ն «կառույց» տիպի դաշտ է, որի մեջ կա «b» տիպի INT դաշտ: Բայց, եթե յուրաքանչյուր միջնորմ պահվել է առանձին, ապա արդյունքը մանրահատակ է՝ անհամատեղելի բաժանման սխեմաներով.

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Այս իրավիճակը լավ հայտնի է, ուստի հատուկ ավելացվել է մի տարբերակ՝ աղբյուրի տվյալները վերլուծելիս հեռացնել դատարկ դաշտերը.

df = spark.read.json("...", dropFieldIfAllNull=True)

Այս դեպքում մանրահատակը բաղկացած կլինի միջնորմներից, որոնք կարելի է միասին կարդալ:
Չնայած նրանք, ովքեր դա արել են գործնականում, այստեղ դառնորեն կժպտան։ Ինչո՞ւ։ Այո, քանի որ հնարավոր է ևս երկու իրավիճակ լինի: Կամ երեք. Կամ չորս. Առաջինը, որը գրեթե անկասկած տեղի կունենա, այն է, որ թվային տեսակները տարբեր JSON ֆայլերում տարբեր տեսք կունենան: Օրինակ՝ {intField: 1} և {intField: 1.1}: Եթե ​​նման դաշտերը հայտնաբերվեն մեկ բաժանման մեջ, ապա սխեմայի միաձուլումը կկարդա ամեն ինչ ճիշտ՝ հանգեցնելով ամենաճշգրիտ տեսակին: Բայց եթե տարբեր են, ապա մեկը կունենա intField՝ int, իսկ մյուսը կունենա intField՝ double:

Այս իրավիճակը կարգավորելու համար կա հետևյալ դրոշը.

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Այժմ մենք ունենք մի թղթապանակ, որտեղ կան միջնորմներ, որոնք կարելի է կարդալ մեկ տվյալների շրջանակում և ողջ ցուցափեղկի վավեր մանրահատակի մեջ: Այո? Ոչ

Պետք է հիշել, որ աղյուսակը գրանցել ենք Փեթակում։ Փեթակը մեծատառերի զգայուն չէ դաշտերի անուններում, մինչդեռ մանրահատակը մեծատառերի զգայուն է: Հետևաբար, սխեմաներով բաժանումները՝ field1: int և Field1: int նույնն են Hive-ի համար, բայց ոչ Spark-ի համար: Մի մոռացեք դաշտերի անունները վերածել փոքրատառերի:

Դրանից հետո կարծես ամեն ինչ լավ է։

Այնուամենայնիվ, ամեն ինչ այնքան էլ պարզ չէ: Կա երկրորդ, նույնպես հայտնի խնդիր. Քանի որ յուրաքանչյուր նոր բաժանմունք պահվում է առանձին, բաժանման պանակը կպարունակի Spark ծառայության ֆայլեր, օրինակ՝ _SUCCESS գործողության հաջողության դրոշը: Սա կհանգեցնի սխալի, երբ փորձում եք մանրահատակ դնել: Սրանից խուսափելու համար դուք պետք է կազմաձևեք կոնֆիգուրացիան, որպեսզի Spark-ը չավելացնի ծառայության ֆայլերը թղթապանակում.

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Թվում է, թե այժմ ամեն օր նոր մանրահատակի միջնորմ է ավելացվում թիրախային ցուցափեղկի թղթապանակում, որտեղ գտնվում են օրվա վերլուծված տվյալները։ Մենք նախապես հոգացել ենք, որ տվյալների տիպի կոնֆլիկտով բաժանումներ չլինեն։

Բայց մենք երրորդ խնդիրն ունենք. Այժմ ընդհանուր սխեման հայտնի չէ, ավելին, Hive-ի աղյուսակը սխալ սխեմա ունի, քանի որ յուրաքանչյուր նոր բաժին, ամենայն հավանականությամբ, խեղաթյուրում է մտցրել սխեմայի մեջ:

Դուք պետք է վերագրանցեք աղյուսակը: Դա կարելի է անել պարզապես՝ նորից կարդալ ցուցափեղկի մանրահատակը, վերցնել սխեման և դրա հիման վրա ստեղծել DDL, որով վերագրանցել թղթապանակը Hive-ում որպես արտաքին աղյուսակ՝ թարմացնելով թիրախային ցուցափեղկի սխեման։

Մենք չորրորդ խնդիր ունենք. Երբ մենք առաջին անգամ գրանցեցինք աղյուսակը, ապավինեցինք Spark-ին։ Այժմ մենք դա անում ենք ինքներս, և մենք պետք է հիշենք, որ մանրահատակի դաշտերը կարող են սկսվել այնպիսի նիշերով, որոնք անթույլատրելի են Hive-ի համար: Օրինակ, Spark-ը դուրս է նետում տողերը, որոնք չի կարող վերլուծել «corrupt_record» դաշտում: Նման դաշտը չի կարող գրանցվել Փեթակում առանց փախուստի:

Իմանալով սա, մենք ստանում ենք սխեման.

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Code ("_corrupt_record", "`_corrupt_record") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ","").replace("զանգված<`", "զանգված<") դարձնում է անվտանգ DDL, այսինքն՝ փոխարենը՝

create table tname (_field1 string, 1field string)

Դաշտերի անուններով, ինչպիսիք են «_field1, 1field», ապահով DDL-ն արվում է այնտեղ, որտեղ դաշտերի անունները փախչում են. ստեղծել «tname» աղյուսակը («_field1» տող, «1field» տող):

Հարց է առաջանում՝ ինչպե՞ս ճիշտ ստանալ տվյալների շրջանակ՝ ամբողջական սխեմայով (pf կոդով): Ինչպե՞ս ստանալ այս pf-ը: Սա հինգերորդ խնդիրն է։ Վերընթերցե՞լ եք նպատակային ցուցափեղկի մանրահատակի ֆայլերով թղթապանակից բոլոր բաժանումների սխեման: Այս մեթոդը ամենաանվտանգն է, բայց դժվար:

Սխեման արդեն Hive-ում է: Դուք կարող եք ստանալ նոր սխեման՝ համադրելով ամբողջ աղյուսակի և նոր բաժանման սխեման: Այսպիսով, դուք պետք է վերցնեք աղյուսակի սխեման Hive-ից և համատեղեք այն նոր բաժանման սխեմայի հետ: Դա կարելի է անել՝ կարդալով թեստային մետատվյալները Hive-ից, պահելով այն ժամանակավոր թղթապանակում և օգտագործելով Spark-ը՝ երկու բաժանմունքները միանգամից կարդալու համար:

Փաստորեն, կա այն ամենը, ինչ ձեզ հարկավոր է՝ սկզբնական սեղանի սխեման Hive-ում և նոր բաժանումը: Մենք էլ ունենք տվյալներ. Մնում է միայն նոր սխեմա ստանալ, որը միավորում է խանութի ցուցափեղկի սխեման և ստեղծված բաժանման նոր դաշտերը.

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Հաջորդը, մենք ստեղծում ենք աղյուսակի գրանցման DDL, ինչպես նախորդ հատվածում:
Եթե ​​ամբողջ շղթան ճիշտ է աշխատում, այսինքն՝ եղել է սկզբնավորման բեռ, և աղյուսակը ճիշտ է ստեղծվել Hive-ում, ապա մենք ստանում ենք թարմացված աղյուսակի սխեման։

Վերջին խնդիրն այն է, որ դուք չեք կարող հեշտությամբ բաժանում ավելացնել Hive սեղանին, քանի որ այն կփչանա: Դուք պետք է ստիպեք Hive-ին շտկել իր բաժանման կառուցվածքը.

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

JSON-ը կարդալու և դրա հիման վրա ցուցափեղկի ստեղծման պարզ առաջադրանքը հանգեցնում է մի շարք անուղղակի դժվարությունների հաղթահարմանը, որոնց լուծումները պետք է առանձին փնտրել: Եվ չնայած այս լուծումները պարզ են, դրանք գտնելու համար շատ ժամանակ է պահանջվում:

Ցուցափեղկի կառուցումն իրականացնելու համար ես ստիպված էի.

  • Ցուցափեղկին ավելացրեք միջնորմներ՝ ազատվելով սպասարկման ֆայլերից
  • Գործեք Spark-ի մուտքագրած աղբյուրի տվյալների դատարկ դաշտերի հետ
  • Պարզ տիպեր նետեք լարին
  • Փոխարկեք դաշտերի անունները փոքրատառերի
  • Առանձին տվյալների վերբեռնում և աղյուսակի գրանցում Hive-ում (DDL սերունդ)
  • Մի մոռացեք խուսափել դաշտերի անուններից, որոնք կարող են անհամատեղելի լինել Hive-ի հետ
  • Իմացեք, թե ինչպես թարմացնել աղյուսակի գրանցումը Hive-ում

Ամփոփելով՝ նշում ենք, որ ցուցափեղկեր կառուցելու որոշումը հղի է բազմաթիվ թակարդներով։ Հետևաբար, իրականացման դժվարությունների դեպքում ավելի լավ է կապ հաստատել փորձառու գործընկերոջ հետ, որն ունի հաջող փորձաքննություն:

Շնորհակալություն այս հոդվածը կարդալու համար, հուսով ենք, որ տեղեկատվությունը օգտակար է ձեզ համար:

Source: www.habr.com

Добавить комментарий