Apache Kafka und Daten-Streaming mit Spark Streaming

Hey Habr! Heute werden wir ein System aufbauen, das Apache Kafka-Nachrichtenströme mithilfe von Spark Streaming verarbeitet und das Verarbeitungsergebnis in die AWS RDS-Clouddatenbank schreibt.

Stellen wir uns vor, ein bestimmtes Kreditinstitut stellt uns vor die Aufgabe, eingehende Transaktionen „on the fly“ für alle seine Filialen abzuwickeln. Dies kann zum Zweck der schnellen Berechnung der offenen Währungsposition für das Treasury, von Limits oder des Finanzergebnisses von Transaktionen usw. erfolgen.

Wie Sie diesen Fall ohne den Einsatz von Magie und Zaubersprüchen umsetzen können – lesen Sie unter dem Schnitt! Gehen!

Apache Kafka und Daten-Streaming mit Spark Streaming
(Bildquelle)

Einführung

Natürlich bietet die Echtzeitverarbeitung großer Datenmengen zahlreiche Möglichkeiten für den Einsatz in modernen Systemen. Eine der beliebtesten Kombinationen hierfür ist das Tandem aus Apache Kafka und Spark Streaming, bei dem Kafka einen Strom eingehender Nachrichtenpakete erstellt und Spark Streaming diese Pakete in einem bestimmten Zeitintervall verarbeitet.

Um die Fehlertoleranz der Anwendung zu verbessern, verwenden wir Checkpoints – Checkpoints. Wenn das Spark-Streaming-Modul mit diesem Mechanismus verlorene Daten wiederherstellen muss, muss es nur zum letzten Prüfpunkt zurückkehren und die Berechnungen von dort aus fortsetzen.

Die Architektur des entwickelten Systems

Apache Kafka und Daten-Streaming mit Spark Streaming

Verwendete Komponenten:

  • Apache Kafka ist ein verteiltes Publish-and-Subscribe-Nachrichtensystem. Geeignet für den Offline- und Online-Nachrichtenkonsum. Um Datenverlust zu verhindern, werden Kafka-Nachrichten auf der Festplatte gespeichert und innerhalb des Clusters repliziert. Das Kafka-System basiert auf dem ZooKeeper-Synchronisierungsdienst.
  • Apache Spark-Streaming – eine Spark-Komponente zur Verarbeitung von Streaming-Daten. Das Spark-Streaming-Modul basiert auf einer Mikrobatch-Architektur, bei der ein Datenstrom als kontinuierliche Folge kleiner Datenpakete interpretiert wird. Spark Streaming nimmt Daten aus verschiedenen Quellen und kombiniert sie in kleinen Stapeln. In regelmäßigen Abständen werden neue Pakete erstellt. Zu Beginn jedes Zeitintervalls wird ein neues Paket erstellt und alle während dieses Intervalls empfangenen Daten werden in das Paket aufgenommen. Am Ende des Intervalls stoppt das Paketwachstum. Die Größe des Intervalls wird durch einen Parameter namens Batch-Intervall bestimmt;
  • Apache Spark-SQL - Kombiniert relationale Verarbeitung mit Spark-Funktionsprogrammierung. Strukturierte Daten beziehen sich auf Daten, die ein Schema haben, d. h. einen einzigen Satz von Feldern für alle Datensätze. Spark SQL unterstützt die Eingabe aus einer Vielzahl strukturierter Datenquellen und kann aufgrund der vorhandenen Schemainformationen effizient nur die erforderlichen Datensatzfelder abrufen und stellt außerdem DataFrame-APIs bereit.
  • AWS-RDS ist eine relativ kostengünstige cloudbasierte relationale Datenbank, ein Webdienst, der die Einrichtung, den Betrieb und die Skalierung vereinfacht und direkt von Amazon verwaltet wird.

Installation und Ausführung des Kafka-Servers

Bevor Sie Kafka direkt verwenden, müssen Sie sicherstellen, dass Sie über Java verfügen, denn JVM wird für die Arbeit verwendet:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Erstellen wir einen neuen Benutzer für die Arbeit mit Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Laden Sie als Nächstes das Distributionskit von der offiziellen Apache Kafka-Website herunter:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Entpacken Sie das heruntergeladene Archiv:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Der nächste Schritt ist optional. Tatsache ist, dass Sie mit den Standardeinstellungen nicht alle Funktionen von Apache Kafka vollständig nutzen können. Löschen Sie beispielsweise ein Thema, eine Kategorie oder eine Gruppe, zu der Nachrichten veröffentlicht werden können. Um dies zu ändern, bearbeiten wir die Konfigurationsdatei:

vim ~/kafka/config/server.properties

Fügen Sie am Ende der Datei Folgendes hinzu:

delete.topic.enable = true

Bevor Sie den Kafka-Server starten, müssen Sie den ZooKeeper-Server starten. Wir verwenden das Hilfsskript, das mit der Kafka-Distribution geliefert wird:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Nachdem ZooKeeper erfolgreich gestartet wurde, starten wir den Kafka-Server in einem separaten Terminal:

bin/kafka-server-start.sh config/server.properties

Erstellen wir ein neues Thema namens „Transaktion“:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Stellen wir sicher, dass ein Thema mit der erforderlichen Anzahl an Partitionen und Replikationen erstellt wurde:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka und Daten-Streaming mit Spark Streaming

Verpassen wir die Momente, in denen wir Produzenten und Konsumenten auf das neu erstellte Thema testen. Weitere Details dazu, wie Sie das Senden und Empfangen von Nachrichten testen können, finden Sie in der offiziellen Dokumentation – Senden Sie einige Nachrichten. Nun, wir fahren mit dem Schreiben eines Produzenten in Python unter Verwendung der KafkaProducer-API fort.

Produzent schreibt

Der Produzent generiert zufällige Daten – 100 Nachrichten pro Sekunde. Mit Zufallsdaten meinen wir ein Wörterbuch, das aus drei Feldern besteht:

  • Filiale — Name der Verkaufsstelle des Kreditinstituts;
  • Währung — Transaktionswährung;
  • Summe - Transaktionshöhe. Der Betrag ist positiv, wenn es sich um einen Währungskauf durch die Bank handelt, und negativ, wenn es sich um einen Verkauf handelt.

Der Code für den Produzenten sieht so aus:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Als nächstes senden wir mit der Sendemethode eine Nachricht im JSON-Format an den Server, an das von uns benötigte Thema:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Beim Ausführen des Skripts erhalten wir im Terminal folgende Meldungen:

Apache Kafka und Daten-Streaming mit Spark Streaming

Das bedeutet, dass alles so funktioniert, wie wir es wollten – der Produzent generiert und sendet Nachrichten zu dem von uns benötigten Thema.
Der nächste Schritt besteht darin, Spark zu installieren und diesen Nachrichtenfluss zu verarbeiten.

Apache Spark installieren

Apache Funken ist eine vielseitige und leistungsstarke Cluster-Computing-Plattform.

Spark übertrifft gängige Implementierungen des MapReduce-Modells hinsichtlich der Leistung und bietet gleichzeitig Unterstützung für ein breiteres Spektrum an Berechnungstypen, einschließlich interaktiver Abfragen und Streaming. Geschwindigkeit spielt bei der Verarbeitung großer Datenmengen eine wichtige Rolle, denn gerade diese Geschwindigkeit ermöglicht ein interaktives Arbeiten, ohne minuten- oder stundenlang warten zu müssen. Eine der größten Stärken von Spark bei der Bereitstellung dieser Geschwindigkeit ist die Fähigkeit, In-Memory-Berechnungen durchzuführen.

Dieses Framework ist in Scala geschrieben, daher müssen Sie es zuerst installieren:

sudo apt-get install scala

Laden Sie die Spark-Distribution von der offiziellen Website herunter:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Archiv entpacken:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Fügen Sie den Pfad zu Spark zur Bash-Datei hinzu:

vim ~/.bashrc

Fügen Sie über den Editor die folgenden Zeilen hinzu:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Führen Sie den folgenden Befehl aus, nachdem Sie Änderungen an bashrc vorgenommen haben:

source ~/.bashrc

Bereitstellen von AWS PostgreSQL

Es bleibt die Datenbank zu erweitern, in die wir die verarbeiteten Informationen aus den Streams eintragen werden. Dazu nutzen wir den AWS RDS-Dienst.

Gehen Sie zur AWS-Konsole -> AWS RDS -> Datenbanken -> Datenbank erstellen:
Apache Kafka und Daten-Streaming mit Spark Streaming

Wählen Sie PostgreSQL aus und klicken Sie auf die Schaltfläche Weiter:
Apache Kafka und Daten-Streaming mit Spark Streaming

Weil Da dieses Beispiel ausschließlich zu Bildungszwecken analysiert wird, werden wir „mindestens“ einen kostenlosen Server (kostenloses Kontingent) verwenden:
Apache Kafka und Daten-Streaming mit Spark Streaming

Als nächstes überprüfen wir den Free Tier-Block, und danach wird uns automatisch eine Instanz der Klasse t2.micro angeboten – zwar schwach, aber kostenlos und für unsere Aufgabe durchaus geeignet:
Apache Kafka und Daten-Streaming mit Spark Streaming

Es folgen ganz wichtige Dinge: der Name der DB-Instanz, der Name des Master-Benutzers und sein Passwort. Nennen wir die Instanz: myHabrTest, Hauptbenutzer: habr, Passwort: habr12345 und klicken Sie auf die Schaltfläche Weiter:
Apache Kafka und Daten-Streaming mit Spark Streaming

Auf der nächsten Seite finden Sie die Parameter, die für die Erreichbarkeit unseres Datenbankservers von außen (öffentliche Erreichbarkeit) und die Verfügbarkeit von Ports verantwortlich sind:

Apache Kafka und Daten-Streaming mit Spark Streaming

Erstellen wir eine neue Einstellung für die VPC-Sicherheitsgruppe, die den externen Zugriff auf unseren Datenbankserver über Port 5432 (PostgreSQL) ermöglicht.
Gehen wir in einem separaten Browserfenster zur AWS-Konsole im Abschnitt VPC-Dashboard -> Sicherheitsgruppen -> Sicherheitsgruppe erstellen:
Apache Kafka und Daten-Streaming mit Spark Streaming

Wir legen den Namen für die Sicherheitsgruppe fest – PostgreSQL, eine Beschreibung, geben an, mit welcher VPC diese Gruppe verknüpft werden soll und klicken auf die Schaltfläche „Erstellen“:
Apache Kafka und Daten-Streaming mit Spark Streaming

Wir füllen für die neu erstellte Gruppe Inbound-Regeln für Port 5432 aus, wie im Bild unten gezeigt. Sie können den Port nicht manuell angeben, sondern wählen PostgreSQL aus der Dropdown-Liste Typ aus.

Streng genommen bedeutet der Wert ::/0 die Verfügbarkeit von eingehendem Datenverkehr für den Server aus der ganzen Welt, was kanonisch nicht ganz stimmt, aber um das Beispiel zu analysieren, verwenden wir diesen Ansatz:
Apache Kafka und Daten-Streaming mit Spark Streaming

Wir kehren zur Browserseite zurück, wo wir „Erweiterte Einstellungen konfigurieren“ geöffnet haben und wählen VPC-Sicherheitsgruppen -> Vorhandene VPC-Sicherheitsgruppen auswählen -> PostgreSQL im Abschnitt:
Apache Kafka und Daten-Streaming mit Spark Streaming

Als nächstes im Abschnitt Datenbankoptionen -> Datenbankname -> Namen festlegen - habrDB.

Wir können die restlichen Parameter beibehalten, mit Ausnahme der standardmäßigen Deaktivierung der Sicherung (Aufbewahrungsdauer der Sicherung – 0 Tage), der Überwachung und der Leistungseinblicke. Klicken Sie auf die Schaltfläche Datenbank erstellen:
Apache Kafka und Daten-Streaming mit Spark Streaming

Stream-Handler

Der letzte Schritt wird die Entwicklung eines Spark-Jobs sein, der alle zwei Sekunden neue Daten von Kafka verarbeitet und das Ergebnis in die Datenbank einträgt.

Wie oben erwähnt, sind Prüfpunkte der Hauptmechanismus in SparkStreaming, der konfiguriert werden muss, um Fehlertoleranz bereitzustellen. Wir werden Prüfpunkte verwenden und im Falle eines Verfahrensfehlers muss das Spark-Streaming-Modul nur zum letzten Prüfpunkt zurückkehren und die Berechnungen von dort aus fortsetzen, um die verlorenen Daten wiederherzustellen.

Ein Prüfpunkt kann aktiviert werden, indem ein Verzeichnis auf einem fehlertoleranten, zuverlässigen Dateisystem (z. B. HDFS, S3 usw.) festgelegt wird, in dem die Prüfpunktinformationen gespeichert werden. Dies geschieht zum Beispiel mit:

streamingContext.checkpoint(checkpointDirectory)

In unserem Beispiel verwenden wir den folgenden Ansatz: Wenn das checkpointDirectory vorhanden ist, wird der Kontext aus den Checkpoint-Daten neu erstellt. Wenn das Verzeichnis nicht existiert (d. h. es wird zum ersten Mal ausgeführt), wird die Funktion „functionToCreateContext“ aufgerufen, um einen neuen Kontext zu erstellen und DStreams einzurichten:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Wir erstellen ein DirectStream-Objekt, um mit der Methode „createDirectStream“ der KafkaUtils-Bibliothek eine Verbindung zum Thema „Transaktion“ herzustellen:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Eingehende Daten im JSON-Format analysieren:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Mit Spark SQL führen wir eine einfache Gruppierung durch und geben das Ergebnis an die Konsole aus:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Abfragetext abrufen und über Spark SQL ausführen:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Und dann speichern wir die empfangenen aggregierten Daten in einer Tabelle in AWS RDS. Um die Ergebnisse der Aggregation in einer Datenbanktabelle zu speichern, verwenden wir die Schreibmethode des DataFrame-Objekts:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Ein paar Worte zum Einrichten einer Verbindung zu AWS RDS. Wir haben den Benutzer und das Passwort dafür im Schritt „AWS PostgreSQL bereitstellen“ erstellt. Als URL des Datenbankservers sollten Sie den Endpunkt verwenden, der im Abschnitt Konnektivität & Sicherheit angezeigt wird:

Apache Kafka und Daten-Streaming mit Spark Streaming

Um Spark und Kafka korrekt zu verbinden, sollten Sie den Job über smark-submit unter Verwendung des Artefakts ausführen spark-streaming-kafka-0-8_2.11. Darüber hinaus werden wir auch ein Artefakt für die Interaktion mit der PostgreSQL-Datenbank verwenden und diese über --packages weiterleiten.

Zur Flexibilität des Skripts nehmen wir auch den Namen des Nachrichtenservers und das Thema, von dem wir Daten erhalten möchten, als Eingabeparameter heraus.

Es ist also an der Zeit, das System auszuführen und zu testen:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Es hat alles geklappt! Wie Sie im Bild unten sehen können, werden während der Ausführung der Anwendung alle 2 Sekunden neue Aggregationsergebnisse angezeigt, da wir das Bündelungsintervall beim Erstellen des StreamingContext-Objekts auf 2 Sekunden festgelegt haben:

Apache Kafka und Daten-Streaming mit Spark Streaming

Als Nächstes führen wir eine einfache Abfrage an die Datenbank durch, um nach Datensätzen in der Tabelle zu suchen transaktionsfluss:

Apache Kafka und Daten-Streaming mit Spark Streaming

Abschluss

In diesem Artikel wurde ein Beispiel für die Verarbeitung von Streaming-Informationen mithilfe von Spark Streaming in Verbindung mit Apache Kafka und PostgreSQL betrachtet. Angesichts der wachsenden Datenmengen aus verschiedenen Quellen kann der praktische Wert von Spark Streaming für die Erstellung von Echtzeit- und Streaming-Anwendungen kaum überschätzt werden.

Den vollständigen Quellcode finden Sie in meinem Repository unter GitHub.

Ich diskutiere diesen Artikel gerne, freue mich auf Ihre Kommentare und hoffe auf konstruktive Kritik aller interessierten Leser.

Ich wünsche Ihnen viel Erfolg!

Ps. Ursprünglich war geplant, eine lokale PostgreSQL-Datenbank zu verwenden, aber aufgrund meiner Liebe zu AWS entschied ich mich, die Datenbank in die Cloud zu verlagern. Im nächsten Artikel zu diesem Thema zeige ich Ihnen, wie Sie das oben beschriebene Gesamtsystem mithilfe von AWS Kinesis und AWS EMR in AWS implementieren. Verfolgen Sie die Nachrichten!

Source: habr.com

Kommentar hinzufügen