Spark schemaEvolution στην πράξη

Αγαπητοί αναγνώστες, καλημέρα!

Σε αυτό το άρθρο, ο κορυφαίος σύμβουλος της επιχειρηματικής περιοχής Big Data Solutions της Neoflex περιγράφει λεπτομερώς τις επιλογές για την κατασκευή βιτρινών μεταβλητών δομών χρησιμοποιώντας το Apache Spark.

Ως μέρος ενός έργου ανάλυσης δεδομένων, συχνά προκύπτει το έργο της κατασκευής βιτρινών με βάση τα χαλαρά δομημένα δεδομένα.

Συνήθως πρόκειται για αρχεία καταγραφής ή απαντήσεις από διάφορα συστήματα, που αποθηκεύονται ως JSON ή XML. Τα δεδομένα μεταφορτώνονται στο Hadoop και, στη συνέχεια, πρέπει να δημιουργήσετε μια βιτρίνα από αυτά. Μπορούμε να οργανώσουμε την πρόσβαση στη δημιουργημένη βιτρίνα, για παράδειγμα, μέσω της Impala.

Σε αυτήν την περίπτωση, το σχήμα της βιτρίνας-στόχου δεν είναι γνωστό εκ των προτέρων. Επιπλέον, το σχήμα δεν μπορεί επίσης να καταρτιστεί εκ των προτέρων, καθώς εξαρτάται από τα δεδομένα, και έχουμε να κάνουμε με αυτά τα πολύ χαλαρά δομημένα δεδομένα.

Για παράδειγμα, σήμερα καταγράφεται η ακόλουθη απάντηση:

{source: "app1", error_code: ""}

και αύριο από το ίδιο σύστημα έρχεται η εξής απάντηση:

{source: "app1", error_code: "error", description: "Network error"}

Ως αποτέλεσμα, θα πρέπει να προστεθεί ένα ακόμη πεδίο στη βιτρίνα - περιγραφή, και κανείς δεν ξέρει αν θα έρθει ή όχι.

Το έργο της δημιουργίας μιας βιτρίνας σε τέτοια δεδομένα είναι αρκετά τυπικό και το Spark έχει μια σειρά από εργαλεία για αυτό. Για την ανάλυση των δεδομένων προέλευσης, υπάρχει υποστήριξη τόσο για JSON όσο και για XML και για ένα προηγουμένως άγνωστο σχήμα, παρέχεται υποστήριξη για το schemaEvolution.

Με την πρώτη ματιά, η λύση φαίνεται απλή. Πρέπει να πάρετε έναν φάκελο με JSON και να τον διαβάσετε σε ένα πλαίσιο δεδομένων. Το Spark θα δημιουργήσει ένα σχήμα, θα μετατρέψει τα ένθετα δεδομένα σε δομές. Επιπλέον, όλα πρέπει να αποθηκευτούν σε παρκέ, το οποίο υποστηρίζεται επίσης στο Impala, καταχωρώντας τη βιτρίνα στο Hive metastore.

Όλα δείχνουν να είναι απλά.

Ωστόσο, δεν είναι σαφές από τα σύντομα παραδείγματα στην τεκμηρίωση τι πρέπει να κάνετε με μια σειρά προβλημάτων στην πράξη.

Η τεκμηρίωση περιγράφει μια προσέγγιση όχι για τη δημιουργία βιτρίνας, αλλά για την ανάγνωση JSON ή XML σε ένα πλαίσιο δεδομένων.

Δηλαδή, δείχνει απλώς πώς να διαβάζετε και να αναλύετε το JSON:

df = spark.read.json(path...)

Αυτό είναι αρκετό για να διατεθούν τα δεδομένα στο Spark.

Στην πράξη, το σενάριο είναι πολύ πιο περίπλοκο από την απλή ανάγνωση αρχείων JSON από έναν φάκελο και τη δημιουργία ενός πλαισίου δεδομένων. Η κατάσταση μοιάζει με αυτό: υπάρχει ήδη μια συγκεκριμένη βιτρίνα, νέα δεδομένα έρχονται καθημερινά, πρέπει να προστεθούν στη βιτρίνα, χωρίς να ξεχνάμε ότι το σχέδιο μπορεί να διαφέρει.

Το συνηθισμένο σχέδιο για την κατασκευή μιας βιτρίνας έχει ως εξής:

Βήμα 1. Τα δεδομένα φορτώνονται στο Hadoop με επακόλουθη καθημερινή επαναφόρτωση και προστίθενται σε ένα νέο διαμέρισμα. Αποδεικνύεται ένας φάκελος με αρχικά δεδομένα χωρισμένα ανά ημέρα.

Βήμα 2. Κατά την αρχική φόρτωση, αυτός ο φάκελος διαβάζεται και αναλύεται από το Spark. Το προκύπτον πλαίσιο δεδομένων αποθηκεύεται σε μορφή αναλύσιμη, για παράδειγμα, σε παρκέ, το οποίο μπορεί στη συνέχεια να εισαχθεί στο Impala. Αυτό δημιουργεί μια βιτρίνα στόχου με όλα τα δεδομένα που έχουν συσσωρευτεί μέχρι αυτό το σημείο.

Βήμα 3. Δημιουργείται μια λήψη που θα ενημερώνει τη βιτρίνα κάθε μέρα.
Υπάρχει θέμα σταδιακής φόρτωσης, της ανάγκης διαχωρισμού της βιτρίνας και της διατήρησης του γενικού σχήματος της βιτρίνας.

Ας πάρουμε ένα παράδειγμα. Ας πούμε ότι το πρώτο βήμα της δημιουργίας ενός αποθετηρίου έχει υλοποιηθεί και τα αρχεία JSON μεταφορτώνονται σε έναν φάκελο.

Η δημιουργία ενός πλαισίου δεδομένων από αυτά και, στη συνέχεια, η αποθήκευση του ως βιτρίνα, δεν είναι πρόβλημα. Αυτό είναι το πρώτο βήμα που μπορεί να βρεθεί εύκολα στην τεκμηρίωση του Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Όλα φαίνονται καλά.

Διαβάζουμε και αναλύουμε το JSON και, στη συνέχεια, αποθηκεύουμε το πλαίσιο δεδομένων ως παρκέ, καταχωρώντας το στο Hive με οποιονδήποτε βολικό τρόπο:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Παίρνουμε ένα παράθυρο.

Όμως, την επόμενη μέρα, προστέθηκαν νέα δεδομένα από την πηγή. Έχουμε έναν φάκελο με JSON και μια βιτρίνα που δημιουργήθηκε από αυτόν τον φάκελο. Μετά τη φόρτωση της επόμενης παρτίδας δεδομένων από την πηγή, λείπουν δεδομένα αξίας μιας ημέρας από τη μάρκα δεδομένων.

Η λογική λύση θα ήταν η κατάτμηση της βιτρίνας την ημέρα, κάτι που θα επιτρέψει την προσθήκη ενός νέου διαμερίσματος κάθε επόμενη μέρα. Ο μηχανισμός για αυτό είναι επίσης γνωστός, το Spark σας επιτρέπει να γράφετε χωριστά διαμερίσματα.

Αρχικά, κάνουμε μια αρχική φόρτωση, αποθηκεύοντας τα δεδομένα όπως περιγράφεται παραπάνω, προσθέτοντας μόνο κατάτμηση. Αυτή η ενέργεια ονομάζεται προετοιμασία βιτρίνας και γίνεται μόνο μία φορά:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Την επόμενη μέρα, φορτώνουμε μόνο ένα νέο διαμέρισμα:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Το μόνο που μένει είναι να εγγραφείτε ξανά στο Hive για να ενημερώσετε το σχήμα.
Ωστόσο, εδώ δημιουργούνται προβλήματα.

Πρώτο πρόβλημα. Αργά ή γρήγορα, το παρκέ που θα προκύψει θα είναι δυσανάγνωστο. Αυτό οφείλεται στο πώς το παρκέ και το JSON αντιμετωπίζουν διαφορετικά τα κενά πεδία.

Ας εξετάσουμε μια τυπική κατάσταση. Για παράδειγμα, χθες φτάνει το JSON:

День 1: {"a": {"b": 1}},

και σήμερα το ίδιο JSON μοιάζει με αυτό:

День 2: {"a": null}

Ας υποθέσουμε ότι έχουμε δύο διαφορετικά διαμερίσματα, το καθένα με μία γραμμή.
Όταν διαβάσουμε όλα τα δεδομένα πηγής, το Spark θα μπορεί να προσδιορίσει τον τύπο και θα καταλάβει ότι το "a" είναι ένα πεδίο τύπου "structure", με ένα ένθετο πεδίο "b" τύπου INT. Αλλά, εάν κάθε διαμέρισμα αποθηκεύτηκε ξεχωριστά, τότε παίρνουμε ένα παρκέ με ασύμβατα σχήματα διαμερισμάτων:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Αυτή η κατάσταση είναι γνωστή, επομένως έχει προστεθεί μια επιλογή ειδικά - κατά την ανάλυση των δεδομένων προέλευσης, αφαιρέστε τα κενά πεδία:

df = spark.read.json("...", dropFieldIfAllNull=True)

Σε αυτή την περίπτωση, το παρκέ θα αποτελείται από χωρίσματα που μπορούν να διαβαστούν μαζί.
Αν και όσοι το έχουν κάνει στην πράξη θα χαμογελάσουν πικρά εδώ. Γιατί; Ναι, γιατί είναι πιθανό να υπάρξουν άλλες δύο καταστάσεις. Ή τρεις. Ή τέσσερις. Το πρώτο, το οποίο σχεδόν σίγουρα θα συμβεί, είναι ότι οι αριθμητικοί τύποι θα φαίνονται διαφορετικοί σε διαφορετικά αρχεία JSON. Για παράδειγμα, {intField: 1} και {intField: 1.1}. Εάν τέτοια πεδία βρεθούν σε ένα διαμέρισμα, τότε η συγχώνευση σχήματος θα διαβάσει τα πάντα σωστά, οδηγώντας στον πιο ακριβή τύπο. Αλλά αν σε διαφορετικά, τότε το ένα θα έχει intField: int και το άλλο θα έχει intField: double.

Υπάρχει η ακόλουθη σημαία για τη διαχείριση αυτής της κατάστασης:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Τώρα έχουμε έναν φάκελο όπου υπάρχουν διαμερίσματα που μπορούν να διαβαστούν σε ένα ενιαίο πλαίσιο δεδομένων και ένα έγκυρο παρκέ ολόκληρης της βιτρίνας. Ναί? Οχι.

Πρέπει να θυμόμαστε ότι καταχωρήσαμε τον πίνακα στο Hive. Το Hive δεν κάνει διάκριση πεζών-κεφαλαίων στα ονόματα πεδίων, ενώ το παρκέ έχει διάκριση πεζών-κεφαλαίων. Επομένως, οι κατατμήσεις με σχήματα: field1: int και Field1: int είναι ίδιες για το Hive, αλλά όχι για το Spark. Μην ξεχάσετε να μετατρέψετε τα ονόματα των πεδίων σε πεζά.

Μετά από αυτό, όλα δείχνουν να είναι καλά.

Ωστόσο, δεν είναι όλα τόσο απλά. Υπάρχει ένα δεύτερο, επίσης γνωστό πρόβλημα. Εφόσον κάθε νέο διαμέρισμα αποθηκεύεται ξεχωριστά, ο φάκελος του διαμερίσματος θα περιέχει αρχεία υπηρεσίας Spark, για παράδειγμα, τη σημαία επιτυχίας λειτουργίας _SUCCESS. Αυτό θα οδηγήσει σε σφάλμα κατά την προσπάθεια παρκέ. Για να αποφευχθεί αυτό, πρέπει να διαμορφώσετε τις παραμέτρους για να αποτρέψετε το Spark από την προσθήκη αρχείων υπηρεσίας στο φάκελο:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Φαίνεται ότι τώρα κάθε μέρα προστίθεται ένα νέο διαμέρισμα παρκέ στον φάκελο προθήκης προορισμού, όπου βρίσκονται τα αναλυμένα δεδομένα για την ημέρα. Φροντίσαμε εκ των προτέρων να μην υπάρχουν κατατμήσεις με διένεξη τύπου δεδομένων.

Όμως, έχουμε ένα τρίτο πρόβλημα. Τώρα το γενικό σχήμα δεν είναι γνωστό, επιπλέον, ο πίνακας στο Hive έχει ένα λανθασμένο σχήμα, καθώς κάθε νέο διαμέρισμα πιθανότατα εισήγαγε μια παραμόρφωση στο σχήμα.

Πρέπει να καταχωρήσετε ξανά τον πίνακα. Αυτό μπορεί να γίνει απλά: διαβάστε ξανά το παρκέ της βιτρίνας, πάρτε το σχήμα και δημιουργήστε ένα DDL με βάση αυτό, με το οποίο θα καταχωρήσετε ξανά τον φάκελο στο Hive ως εξωτερικό πίνακα, ενημερώνοντας το σχήμα της βιτρίνας-στόχου.

Έχουμε ένα τέταρτο πρόβλημα. Όταν καταχωρήσαμε τον πίνακα για πρώτη φορά, βασιστήκαμε στο Spark. Τώρα το κάνουμε μόνοι μας και πρέπει να θυμόμαστε ότι τα πεδία παρκέ μπορούν να ξεκινούν με χαρακτήρες που δεν επιτρέπονται για το Hive. Για παράδειγμα, το Spark εκτοξεύει γραμμές που δεν μπορούσε να αναλύσει στο πεδίο "corrupt_record". Ένα τέτοιο πεδίο δεν μπορεί να καταχωρηθεί στο Hive χωρίς να γίνει διαφυγή.

Γνωρίζοντας αυτό, παίρνουμε το σχήμα:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Κώδικας ("_corrupt_record", "`_corrupt_record") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") κάνει ασφαλές το DDL, δηλαδή αντί για:

create table tname (_field1 string, 1field string)

Με ονόματα πεδίων όπως "_field1, 1field", δημιουργείται ασφαλής DDL όπου διαφεύγουν τα ονόματα των πεδίων: δημιουργία πίνακα `tname` (συμβολοσειρά `_field1`, συμβολοσειρά `1field`).

Τίθεται το ερώτημα: πώς να αποκτήσετε σωστά ένα πλαίσιο δεδομένων με πλήρες σχήμα (σε κώδικα pf); Πώς να πάρετε αυτό το pf; Αυτό είναι το πέμπτο πρόβλημα. Ξαναδιάβασε το σχήμα όλων των κατατμήσεων από το φάκελο με τα αρχεία παρκέ της προθήκης προορισμού; Αυτή η μέθοδος είναι η πιο ασφαλής, αλλά δύσκολη.

Το σχήμα βρίσκεται ήδη στο Hive. Μπορείτε να αποκτήσετε ένα νέο σχήμα συνδυάζοντας το σχήμα ολόκληρου του πίνακα και το νέο διαμέρισμα. Επομένως, πρέπει να πάρετε το σχήμα πίνακα από το Hive και να το συνδυάσετε με το σχήμα του νέου διαμερίσματος. Αυτό μπορεί να γίνει διαβάζοντας τα δοκιμαστικά μεταδεδομένα από το Hive, αποθηκεύοντάς τα σε έναν προσωρινό φάκελο και χρησιμοποιώντας το Spark για να διαβάσετε και τα δύο διαμερίσματα ταυτόχρονα.

Στην πραγματικότητα, υπάρχουν όλα όσα χρειάζεστε: το αρχικό σχήμα πίνακα στο Hive και το νέο διαμέρισμα. Έχουμε και στοιχεία. Απομένει μόνο να αποκτήσετε ένα νέο σχήμα που συνδυάζει το σχήμα βιτρίνας και νέα πεδία από το διαμέρισμα που δημιουργήθηκε:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Στη συνέχεια, δημιουργούμε την εγγραφή πίνακα DDL, όπως στο προηγούμενο απόσπασμα.
Εάν ολόκληρη η αλυσίδα λειτουργεί σωστά, δηλαδή, υπήρχε ένα φορτίο προετοιμασίας και ο πίνακας δημιουργήθηκε σωστά στο Hive, τότε λαμβάνουμε ένα ενημερωμένο σχήμα πίνακα.

Και το τελευταίο πρόβλημα είναι ότι δεν μπορείτε απλώς να προσθέσετε ένα διαμέρισμα σε έναν πίνακα Hive, γιατί θα χαλάσει. Πρέπει να αναγκάσετε το Hive να διορθώσει τη δομή διαμερισμάτων του:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Το απλό έργο της ανάγνωσης του JSON και της δημιουργίας μιας βιτρίνας με βάση αυτό έχει ως αποτέλεσμα να ξεπεραστούν μια σειρά από σιωπηρές δυσκολίες, λύσεις για τις οποίες πρέπει να αναζητήσετε ξεχωριστά. Και παρόλο που αυτές οι λύσεις είναι απλές, χρειάζεται πολύς χρόνος για να τις βρούμε.

Για να υλοποιήσω την κατασκευή της βιτρίνας έπρεπε:

  • Προσθέστε διαμερίσματα στη βιτρίνα, απαλλαγείτε από αρχεία υπηρεσιών
  • Αντιμετωπίστε τα κενά πεδία στα δεδομένα προέλευσης που έχει πληκτρολογήσει το Spark
  • Ρίξτε απλούς τύπους σε μια χορδή
  • Μετατροπή ονομάτων πεδίων σε πεζά
  • Ξεχωριστή μεταφόρτωση δεδομένων και εγγραφή πίνακα στο Hive (δημιουργία DDL)
  • Μην ξεχάσετε να αποφύγετε τα ονόματα πεδίων που μπορεί να είναι ασύμβατα με το Hive
  • Μάθετε πώς να ενημερώνετε την εγγραφή πίνακα στο Hive

Συνοψίζοντας, σημειώνουμε ότι η απόφαση για την κατασκευή βιτρινών είναι γεμάτη με πολλές παγίδες. Επομένως, σε περίπτωση δυσκολιών στην εφαρμογή, είναι καλύτερο να επικοινωνήσετε με έναν έμπειρο συνεργάτη με επιτυχημένη τεχνογνωσία.

Σας ευχαριστούμε που διαβάσατε αυτό το άρθρο, ελπίζουμε να σας φανούν χρήσιμες οι πληροφορίες.

Πηγή: www.habr.com

Προσθέστε ένα σχόλιο