Schéma SparkEvolution en pratique

Chers lecteurs, bonne journée !

Dans cet article, le consultant leader du secteur d'activité Big Data Solutions de Neoflex décrit en détail les options de construction de vitrines à structure variable à l'aide d'Apache Spark.

Dans le cadre d'un projet d'analyse de données, la tâche de construire des vitrines basées sur des données peu structurées se pose souvent.

Il s'agit généralement de journaux ou de réponses de divers systèmes, enregistrés au format JSON ou XML. Les données sont téléchargées sur Hadoop, puis vous devez créer une vitrine à partir d'elles. Nous pouvons organiser l'accès à la vitrine créée, par exemple, via Impala.

Dans ce cas, le schéma de la devanture cible n'est pas connu a priori. De plus, le schéma ne peut pas non plus être établi à l'avance, car il dépend des données, et nous avons affaire à ces données très peu structurées.

Par exemple, aujourd'hui, la réponse suivante est enregistrée :

{source: "app1", error_code: ""}

et demain du même système vient la réponse suivante :

{source: "app1", error_code: "error", description: "Network error"}

En conséquence, un champ supplémentaire doit être ajouté à la vitrine - description, et personne ne sait s'il viendra ou non.

La tâche de créer une vitrine sur de telles données est assez standard, et Spark dispose d'un certain nombre d'outils pour cela. Pour l'analyse des données source, il existe une prise en charge de JSON et XML, et pour un schéma jusque-là inconnu, la prise en charge de schemaEvolution est fournie.

À première vue, la solution semble simple. Vous devez prendre un dossier avec JSON et le lire dans une trame de données. Spark créera un schéma, transformera les données imbriquées en structures. De plus, tout doit être enregistré dans le parquet, qui est également pris en charge dans Impala, en enregistrant la vitrine dans le metastore Hive.

Tout semble simple.

Cependant, les courts exemples de la documentation ne permettent pas de savoir quoi faire avec un certain nombre de problèmes dans la pratique.

La documentation décrit une approche non pas pour créer une vitrine, mais pour lire JSON ou XML dans une trame de données.

À savoir, il montre simplement comment lire et analyser JSON :

df = spark.read.json(path...)

Cela suffit pour rendre les données disponibles pour Spark.

En pratique, le script est beaucoup plus compliqué que de simplement lire des fichiers JSON à partir d'un dossier et de créer une trame de données. La situation ressemble à ceci: il existe déjà une certaine vitrine, de nouvelles données arrivent chaque jour, elles doivent être ajoutées à la vitrine, sans oublier que le schéma peut différer.

Le schéma habituel de construction d'une vitrine est le suivant:

Étape 1. Les données sont chargées dans Hadoop avec un rechargement quotidien ultérieur et ajoutées à une nouvelle partition. Il s'avère un dossier avec des données initiales partitionnées par jour.

Étape 2. Lors du chargement initial, ce dossier est lu et analysé par Spark. La trame de données résultante est enregistrée dans un format analysable, par exemple dans parquet, qui peut ensuite être importé dans Impala. Cela crée une vitrine cible avec toutes les données accumulées jusqu'à présent.

Étape 3. Un téléchargement est créé qui mettra à jour la vitrine tous les jours.
Il y a une question de chargement incrémental, la nécessité de cloisonner la vitrine, et la question du maintien du schéma général de la vitrine.

Prenons un exemple. Supposons que la première étape de création d'un référentiel a été implémentée et que les fichiers JSON sont téléchargés dans un dossier.

Créer une trame de données à partir d'eux, puis l'enregistrer en tant que vitrine, n'est pas un problème. C'est la toute première étape qui peut être facilement trouvée dans la documentation de Spark :

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Tout semble aller bien.

Nous lisons et analysons JSON, puis nous enregistrons la trame de données en tant que parquet, en l'enregistrant dans Hive de la manière la plus pratique :

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Nous obtenons une fenêtre.

Mais, le lendemain, de nouvelles données de la source ont été ajoutées. Nous avons un dossier avec JSON, et une vitrine créée à partir de ce dossier. Après avoir chargé le prochain lot de données à partir de la source, il manque au magasin de données l'équivalent d'une journée de données.

La solution logique serait de partitionner la vitrine par jour, ce qui permettra d'ajouter une nouvelle partition chaque jour suivant. Le mécanisme pour cela est également bien connu, Spark vous permet d'écrire des partitions séparément.

Tout d'abord, nous effectuons un chargement initial, en sauvegardant les données comme décrit ci-dessus, en ajoutant uniquement le partitionnement. Cette action est appelée initialisation de la vitrine et n'est effectuée qu'une seule fois :

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Le lendemain, nous chargeons uniquement une nouvelle partition :

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Il ne reste plus qu'à se réinscrire dans Hive pour mettre à jour le schéma.
Cependant, c'est là que les problèmes surgissent.

Premier problème. Tôt ou tard, le parquet résultant sera illisible. Cela est dû à la façon dont parquet et JSON traitent différemment les champs vides.

Considérons une situation typique. Par exemple, hier JSON arrive :

День 1: {"a": {"b": 1}},

et aujourd'hui le même JSON ressemble à ceci :

День 2: {"a": null}

Disons que nous avons deux partitions différentes, chacune avec une ligne.
Lorsque nous lirons l'intégralité des données source, Spark pourra déterminer le type, et comprendra que "a" est un champ de type "structure", avec un champ imbriqué "b" de type INT. Mais, si chaque partition a été enregistrée séparément, nous obtenons alors un parquet avec des schémas de partition incompatibles :

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Cette situation est bien connue, c'est pourquoi une option a été spécialement ajoutée - lors de l'analyse des données source, supprimez les champs vides :

df = spark.read.json("...", dropFieldIfAllNull=True)

Dans ce cas, le parquet sera composé de cloisons pouvant être lues ensemble.
Bien que ceux qui l'ont fait dans la pratique souriront amèrement ici. Pourquoi? Oui, car il y aura probablement deux autres situations. Ou trois. Ou quatre. Le premier, qui se produira presque certainement, est que les types numériques auront un aspect différent dans différents fichiers JSON. Par exemple, {intField : 1} et {intField : 1.1}. Si de tels champs sont trouvés dans une partition, la fusion de schéma lira tout correctement, conduisant au type le plus précis. Mais si dans différents, alors l'un aura intField: int, et l'autre aura intField: double.

Il existe le drapeau suivant pour gérer cette situation :

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Nous avons maintenant un dossier où il y a des partitions qui peuvent être lues dans une seule trame de données et un parquet valide de toute la vitrine. Oui? Non.

Nous devons nous rappeler que nous avons enregistré la table dans Hive. Hive n'est pas sensible à la casse dans les noms de champs, tandis que parquet est sensible à la casse. Par conséquent, les partitions avec les schémas : field1 : int et Field1 : int sont les mêmes pour Hive, mais pas pour Spark. N'oubliez pas de convertir les noms de champs en minuscules.

Après cela, tout semble aller bien.

Cependant, tout n'est pas si simple. Il y a un deuxième problème, également bien connu. Étant donné que chaque nouvelle partition est enregistrée séparément, le dossier de partition contiendra les fichiers de service Spark, par exemple, l'indicateur de réussite de l'opération _SUCCESS. Cela entraînera une erreur lors de la tentative de parquet. Pour éviter cela, vous devez configurer la configuration pour empêcher Spark d'ajouter des fichiers de service au dossier :

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Il semble que maintenant, chaque jour, une nouvelle partition de parquet est ajoutée au dossier de vitrine cible, où se trouvent les données analysées pour la journée. Nous avons pris soin à l'avance qu'il n'y ait pas de partitions avec un conflit de type de données.

Mais, nous avons un troisième problème. Maintenant, le schéma général n'est pas connu, de plus, la table dans Hive a un schéma incorrect, car chaque nouvelle partition a très probablement introduit une distorsion dans le schéma.

Vous devez réenregistrer la table. Cela peut être fait simplement : relisez le parquet de la vitrine, prenez le schéma et créez un DDL basé sur celui-ci, avec lequel réenregistrer le dossier dans Hive en tant que table externe, en mettant à jour le schéma de la vitrine cible.

Nous avons un quatrième problème. Lorsque nous avons enregistré la table pour la première fois, nous nous sommes appuyés sur Spark. Maintenant, nous le faisons nous-mêmes et nous devons nous rappeler que les champs de parquet peuvent commencer par des caractères qui ne sont pas autorisés pour Hive. Par exemple, Spark rejette des lignes qu'il n'a pas pu analyser dans le champ "corrupt_record". Un tel champ ne peut pas être enregistré dans Hive sans être échappé.

Sachant cela, on obtient le schéma :

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Code ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("tableau<`", "tableau<") rend le DDL sûr, c'est-à-dire au lieu de :

create table tname (_field1 string, 1field string)

Avec des noms de champ comme "_field1, 1field", un DDL sûr est créé là où les noms de champ sont échappés : créez la table `tname` (chaîne `_field1`, chaîne `1field`).

La question se pose : comment obtenir correctement une dataframe avec un schéma complet (en code pf) ? Comment obtenir ce pf ? C'est le cinquième problème. Relire le schéma de toutes les partitions du dossier avec les fichiers parquet de la vitrine cible ? Cette méthode est la plus sûre, mais difficile.

Le schéma est déjà dans Hive. Vous pouvez obtenir un nouveau schéma en combinant le schéma de la table entière et la nouvelle partition. Vous devez donc prendre le schéma de table de Hive et le combiner avec le schéma de la nouvelle partition. Cela peut être fait en lisant les métadonnées de test de Hive, en les enregistrant dans un dossier temporaire et en utilisant Spark pour lire les deux partitions à la fois.

En fait, il y a tout ce dont vous avez besoin : le schéma de table d'origine dans Hive et la nouvelle partition. Nous avons aussi des données. Il ne reste plus qu'à obtenir un nouveau schéma qui combine le schéma vitrine et les nouveaux champs de la partition créée :

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Ensuite, nous créons le DDL d'enregistrement de table, comme dans l'extrait précédent.
Si toute la chaîne fonctionne correctement, à savoir s'il y a eu une charge d'initialisation et que la table a été créée correctement dans Hive, nous obtenons un schéma de table mis à jour.

Et le dernier problème est que vous ne pouvez pas simplement ajouter une partition à une table Hive, car elle sera cassée. Vous devez forcer Hive à corriger sa structure de partition :

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

La simple tâche de lire JSON et de créer une vitrine basée sur celui-ci permet de surmonter un certain nombre de difficultés implicites, des solutions pour lesquelles vous devez rechercher séparément. Et bien que ces solutions soient simples, il faut beaucoup de temps pour les trouver.

Pour mettre en œuvre la construction de la vitrine, j'ai dû :

  • Ajoutez des partitions à la vitrine, en vous débarrassant des fichiers de service
  • Traiter les champs vides dans les données sources que Spark a tapées
  • Convertir des types simples en chaîne
  • Convertir les noms de champs en minuscules
  • Téléchargement de données et enregistrement de table séparés dans Hive (génération DDL)
  • N'oubliez pas d'échapper les noms de champs qui pourraient être incompatibles avec Hive
  • Découvrez comment mettre à jour l'enregistrement des tables dans Hive

En résumé, nous constatons que la décision de construire des vitrines est semée d'embûches. Dès lors, en cas de difficultés de mise en œuvre, mieux vaut s'adresser à un partenaire expérimenté disposant d'une expertise aboutie.

Merci d'avoir lu cet article, nous espérons que vous trouverez les informations utiles.

Source: habr.com

Ajouter un commentaire