Apache Kafka et le traitement des données en streaming avec Spark Streaming

Bonjour Habr! Aujourd'hui, nous allons créer un système qui traitera les flux de messages Apache Kafka à l'aide de Spark Streaming et écrira les résultats du traitement dans la base de données cloud AWS RDS.

Imaginons qu’un certain établissement de crédit nous confie la tâche de traiter les transactions entrantes « à la volée » dans toutes ses succursales. Cela peut être fait dans le but de calculer rapidement une position en devise ouverte pour la trésorerie, des limites ou des résultats financiers pour les transactions, etc.

Comment mettre en œuvre cette affaire sans utiliser de magie et de sorts magiques - lisez sous la coupe ! Aller!

Apache Kafka et le traitement des données en streaming avec Spark Streaming
(Source de l'image)

introduction

Bien entendu, le traitement d’une grande quantité de données en temps réel offre de nombreuses possibilités d’utilisation dans les systèmes modernes. L'une des combinaisons les plus populaires pour cela est le tandem Apache Kafka et Spark Streaming, dans lequel Kafka crée un flux de paquets de messages entrants et Spark Streaming traite ces paquets à un intervalle de temps donné.

Pour augmenter la tolérance aux pannes de l'application, nous utiliserons des points de contrôle. Avec ce mécanisme, lorsque le moteur Spark Streaming a besoin de récupérer des données perdues, il lui suffit de revenir au dernier point de contrôle et de reprendre les calculs à partir de là.

Architecture du système développé

Apache Kafka et le traitement des données en streaming avec Spark Streaming

Composants utilisés :

  • Apache Kafka est un système de messagerie distribué de publication-abonnement. Convient à la consommation de messages hors ligne et en ligne. Pour éviter la perte de données, les messages Kafka sont stockés sur disque et répliqués au sein du cluster. Le système Kafka est construit sur le service de synchronisation ZooKeeper ;
  • Apache Spark Diffusion - Composant Spark pour le traitement des données en streaming. Le module Spark Streaming est construit à l'aide d'une architecture micro-batch, où le flux de données est interprété comme une séquence continue de petits paquets de données. Spark Streaming récupère les données de différentes sources et les combine en petits packages. De nouveaux packages sont créés à intervalles réguliers. Au début de chaque intervalle de temps, un nouveau paquet est créé et toutes les données reçues pendant cet intervalle sont incluses dans le paquet. À la fin de l'intervalle, la croissance des paquets s'arrête. La taille de l'intervalle est déterminée par un paramètre appelé intervalle de lot ;
  • Apache SparkSQL - combine le traitement relationnel avec la programmation fonctionnelle Spark. Les données structurées désignent les données qui ont un schéma, c'est-à-dire un ensemble unique de champs pour tous les enregistrements. Spark SQL prend en charge la saisie d'une variété de sources de données structurées et, grâce à la disponibilité des informations de schéma, il peut récupérer efficacement uniquement les champs d'enregistrement requis et fournit également des API DataFrame ;
  • AWSRDS est une base de données relationnelle basée sur le cloud, un service Web relativement peu coûteux qui simplifie la configuration, le fonctionnement et la mise à l'échelle, et est administré directement par Amazon.

Installer et exécuter le serveur Kafka

Avant d'utiliser Kafka directement, vous devez vous assurer que vous disposez de Java, car... JVM est utilisé pour le travail :

sudo apt-get update 
sudo apt-get install default-jre
java -version

Créons un nouvel utilisateur pour travailler avec Kafka :

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Ensuite, téléchargez la distribution depuis le site officiel d'Apache Kafka :

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Décompressez l'archive téléchargée :

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

L'étape suivante est facultative. Le fait est que les paramètres par défaut ne vous permettent pas d'utiliser pleinement toutes les capacités d'Apache Kafka. Par exemple, supprimez un sujet, une catégorie, un groupe dans lequel les messages peuvent être publiés. Pour changer cela, éditons le fichier de configuration :

vim ~/kafka/config/server.properties

Ajoutez ce qui suit à la fin du fichier :

delete.topic.enable = true

Avant de démarrer le serveur Kafka, vous devez démarrer le serveur ZooKeeper ; nous utiliserons le script auxiliaire fourni avec la distribution Kafka :

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Une fois ZooKeeper démarré avec succès, lancez le serveur Kafka dans un terminal séparé :

bin/kafka-server-start.sh config/server.properties

Créons un nouveau sujet appelé Transaction :

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Assurons-nous qu'un sujet avec le nombre de partitions et de réplication requis a été créé :

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka et le traitement des données en streaming avec Spark Streaming

Manquons les moments de test du producteur et du consommateur pour le sujet nouvellement créé. Plus de détails sur la façon dont vous pouvez tester l'envoi et la réception de messages sont écrits dans la documentation officielle - Envoyez des messages. Eh bien, passons à l'écriture d'un producteur en Python à l'aide de l'API KafkaProducer.

Écriture du producteur

Le producteur générera des données aléatoires – 100 messages par seconde. Par données aléatoires, nous entendons un dictionnaire composé de trois champs :

  • Branche — le nom du point de vente de l'établissement de crédit;
  • Devise - devise de la transaction;
  • Montant - Montant de la transaction. Le montant sera un nombre positif s'il s'agit d'un achat de devises par la Banque, et un nombre négatif s'il s'agit d'une vente.

Le code du producteur ressemble à ceci :

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Ensuite, en utilisant la méthode send, nous envoyons un message au serveur, au sujet dont nous avons besoin, au format JSON :

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Lors de l'exécution du script, nous recevons les messages suivants dans le terminal :

Apache Kafka et le traitement des données en streaming avec Spark Streaming

Cela signifie que tout fonctionne comme nous le souhaitions - le producteur génère et envoie des messages sur le sujet dont nous avons besoin.
L'étape suivante consiste à installer Spark et à traiter ce flux de messages.

Installation d'Apache Spark

Apache Spark est une plateforme informatique en cluster universelle et hautes performances.

Spark fonctionne mieux que les implémentations populaires du modèle MapReduce tout en prenant en charge un plus large éventail de types de calcul, notamment les requêtes interactives et le traitement de flux. La vitesse joue un rôle important lors du traitement de grandes quantités de données, car c'est la vitesse qui vous permet de travailler de manière interactive sans passer des minutes ou des heures à attendre. L'un des plus grands atouts de Spark qui le rend si rapide est sa capacité à effectuer des calculs en mémoire.

Ce framework est écrit en Scala, vous devez donc d'abord l'installer :

sudo apt-get install scala

Téléchargez la distribution Spark depuis le site officiel :

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Décompressez l'archive :

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Ajoutez le chemin d'accès à Spark au fichier bash :

vim ~/.bashrc

Ajoutez les lignes suivantes via l'éditeur :

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Exécutez la commande ci-dessous après avoir apporté des modifications à bashrc :

source ~/.bashrc

Déploiement d'AWS PostgreSQL

Il ne reste plus qu'à déployer la base de données dans laquelle nous téléchargerons les informations traitées depuis les flux. Pour cela, nous utiliserons le service AWS RDS.

Accédez à la console AWS -> AWS RDS -> Bases de données -> Créer une base de données :
Apache Kafka et le traitement des données en streaming avec Spark Streaming

Sélectionnez PostgreSQL et cliquez sur Suivant :
Apache Kafka et le traitement des données en streaming avec Spark Streaming

Parce que Cet exemple est uniquement à des fins pédagogiques ; nous utiliserons un serveur gratuit « au minimum » (Free Tier) :
Apache Kafka et le traitement des données en streaming avec Spark Streaming

Ensuite, nous cochons le bloc Free Tier, et après cela, une instance de la classe t2.micro nous sera automatiquement proposée - bien que faible, elle est gratuite et tout à fait adaptée à notre tâche :
Apache Kafka et le traitement des données en streaming avec Spark Streaming

Viennent ensuite des éléments très importants : le nom de l'instance de base de données, le nom de l'utilisateur maître et son mot de passe. Nommons l'instance : myHabrTest, utilisateur principal : habr, le mot de passe: habr12345 et cliquez sur le bouton Suivant :
Apache Kafka et le traitement des données en streaming avec Spark Streaming

Sur la page suivante se trouvent les paramètres responsables de l'accessibilité de notre serveur de base de données de l'extérieur (accessibilité publique) et de la disponibilité des ports :

Apache Kafka et le traitement des données en streaming avec Spark Streaming

Créons un nouveau paramètre pour le groupe de sécurité VPC, qui permettra l'accès externe à notre serveur de base de données via le port 5432 (PostgreSQL).
Accédez à la console AWS dans une fenêtre de navigateur distincte dans la section Tableau de bord VPC -> Groupes de sécurité -> Créer un groupe de sécurité :
Apache Kafka et le traitement des données en streaming avec Spark Streaming

Nous définissons le nom du groupe de sécurité - PostgreSQL, une description, indiquons à quel VPC ce groupe doit être associé et cliquons sur le bouton Créer :
Apache Kafka et le traitement des données en streaming avec Spark Streaming

Remplissez les règles entrantes pour le port 5432 pour le groupe nouvellement créé, comme indiqué dans l'image ci-dessous. Vous ne pouvez pas spécifier le port manuellement, mais sélectionnez PostgreSQL dans la liste déroulante Type.

À proprement parler, la valeur ::/0 signifie la disponibilité du trafic entrant vers le serveur en provenance du monde entier, ce qui n'est canoniquement pas tout à fait vrai, mais pour analyser l'exemple, permettons-nous d'utiliser cette approche :
Apache Kafka et le traitement des données en streaming avec Spark Streaming

Nous revenons à la page du navigateur, où nous avons « Configurer les paramètres avancés » ouvert et sélectionné dans la section Groupes de sécurité VPC -> Choisir les groupes de sécurité VPC existants -> PostgreSQL :
Apache Kafka et le traitement des données en streaming avec Spark Streaming

Ensuite, dans les options de la base de données -> Nom de la base de données -> définissez le nom - habrDB.

Nous pouvons laisser les paramètres restants, à l'exception de la désactivation de la sauvegarde (durée de conservation des sauvegardes - 0 jour), de la surveillance et de Performance Insights, par défaut. Cliquez sur le bouton Créer une base de données:
Apache Kafka et le traitement des données en streaming avec Spark Streaming

Gestionnaire de threads

La dernière étape sera le développement d'un travail Spark, qui traitera les nouvelles données provenant de Kafka toutes les deux secondes et saisira le résultat dans la base de données.

Comme indiqué ci-dessus, les points de contrôle constituent un mécanisme central de SparkStreaming qui doit être configuré pour garantir la tolérance aux pannes. Nous utiliserons des points de contrôle et, si la procédure échoue, le module Spark Streaming n'aura qu'à revenir au dernier point de contrôle et à reprendre les calculs à partir de celui-ci pour récupérer les données perdues.

Le point de contrôle peut être activé en définissant un répertoire sur un système de fichiers fiable et tolérant aux pannes (tel que HDFS, S3, etc.) dans lequel les informations du point de contrôle seront stockées. Cela se fait en utilisant, par exemple :

streamingContext.checkpoint(checkpointDirectory)

Dans notre exemple, nous utiliserons l'approche suivante, à savoir que si checkpointDirectory existe, alors le contexte sera recréé à partir des données du point de contrôle. Si le répertoire n'existe pas (c'est-à-dire s'il est exécuté pour la première fois), alors functionToCreateContext est appelé pour créer un nouveau contexte et configurer DStreams :

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Nous créons un objet DirectStream pour nous connecter au sujet « transaction » en utilisant la méthode createDirectStream de la bibliothèque KafkaUtils :

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Analyse des données entrantes au format JSON :

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

À l'aide de Spark SQL, nous effectuons un regroupement simple et affichons le résultat dans la console :

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Récupérer le texte de la requête et l'exécuter via Spark SQL :

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Et puis nous enregistrons les données agrégées résultantes dans une table dans AWS RDS. Pour enregistrer les résultats de l'agrégation dans une table de base de données, nous utiliserons la méthode write de l'objet DataFrame :

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Quelques mots sur la configuration d'une connexion à AWS RDS. Nous avons créé l'utilisateur et le mot de passe correspondant à l'étape « Déploiement d'AWS PostgreSQL ». Vous devez utiliser Endpoint comme URL du serveur de base de données, qui est affichée dans la section Connectivité et sécurité :

Apache Kafka et le traitement des données en streaming avec Spark Streaming

Afin de connecter correctement Spark et Kafka, vous devez exécuter le travail via smark-submit à l'aide de l'artefact étincelle-streaming-kafka-0-8_2.11. De plus, nous utiliserons également un artefact pour interagir avec la base de données PostgreSQL ; nous les transférerons via --packages.

Pour la flexibilité du script, nous inclurons également comme paramètres d'entrée le nom du serveur de messages et le sujet à partir duquel nous souhaitons recevoir des données.

Il est donc temps de lancer et de vérifier les fonctionnalités du système :

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Tout s'est bien passé ! Comme vous pouvez le voir dans l'image ci-dessous, pendant l'exécution de l'application, de nouveaux résultats d'agrégation sont générés toutes les 2 secondes, car nous avons défini l'intervalle de traitement par lots sur 2 secondes lorsque nous avons créé l'objet StreamingContext :

Apache Kafka et le traitement des données en streaming avec Spark Streaming

Ensuite, nous effectuons une simple requête à la base de données pour vérifier la présence d'enregistrements dans la table. flux_transaction:

Apache Kafka et le traitement des données en streaming avec Spark Streaming

Conclusion

Cet article a examiné un exemple de traitement de flux d'informations à l'aide de Spark Streaming en conjonction avec Apache Kafka et PostgreSQL. Avec la croissance des données provenant de diverses sources, il est difficile de surestimer la valeur pratique de Spark Streaming pour la création d'applications de streaming et en temps réel.

Vous pouvez trouver le code source complet dans mon référentiel à l'adresse GitHub.

Je suis heureux de discuter de cet article, j'attends avec impatience vos commentaires et j'espère également des critiques constructives de la part de tous les lecteurs attentionnés.

Je vous souhaite du succès!

Ps. Au départ, il était prévu d'utiliser une base de données PostgreSQL locale, mais étant donné mon amour pour AWS, j'ai décidé de déplacer la base de données vers le cloud. Dans le prochain article sur ce sujet, je montrerai comment implémenter l'ensemble du système décrit ci-dessus dans AWS à l'aide d'AWS Kinesis et d'AWS EMR. Suivez l'actualité !

Source: habr.com

Ajouter un commentaire