Esquema d'espurnaL'evolució a la pràctica

Benvolguts lectors, bon dia!

En aquest article, el consultor líder de l'àrea de negoci de Solucions de Big Data de Neoflex descriu amb detall les opcions per crear aparadors d'estructura variable amb Apache Spark.

Com a part d'un projecte d'anàlisi de dades, sovint sorgeix la tasca de construir aparadors basats en dades poc estructurades.

Normalment es tracta de registres, o respostes de diversos sistemes, desats com a JSON o XML. Les dades es pengen a Hadoop i, a continuació, heu de crear una botiga a partir d'elles. Podem organitzar l'accés a l'aparador creat, per exemple, a través d'Impala.

En aquest cas, l'esquema de l'aparador de destinació no es coneix per endavant. A més, l'esquema tampoc no es pot elaborar amb antelació, ja que depèn de les dades, i estem tractant aquestes dades molt poc estructurades.

Per exemple, avui es registra la resposta següent:

{source: "app1", error_code: ""}

i demà del mateix sistema surt la següent resposta:

{source: "app1", error_code: "error", description: "Network error"}

Com a resultat, s'hauria d'afegir un camp més a l'aparador: descripció, i ningú sap si arribarà o no.

La tasca de crear un aparador amb aquestes dades és bastant estàndard i Spark té diverses eines per a això. Per analitzar les dades d'origen, hi ha suport tant per a JSON com per a XML, i per a un esquema desconegut anteriorment, es proporciona suport per a schemaEvolution.

A primera vista, la solució sembla senzilla. Heu d'agafar una carpeta amb JSON i llegir-la en un marc de dades. Spark crearà un esquema, convertirà les dades imbricades en estructures. A més, tot s'ha de desar al parquet, que també és compatible amb Impala, registrant l'aparador a la metastore Hive.

Tot sembla ser senzill.

Tanmateix, no queda clar a partir dels exemples breus de la documentació què fer amb una sèrie de problemes a la pràctica.

La documentació descriu un enfocament no per crear un aparador, sinó per llegir JSON o XML en un marc de dades.

És a dir, simplement mostra com llegir i analitzar JSON:

df = spark.read.json(path...)

Això és suficient per posar les dades a disposició de Spark.

A la pràctica, l'script és molt més complicat que només llegir fitxers JSON d'una carpeta i crear un marc de dades. La situació es veu així: ja hi ha un cert aparador, cada dia arriben noves dades, s'han d'afegir a l'aparador, sense oblidar que l'esquema pot ser diferent.

L'esquema habitual per construir un aparador és el següent:

Pas 1. Les dades es carreguen a Hadoop amb una recàrrega diària posterior i s'afegeixen a una nova partició. Resulta una carpeta amb dades inicials particionades per dia.

Pas 2. Durant la càrrega inicial, Spark llegeix i analitza aquesta carpeta. El marc de dades resultant es desa en un format analitzable, per exemple, en parquet, que després es pot importar a Impala. Això crea un aparador objectiu amb totes les dades que s'han acumulat fins a aquest moment.

Pas 3. Es crea una descàrrega que actualitzarà l'aparador cada dia.
Hi ha una qüestió de càrrega incremental, la necessitat de dividir l'aparador i la qüestió de mantenir l'esquema general de l'aparador.

Prenguem un exemple. Suposem que s'ha implementat el primer pas de la creació d'un dipòsit i que els fitxers JSON es pengen a una carpeta.

Crear un marc de dades a partir d'ells i desar-lo com a aparador no és cap problema. Aquest és el primer pas que es pot trobar fàcilment a la documentació de Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Tot sembla estar bé.

Llegim i analitzem JSON, després desem el marc de dades com a parquet, registrant-lo a Hive de qualsevol manera convenient:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Tenim una finestra.

Però, l'endemà, es van afegir noves dades de la font. Tenim una carpeta amb JSON i un aparador creat a partir d'aquesta carpeta. Després de carregar el següent lot de dades de la font, al data mart li falten dades d'un dia.

La solució lògica seria dividir l'aparador per dia, cosa que permetrà afegir una nova partició cada dia següent. El mecanisme d'això també és ben conegut, Spark us permet escriure particions per separat.

Primer, fem una càrrega inicial, desant les dades tal com es descriu anteriorment, afegint només particions. Aquesta acció s'anomena inicialització de l'aparador i només es fa una vegada:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

L'endemà, només carreguem una partició nova:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Tot el que queda és tornar a registrar-se a Hive per actualitzar l'esquema.
No obstant això, aquí és on sorgeixen els problemes.

Primer problema. Tard o d'hora, el parquet resultant serà il·legible. Això es deu a com el parquet i el JSON tracten els camps buits de manera diferent.

Considerem una situació típica. Per exemple, ahir arriba JSON:

День 1: {"a": {"b": 1}},

i avui el mateix JSON té aquest aspecte:

День 2: {"a": null}

Suposem que tenim dues particions diferents, cadascuna amb una línia.
Quan llegim totes les dades d'origen, Spark podrà determinar-ne el tipus i entendrà que "a" és un camp de tipus "estructura", amb un camp imbricat "b" de tipus INT. Però, si cada partició es va desar per separat, obtenim un parquet amb esquemes de particions incompatibles:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Aquesta situació és ben coneguda, per la qual cosa s'ha afegit especialment una opció: en analitzar les dades d'origen, elimineu els camps buits:

df = spark.read.json("...", dropFieldIfAllNull=True)

En aquest cas, el parquet estarà format per envans que es podran llegir conjuntament.
Encara que els que ho han fet a la pràctica somriuran amargament aquí. Per què? Sí, perquè és probable que hi hagi dues situacions més. O tres. O quatre. El primer, que gairebé segur que es produirà, és que els tipus numèrics tindran un aspecte diferent en diferents fitxers JSON. Per exemple, {intField: 1} i {intField: 1.1}. Si aquests camps es troben en una partició, la combinació d'esquemes ho llegirà tot correctament, donant lloc al tipus més precís. Però si hi ha diferents, llavors un tindrà intField: int, i l'altre tindrà intField: double.

Hi ha la següent bandera per gestionar aquesta situació:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Ara tenim una carpeta on hi ha particions que es poden llegir en un únic dataframe i un parquet vàlid de tot l'aparador. Sí? No.

Hem de recordar que vam registrar la taula a Hive. Hive no distingeix entre majúscules i minúscules als noms de camp, mentre que el parquet distingeix entre majúscules i minúscules. Per tant, les particions amb esquemes: field1: int i Field1: int són iguals per a Hive, però no per a Spark. No oblideu convertir els noms dels camps a minúscules.

Després d'això, sembla que tot va bé.

Tanmateix, no tot és tan senzill. Hi ha un segon problema, també conegut. Com que cada partició nova es desa per separat, la carpeta de particions contindrà fitxers de servei Spark, per exemple, el senyalador d'èxit de l'operació _SUCCESS. Això donarà lloc a un error en intentar parquet. Per evitar-ho, heu de configurar la configuració per evitar que Spark afegeixi fitxers de servei a la carpeta:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Sembla que ara cada dia s'afegeix una nova partició de parquet a la carpeta d'aparador de destinació, on es troben les dades analitzades del dia. Hem tingut cura per endavant que no hi hagués particions amb un conflicte de tipus de dades.

Però tenim un tercer problema. Ara l'esquema general no es coneix, a més, la taula a Hive té un esquema incorrecte, ja que cada nova partició probablement introduïa una distorsió a l'esquema.

Heu de tornar a registrar la taula. Això es pot fer simplement: tornar a llegir el parquet de l'aparador, agafar l'esquema i crear un DDL basat en ell, amb el qual tornar a registrar la carpeta a Hive com a taula externa, actualitzant l'esquema de l'aparador de destinació.

Tenim un quart problema. Quan vam registrar la taula per primera vegada, vam confiar en Spark. Ara ho fem nosaltres mateixos, i hem de recordar que els camps de parquet poden començar amb caràcters que no estan permesos per Hive. Per exemple, Spark llança línies que no ha pogut analitzar al camp "corrupt_record". Aquest camp no es pot registrar a Hive sense escapar.

Sabent això, obtenim l'esquema:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Codi ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("matriu<`", "matriu<") fa DDL segur, és a dir, en lloc de:

create table tname (_field1 string, 1field string)

Amb noms de camp com "_field1, 1field", es crea un DDL segur on els noms dels camps s'escapa: creeu la taula `tname` (cadena `_camp1`, cadena `1camp`).

Sorgeix la pregunta: com obtenir correctament un marc de dades amb un esquema complet (en codi pf)? Com aconseguir aquest pf? Aquest és el cinquè problema. Rellegiu l'esquema de totes les particions de la carpeta amb fitxers de parquet de l'aparador de destinació? Aquest mètode és el més segur, però difícil.

L'esquema ja és a Hive. Podeu obtenir un esquema nou combinant l'esquema de tota la taula i la nova partició. Per tant, cal agafar l'esquema de la taula de Hive i combinar-lo amb l'esquema de la nova partició. Això es pot fer llegint les metadades de prova de Hive, desant-les en una carpeta temporal i utilitzant Spark per llegir les dues particions alhora.

De fet, hi ha tot el que necessiteu: l'esquema de taula original a Hive i la nova partició. També tenim dades. Només queda obtenir un nou esquema que combini l'esquema d'aparador i nous camps de la partició creada:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

A continuació, creem el DDL de registre de taula, com en el fragment anterior.
Si tota la cadena funciona correctament, és a dir, hi va haver una càrrega d'inicialització i la taula es va crear correctament a Hive, obtenim un esquema de taula actualitzat.

I l'últim problema és que no podeu afegir només una partició a una taula Hive, perquè es trencarà. Heu de forçar Hive a arreglar la seva estructura de partició:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

La senzilla tasca de llegir JSON i crear un aparador basat en ell comporta la superació d'una sèrie de dificultats implícites, solucions per a les quals cal buscar per separat. I encara que aquestes solucions són senzilles, es necessita molt de temps per trobar-les.

Per implementar la construcció de l'aparador, vaig haver de:

  • Afegiu particions a l'aparador, desfer-vos dels fitxers de servei
  • Tracteu els camps buits de les dades font que Spark ha escrit
  • Emet tipus simples a una cadena
  • Converteix els noms dels camps en minúscules
  • Càrrega de dades i registre de taula independents a Hive (generació DDL)
  • No us oblideu d'escapar els noms de camp que puguin ser incompatibles amb Hive
  • Obteniu informació sobre com actualitzar el registre de la taula a Hive

En resum, observem que la decisió de construir aparadors està plena de molts esculls. Per tant, en cas de dificultats en la implementació, és millor contactar amb un soci experimentat amb experiència d'èxit.

Gràcies per llegir aquest article, esperem que la informació us sigui útil.

Font: www.habr.com

Afegeix comentari