Apache Kafka en streaminggegevensverwerking met Spark Streaming

Hallo, Habr! Vandaag gaan we een systeem bouwen dat Apache Kafka-berichtenstromen verwerkt met behulp van Spark Streaming en de verwerkingsresultaten naar de AWS RDS-clouddatabase schrijft.

Laten we ons voorstellen dat een bepaalde kredietinstelling ons de taak geeft om inkomende transacties ‘on the fly’ in al haar vestigingen te verwerken. Dit kan worden gedaan om snel een open valutapositie voor de schatkist, limieten of financiële resultaten voor transacties, enz. te berekenen.

Hoe je deze zaak kunt implementeren zonder het gebruik van magie en magische spreuken - lees onder de snede! Gaan!

Apache Kafka en streaminggegevensverwerking met Spark Streaming
(Afbeeldingsbron)

Introductie

Uiteraard biedt het realtime verwerken van een grote hoeveelheid data volop mogelijkheden voor gebruik in moderne systemen. Een van de meest populaire combinaties hiervoor is de tandem van Apache Kafka en Spark Streaming, waarbij Kafka een stroom binnenkomende berichtenpakketten creëert en Spark Streaming deze pakketten met een bepaald tijdsinterval verwerkt.

Om de fouttolerantie van de applicatie te vergroten, zullen we controlepunten gebruiken. Met dit mechanisme hoeft de Spark Streaming-engine, wanneer hij verloren gegevens moet herstellen, alleen maar terug te gaan naar het laatste controlepunt en vanaf daar de berekeningen te hervatten.

Architectuur van het ontwikkelde systeem

Apache Kafka en streaminggegevensverwerking met Spark Streaming

Gebruikte componenten:

  • Apache Kafka is een gedistribueerd berichtensysteem voor publiceren en abonneren. Geschikt voor zowel offline als online berichtconsumptie. Om gegevensverlies te voorkomen, worden Kafka-berichten op schijf opgeslagen en binnen het cluster gerepliceerd. Het Kafka-systeem is gebouwd bovenop de ZooKeeper-synchronisatieservice;
  • Apache Spark-streaming - Spark-component voor het verwerken van streaminggegevens. De Spark Streaming-module is gebouwd met behulp van een micro-batch-architectuur, waarbij de datastroom wordt geïnterpreteerd als een continue reeks kleine datapakketten. Spark Streaming haalt gegevens uit verschillende bronnen en combineert deze in kleine pakketten. Er worden met regelmatige tussenpozen nieuwe pakketten gemaakt. Aan het begin van elk tijdsinterval wordt een nieuw pakket gemaakt en alle gegevens die tijdens dat interval worden ontvangen, worden in het pakket opgenomen. Aan het einde van het interval stopt de pakketgroei. De grootte van het interval wordt bepaald door een parameter die het batchinterval wordt genoemd;
  • Apache Spark-SQL - combineert relationele verwerking met functionele Spark-programmering. Gestructureerde gegevens zijn gegevens met een schema, dat wil zeggen één enkele set velden voor alle records. Spark SQL ondersteunt invoer uit een verscheidenheid aan gestructureerde gegevensbronnen en kan, dankzij de beschikbaarheid van schema-informatie, efficiënt alleen de vereiste velden met records ophalen, en biedt ook DataFrame API's;
  • AWS-RDS is een relatief goedkope cloudgebaseerde relationele database, een webservice die de installatie, bediening en schaling vereenvoudigt, en rechtstreeks door Amazon wordt beheerd.

De Kafka-server installeren en uitvoeren

Voordat u Kafka rechtstreeks gebruikt, moet u ervoor zorgen dat u over Java beschikt, omdat... JVM wordt gebruikt voor werk:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Laten we een nieuwe gebruiker aanmaken om met Kafka te werken:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Download vervolgens de distributie van de officiële Apache Kafka-website:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Pak het gedownloade archief uit:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

De volgende stap is optioneel. Feit is dat je met de standaardinstellingen niet alle mogelijkheden van Apache Kafka volledig kunt gebruiken. Verwijder bijvoorbeeld een onderwerp, categorie of groep waarnaar berichten kunnen worden gepubliceerd. Om dit te veranderen, gaan we het configuratiebestand bewerken:

vim ~/kafka/config/server.properties

Voeg het volgende toe aan het einde van het bestand:

delete.topic.enable = true

Voordat u de Kafka-server start, moet u de ZooKeeper-server starten; we zullen het hulpscript gebruiken dat bij de Kafka-distributie wordt geleverd:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Nadat ZooKeeper succesvol is gestart, start u de Kafka-server in een aparte terminal:

bin/kafka-server-start.sh config/server.properties

Laten we een nieuw onderwerp maken met de naam Transactie:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Laten we ervoor zorgen dat er een onderwerp met het vereiste aantal partities en replicatie is gemaakt:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka en streaminggegevensverwerking met Spark Streaming

Laten we de momenten missen waarin de producent en de consument worden getest op het nieuw gecreëerde onderwerp. Meer details over hoe u het verzenden en ontvangen van berichten kunt testen, vindt u in de officiële documentatie - Stuur wat berichten. Welnu, we gaan verder met het schrijven van een producer in Python met behulp van de KafkaProducer API.

Producent schrijft

De producent genereert willekeurige gegevens: 100 berichten per seconde. Met willekeurige gegevens bedoelen we een woordenboek dat uit drie velden bestaat:

  • Tak — naam van het verkooppunt van de kredietinstelling;
  • Valuta - munteenheid van de transactie;
  • Hoeveelheid — transactiebedrag. Het bedrag zal een positief getal zijn als het een aankoop van valuta door de Bank betreft, en een negatief getal als het een verkoop betreft.

De code voor de producer ziet er als volgt uit:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Vervolgens sturen we met behulp van de verzendmethode een bericht naar de server, naar het onderwerp dat we nodig hebben, in JSON-formaat:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Bij het uitvoeren van het script ontvangen we de volgende berichten in de terminal:

Apache Kafka en streaminggegevensverwerking met Spark Streaming

Dit betekent dat alles werkt zoals we wilden: de producent genereert en verzendt berichten over het onderwerp dat we nodig hebben.
De volgende stap is het installeren van Spark en het verwerken van deze berichtenstroom.

Apache Spark installeren

Apache Spark is een universeel en krachtig clustercomputerplatform.

Spark presteert beter dan populaire implementaties van het MapReduce-model en ondersteunt tegelijkertijd een breder scala aan berekeningstypen, waaronder interactieve query's en streamverwerking. Snelheid speelt een belangrijke rol bij het verwerken van grote hoeveelheden gegevens, omdat het de snelheid is waarmee u interactief kunt werken zonder minuten of uren te moeten wachten. Een van de grootste sterke punten van Spark die hem zo snel maakt, is de mogelijkheid om berekeningen in het geheugen uit te voeren.

Dit framework is geschreven in Scala, dus je moet het eerst installeren:

sudo apt-get install scala

Download de Spark-distributie van de officiële website:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Pak het archief uit:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Voeg het pad naar Spark toe aan het bash-bestand:

vim ~/.bashrc

Voeg de volgende regels toe via de editor:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Voer de onderstaande opdracht uit nadat u wijzigingen in bashrc hebt aangebracht:

source ~/.bashrc

AWS PostgreSQL implementeren

Het enige dat overblijft is het implementeren van de database waarin we de verwerkte informatie uit de streams zullen uploaden. Hiervoor zullen wij gebruik maken van de AWS RDS service.

Ga naar de AWS-console -> AWS RDS -> Databases -> Database aanmaken:
Apache Kafka en streaminggegevensverwerking met Spark Streaming

Selecteer PostgreSQL en klik op Volgende:
Apache Kafka en streaminggegevensverwerking met Spark Streaming

Omdat Dit voorbeeld is alleen voor educatieve doeleinden; we zullen “minimaal” een gratis server gebruiken (Free Tier):
Apache Kafka en streaminggegevensverwerking met Spark Streaming

Vervolgens zetten we een vinkje in het Free Tier-blok, en daarna krijgen we automatisch een exemplaar van de klasse t2.micro aangeboden - hoewel zwak, is het gratis en redelijk geschikt voor onze taak:
Apache Kafka en streaminggegevensverwerking met Spark Streaming

Vervolgens komen heel belangrijke dingen: de naam van de database-instantie, de naam van de hoofdgebruiker en zijn wachtwoord. Laten we de instantie een naam geven: myHabrTest, hoofdgebruiker: habr, wachtwoord: habr12345 en klik op de knop Volgende:
Apache Kafka en streaminggegevensverwerking met Spark Streaming

Op de volgende pagina staan ​​parameters die verantwoordelijk zijn voor de toegankelijkheid van onze databaseserver van buitenaf (openbare toegankelijkheid) en poortbeschikbaarheid:

Apache Kafka en streaminggegevensverwerking met Spark Streaming

Laten we een nieuwe instelling maken voor de VPC-beveiligingsgroep, die externe toegang tot onze databaseserver mogelijk maakt via poort 5432 (PostgreSQL).
Laten we naar de AWS-console gaan in een apart browservenster naar het VPC Dashboard -> Beveiligingsgroepen -> Beveiligingsgroep aanmaken sectie:
Apache Kafka en streaminggegevensverwerking met Spark Streaming

We stellen de naam in voor de beveiligingsgroep - PostgreSQL, een beschrijving, geven aan aan welke VPC deze groep moet worden gekoppeld en klikken op de knop Maken:
Apache Kafka en streaminggegevensverwerking met Spark Streaming

Vul de regels voor inkomend verkeer voor poort 5432 in voor de nieuw aangemaakte groep, zoals weergegeven in de onderstaande afbeelding. U kunt de poort niet handmatig opgeven, maar selecteer PostgreSQL in de vervolgkeuzelijst Type.

Strikt genomen betekent de waarde ::/0 de beschikbaarheid van inkomend verkeer naar de server van over de hele wereld, wat canoniek gezien niet helemaal waar is, maar om het voorbeeld te analyseren, laten we onszelf toestaan ​​deze aanpak te gebruiken:
Apache Kafka en streaminggegevensverwerking met Spark Streaming

We keren terug naar de browserpagina, waar we “Geavanceerde instellingen configureren” hebben geopend en selecteren in de sectie VPC-beveiligingsgroepen -> Kies bestaande VPC-beveiligingsgroepen -> PostgreSQL:
Apache Kafka en streaminggegevensverwerking met Spark Streaming

Stel vervolgens in de Database-opties -> Databasenaam -> de naam in - habrDB.

De overige parameters kunnen we standaard laten staan, met uitzondering van het uitschakelen van back-up (retentieperiode back-up - 0 dagen), monitoring en prestatie-inzichten. Klik op de knop Maak een database:
Apache Kafka en streaminggegevensverwerking met Spark Streaming

Draadafhandelaar

De laatste fase zal de ontwikkeling zijn van een Spark-taak, die elke twee seconden nieuwe gegevens uit Kafka zal verwerken en het resultaat in de database zal invoeren.

Zoals hierboven vermeld, zijn controlepunten een kernmechanisme in SparkStreaming dat moet worden geconfigureerd om fouttolerantie te garanderen. We zullen controlepunten gebruiken en als de procedure mislukt, hoeft de Spark Streaming-module alleen terug te keren naar het laatste controlepunt en de berekeningen daar te hervatten om de verloren gegevens te herstellen.

Controlepunten kunnen worden ingeschakeld door een map in te stellen op een fouttolerant, betrouwbaar bestandssysteem (zoals HDFS, S3, enz.) waarin de controlepuntinformatie wordt opgeslagen. Dit gebeurt bijvoorbeeld met behulp van:

streamingContext.checkpoint(checkpointDirectory)

In ons voorbeeld gebruiken we de volgende aanpak: als checkpointDirectory bestaat, wordt de context opnieuw gemaakt op basis van de controlepuntgegevens. Als de map niet bestaat (d.w.z. voor de eerste keer uitgevoerd), wordt functionToCreateContext aangeroepen om een ​​nieuwe context te creëren en DStreams te configureren:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

We maken een DirectStream-object om verbinding te maken met het onderwerp “transactie” met behulp van de createDirectStream-methode van de KafkaUtils-bibliotheek:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Parseren van binnenkomende gegevens in JSON-formaat:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Met behulp van Spark SQL voeren we een eenvoudige groepering uit en geven we het resultaat weer in de console:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

De querytekst ophalen en deze via Spark SQL uitvoeren:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

En vervolgens slaan we de resulterende geaggregeerde gegevens op in een tabel in AWS RDS. Om de aggregatieresultaten in een databasetabel op te slaan, gebruiken we de schrijfmethode van het DataFrame-object:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Een paar woorden over het opzetten van een verbinding met AWS RDS. We hebben de gebruiker en het wachtwoord ervoor aangemaakt bij de stap “Deploying AWS PostgreSQL”. U moet Endpoint gebruiken als de databaseserver-URL, die wordt weergegeven in de sectie Connectiviteit en beveiliging:

Apache Kafka en streaminggegevensverwerking met Spark Streaming

Om Spark en Kafka correct te verbinden, moet u de taak uitvoeren via smark-submit met behulp van het artefact spark-streaming-kafka-0-8_2.11. Daarnaast zullen we ook een artefact gebruiken voor interactie met de PostgreSQL-database; we zullen ze overbrengen via --packages.

Voor de flexibiliteit van het script zullen we als invoerparameters ook de naam van de berichtenserver en het onderwerp waarvan we gegevens willen ontvangen, opnemen.

Het is dus tijd om de functionaliteit van het systeem te starten en te controleren:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Alles is gelukt! Zoals u in de onderstaande afbeelding kunt zien, worden er, terwijl de toepassing actief is, elke 2 seconden nieuwe aggregatieresultaten uitgevoerd, omdat we het batchinterval hebben ingesteld op 2 seconden toen we het StreamingContext-object maakten:

Apache Kafka en streaminggegevensverwerking met Spark Streaming

Vervolgens voeren we een eenvoudige zoekopdracht uit in de database om de aanwezigheid van records in de tabel te controleren transactiestroom:

Apache Kafka en streaminggegevensverwerking met Spark Streaming

Conclusie

In dit artikel werd gekeken naar een voorbeeld van het streamen van informatie met behulp van Spark Streaming in combinatie met Apache Kafka en PostgreSQL. Met de groei van data uit verschillende bronnen is het moeilijk om de praktische waarde van Spark Streaming voor het creëren van streaming- en realtime-applicaties te overschatten.

Je kunt de volledige broncode vinden in mijn repository op GitHub.

Ik ben blij om dit artikel te bespreken, ik kijk uit naar uw opmerkingen, en ik hoop ook op opbouwende kritiek van alle zorgzame lezers.

Ik wens u veel succes!

Ps. Aanvankelijk was het de bedoeling om een ​​lokale PostgreSQL-database te gebruiken, maar gezien mijn liefde voor AWS besloot ik de database naar de cloud te verhuizen. In het volgende artikel over dit onderwerp zal ik laten zien hoe je het volledige hierboven beschreven systeem in AWS kunt implementeren met behulp van AWS Kinesis en AWS EMR. Volg het nieuws!

Bron: www.habr.com

Voeg een reactie