Spark schemaEvolution en la práctica

Estimados lectores, ¡buen día!

En este artículo, el consultor líder del área de negocio Big Data Solutions de Neoflex describe en detalle las opciones para construir escaparates de estructura variable utilizando Apache Spark.

Como parte de un proyecto de análisis de datos, a menudo surge la tarea de construir escaparates basados ​​en datos poco estructurados.

Por lo general, estos son registros o respuestas de varios sistemas, guardados como JSON o XML. Los datos se cargan en Hadoop, luego debe crear una tienda a partir de ellos. Podemos organizar el acceso al escaparate creado, por ejemplo, a través de Impala.

En este caso, el esquema del escaparate de destino no se conoce de antemano. Además, el esquema tampoco se puede elaborar de antemano, ya que depende de los datos, y estamos tratando con estos datos muy vagamente estructurados.

Por ejemplo, hoy se registra la siguiente respuesta:

{source: "app1", error_code: ""}

y mañana desde el mismo sistema sale la siguiente respuesta:

{source: "app1", error_code: "error", description: "Network error"}

Como resultado, se debe agregar un campo más al escaparate: descripción, y nadie sabe si vendrá o no.

La tarea de crear un escaparate con dichos datos es bastante estándar y Spark tiene una serie de herramientas para esto. Para analizar los datos de origen, hay soporte para JSON y XML, y para un esquema previamente desconocido, se proporciona soporte para schemaEvolution.

A primera vista, la solución parece simple. Debe tomar una carpeta con JSON y leerla en un marco de datos. Spark creará un esquema, convertirá los datos anidados en estructuras. Además, todo debe guardarse en parquet, que también es compatible con Impala, registrando el escaparate en Hive metastore.

Todo parece ser simple.

Sin embargo, no queda claro a partir de los breves ejemplos de la documentación qué hacer con una serie de problemas en la práctica.

La documentación describe un enfoque para no crear un escaparate, sino para leer JSON o XML en un marco de datos.

Es decir, simplemente muestra cómo leer y analizar JSON:

df = spark.read.json(path...)

Esto es suficiente para que los datos estén disponibles para Spark.

En la práctica, el script es mucho más complicado que simplemente leer archivos JSON de una carpeta y crear un marco de datos. La situación se ve así: ya existe un escaparate determinado, todos los días ingresan nuevos datos, deben agregarse al escaparate, sin olvidar que el esquema puede diferir.

El esquema habitual para construir una vitrina es el siguiente:

Paso 1. Los datos se cargan en Hadoop con una recarga diaria posterior y se agregan a una nueva partición. Resulta una carpeta con datos iniciales particionados por día.

Paso 2. Durante la carga inicial, Spark lee y analiza esta carpeta. El marco de datos resultante se guarda en un formato analizable, por ejemplo, en parquet, que luego se puede importar a Impala. Esto crea un escaparate de destino con todos los datos que se han acumulado hasta este punto.

Paso 3. Se crea una descarga que actualizará el escaparate todos los días.
Hay una cuestión de carga incremental, la necesidad de dividir la vitrina y la cuestión de mantener el esquema general de la vitrina.

Tomemos un ejemplo. Digamos que se ha implementado el primer paso para crear un repositorio y los archivos JSON se cargan en una carpeta.

Crear un marco de datos a partir de ellos y luego guardarlo como escaparate no es un problema. Este es el primer paso que se puede encontrar fácilmente en la documentación de Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Todo parece estar bien,

Leemos y analizamos JSON, luego guardamos el marco de datos como un parquet, registrándolo en Hive de cualquier manera conveniente:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Obtenemos una ventana.

Pero, al día siguiente, se agregaron nuevos datos de la fuente. Tenemos una carpeta con JSON y un escaparate creado a partir de esta carpeta. Después de cargar el siguiente lote de datos desde el origen, al data mart le faltan los datos de un día.

La solución lógica sería particionar el escaparate por día, lo que permitirá agregar una nueva partición cada día siguiente. El mecanismo para esto también es bien conocido, Spark le permite escribir particiones por separado.

Primero, hacemos una carga inicial, guardando los datos como se describió anteriormente, agregando solo particiones. Esta acción se denomina inicialización del escaparate y se realiza solo una vez:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Al día siguiente, cargamos solo una nueva partición:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Todo lo que queda es volver a registrarse en Hive para actualizar el esquema.
Sin embargo, aquí es donde surgen los problemas.

Primer problema. Tarde o temprano, el parquet resultante será ilegible. Esto se debe a cómo parquet y JSON tratan los campos vacíos de manera diferente.

Consideremos una situación típica. Por ejemplo, ayer llega JSON:

День 1: {"a": {"b": 1}},

y hoy el mismo JSON se ve así:

День 2: {"a": null}

Digamos que tenemos dos particiones diferentes, cada una con una línea.
Cuando leamos todos los datos de origen, Spark podrá determinar el tipo y comprenderá que "a" es un campo de tipo "estructura", con un campo anidado "b" de tipo INT. Pero, si cada partición se guardó por separado, obtenemos un parquet con esquemas de partición incompatibles:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Esta situación es bien conocida, por lo que se ha agregado especialmente una opción: al analizar los datos de origen, eliminar los campos vacíos:

df = spark.read.json("...", dropFieldIfAllNull=True)

En este caso, el parquet constará de tabiques que se pueden leer juntos.
Aunque aquellos que han hecho esto en la práctica sonreirán amargamente aquí. ¿Por qué? Sí, porque es probable que haya dos situaciones más. O tres. O cuatro. La primera, que es casi seguro que ocurrirá, es que los tipos numéricos se verán diferentes en diferentes archivos JSON. Por ejemplo, {intField: 1} y {intField: 1.1}. Si dichos campos se encuentran en una partición, la combinación de esquemas leerá todo correctamente, lo que conducirá al tipo más preciso. Pero si en diferentes, uno tendrá intField: int y el otro tendrá intField: double.

Hay la siguiente bandera para manejar esta situación:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Ahora tenemos una carpeta donde hay particiones que se pueden leer en un solo marco de datos y un parquet válido de todo el escaparate. ¿Sí? No.

Debemos recordar que registramos la tabla en Hive. Hive no distingue entre mayúsculas y minúsculas en los nombres de campo, mientras que parquet distingue entre mayúsculas y minúsculas. Por lo tanto, las particiones con esquemas: field1: int y Field1: int son iguales para Hive, pero no para Spark. No olvide convertir los nombres de los campos a minúsculas.

Después de eso, todo parece estar bien.

Sin embargo, no todo es tan simple. Hay un segundo problema, también conocido. Dado que cada nueva partición se guarda por separado, la carpeta de partición contendrá archivos de servicio de Spark, por ejemplo, el indicador de éxito de la operación _SUCCESS. Esto dará como resultado un error al intentar entarimar. Para evitar esto, debe configurar la configuración para evitar que Spark agregue archivos de servicio a la carpeta:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Parece que ahora todos los días se agrega una nueva partición de parquet a la carpeta de exhibición de destino, donde se encuentran los datos analizados para el día. Cuidamos de antemano que no hubiera particiones con un conflicto de tipos de datos.

Pero, tenemos un tercer problema. Ahora el esquema general no se conoce, además, la tabla en Hive tiene un esquema incorrecto, ya que cada nueva partición probablemente introdujo una distorsión en el esquema.

Debe volver a registrar la tabla. Esto se puede hacer simplemente: lea nuevamente el parquet del escaparate, tome el esquema y cree un DDL basado en él, con el cual volver a registrar la carpeta en Hive como una tabla externa, actualizando el esquema del escaparate de destino.

Tenemos un cuarto problema. Cuando registramos la mesa por primera vez, confiamos en Spark. Ahora lo hacemos nosotros mismos, y debemos recordar que los campos de parquet pueden comenzar con caracteres que no están permitidos para Hive. Por ejemplo, Spark arroja líneas que no pudo analizar en el campo "corrupt_record". Dicho campo no se puede registrar en Hive sin escaparse.

Sabiendo esto, obtenemos el esquema:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

código ("_registro_corrupto", "`_registro_corrupto`") + " " + f[1].reemplazar(":", "`:").reemplazar("<", "<`").reemplazar(",", ",`").replace("matriz<`", "matriz<") hace DDL seguro, es decir, en lugar de:

create table tname (_field1 string, 1field string)

Con nombres de campo como "_field1, 1field", se crea DDL seguro donde los nombres de campo se escapan: cree la tabla `tname` (cadena `_field1`, cadena `1field`).

Surge la pregunta: ¿cómo obtener correctamente un marco de datos con un esquema completo (en código pf)? ¿Cómo obtener este pf? Este es el quinto problema. ¿Volver a leer el esquema de todas las particiones de la carpeta con archivos de parquet del escaparate de destino? Este método es el más seguro, pero difícil.

El esquema ya está en Hive. Puede obtener un nuevo esquema combinando el esquema de toda la tabla y la nueva partición. Por lo tanto, debe tomar el esquema de la tabla de Hive y combinarlo con el esquema de la nueva partición. Esto se puede hacer leyendo los metadatos de prueba de Hive, guardándolos en una carpeta temporal y usando Spark para leer ambas particiones a la vez.

De hecho, hay todo lo que necesita: el esquema de tabla original en Hive y la nueva partición. También tenemos datos. Solo queda obtener un nuevo esquema que combine el esquema del escaparate y los nuevos campos de la partición creada:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

A continuación, creamos el DDL de registro de la tabla, como en el fragmento anterior.
Si toda la cadena funciona correctamente, es decir, hubo una carga de inicialización y la tabla se creó correctamente en Hive, entonces obtenemos un esquema de tabla actualizado.

Y el último problema es que no puede simplemente agregar una partición a una tabla de Hive, porque se romperá. Debe obligar a Hive a arreglar su estructura de partición:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

La simple tarea de leer JSON y crear un escaparate basado en él da como resultado la superación de una serie de dificultades implícitas, cuyas soluciones hay que buscar por separado. Y aunque estas soluciones son simples, lleva mucho tiempo encontrarlas.

Para implementar la construcción de la vitrina, tuve que:

  • Agregue particiones al escaparate, deshaciéndose de los archivos de servicio
  • Tratar con campos vacíos en los datos de origen que Spark ha escrito
  • Transmitir tipos simples a una cadena
  • Convertir nombres de campo a minúsculas
  • Carga de datos y registro de tablas separados en Hive (generación DDL)
  • No olvide escapar los nombres de campo que podrían ser incompatibles con Hive
  • Aprenda a actualizar el registro de tablas en Hive

En resumen, notamos que la decisión de construir escaparates está plagada de muchas trampas. Por lo tanto, en caso de dificultades en la implementación, es mejor contactar a un socio experimentado con experiencia exitosa.

Gracias por leer este artículo, esperamos que encuentre útil la información.

Fuente: habr.com

Añadir un comentario