Apache Kafka og datastreaming med Spark Streaming

Hej Habr! I dag vil vi bygge et system, der vil behandle Apache Kafka-meddelelsesstrømme ved hjælp af Spark Streaming og skrive behandlingsresultatet til AWS RDS-skydatabasen.

Lad os forestille os, at et bestemt kreditinstitut stiller os opgaven med at behandle indgående transaktioner "on the fly" for alle sine filialer. Dette kan ske med det formål hurtigt at beregne den åbne valutaposition for statskassen, limits eller økonomisk resultat på transaktioner mv.

Sådan implementerer du denne sag uden brug af magi og magiske besværgelser - læs under klippet! Gå!

Apache Kafka og datastreaming med Spark Streaming
(Billedkilde)

Indledning

Naturligvis giver realtidsbehandling af en stor mængde data rige muligheder for brug i moderne systemer. En af de mest populære kombinationer for dette er tandem af Apache Kafka og Spark Streaming, hvor Kafka opretter en strøm af indgående beskedpakker, og Spark Streaming behandler disse pakker med et bestemt tidsinterval.

For at forbedre applikationens fejltolerance vil vi bruge checkpoints - checkpoints. Med denne mekanisme, når Spark Streaming-modulet skal gendanne tabte data, behøver det kun at vende tilbage til det sidste kontrolpunkt og genoptage beregninger derfra.

Arkitekturen af ​​det udviklede system

Apache Kafka og datastreaming med Spark Streaming

Brugte komponenter:

  • Apache Kafka er et distribueret udgiv-og-abonner-meddelelsessystem. Velegnet til både offline og online beskedforbrug. For at forhindre tab af data gemmes Kafka-meddelelser på disken og replikeres i klyngen. Kafka-systemet er bygget oven på ZooKeeper-synkroniseringstjenesten;
  • Apache Spark-streaming - en Spark-komponent til behandling af streamingdata. Spark Streaming-modulet er bygget ved hjælp af en mikro-batch-arkitektur, når en datastrøm fortolkes som en kontinuerlig sekvens af små datapakker. Spark Streaming tager data fra forskellige kilder og kombinerer dem i små batches. Nye pakker oprettes med jævne mellemrum. Ved starten af ​​hvert tidsinterval oprettes en ny pakke, og alle data, der modtages i dette interval, er inkluderet i pakken. Ved slutningen af ​​intervallet stopper pakkevæksten. Størrelsen af ​​intervallet bestemmes af en parameter kaldet batchintervallet;
  • Apache Spark SQL - Kombinerer relationel behandling med Spark funktionel programmering. Strukturerede data refererer til data, der har et skema, det vil sige et enkelt sæt felter for alle poster. Spark SQL understøtter input fra en række strukturerede datakilder, og på grund af tilstedeværelsen af ​​skemaoplysninger kan den effektivt kun udtrække de påkrævede felter af poster, og den leverer også DataFrame API'er;
  • AWS RDS er en relativt billig cloud-baseret relationsdatabase, en webservice, der forenkler opsætning, drift og skalering, administreret direkte af Amazon.

Installation og kørsel af Kafka-serveren

Før du bruger Kafka direkte, skal du sikre dig, at du har Java, pga JVM bruges til arbejde:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Lad os oprette en ny bruger til at arbejde med Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Download derefter distributionssættet fra det officielle Apache Kafka-websted:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Pak det downloadede arkiv ud:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Det næste trin er valgfrit. Faktum er, at standardindstillingerne ikke tillader dig fuldt ud at bruge alle funktionerne i Apache Kafka. Slet for eksempel et emne, kategori, gruppe, som meddelelser kan publiceres på. For at ændre dette, lad os redigere konfigurationsfilen:

vim ~/kafka/config/server.properties

Tilføj følgende til slutningen af ​​filen:

delete.topic.enable = true

Før du starter Kafka-serveren, skal du starte ZooKeeper-serveren, vi vil bruge hjælpescriptet, der følger med Kafka-distributionen:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Efter at ZooKeeper er startet, starter vi Kafka-serveren i en separat terminal:

bin/kafka-server-start.sh config/server.properties

Lad os oprette et nyt emne kaldet Transaktion:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Lad os sikre os, at der er oprettet et emne med det nødvendige antal partitioner og replikering:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka og datastreaming med Spark Streaming

Lad os gå glip af øjeblikke med at teste producenten og forbrugeren for det nyoprettede emne. Flere detaljer om, hvordan du kan teste at sende og modtage beskeder er skrevet i den officielle dokumentation - Send nogle beskeder. Nå, vi går videre til at skrive en producent i Python ved hjælp af KafkaProducer API.

Producer skriver

Producenten vil generere tilfældige data - 100 beskeder hvert sekund. Med tilfældige data mener vi en ordbog bestående af tre felter:

  • Branch — navnet på kreditinstituttets salgssted;
  • Valuta — transaktionsvaluta;
  • beløb - overførselsbeløb. Beløbet vil være positivt, hvis det er et køb af valuta fra banken, og negativt, hvis det er et salg.

Koden til producenten ser sådan ud:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Dernæst, ved hjælp af sendemetoden, sender vi en besked til serveren, til det emne, vi har brug for, i JSON-format:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Når vi kører scriptet, får vi følgende meddelelser i terminalen:

Apache Kafka og datastreaming med Spark Streaming

Det betyder, at alt fungerer, som vi ønskede – producenten genererer og sender beskeder til det emne, vi har brug for.
Det næste trin er at installere Spark og behandle dette meddelelsesflow.

Installation af Apache Spark

Apache Spark er en alsidig og højtydende klyngecomputerplatform.

Spark udkonkurrerer populære implementeringer af MapReduce-modellen med hensyn til ydeevne, samtidig med at den understøtter en bredere række af beregningstyper, herunder interaktive forespørgsler og streaming. Hastighed spiller en vigtig rolle ved behandling af store mængder data, da det er hastighed, der giver dig mulighed for at arbejde interaktivt uden at bruge minutter eller timer på at vente. En af Sparks største styrker til at levere denne hastighed er dens evne til at udføre in-memory beregninger.

Denne ramme er skrevet i Scala, så du skal installere den først:

sudo apt-get install scala

Download Spark-distributionen fra den officielle hjemmeside:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Pak arkivet ud:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Tilføj stien til Spark til bash-filen:

vim ~/.bashrc

Tilføj følgende linjer gennem editoren:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Kør kommandoen nedenfor efter at have foretaget ændringer til bashrc:

source ~/.bashrc

Implementering af AWS PostgreSQL

Det er tilbage at udvide databasen, hvor vi vil udfylde de behandlede oplysninger fra strømmene. For at gøre dette vil vi bruge AWS RDS-tjenesten.

Gå til AWS-konsollen -> AWS RDS -> Databaser -> Opret database:
Apache Kafka og datastreaming med Spark Streaming

Vælg PostgreSQL, og klik på knappen Næste:
Apache Kafka og datastreaming med Spark Streaming

Fordi dette eksempel analyseres udelukkende til uddannelsesformål, vi vil bruge en gratis server "som minimum" (Free Tier):
Apache Kafka og datastreaming med Spark Streaming

Tjek derefter Free Tier-blokken, og derefter vil vi automatisk blive tilbudt en forekomst af t2.micro-klassen - selvom det er svagt, men gratis og ganske velegnet til vores opgave:
Apache Kafka og datastreaming med Spark Streaming

Meget vigtige ting følger: navnet på DB-instansen, navnet på masterbrugeren og dennes adgangskode. Lad os kalde forekomsten: myHabrTest, hovedbruger: habr, adgangskode: habr12345 og klik på knappen Næste:
Apache Kafka og datastreaming med Spark Streaming

Den næste side indeholder de parametre, der er ansvarlige for tilgængeligheden af ​​vores databaseserver udefra (Public accessibility) og tilgængeligheden af ​​porte:

Apache Kafka og datastreaming med Spark Streaming

Lad os oprette en ny indstilling for VPC-sikkerhedsgruppen, som tillader ekstern adgang til vores databaseserver via port 5432 (PostgreSQL).
Lad os gå i et separat browservindue til AWS-konsollen i VPC Dashboard -> Sikkerhedsgrupper -> Opret sikkerhedsgruppesektion:
Apache Kafka og datastreaming med Spark Streaming

Vi sætter navnet på sikkerhedsgruppen - PostgreSQL, en beskrivelse, specificerer hvilken VPC denne gruppe skal være tilknyttet og klik på knappen Opret:
Apache Kafka og datastreaming med Spark Streaming

Vi udfylder den nyoprettede gruppe Indgående regler for port 5432, som vist på billedet nedenfor. Du kan ikke angive porten manuelt, men vælg PostgreSQL fra rullelisten Type.

Strengt taget betyder værdien ::/0 tilgængeligheden af ​​indgående trafik til serveren fra hele verden, hvilket ikke er helt sandt kanonisk, men for at analysere eksemplet, lad os bruge denne tilgang:
Apache Kafka og datastreaming med Spark Streaming

Vi vender tilbage til browsersiden, hvor vi har "Konfigurer avancerede indstillinger" åben og vælger VPC-sikkerhedsgrupper -> Vælg eksisterende VPC-sikkerhedsgrupper -> PostgreSQL i sektionen:
Apache Kafka og datastreaming med Spark Streaming

Dernæst i afsnittet Databaseindstillinger -> Databasenavn -> indstil navnet - habrDB.

Vi kan lade resten af ​​parametrene, med undtagelse af deaktivering af backup (backup-opbevaringsperiode - 0 dage), overvågning og Performance Insights, stå som standard. Klik på knappen Opret database:
Apache Kafka og datastreaming med Spark Streaming

Stream Handler

Den sidste fase vil være udviklingen af ​​et Spark-job, som vil behandle nye data fra Kafka hvert andet sekund og indtaste resultatet i databasen.

Som nævnt ovenfor er kontrolpunkter hovedmekanismen i SparkStreaming, der skal konfigureres til at give fejltolerance. Vi vil bruge kontrolpunkter, og i tilfælde af en procedurefejl skal Spark Streaming-modulet kun vende tilbage til det sidste kontrolpunkt og genoptage beregninger fra det for at gendanne de tabte data.

Et kontrolpunkt kan aktiveres ved at indstille en mappe på et fejltolerant, pålideligt filsystem (f.eks. HDFS, S3 osv.), hvor kontrolpunktinformationen vil blive gemt. Dette gøres med for eksempel:

streamingContext.checkpoint(checkpointDirectory)

I vores eksempel vil vi bruge følgende tilgang, nemlig hvis checkpointDirectory eksisterer, så vil konteksten blive genskabt fra checkpointdataene. Hvis mappen ikke eksisterer (dvs. den udføres for første gang), så kaldes functionToCreateContext-funktionen for at skabe en ny kontekst og opsætte DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Vi opretter et DirectStream-objekt for at forbinde til "transaktion"-emnet ved at bruge createDirectStream-metoden i KafkaUtils-biblioteket:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Parsing af indgående data i JSON-format:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Ved at bruge Spark SQL laver vi en simpel gruppering og sender resultatet til konsollen:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Hentning af forespørgselsteksten og kørsel gennem Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Og så gemmer vi de modtagne aggregerede data i en tabel i AWS RDS. For at gemme resultaterne af aggregeringen til en databasetabel, vil vi bruge skrivemetoden for DataFrame-objektet:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Et par ord om at oprette en forbindelse til AWS RDS. Vi oprettede brugeren og adgangskoden til det i trinnet "Deploying AWS PostgreSQL". Som url til databaseserveren skal du bruge Endpoint, som vises i afsnittet Forbindelse og sikkerhed:

Apache Kafka og datastreaming med Spark Streaming

For at forbinde Spark og Kafka korrekt, skal du køre jobbet gennem smark-submit ved hjælp af artefakten spark-streaming-kafka-0-8_2.11. Derudover vil vi også bruge en artefakt til at interagere med PostgreSQL-databasen, vi vil sende dem gennem --pakker.

For fleksibilitet af scriptet vil vi også tage navnet på beskedserveren og det emne, som vi ønsker at modtage data fra, som inputparametre.

Så det er tid til at køre og teste systemet:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Alt fungerede! Som du kan se på billedet nedenfor, mens applikationen kører, vises nye aggregeringsresultater hvert 2. sekund, fordi vi satte bundlingintervallet til 2 sekunder, da vi oprettede StreamingContext-objektet:

Apache Kafka og datastreaming med Spark Streaming

Dernæst laver vi en simpel forespørgsel til databasen for at tjekke for poster i tabellen transaktionsflow:

Apache Kafka og datastreaming med Spark Streaming

Konklusion

I denne artikel blev et eksempel på streaming af informationsbehandling ved hjælp af Spark Streaming i forbindelse med Apache Kafka og PostgreSQL overvejet. Med væksten i mængder af data fra forskellige kilder er det svært at overvurdere den praktiske værdi af Spark Streaming til at skabe realtids- og streamingapplikationer.

Du kan finde den fulde kildekode i mit lager på GitHub.

Jeg er glad for at diskutere denne artikel, jeg ser frem til dine kommentarer, og jeg håber også på konstruktiv kritik fra alle berørte læsere.

Jeg ønsker dig succes!

Ps. Det var oprindeligt planlagt at bruge en lokal PostgreSQL-database, men på grund af min kærlighed til AWS besluttede jeg at flytte databasen til skyen. I den næste artikel om dette emne vil jeg vise dig, hvordan du implementerer hele systemet beskrevet ovenfor i AWS ved hjælp af AWS Kinesis og AWS EMR. Følg nyhederne!

Kilde: www.habr.com

Tilføj en kommentar