Spark schemaA evolución na práctica

Queridos lectores, bo día!

Neste artigo, o consultor líder da área de negocio de Solucións de Big Data de Neoflex describe en detalle as opcións para construír vitrinas de estrutura variable usando Apache Spark.

Como parte dun proxecto de análise de datos, a miúdo xorde a tarefa de construír escaparates baseados en datos pouco estruturados.

Normalmente son rexistros, ou respostas de varios sistemas, gardadas como JSON ou XML. Os datos cárganse en Hadoop, entón cómpre crear un escaparate a partir deles. Podemos organizar o acceso ao escaparate creado, por exemplo, a través de Impala.

Neste caso, o esquema do escaparate de destino non se coñece de antemán. Ademais, o esquema tampouco se pode elaborar con antelación, xa que depende dos datos, e estamos a tratar con estes datos moi pouco estruturados.

Por exemplo, hoxe rexístrase a seguinte resposta:

{source: "app1", error_code: ""}

e mañá do mesmo sistema sae a seguinte resposta:

{source: "app1", error_code: "error", description: "Network error"}

Como resultado, debería engadirse un campo máis ao escaparate: descrición, e ninguén sabe se chegará ou non.

A tarefa de crear unha tenda con tales datos é bastante estándar e Spark ten varias ferramentas para iso. Para analizar os datos de orixe, hai soporte para JSON e XML, e para un esquema previamente descoñecido, ofrécese soporte para schemaEvolution.

A primeira vista, a solución parece sinxela. Debe coller un cartafol con JSON e lelo nun marco de datos. Spark creará un esquema, converterá os datos anidados en estruturas. Ademais, hai que gardar todo en parquet, que tamén se admite en Impala, rexistrando o escaparate na metatenda Hive.

Todo parece ser sinxelo.

Non obstante, a partir dos pequenos exemplos da documentación non queda claro que facer con unha serie de problemas na práctica.

A documentación describe un enfoque non para crear un escaparate, senón para ler JSON ou XML nun marco de datos.

É dicir, simplemente mostra como ler e analizar JSON:

df = spark.read.json(path...)

Isto é suficiente para que os datos estean dispoñibles para Spark.

Na práctica, o script é moito máis complicado que só ler ficheiros JSON dun cartafol e crear un marco de datos. A situación é así: xa hai un escaparate determinado, cada día chegan novos datos, hai que engadirlos ao escaparate, sen esquecer que o esquema pode diferir.

O esquema habitual para construír un escaparate é o seguinte:

Paso 1. Os datos cárganse en Hadoop coa posterior recarga diaria e engádense a unha nova partición. Resulta un cartafol cos datos iniciais divididos por día.

Paso 2. Durante a carga inicial, este cartafol é lido e analizado por Spark. O marco de datos resultante gárdase nun formato analizable, por exemplo, en parquet, que logo se pode importar a Impala. Isto crea un escaparate de destino con todos os datos acumulados ata este momento.

Paso 3. Créase unha descarga que actualizará o escaparate todos os días.
Hai unha cuestión de carga incremental, a necesidade de particionar o escaparate e a cuestión de manter o esquema xeral do escaparate.

Poñamos un exemplo. Digamos que se implementou o primeiro paso para construír un repositorio e que os ficheiros JSON cárganse nun cartafol.

Crear un marco de datos a partir deles e gardalo como escaparate non é un problema. Este é o primeiro paso que se pode atopar facilmente na documentación de Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Todo parece estar ben.

Lemos e analizamos JSON, despois gardamos o marco de datos como un parquet, rexistrándoo en Hive de calquera forma conveniente:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Temos unha fiestra.

Pero, ao día seguinte, engadíronse novos datos da fonte. Temos un cartafol con JSON e un escaparate creado a partir deste cartafol. Despois de cargar o seguinte lote de datos da fonte, ao data mart faltan os datos dun día.

A solución lóxica sería particionar o escaparate por día, o que permitirá engadir unha nova partición cada día seguinte. O mecanismo para iso tamén é ben coñecido, Spark permítelle escribir particións por separado.

En primeiro lugar, facemos unha carga inicial, gardando os datos como se describe anteriormente, engadindo só particións. Esta acción chámase inicialización de escaparate e realízase só unha vez:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Ao día seguinte, cargamos só unha nova partición:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Todo o que queda é rexistrarse de novo en Hive para actualizar o esquema.
Non obstante, aquí é onde xorden os problemas.

Primeiro problema. Tarde ou cedo, o parqué resultante será ilexible. Isto débese a que o parquet e o JSON abordan os campos baleiros de forma diferente.

Consideremos unha situación típica. Por exemplo, onte chegou JSON:

День 1: {"a": {"b": 1}},

e hoxe o mesmo JSON ten o seguinte aspecto:

День 2: {"a": null}

Digamos que temos dúas particións diferentes, cada unha cunha liña.
Cando leamos todos os datos de orixe, Spark poderá determinar o tipo, e entenderá que "a" é un campo de tipo "estructura", cun campo aniñado "b" de tipo INT. Pero, se cada partición se gardou por separado, obtemos un parquet con esquemas de partición incompatibles:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Esta situación é coñecida, polo que se engadiu unha opción especialmente: ao analizar os datos de orixe, elimine os campos baleiros:

df = spark.read.json("...", dropFieldIfAllNull=True)

Neste caso, o parqué estará formado por tabiques que se poden ler en conxunto.
Aínda que os que fixeron isto na práctica sorrirán amargamente aquí. Por que? Si, porque é probable que haxa dúas situacións máis. Ou tres. Ou catro. O primeiro, que case con toda seguridade ocorrerá, é que os tipos numéricos terán un aspecto diferente en diferentes ficheiros JSON. Por exemplo, {intField: 1} e {intField: 1.1}. Se tales campos se atopan nunha partición, a combinación do esquema lerá todo correctamente, o que levará ao tipo máis preciso. Pero se hai diferentes, entón un terá intField: int, e o outro terá intField: double.

Existe a seguinte bandeira para xestionar esta situación:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Agora temos un cartafol onde hai particións que se poden ler nun único dataframe e un parquet válido de toda a vitrina. Si? Non.

Debemos lembrar que rexistramos a mesa en Hive. Hive non distingue entre maiúsculas e minúsculas nos nomes de campo, mentres que o parquet distingue entre maiúsculas e minúsculas. Polo tanto, as particións con esquemas: field1: int e Field1: int son iguais para Hive, pero non para Spark. Non esquezas converter os nomes dos campos en minúsculas.

Despois diso, todo parece estar ben.

Non obstante, non todo é tan sinxelo. Hai un segundo problema, tamén coñecido. Dado que cada nova partición gárdase por separado, o cartafol da partición conterá ficheiros de servizo Spark, por exemplo, a marca de éxito da operación _SUCCESS. Isto dará lugar a un erro ao tentar facer parquet. Para evitar isto, cómpre configurar a configuración para evitar que Spark engada ficheiros de servizo ao cartafol:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Parece que agora cada día engádese unha nova partición de parquet ao cartafol do escaparate de destino, onde se atopan os datos analizados para o día. Coidamos de antemán que non houbese particións cun conflito de tipo de datos.

Pero, temos un terceiro problema. Agora non se coñece o esquema xeral, ademais, a táboa de Hive ten un esquema incorrecto, xa que cada nova partición probablemente introduciu unha distorsión no esquema.

Debes volver rexistrar a táboa. Isto pódese facer de forma sinxela: ler de novo o parquet do escaparate, coller o esquema e crear un DDL baseado nel, co que volver rexistrar o cartafol en Hive como unha táboa externa, actualizando o esquema do escaparate de destino.

Temos un cuarto problema. Cando rexistramos a táboa por primeira vez, confiamos en Spark. Agora facémolo nós mesmos e hai que lembrar que os campos de parquet poden comezar con caracteres que non están permitidos para Hive. Por exemplo, Spark lanza liñas que non puido analizar no campo "corrupt_record". Tal campo non se pode rexistrar en Hive sen escapar.

Sabendo isto, obtemos o esquema:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Código ("_rexistro_corrupto", "`_rexistro_corrupto`") + " " + f[1].substituír(":", "`:").substituír("<", "<`").substituír(",", ",`").replace("matriz<`", "matriz<") fai DDL seguro, é dicir, en lugar de:

create table tname (_field1 string, 1field string)

Con nomes de campo como "_field1, 1field", realízase un DDL seguro onde se escapan os nomes dos campos: cree a táboa `tname` (cadea `_field1`, cadea `1field`).

Xorde a pregunta: como obter correctamente un marco de datos cun esquema completo (en código pf)? Como conseguir este pf? Este é o quinto problema. Volve ler o esquema de todas as particións do cartafol con ficheiros de parquet do escaparate de destino? Este método é o máis seguro, pero difícil.

O esquema xa está en Hive. Podes obter un novo esquema combinando o esquema de toda a táboa e a nova partición. Polo tanto, cómpre coller o esquema da táboa de Hive e combinalo co esquema da nova partición. Isto pódese facer lendo os metadatos de proba de Hive, gardándoos nun cartafol temporal e usando Spark para ler as dúas particións á vez.

De feito, hai todo o que necesitas: o esquema de táboa orixinal en Hive e a nova partición. Tamén temos datos. Só queda obter un novo esquema que combine o esquema de escaparate e novos campos da partición creada:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

A continuación, creamos o DDL de rexistro da táboa, como no fragmento anterior.
Se toda a cadea funciona correctamente, é dicir, houbo unha carga de inicialización e a táboa foi creada correctamente en Hive, obtemos un esquema de táboa actualizado.

E o último problema é que non pode simplemente engadir unha partición a unha táboa Hive, porque estará rota. Debes forzar a Hive a corrixir a súa estrutura de partición:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

A simple tarefa de ler JSON e crear un escaparate baseado nela ten como resultado a superación dunha serie de dificultades implícitas, solucións para as que hai que buscar por separado. E aínda que estas solucións son sinxelas, leva moito tempo atopalas.

Para implementar a construción do escaparate, tiven que:

  • Engade particións ao escaparate, desfacendo os ficheiros de servizo
  • Tratar os campos baleiros dos datos de orixe que escribiu Spark
  • Transmite tipos sinxelos a unha cadea
  • Converte os nomes dos campos en minúsculas
  • Carga de datos e rexistro de táboas separados en Hive (xeración DDL)
  • Non esquezas escapar dos nomes de campo que poidan ser incompatibles con Hive
  • Aprende a actualizar o rexistro da táboa en Hive

En resumo, observamos que a decisión de construír escaparates está chea de moitas trampas. Polo tanto, en caso de dificultades na implementación, é mellor contactar cun socio experimentado con experiencia exitosa.

Grazas por ler este artigo, esperamos que che resulte útil a información.

Fonte: www.habr.com

Engadir un comentario