Spark schemaEvolution in pratica

Cari lettori, bona sera !

In questu articulu, u cunsultore principali per l'area di cummerciale di Big Data Solutions di Neoflex descrive in dettaglio l'opzioni per a custruzzione di vetrina di struttura variabile cù Apache Spark.

Cum'è parte di un prughjettu di analisi di dati, u compitu di custruisce vitrine basate nantu à dati strutturati sò spessu.

Di genere, questi sò logs, o risposti da diversi sistemi, salvati in forma di JSON o XML. I dati sò caricati in Hadoop, allora un storefront deve esse custruitu da ellu. Pudemu urganizà l'accessu à a vetrina creata, per esempiu, attraversu Impala.

In questu casu, u layout di a vetrina di destinazione hè scunnisciutu in anticipu. Inoltre, u schema ùn pò esse elaboratu in anticipu, postu chì dipende di e dati, è avemu trattatu di sti dati strutturati assai debbuli.

Per esempiu, oghje a seguente risposta hè registrata:

{source: "app1", error_code: ""}

è dumane a seguente risposta vene da u listessu sistema:

{source: "app1", error_code: "error", description: "Network error"}

In u risultatu, un altru campu deve esse aghjuntu à a vetrina - descrizzione, è nimu sà s'ellu vene o micca.

U compitu di creà un mart nantu à tali dati hè abbastanza standard, è Spark hà una quantità di strumenti per questu. Per l'analisi di dati fonte, ci hè supportu per JSON è XML, è per un schema precedentemente scunnisciutu, u supportu schemaEvolution hè furnitu.

À u primu sguardu, a suluzione pare simplice. Avete bisognu di piglià u cartulare cù JSON è leghje in u dataframe. Spark hà da creà un schema è trasfurmà i dati nidificati in strutture. In seguitu, tuttu deve esse salvatu in parquet, chì hè ancu supportatu in Impala, registrendu a vitrina in u metastore Hive.

Tuttu pare esse simplice.

Tuttavia, da l'esempii brevi in ​​a documentazione ùn hè micca chjaru chì fà cù una quantità di prublemi in pratica.

A documentazione descrive un approcciu micca per creà una vetrina, ma per leghje JSON o XML in un dataframe.

Vale à dì, mostra solu cumu leghje è analizà JSON:

df = spark.read.json(path...)

Questu hè abbastanza per rende i dati dispunibuli à Spark.

In pratica, u scenariu hè assai più cumplessu cà solu leghje i fugliali JSON da un cartulare è creà un dataframe. A situazione s'assumiglia cusì: ci hè digià una certa vitrina, novi dati arrivanu ogni ghjornu, anu da esse aghjuntu à a vitrina, senza scurdà chì u schema pò differisce.

U schema abituale per a custruzzione di una vetrina hè u seguitu:

Step 1. I dati sò caricati in Hadoop, seguitu da una carica addiziale ogni ghjornu è aghjuntu à una nova partizione. U risultatu hè un cartulare cù dati fonte, spartutu da ghjornu.

Step 2. Durante a carica iniziale, stu cartulare hè lettu è analizatu cù Spark. U dataframe resultanti hè salvatu in un furmatu chì pò esse analizatu, per esempiu, in parquet, chì pò esse impurtatu in Impala. Questu crea una vetrina di destinazione cù tutte e dati chì anu accumulatu finu à questu puntu.

Step 3. Un scaricamentu hè creatu chì aghjurnà a vetrina ogni ghjornu.
A quistione di a carica incrementale si sviluppa, a necessità di particionà a vetrina, è a quistione di sustene u layout generale di a vetrina.

Demu un esempiu. Diciamu chì u primu passu di custruisce un repository hè statu implementatu, è a carica di i fugliali JSON in un cartulare hè cunfigurata.

Ùn hè micca prublema per creà un dataframe da elli è poi salvà cum'è una vitrina. Questu hè u primu passu chì pò esse facilmente truvatu in a documentazione di Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Tuttu pare esse bè.

Avemu lettu è analizatu u JSON, allora salvemu u dataframe cum'è parquet, registrendu in Hive in ogni modu cunvene:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Avemu una vetrina.

Ma, u ghjornu dopu, novi dati sò stati aghjuntu da a fonte. Avemu un cartulare cù JSON, è un storefront creatu basatu annantu à questu cartulare. Dopu avè caricatu a prossima parte di dati da a fonte, a vetrina ùn hà micca abbastanza dati per un ghjornu.

Una suluzione logica seria di particionà a vetrina per ghjornu, chì vi permetterà di aghjunghje una nova partizione ogni ghjornu dopu. U mecanismu per questu hè ancu cunnisciutu; Spark permette di arregistrà partizioni separatamente.

Prima, facemu a carica iniziale, salvendu i dati cum'è descrittu sopra, aghjunghjendu solu partitioning. Questa azione hè chjamata inizializazione di vetrina è hè fatta solu una volta:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

U ghjornu dopu scarichemu solu a nova partizione:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Tuttu ciò chì resta hè di riregistrà in Hive per aghjurnà u schema.
Tuttavia, hè quì chì i prublemi sorgi.

Primu prublema. Prima o dopu, u parquet resultanti ùn sarà più leghjite. Questu hè dovutu à cumu u parquet è JSON trattanu i campi vacanti in modu diversu.

Fighjemu una situazione tipica. Per esempiu, ieri JSON arriva:

День 1: {"a": {"b": 1}},

è oghje u listessu JSON pare cusì:

День 2: {"a": null}

Diciamu chì avemu dui partizioni diffirenti, ognunu cù una linea.
Quandu avemu lettu tutta a fonte di dati, Spark hà da pudè determinà u tipu, è capisce chì "a" hè un campu di tipu "struttura", cù un campu nidificatu "b" di tipu INT. Ma, se ogni partizione hè stata salvata separatamente, u risultatu hè un parquet cù schemi di partizioni incompatibili:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Questa situazione hè ben cunnisciuta, cusì una opzione hè stata appositamente aghjunta per sguassà i campi vacanti quandu analizzanu i dati di fonte:

df = spark.read.json("...", dropFieldIfAllNull=True)

In questu casu, u parquet serà custituitu di partizioni chì ponu esse leghje inseme.
Ancu s'è quelli chì anu fattu questu in pratica, surrisanu amaramente. Perchè? Iè, perchè più prubabilmente ci saranu duie altre situazioni. O trè. O quattru. U primu, chì hè quasi sicuru, hè chì i tipi numerichi pareranu diffirenti in diversi schedarii JSON. Per esempiu, {intField: 1} è {intField: 1.1}. Se tali campi appariscenu in un batch, allora u schema merge leghje tuttu bè, purtendu à u tipu più precisu. Ma s'ellu hè in diverse, allora unu averà intField: int, è l'altru averà intField: double.

Per trattà sta situazione, ci hè a seguente bandiera:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Avà avemu un cartulare induve si trovanu e partizioni, chì ponu esse leghje in un unicu dataframe è un parquet validu di tuttu u magazinu. Iè? Innò.

Avemu da ricurdà chì avemu registratu a tavola in Hive. Hive ùn hè micca sensible à u casu in i nomi di campu, ma u parquet hè. Dunque, e partizioni cù schemi: field1: int, è Field1: int sò listessi per Hive, ma micca per Spark. Ùn vi scurdate di cambià i nomi di campi in minuscule.

Dopu questu, tuttu pare esse bè.

Tuttavia, micca tutti cusì sèmplice. Un secondu prublema, ancu cunnisciuta, sorge. Siccomu ogni nova particione hè salvatu separatamente, u cartulare di partizioni cuntene i schedarii di serviziu Spark, per esempiu, a bandiera di successu di l'operazione _SUCCESS. Questu hà da esse un errore quandu pruvate di parquet. Per evitari questu, avete bisognu di cunfigurà a cunfigurazione impediscendu à Spark di aghjunghje i schedarii di serviziu à u cartulare:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Sembra chì avà ogni ghjornu una nova partizione di parquet hè aghjuntu à u cartulare di u magazinu di destinazione, induve si trovanu i dati analizati per u ghjornu. Avemu cura in anticipu per assicurà chì ùn ci era micca partizioni cù cunflitti di tipu di dati.

Ma simu di fronte à un terzu prublema. Avà u schema generale ùn hè micca cunnisciutu, in più, in Hive, a tavula hà u schema sbagliatu, postu chì ogni nova partizione hà prubabilmente introduttu una distorsione in u schema.

A tavula deve esse riregistrata. Questu pò esse fattu simplicemente: leghjite u parquet di a vitrina di novu, pigliate u schema è creanu un DDL basatu annantu à questu, cù quale pudete re-registrà u cartulare in Hive cum'è una tavola esterna, aghjurnà u schema di a vetrina di destinazione.

Facemu un quartu prublema. Quandu avemu registratu a tavula per a prima volta, avemu cunfidendu Spark. Avà u facemu noi stessi, è avemu bisognu di ricurdà chì i campi di parquet ponu principià cù caratteri chì ùn sò micca permessi da Hive. Per esempiu, Spark lancia linee chì ùn pudia micca analizà in u campu "corrupt_record". Un tali campu ùn pò esse registratu in Hive senza scappà.

Sapemu questu, avemu u schema:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

codice ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace ("<", "<`").replace(",", ",`"). replace ("array<`", "array<") faci DDL sicuru, vale à dì invece di:

create table tname (_field1 string, 1field string)

Cù nomi di campu cum'è "_field1, 1field", un DDL sicuru hè fattu induve i nomi di campu sò scappati: creanu a table `tname` (`_field1` string, `1field` string).

A quistione nasce: cumu uttene currettamente un dataframe cù un schema cumpletu (in codice pf)? Cumu uttene questu pf? Questu hè u quintu prublema. Rileghjite u schema di tutte e partizioni da u cartulare cù i schedarii di parquet di a vetrina di destinazione? Stu metudu hè u più sicuru, ma difficiule.

U schema hè digià in Hive. Pudete ottene un novu schema cumminendu u schema di a tavula sana è a nova partizione. Questu significa chì avete bisognu di piglià l'schema di a tavula da Hive è combina cù u schema di a nova partizione. Questu pò esse fattu per leghje i metadati di prova da Hive, salvendu in un cartulare tempuranee, è leghje e duie partizioni in una volta cù Spark.

Essenzialmente, ci hè tuttu ciò chì avete bisognu: u schema di tabella originale in Hive è una nova partizione. Avemu ancu dati. Tuttu ciò chì resta hè di ottene un novu schema chì combina u schema di vetrina è novi campi da a partizione creata:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

In seguitu, creamu a tavula di registrazione DDL, cum'è in u fragmentu precedente.
Se tutta a catena funziona bè, vale à dì, ci era una carica iniziale, è a tavola hè stata creata currettamente in Hive, allora avemu un schema di tabella aghjurnatu.

L'ultimu prublema hè chì ùn pudete micca facilmente aghjunghje una partizione à una tavola Hive, perchè si romperà. Avete bisognu di furzà Hive à riparà a so struttura di partizione:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

U compitu simplice di leghje JSON è di creà un storefront basatu annantu à questu risultatu in superà una quantità di difficultà implicite, e soluzioni per quale deve esse truvate separatamente. E ancu s'è sti suluzioni sò simplici, ci vole assai tempu per truvà.

Per implementà a custruzzione di una vetrina, avemu avutu à:

  • Aghjunghjite partizioni à a vetrina, sguassate i schedarii di serviziu
  • Trattate cù i campi vacanti in i dati fonte chì Spark hà scrittu
  • Cast tipi simplici à stringa
  • Cunvertite i nomi di campi in minuscule
  • Carica di dati separata è registrazione di a tavola in Hive (creazione DDL)
  • Ùn vi scurdate di scappà nomi di campu chì pò esse micca cumpatibili cù Hive
  • Amparate à aghjurnà a registrazione di a tavola in Hive

Per sintetizà, avemu nutatu chì a decisione di custruisce vetri di magazzini hè chjosa di parechje trappule. Per quessa, se nascenu difficultà in l'implementazione, hè megliu vultà à un associu espertu cun sapè fà successu.

Ti ringraziu per leghje stu articulu, speremu chì truvate l'infurmazioni utili.

Source: www.habr.com

Add a comment