Apache Kafka und Daten-Streaming mit Spark Streaming
Hey Habr! Heute werden wir ein System aufbauen, das Apache Kafka-Nachrichtenströme mithilfe von Spark Streaming verarbeitet und das Verarbeitungsergebnis in die AWS RDS-Clouddatenbank schreibt.
Stellen wir uns vor, ein bestimmtes Kreditinstitut stellt uns vor die Aufgabe, eingehende Transaktionen „on the fly“ für alle seine Filialen abzuwickeln. Dies kann zum Zweck der schnellen Berechnung der offenen Währungsposition für das Treasury, von Limits oder des Finanzergebnisses von Transaktionen usw. erfolgen.
Wie Sie diesen Fall ohne den Einsatz von Magie und Zaubersprüchen umsetzen können – lesen Sie unter dem Schnitt! Gehen!
Natürlich bietet die Echtzeitverarbeitung großer Datenmengen zahlreiche Möglichkeiten für den Einsatz in modernen Systemen. Eine der beliebtesten Kombinationen hierfür ist das Tandem aus Apache Kafka und Spark Streaming, bei dem Kafka einen Strom eingehender Nachrichtenpakete erstellt und Spark Streaming diese Pakete in einem bestimmten Zeitintervall verarbeitet.
Um die Fehlertoleranz der Anwendung zu verbessern, verwenden wir Checkpoints – Checkpoints. Wenn das Spark-Streaming-Modul mit diesem Mechanismus verlorene Daten wiederherstellen muss, muss es nur zum letzten Prüfpunkt zurückkehren und die Berechnungen von dort aus fortsetzen.
Die Architektur des entwickelten Systems
Verwendete Komponenten:
Apache Kafka ist ein verteiltes Publish-and-Subscribe-Nachrichtensystem. Geeignet für den Offline- und Online-Nachrichtenkonsum. Um Datenverlust zu verhindern, werden Kafka-Nachrichten auf der Festplatte gespeichert und innerhalb des Clusters repliziert. Das Kafka-System basiert auf dem ZooKeeper-Synchronisierungsdienst.
Apache Spark-Streaming – eine Spark-Komponente zur Verarbeitung von Streaming-Daten. Das Spark-Streaming-Modul basiert auf einer Mikrobatch-Architektur, bei der ein Datenstrom als kontinuierliche Folge kleiner Datenpakete interpretiert wird. Spark Streaming nimmt Daten aus verschiedenen Quellen und kombiniert sie in kleinen Stapeln. In regelmäßigen Abständen werden neue Pakete erstellt. Zu Beginn jedes Zeitintervalls wird ein neues Paket erstellt und alle während dieses Intervalls empfangenen Daten werden in das Paket aufgenommen. Am Ende des Intervalls stoppt das Paketwachstum. Die Größe des Intervalls wird durch einen Parameter namens Batch-Intervall bestimmt;
Apache Spark-SQL - Kombiniert relationale Verarbeitung mit Spark-Funktionsprogrammierung. Strukturierte Daten beziehen sich auf Daten, die ein Schema haben, d. h. einen einzigen Satz von Feldern für alle Datensätze. Spark SQL unterstützt die Eingabe aus einer Vielzahl strukturierter Datenquellen und kann aufgrund der vorhandenen Schemainformationen effizient nur die erforderlichen Datensatzfelder abrufen und stellt außerdem DataFrame-APIs bereit.
AWS-RDS ist eine relativ kostengünstige cloudbasierte relationale Datenbank, ein Webdienst, der die Einrichtung, den Betrieb und die Skalierung vereinfacht und direkt von Amazon verwaltet wird.
Installation und Ausführung des Kafka-Servers
Bevor Sie Kafka direkt verwenden, müssen Sie sicherstellen, dass Sie über Java verfügen, denn JVM wird für die Arbeit verwendet:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
Der nächste Schritt ist optional. Tatsache ist, dass Sie mit den Standardeinstellungen nicht alle Funktionen von Apache Kafka vollständig nutzen können. Löschen Sie beispielsweise ein Thema, eine Kategorie oder eine Gruppe, zu der Nachrichten veröffentlicht werden können. Um dies zu ändern, bearbeiten wir die Konfigurationsdatei:
vim ~/kafka/config/server.properties
Fügen Sie am Ende der Datei Folgendes hinzu:
delete.topic.enable = true
Bevor Sie den Kafka-Server starten, müssen Sie den ZooKeeper-Server starten. Wir verwenden das Hilfsskript, das mit der Kafka-Distribution geliefert wird:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
Nachdem ZooKeeper erfolgreich gestartet wurde, starten wir den Kafka-Server in einem separaten Terminal:
Verpassen wir die Momente, in denen wir Produzenten und Konsumenten auf das neu erstellte Thema testen. Weitere Details dazu, wie Sie das Senden und Empfangen von Nachrichten testen können, finden Sie in der offiziellen Dokumentation – Senden Sie einige Nachrichten. Nun, wir fahren mit dem Schreiben eines Produzenten in Python unter Verwendung der KafkaProducer-API fort.
Produzent schreibt
Der Produzent generiert zufällige Daten – 100 Nachrichten pro Sekunde. Mit Zufallsdaten meinen wir ein Wörterbuch, das aus drei Feldern besteht:
Filiale — Name der Verkaufsstelle des Kreditinstituts;
Währung — Transaktionswährung;
Summe - Transaktionshöhe. Der Betrag ist positiv, wenn es sich um einen Währungskauf durch die Bank handelt, und negativ, wenn es sich um einen Verkauf handelt.
Als nächstes senden wir mit der Sendemethode eine Nachricht im JSON-Format an den Server, an das von uns benötigte Thema:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Beim Ausführen des Skripts erhalten wir im Terminal folgende Meldungen:
Das bedeutet, dass alles so funktioniert, wie wir es wollten – der Produzent generiert und sendet Nachrichten zu dem von uns benötigten Thema.
Der nächste Schritt besteht darin, Spark zu installieren und diesen Nachrichtenfluss zu verarbeiten.
Apache Spark installieren
Apache Funken ist eine vielseitige und leistungsstarke Cluster-Computing-Plattform.
Spark übertrifft gängige Implementierungen des MapReduce-Modells hinsichtlich der Leistung und bietet gleichzeitig Unterstützung für ein breiteres Spektrum an Berechnungstypen, einschließlich interaktiver Abfragen und Streaming. Geschwindigkeit spielt bei der Verarbeitung großer Datenmengen eine wichtige Rolle, denn gerade diese Geschwindigkeit ermöglicht ein interaktives Arbeiten, ohne minuten- oder stundenlang warten zu müssen. Eine der größten Stärken von Spark bei der Bereitstellung dieser Geschwindigkeit ist die Fähigkeit, In-Memory-Berechnungen durchzuführen.
Dieses Framework ist in Scala geschrieben, daher müssen Sie es zuerst installieren:
sudo apt-get install scala
Laden Sie die Spark-Distribution von der offiziellen Website herunter:
Führen Sie den folgenden Befehl aus, nachdem Sie Änderungen an bashrc vorgenommen haben:
source ~/.bashrc
Bereitstellen von AWS PostgreSQL
Es bleibt die Datenbank zu erweitern, in die wir die verarbeiteten Informationen aus den Streams eintragen werden. Dazu nutzen wir den AWS RDS-Dienst.
Gehen Sie zur AWS-Konsole -> AWS RDS -> Datenbanken -> Datenbank erstellen:
Wählen Sie PostgreSQL aus und klicken Sie auf die Schaltfläche Weiter:
Weil Da dieses Beispiel ausschließlich zu Bildungszwecken analysiert wird, werden wir „mindestens“ einen kostenlosen Server (kostenloses Kontingent) verwenden:
Als nächstes überprüfen wir den Free Tier-Block, und danach wird uns automatisch eine Instanz der Klasse t2.micro angeboten – zwar schwach, aber kostenlos und für unsere Aufgabe durchaus geeignet:
Es folgen ganz wichtige Dinge: der Name der DB-Instanz, der Name des Master-Benutzers und sein Passwort. Nennen wir die Instanz: myHabrTest, Hauptbenutzer: habr, Passwort: habr12345 und klicken Sie auf die Schaltfläche Weiter:
Auf der nächsten Seite finden Sie die Parameter, die für die Erreichbarkeit unseres Datenbankservers von außen (öffentliche Erreichbarkeit) und die Verfügbarkeit von Ports verantwortlich sind:
Erstellen wir eine neue Einstellung für die VPC-Sicherheitsgruppe, die den externen Zugriff auf unseren Datenbankserver über Port 5432 (PostgreSQL) ermöglicht.
Gehen wir in einem separaten Browserfenster zur AWS-Konsole im Abschnitt VPC-Dashboard -> Sicherheitsgruppen -> Sicherheitsgruppe erstellen:
Wir legen den Namen für die Sicherheitsgruppe fest – PostgreSQL, eine Beschreibung, geben an, mit welcher VPC diese Gruppe verknüpft werden soll und klicken auf die Schaltfläche „Erstellen“:
Wir füllen für die neu erstellte Gruppe Inbound-Regeln für Port 5432 aus, wie im Bild unten gezeigt. Sie können den Port nicht manuell angeben, sondern wählen PostgreSQL aus der Dropdown-Liste Typ aus.
Streng genommen bedeutet der Wert ::/0 die Verfügbarkeit von eingehendem Datenverkehr für den Server aus der ganzen Welt, was kanonisch nicht ganz stimmt, aber um das Beispiel zu analysieren, verwenden wir diesen Ansatz:
Wir kehren zur Browserseite zurück, wo wir „Erweiterte Einstellungen konfigurieren“ geöffnet haben und wählen VPC-Sicherheitsgruppen -> Vorhandene VPC-Sicherheitsgruppen auswählen -> PostgreSQL im Abschnitt:
Als nächstes im Abschnitt Datenbankoptionen -> Datenbankname -> Namen festlegen - habrDB.
Wir können die restlichen Parameter beibehalten, mit Ausnahme der standardmäßigen Deaktivierung der Sicherung (Aufbewahrungsdauer der Sicherung – 0 Tage), der Überwachung und der Leistungseinblicke. Klicken Sie auf die Schaltfläche Datenbank erstellen:
Stream-Handler
Der letzte Schritt wird die Entwicklung eines Spark-Jobs sein, der alle zwei Sekunden neue Daten von Kafka verarbeitet und das Ergebnis in die Datenbank einträgt.
Wie oben erwähnt, sind Prüfpunkte der Hauptmechanismus in SparkStreaming, der konfiguriert werden muss, um Fehlertoleranz bereitzustellen. Wir werden Prüfpunkte verwenden und im Falle eines Verfahrensfehlers muss das Spark-Streaming-Modul nur zum letzten Prüfpunkt zurückkehren und die Berechnungen von dort aus fortsetzen, um die verlorenen Daten wiederherzustellen.
Ein Prüfpunkt kann aktiviert werden, indem ein Verzeichnis auf einem fehlertoleranten, zuverlässigen Dateisystem (z. B. HDFS, S3 usw.) festgelegt wird, in dem die Prüfpunktinformationen gespeichert werden. Dies geschieht zum Beispiel mit:
streamingContext.checkpoint(checkpointDirectory)
In unserem Beispiel verwenden wir den folgenden Ansatz: Wenn das checkpointDirectory vorhanden ist, wird der Kontext aus den Checkpoint-Daten neu erstellt. Wenn das Verzeichnis nicht existiert (d. h. es wird zum ersten Mal ausgeführt), wird die Funktion „functionToCreateContext“ aufgerufen, um einen neuen Kontext zu erstellen und DStreams einzurichten:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Wir erstellen ein DirectStream-Objekt, um mit der Methode „createDirectStream“ der KafkaUtils-Bibliothek eine Verbindung zum Thema „Transaktion“ herzustellen:
Mit Spark SQL führen wir eine einfache Gruppierung durch und geben das Ergebnis an die Konsole aus:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Und dann speichern wir die empfangenen aggregierten Daten in einer Tabelle in AWS RDS. Um die Ergebnisse der Aggregation in einer Datenbanktabelle zu speichern, verwenden wir die Schreibmethode des DataFrame-Objekts:
Ein paar Worte zum Einrichten einer Verbindung zu AWS RDS. Wir haben den Benutzer und das Passwort dafür im Schritt „AWS PostgreSQL bereitstellen“ erstellt. Als URL des Datenbankservers sollten Sie den Endpunkt verwenden, der im Abschnitt Konnektivität & Sicherheit angezeigt wird:
Um Spark und Kafka korrekt zu verbinden, sollten Sie den Job über smark-submit unter Verwendung des Artefakts ausführen spark-streaming-kafka-0-8_2.11. Darüber hinaus werden wir auch ein Artefakt für die Interaktion mit der PostgreSQL-Datenbank verwenden und diese über --packages weiterleiten.
Zur Flexibilität des Skripts nehmen wir auch den Namen des Nachrichtenservers und das Thema, von dem wir Daten erhalten möchten, als Eingabeparameter heraus.
Es ist also an der Zeit, das System auszuführen und zu testen:
Es hat alles geklappt! Wie Sie im Bild unten sehen können, werden während der Ausführung der Anwendung alle 2 Sekunden neue Aggregationsergebnisse angezeigt, da wir das Bündelungsintervall beim Erstellen des StreamingContext-Objekts auf 2 Sekunden festgelegt haben:
Als Nächstes führen wir eine einfache Abfrage an die Datenbank durch, um nach Datensätzen in der Tabelle zu suchen transaktionsfluss:
Abschluss
In diesem Artikel wurde ein Beispiel für die Verarbeitung von Streaming-Informationen mithilfe von Spark Streaming in Verbindung mit Apache Kafka und PostgreSQL betrachtet. Angesichts der wachsenden Datenmengen aus verschiedenen Quellen kann der praktische Wert von Spark Streaming für die Erstellung von Echtzeit- und Streaming-Anwendungen kaum überschätzt werden.
Den vollständigen Quellcode finden Sie in meinem Repository unter GitHub.
Ich diskutiere diesen Artikel gerne, freue mich auf Ihre Kommentare und hoffe auf konstruktive Kritik aller interessierten Leser.
Ich wünsche Ihnen viel Erfolg!
Ps. Ursprünglich war geplant, eine lokale PostgreSQL-Datenbank zu verwenden, aber aufgrund meiner Liebe zu AWS entschied ich mich, die Datenbank in die Cloud zu verlagern. Im nächsten Artikel zu diesem Thema zeige ich Ihnen, wie Sie das oben beschriebene Gesamtsystem mithilfe von AWS Kinesis und AWS EMR in AWS implementieren. Verfolgen Sie die Nachrichten!