Apache Kafka και Data Streaming με Spark Streaming
Γεια σου Χαμπρ! Σήμερα θα δημιουργήσουμε ένα σύστημα που θα επεξεργάζεται τις ροές μηνυμάτων του Apache Kafka χρησιμοποιώντας το Spark Streaming και θα γράφει το αποτέλεσμα επεξεργασίας στη βάση δεδομένων cloud AWS RDS.
Φανταστείτε ότι ένα συγκεκριμένο πιστωτικό ίδρυμα μάς θέτει ως καθήκον την επεξεργασία των εισερχόμενων συναλλαγών "on the fly" για όλα τα υποκαταστήματά του. Αυτό μπορεί να γίνει για τον γρήγορο υπολογισμό της ανοιχτής συναλλαγματικής θέσης για το δημόσιο ταμείο, των ορίων ή του οικονομικού αποτελέσματος συναλλαγών κ.λπ.
Πώς να εφαρμόσετε αυτήν την περίπτωση χωρίς τη χρήση μαγικών και μαγικών ξόρκων - διαβάστε κάτω από το κόψιμο! Πηγαίνω!
Φυσικά, η επεξεργασία μεγάλου όγκου δεδομένων σε πραγματικό χρόνο παρέχει άφθονες ευκαιρίες για χρήση σε σύγχρονα συστήματα. Ένας από τους πιο δημοφιλείς συνδυασμούς για αυτό είναι ο συνδυασμός Apache Kafka και Spark Streaming, όπου ο Kafka δημιουργεί μια ροή πακέτων εισερχόμενων μηνυμάτων και το Spark Streaming επεξεργάζεται αυτά τα πακέτα σε ένα καθορισμένο χρονικό διάστημα.
Για να βελτιώσουμε την ανοχή σφαλμάτων της εφαρμογής, θα χρησιμοποιήσουμε σημεία ελέγχου - σημεία ελέγχου. Με αυτόν τον μηχανισμό, όταν η μονάδα Spark Streaming χρειάζεται να ανακτήσει χαμένα δεδομένα, χρειάζεται μόνο να επιστρέψει στο τελευταίο σημείο ελέγχου και να συνεχίσει τους υπολογισμούς από εκεί.
Η αρχιτεκτονική του ανεπτυγμένου συστήματος
Μεταχειρισμένα εξαρτήματα:
Apache Kafka είναι ένα κατανεμημένο σύστημα μηνυμάτων δημοσίευσης και εγγραφής. Κατάλληλο για κατανάλωση μηνυμάτων εκτός σύνδεσης και online. Για να αποφευχθεί η απώλεια δεδομένων, τα μηνύματα Kafka αποθηκεύονται στο δίσκο και αναπαράγονται μέσα στο σύμπλεγμα. Το σύστημα Kafka είναι χτισμένο πάνω από την υπηρεσία συγχρονισμού ZooKeeper.
Apache Spark Streaming - ένα στοιχείο Spark για την επεξεργασία δεδομένων ροής. Η μονάδα Spark Streaming έχει κατασκευαστεί χρησιμοποιώντας μια αρχιτεκτονική μικρο-παρτίδας, όταν μια ροή δεδομένων ερμηνεύεται ως μια συνεχής ακολουθία μικρών πακέτων δεδομένων. Το Spark Streaming λαμβάνει δεδομένα από διαφορετικές πηγές και τα συνδυάζει σε μικρές παρτίδες. Νέα πακέτα δημιουργούνται σε τακτά χρονικά διαστήματα. Στην αρχή κάθε χρονικού διαστήματος, δημιουργείται ένα νέο πακέτο και όλα τα δεδομένα που λαμβάνονται κατά τη διάρκεια αυτού του διαστήματος περιλαμβάνονται στο πακέτο. Στο τέλος του διαστήματος, η ανάπτυξη πακέτων σταματά. Το μέγεθος του διαστήματος καθορίζεται από μια παράμετρο που ονομάζεται διάστημα παρτίδας.
Apache Spark SQL - Συνδυάζει τη σχεσιακή επεξεργασία με τον λειτουργικό προγραμματισμό Spark. Τα δομημένα δεδομένα αναφέρονται σε δεδομένα που έχουν ένα σχήμα, δηλαδή ένα ενιαίο σύνολο πεδίων για όλες τις εγγραφές. Το Spark SQL υποστηρίζει είσοδο από μια ποικιλία δομημένων πηγών δεδομένων και, λόγω της παρουσίας πληροφοριών σχήματος, μπορεί να ανακτήσει αποτελεσματικά μόνο τα απαιτούμενα πεδία εγγραφών και επίσης παρέχει DataFrame API.
AWS RDS είναι μια σχετικά φθηνή σχεσιακή βάση δεδομένων που βασίζεται σε σύννεφο, μια υπηρεσία ιστού που απλοποιεί τη ρύθμιση, τη λειτουργία και την κλιμάκωση, που διαχειρίζεται απευθείας η Amazon.
Εγκατάσταση και εκτέλεση του διακομιστή Kafka
Πριν χρησιμοποιήσετε απευθείας το Kafka, πρέπει να βεβαιωθείτε ότι έχετε Java, γιατί Το JVM χρησιμοποιείται για εργασίες:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
Το επόμενο βήμα είναι προαιρετικό. Το γεγονός είναι ότι οι προεπιλεγμένες ρυθμίσεις δεν σας επιτρέπουν να χρησιμοποιήσετε πλήρως όλες τις δυνατότητες του Apache Kafka. Για παράδειγμα, διαγράψτε ένα θέμα, κατηγορία, ομάδα, στην οποία μπορούν να δημοσιεύονται μηνύματα. Για να το αλλάξετε αυτό, ας επεξεργαστούμε το αρχείο ρυθμίσεων:
vim ~/kafka/config/server.properties
Προσθέστε τα ακόλουθα στο τέλος του αρχείου:
delete.topic.enable = true
Πριν ξεκινήσετε τον διακομιστή Kafka, πρέπει να ξεκινήσετε τον διακομιστή ZooKeeper, θα χρησιμοποιήσουμε το βοηθητικό σενάριο που συνοδεύει τη διανομή Kafka:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
Αφού ξεκινήσει με επιτυχία το ZooKeeper, ξεκινάμε τον διακομιστή Kafka σε ξεχωριστό τερματικό:
Ας χάσουμε τις στιγμές δοκιμής του παραγωγού και του καταναλωτή για το νεοδημιουργημένο θέμα. Περισσότερες λεπτομέρειες σχετικά με το πώς μπορείτε να δοκιμάσετε την αποστολή και τη λήψη μηνυμάτων αναγράφονται στην επίσημη τεκμηρίωση - Στείλτε μερικά μηνύματα. Λοιπόν, προχωράμε στη σύνταξη ενός παραγωγού στην Python χρησιμοποιώντας το KafkaProducer API.
Συγγραφή παραγωγού
Ο παραγωγός θα δημιουργήσει τυχαία δεδομένα - 100 μηνύματα κάθε δευτερόλεπτο. Με τον όρο τυχαία δεδομένα εννοούμε ένα λεξικό που αποτελείται από τρία πεδία:
Υποκατάστημα — όνομα του σημείου πώλησης του πιστωτικού ιδρύματος·
Νόμισμα — νόμισμα συναλλαγής·
Ποσό - Ποσό Συναλλαγής. Το ποσό θα είναι θετικό εάν πρόκειται για αγορά συναλλάγματος από την Τράπεζα και αρνητικό εάν πρόκειται για πώληση.
Στη συνέχεια, χρησιμοποιώντας τη μέθοδο αποστολής, στέλνουμε ένα μήνυμα στον διακομιστή, στο θέμα που χρειαζόμαστε, σε μορφή JSON:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Κατά την εκτέλεση του σεναρίου, λαμβάνουμε τα ακόλουθα μηνύματα στο τερματικό:
Αυτό σημαίνει ότι όλα λειτουργούν όπως θέλαμε - ο παραγωγός δημιουργεί και στέλνει μηνύματα στο θέμα που χρειαζόμαστε.
Το επόμενο βήμα είναι να εγκαταστήσετε το Spark και να επεξεργαστείτε αυτήν τη ροή μηνυμάτων.
Εγκατάσταση του Apache Spark
Apache Spark είναι μια ευέλικτη και υψηλής απόδοσης υπολογιστική πλατφόρμα συμπλέγματος.
Το Spark ξεπερνά τις δημοφιλείς υλοποιήσεις του μοντέλου MapReduce όσον αφορά την απόδοση, ενώ παρέχει υποστήριξη για ένα ευρύτερο φάσμα τύπων υπολογισμών, συμπεριλαμβανομένων των διαδραστικών ερωτημάτων και της ροής. Η ταχύτητα παίζει σημαντικό ρόλο κατά την επεξεργασία μεγάλων ποσοτήτων δεδομένων, καθώς είναι η ταχύτητα που σας επιτρέπει να εργάζεστε διαδραστικά χωρίς να ξοδεύετε λεπτά ή ώρες αναμονής. Ένα από τα μεγαλύτερα δυνατά σημεία του Spark για την παροχή αυτής της ταχύτητας είναι η ικανότητά του να εκτελεί υπολογισμούς στη μνήμη.
Αυτό το πλαίσιο είναι γραμμένο στο Scala, επομένως πρέπει να το εγκαταστήσετε πρώτα:
sudo apt-get install scala
Κατεβάστε τη διανομή Spark από την επίσημη ιστοσελίδα:
Εκτελέστε την παρακάτω εντολή αφού κάνετε αλλαγές στο bashrc:
source ~/.bashrc
Ανάπτυξη του AWS PostgreSQL
Απομένει να επεκταθεί η βάση δεδομένων, όπου θα συμπληρώσουμε τις επεξεργασμένες πληροφορίες από τις ροές. Για να το κάνουμε αυτό, θα χρησιμοποιήσουμε την υπηρεσία AWS RDS.
Μεταβείτε στην κονσόλα AWS -> AWS RDS -> Βάσεις δεδομένων -> Δημιουργία βάσης δεδομένων:
Επιλέξτε PostgreSQL και κάντε κλικ στο κουμπί Επόμενο:
Επειδή αυτό το παράδειγμα αναλύεται αποκλειστικά για εκπαιδευτικούς σκοπούς, θα χρησιμοποιήσουμε έναν δωρεάν διακομιστή "τουλάχιστον" (Free Tier):
Στη συνέχεια, ελέγξτε το μπλοκ Free Tier και μετά από αυτό θα μας προσφερθεί αυτόματα μια παρουσία της κλάσης t2.micro - αν και αδύναμη, αλλά δωρεάν και αρκετά κατάλληλη για την εργασία μας:
Ακολουθούν πολύ σημαντικά πράγματα: το όνομα της παρουσίας του DB, το όνομα του κύριου χρήστη και ο κωδικός πρόσβασής του. Ας ονομάσουμε το παράδειγμα: myHabrTest, κύριος χρήστης: habr, Κωδικός πρόσβασης: habr12345 και κάντε κλικ στο κουμπί Επόμενο:
Η επόμενη σελίδα περιέχει τις παραμέτρους που είναι υπεύθυνες για την προσβασιμότητα του διακομιστή της βάσης δεδομένων μας από το εξωτερικό (Public Accessibility) και τη διαθεσιμότητα των θυρών:
Ας δημιουργήσουμε μια νέα ρύθμιση για την ομάδα ασφαλείας VPC, η οποία θα επιτρέπει εξωτερική πρόσβαση στον διακομιστή της βάσης δεδομένων μας μέσω της θύρας 5432 (PostgreSQL).
Ας πάμε σε ένα ξεχωριστό παράθυρο του προγράμματος περιήγησης στην κονσόλα AWS στην ενότητα Πίνακας ελέγχου VPC -> Ομάδες ασφαλείας -> Δημιουργία ομάδας ασφαλείας:
Ορίζουμε το όνομα για την ομάδα Security - PostgreSQL, μια περιγραφή, καθορίζουμε με ποιο VPC πρέπει να συσχετιστεί αυτή η ομάδα και κάνουμε κλικ στο κουμπί Δημιουργία:
Συμπληρώνουμε τους νέους κανόνες ομάδας εισερχόμενων για τη θύρα 5432, όπως φαίνεται στην παρακάτω εικόνα. Δεν μπορείτε να καθορίσετε τη θύρα με μη αυτόματο τρόπο, αλλά επιλέξτε PostgreSQL από την αναπτυσσόμενη λίστα Τύπος.
Αυστηρά μιλώντας, η τιμή ::/0 σημαίνει τη διαθεσιμότητα εισερχόμενης κίνησης για τον διακομιστή από όλο τον κόσμο, κάτι που δεν είναι απολύτως αληθές κανονικά, αλλά για να αναλύσουμε το παράδειγμα, ας χρησιμοποιήσουμε αυτήν την προσέγγιση:
Επιστρέφουμε στη σελίδα του προγράμματος περιήγησης, όπου έχουμε ανοιχτό το "Διαμόρφωση σύνθετων ρυθμίσεων" και επιλέγουμε ομάδες ασφαλείας VPC -> Επιλέξτε υπάρχουσες ομάδες ασφαλείας VPC -> PostgreSQL στην ενότητα:
Στη συνέχεια, στην ενότητα Επιλογές βάσης δεδομένων -> Όνομα βάσης δεδομένων -> ορίστε το όνομα - habrDB.
Μπορούμε να αφήσουμε τις υπόλοιπες παραμέτρους, με εξαίρεση την απενεργοποίηση της δημιουργίας αντιγράφων ασφαλείας (περίοδος διατήρησης αντιγράφων ασφαλείας - 0 ημέρες), της παρακολούθησης και των Insights απόδοσης, από προεπιλογή. Κάντε κλικ στο κουμπί Δημιουργία βάσης δεδομένων:
Διαχειριστής ροής
Το τελικό στάδιο θα είναι η ανάπτυξη μιας εργασίας Spark, η οποία θα επεξεργάζεται νέα δεδομένα από τον Κάφκα κάθε δύο δευτερόλεπτα και θα εισάγει το αποτέλεσμα στη βάση δεδομένων.
Όπως σημειώθηκε παραπάνω, τα σημεία ελέγχου είναι ο κύριος μηχανισμός στο SparkStreaming που πρέπει να ρυθμιστεί ώστε να παρέχει ανοχή σφαλμάτων. Θα χρησιμοποιήσουμε σημεία ελέγχου και, σε περίπτωση αποτυχίας της διαδικασίας, η λειτουργική μονάδα Spark Streaming θα χρειαστεί μόνο να επιστρέψει στο τελευταίο σημείο ελέγχου και να συνεχίσει τους υπολογισμούς από αυτό για να ανακτήσει τα χαμένα δεδομένα.
Ένα σημείο ελέγχου μπορεί να ενεργοποιηθεί ορίζοντας έναν κατάλογο σε ένα ανεκτικό σε σφάλματα, αξιόπιστο σύστημα αρχείων (π.χ. HDFS, S3, κ.λπ.) όπου θα αποθηκευτούν οι πληροφορίες του σημείου ελέγχου. Αυτό γίνεται για παράδειγμα με:
streamingContext.checkpoint(checkpointDirectory)
Στο παράδειγμά μας, θα χρησιμοποιήσουμε την ακόλουθη προσέγγιση, δηλαδή, εάν υπάρχει το checkpointDirectory, τότε το πλαίσιο θα αναδημιουργηθεί από τα δεδομένα του σημείου ελέγχου. Εάν ο κατάλογος δεν υπάρχει (δηλαδή εκτελείται για πρώτη φορά), τότε καλείται η συνάρτηση functionToCreateContext για να δημιουργήσει ένα νέο περιβάλλον και να ρυθμίσει το DStreams:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Δημιουργούμε ένα αντικείμενο DirectStream για σύνδεση με το θέμα "συναλλαγή" χρησιμοποιώντας τη μέθοδο createDirectStream της βιβλιοθήκης KafkaUtils:
Χρησιμοποιώντας το Spark SQL, κάνουμε μια απλή ομαδοποίηση και βγάζουμε το αποτέλεσμα στην κονσόλα:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Λήψη του σώματος του ερωτήματος και εκτέλεση μέσω του Spark SQL:
Και στη συνέχεια αποθηκεύουμε τα λαμβανόμενα συγκεντρωτικά δεδομένα σε έναν πίνακα στο AWS RDS. Για να αποθηκεύσουμε τα αποτελέσματα της συνάθροισης σε έναν πίνακα βάσης δεδομένων, θα χρησιμοποιήσουμε τη μέθοδο εγγραφής του αντικειμένου DataFrame:
Λίγα λόγια για τη ρύθμιση μιας σύνδεσης με το AWS RDS. Δημιουργήσαμε τον χρήστη και τον κωδικό πρόσβασης για αυτό στο βήμα «Ανάπτυξη AWS PostgreSQL». Ως url του διακομιστή βάσης δεδομένων, θα πρέπει να χρησιμοποιήσετε το Endpoint, το οποίο εμφανίζεται στην ενότητα Συνδεσιμότητα και ασφάλεια:
Για να συνδέσετε σωστά το Spark και τον Kafka, θα πρέπει να εκτελέσετε την εργασία μέσω smark-submit χρησιμοποιώντας το artifact spark-streaming-kafka-0-8_2.11. Επιπλέον, θα χρησιμοποιήσουμε επίσης ένα τεχνούργημα για αλληλεπίδραση με τη βάση δεδομένων PostgreSQL, θα τα περάσουμε μέσα από πακέτα --.
Για ευελιξία του σεναρίου, θα αφαιρέσουμε επίσης το όνομα του διακομιστή μηνυμάτων και το θέμα από το οποίο θέλουμε να λάβουμε δεδομένα ως παραμέτρους εισόδου.
Λοιπόν, ήρθε η ώρα να εκτελέσετε και να δοκιμάσετε το σύστημα:
Όλα λειτούργησαν! Όπως μπορείτε να δείτε στην παρακάτω εικόνα, ενώ η εφαρμογή εκτελείται, νέα αποτελέσματα συνάθροισης εμφανίζονται κάθε 2 δευτερόλεπτα, επειδή ορίσαμε το διάστημα ομαδοποίησης σε 2 δευτερόλεπτα όταν δημιουργήσαμε το αντικείμενο StreamingContext:
Στη συνέχεια, κάνουμε ένα απλό ερώτημα στη βάση δεδομένων για να ελέγξουμε για εγγραφές στον πίνακα συναλλαγή_ροής:
Συμπέρασμα
Σε αυτό το άρθρο, εξετάστηκε ένα παράδειγμα επεξεργασίας πληροφοριών ροής χρησιμοποιώντας το Spark Streaming σε συνδυασμό με τους Apache Kafka και PostgreSQL. Με την αύξηση του όγκου δεδομένων από διάφορες πηγές, είναι δύσκολο να υπερεκτιμηθεί η πρακτική αξία του Spark Streaming για τη δημιουργία εφαρμογών σε πραγματικό χρόνο και ροής.
Μπορείτε να βρείτε τον πλήρη πηγαίο κώδικα στο αποθετήριο μου στη διεύθυνση GitHub.
Είμαι στην ευχάριστη θέση να συζητήσω αυτό το άρθρο, περιμένω τα σχόλιά σας και, επίσης, ελπίζω σε εποικοδομητική κριτική από όλους τους ενδιαφερόμενους αναγνώστες.
Σας εύχομαι καλή επιτυχία!
Ps. Αρχικά σχεδιαζόταν να χρησιμοποιήσω μια τοπική βάση δεδομένων PostgreSQL, αλλά λόγω της αγάπης μου για το AWS, αποφάσισα να μεταφέρω τη βάση δεδομένων στο cloud. Στο επόμενο άρθρο σχετικά με αυτό το θέμα, θα σας δείξω πώς να εφαρμόσετε ολόκληρο το σύστημα που περιγράφεται παραπάνω στο AWS χρησιμοποιώντας AWS Kinesis και AWS EMR. Ακολουθήστε τα νέα!