Apache Kafka en datastroom met Spark Streaming

Haai Habr! Ons sal vandag 'n stelsel bou wat Apache Kafka-boodskapstrome sal verwerk deur Spark Streaming te gebruik en die verwerkingsresultaat na die AWS RDS-wolkdatabasis sal skryf.

Stel jou voor dat 'n sekere kredietinstelling ons die taak stel om inkomende transaksies "on-die-vlieg" vir al sy takke te verwerk. Dit kan gedoen word met die doel om vinnig die oop geldeenheidposisie vir die tesourie, limiete of finansiële resultaat op transaksies, ens.

Hoe om hierdie geval te implementeer sonder die gebruik van towerkrag en towerspreuke - lees onder die snit! Gaan!

Apache Kafka en datastroom met Spark Streaming
(Foto bron)

Inleiding

Natuurlik bied intydse verwerking van 'n groot hoeveelheid data genoeg geleenthede vir gebruik in moderne stelsels. Een van die gewildste kombinasies hiervoor is die tandem van Apache Kafka en Spark Streaming, waar Kafka 'n stroom van inkomende boodskappakkies skep, en Spark Streaming hierdie pakkies op 'n bepaalde tydsinterval verwerk.

Om die fouttoleransie van die toepassing te verbeter, sal ons kontrolepunte - kontrolepunte gebruik. Met hierdie meganisme, wanneer die Spark Streaming-module verlore data moet herstel, hoef dit net na die laaste kontrolepunt terug te keer en van daar af berekeninge te hervat.

Die argitektuur van die ontwikkelde stelsel

Apache Kafka en datastroom met Spark Streaming

Gebruikte komponente:

  • Apache Kafka is 'n verspreide publiseer-en-teken-boodskapstelsel. Geskik vir beide vanlyn en aanlyn boodskapverbruik. Om dataverlies te voorkom, word Kafka-boodskappe op skyf gestoor en binne die groep gerepliseer. Die Kafka-stelsel is bo-op die ZooKeeper-sinchronisasiediens gebou;
  • Apache Spark Streaming - 'n Spark-komponent vir die verwerking van stroomdata. Die Spark Streaming-module word gebou deur 'n mikro-batch-argitektuur te gebruik, wanneer 'n datastroom geïnterpreteer word as 'n aaneenlopende reeks klein datapakkies. Spark Streaming neem data uit verskillende bronne en kombineer dit in klein bondels. Nuwe pakkette word met gereelde tussenposes geskep. Aan die begin van elke tydinterval word 'n nuwe pakkie geskep, en enige data wat gedurende daardie interval ontvang word, word by die pakkie ingesluit. Aan die einde van die interval stop pakkiegroei. Die grootte van die interval word bepaal deur 'n parameter wat die bondelinterval genoem word;
  • Apache Spark SQL - Kombineer relasionele verwerking met Spark funksionele programmering. Gestruktureerde data verwys na data wat 'n skema het, dit wil sê 'n enkele stel velde vir alle rekords. Spark SQL ondersteun insette van 'n verskeidenheid gestruktureerde databronne en, as gevolg van die teenwoordigheid van skema-inligting, kan dit doeltreffend net die vereiste velde van rekords ophaal, en verskaf ook DataFrame API's;
  • AWS RDS is 'n relatief goedkoop wolkgebaseerde relasionele databasis, 'n webdiens wat opstelling, werking en skaal vereenvoudig, direk deur Amazon geadministreer.

Installeer en bestuur die Kafka-bediener

Voordat jy Kafka direk gebruik, moet jy seker maak dat jy Java het, want JVM word gebruik vir werk:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Kom ons skep 'n nuwe gebruiker om met Kafka te werk:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Laai dan die verspreidingskit van die amptelike Apache Kafka-webwerf af:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Pak die afgelaaide argief uit:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Die volgende stap is opsioneel. Die feit is dat die verstekinstellings jou nie toelaat om al die kenmerke van Apache Kafka ten volle te gebruik nie. Vee byvoorbeeld 'n onderwerp, kategorie, groep uit waarop boodskappe gepubliseer kan word. Om dit te verander, kom ons wysig die konfigurasielêer:

vim ~/kafka/config/server.properties

Voeg die volgende aan die einde van die lêer:

delete.topic.enable = true

Voordat jy die Kafka-bediener begin, moet jy die ZooKeeper-bediener begin, ons sal die helper script gebruik wat saam met die Kafka verspreiding kom:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Nadat ZooKeeper suksesvol begin het, begin ons die Kafka-bediener in 'n aparte terminaal:

bin/kafka-server-start.sh config/server.properties

Kom ons skep 'n nuwe onderwerp genaamd Transaksie:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Kom ons maak seker dat 'n onderwerp met die vereiste aantal partisies en replikasie geskep is:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka en datastroom met Spark Streaming

Kom ons mis die oomblikke van toetsing van die produsent en verbruiker vir die nuutgeskepte onderwerp. Meer besonderhede oor hoe jy die stuur en ontvang van boodskappe kan toets, is in die amptelike dokumentasie geskryf - Stuur 'n paar boodskappe. Wel, ons gaan voort om 'n vervaardiger in Python te skryf met behulp van die KafkaProducer API.

Vervaardiger skryf

Die vervaardiger sal ewekansige data genereer - 100 boodskappe elke sekonde. Met ewekansige data bedoel ons 'n woordeboek wat uit drie velde bestaan:

  • Tak — naam van die verkoopspunt van die kredietinstelling;
  • Geld — transaksiegeldeenheid;
  • bedrag - Transaksie Bedrag. Die bedrag sal positief wees as dit 'n aankoop van valuta deur die Bank is, en negatief as dit 'n verkoop is.

Die kode vir die vervaardiger lyk soos volg:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Volgende, met behulp van die stuurmetode, stuur ons 'n boodskap na die bediener, na die onderwerp wat ons benodig, in JSON-formaat:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Wanneer die skrip uitgevoer word, kry ons die volgende boodskappe in die terminale:

Apache Kafka en datastroom met Spark Streaming

Dit beteken alles werk soos ons wou – die vervaardiger genereer en stuur boodskappe na die onderwerp wat ons nodig het.
Die volgende stap is om Spark te installeer en hierdie boodskapvloei te verwerk.

Installeer Apache Spark

Apache Spark is 'n veelsydige en hoë werkverrigting groeprekenaarplatform.

Spark presteer beter as gewilde implementerings van die MapReduce-model in terme van werkverrigting, terwyl dit ondersteuning bied vir 'n wyer reeks berekeningtipes, insluitend interaktiewe navrae en stroming. Spoed speel 'n belangrike rol wanneer groot hoeveelhede data verwerk word, aangesien dit spoed is wat jou toelaat om interaktief te werk sonder om minute of ure te wag. Een van Spark se grootste sterkpunte om hierdie spoed te lewer, is sy vermoë om in-geheue-berekeninge uit te voer.

Hierdie raamwerk is in Scala geskryf, so jy moet dit eers installeer:

sudo apt-get install scala

Laai die Spark-verspreiding van die amptelike webwerf af:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Pak die argief uit:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Voeg die pad na Spark by die bash-lêer:

vim ~/.bashrc

Voeg die volgende reëls by deur die redigeerder:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Voer die opdrag hieronder uit nadat jy veranderinge aan bashrc gemaak het:

source ~/.bashrc

Ontplooi AWS PostgreSQL

Dit bly om die databasis te ontplooi, waar ons die verwerkte inligting van die strome sal invul. Om dit te doen, sal ons die AWS RDS-diens gebruik.

Gaan na die AWS-konsole -> AWS RDS -> Databasisse -> Skep databasis:
Apache Kafka en datastroom met Spark Streaming

Kies PostgreSQL en klik op die volgende knoppie:
Apache Kafka en datastroom met Spark Streaming

Omdat hierdie voorbeeld word uitsluitlik vir opvoedkundige doeleindes ontleed, ons sal 'n gratis bediener gebruik "ten minste" (Free Tier):
Apache Kafka en datastroom met Spark Streaming

Merk dan die blokkie Free Tier, en daarna sal ons outomaties 'n instansie van die t2.micro-klas aangebied word - hoewel swak, maar gratis en redelik geskik vir ons taak:
Apache Kafka en datastroom met Spark Streaming

Baie belangrike dinge volg: die naam van die DB-instansie, die naam van die meestergebruiker en sy wagwoord. Kom ons noem die instansie: myHabrTest, meestergebruiker: habr, wagwoord: habr12345 en klik op die volgende knoppie:
Apache Kafka en datastroom met Spark Streaming

Die volgende bladsy bevat die parameters wat verantwoordelik is vir die toeganklikheid van ons databasisbediener van buite (Publieke toeganklikheid) en die beskikbaarheid van poorte:

Apache Kafka en datastroom met Spark Streaming

Kom ons skep 'n nuwe instelling vir die VPC-sekuriteitsgroep, wat eksterne toegang tot ons databasisbediener deur poort 5432 (PostgreSQL) sal toelaat.
Kom ons gaan in 'n aparte blaaiervenster na die AWS-konsole in die VPC Dashboard -> Sekuriteitsgroepe -> Skep sekuriteitsgroep-afdeling:
Apache Kafka en datastroom met Spark Streaming

Ons stel die naam vir die sekuriteitsgroep - PostgreSQL, 'n beskrywing, spesifiseer met watter VPC hierdie groep geassosieer moet word en klik op die Skep-knoppie:
Apache Kafka en datastroom met Spark Streaming

Ons vul in vir die nuutgeskepte groep Inkomende reëls vir poort 5432, soos in die prentjie hieronder getoon. Jy kan nie die poort met die hand spesifiseer nie, maar kies PostgreSQL uit die Tipe-aftreklys.

Streng gesproke beteken die waarde ::/0 die beskikbaarheid van inkomende verkeer vir die bediener van regoor die wêreld, wat nie heeltemal kanonies waar is nie, maar om die voorbeeld te ontleed, kom ons gebruik hierdie benadering:
Apache Kafka en datastroom met Spark Streaming

Ons keer terug na die blaaierbladsy, waar ons "Konfigureer gevorderde instellings" oop het en kies VPC-sekuriteitsgroepe -> Kies bestaande VPC-sekuriteitsgroepe -> PostgreSQL in die afdeling:
Apache Kafka en datastroom met Spark Streaming

Volgende, in die afdeling Databasis opsies -> Databasis naam -> stel die naam - habrDB.

Ons kan die res van die parameters verlaat, met die uitsondering van die deaktivering van rugsteun (rugsteunbehoudtydperk - 0 dae), monitering en prestasie-insigte, by verstek. Klik op die knoppie Skep databasis:
Apache Kafka en datastroom met Spark Streaming

Stroom hanteerder

Die laaste fase sal die ontwikkeling van 'n Spark-werk wees, wat elke twee sekondes nuwe data van Kafka sal verwerk en die resultaat in die databasis sal invoer.

Soos hierbo genoem, is kontrolepunte die hoofmeganisme in SparkStreaming wat gekonfigureer moet word om fouttoleransie te verskaf. Ons sal kontrolepunte gebruik en, in die geval van 'n prosedure mislukking, sal die Spark Streaming-module net hoef terug te keer na die laaste kontrolepunt en berekeninge daaruit te hervat om die verlore data te herwin.

'n Kontrolepunt kan geaktiveer word deur 'n gids op 'n fouttolerante, betroubare lêerstelsel (bv. HDFS, S3, ens.) te stel waar die kontrolepuntinligting gestoor sal word. Dit word gedoen met byvoorbeeld:

streamingContext.checkpoint(checkpointDirectory)

In ons voorbeeld sal ons die volgende benadering gebruik, naamlik, as die kontrolepuntDirectory bestaan, sal die konteks herskep word vanaf die kontrolepuntdata. As die gids nie bestaan ​​nie (d.w.s. dit word vir die eerste keer uitgevoer), dan word die functionToCreateContext-funksie opgeroep om 'n nuwe konteks te skep en DStreams op te stel:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Ons skep 'n DirectStream-objek om aan die "transaksie"-onderwerp te koppel deur die createDirectStream-metode van die KafkaUtils-biblioteek te gebruik:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Ontleding van inkomende data in JSON-formaat:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Deur Spark SQL te gebruik, doen ons 'n eenvoudige groepering en voer die resultaat uit na die konsole:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Kry die navraagliggaam en voer dit deur Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

En dan stoor ons die ontvangde saamgevoegde data in 'n tabel in AWS RDS. Om die resultate van die samevoeging na 'n databasistabel te stoor, sal ons die skryfmetode van die DataFrame-objek gebruik:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

'n Paar woorde oor die opstel van 'n verbinding met AWS RDS. Ons het die gebruiker en wagwoord daarvoor geskep by die “Deploying AWS PostgreSQL”-stap. As die url van die databasisbediener moet jy die Eindpunt gebruik, wat in die Verbindings- en sekuriteitsafdeling vertoon word:

Apache Kafka en datastroom met Spark Streaming

Om Spark en Kafka korrek te verbind, moet jy die taak deur smark-submit met die artefak laat loop vonk-stroom-kafka-0-8_2.11. Daarbenewens sal ons ook 'n artefak gebruik vir interaksie met die PostgreSQL-databasis, ons sal dit deur --pakkette stuur.

Vir buigsaamheid van die skrif, sal ons ook die naam van die boodskapbediener en die onderwerp waaruit ons data wil ontvang as invoerparameters uithaal.

Dit is dus tyd om die stelsel uit te voer en te toets:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Alles het uitgewerk! Soos u in die prentjie hieronder kan sien, terwyl die toepassing loop, word nuwe samevoegingsresultate elke 2 sekondes vertoon, want ons het die bondelinterval op 2 sekondes gestel toe ons die StreamingContext-objek geskep het:

Apache Kafka en datastroom met Spark Streaming

Vervolgens maak ons ​​'n eenvoudige navraag na die databasis om te kyk vir rekords in die tabel transaksie_vloei:

Apache Kafka en datastroom met Spark Streaming

Gevolgtrekking

In hierdie artikel is 'n voorbeeld van stroominligtingverwerking met behulp van Spark Streaming in samewerking met Apache Kafka en PostgreSQL oorweeg. Met die groei in volumes data uit verskeie bronne, is dit moeilik om die praktiese waarde van Spark Streaming vir die skep van intydse en stroomtoepassings te oorskat.

U kan die volledige bronkode in my bewaarplek vind by GitHub.

Ek is bly om hierdie artikel te bespreek, ek sien uit na u kommentaar, en ek hoop ook vir opbouende kritiek van alle betrokke lesers.

Ek wens jou sukses toe!

Ps. Dit was oorspronklik beplan om 'n plaaslike PostgreSQL-databasis te gebruik, maar gegewe my liefde vir AWS, het ek besluit om die databasis na die wolk te skuif. In die volgende artikel oor hierdie onderwerp, sal ek jou wys hoe om die hele stelsel wat hierbo beskryf word in AWS te implementeer deur gebruik te maak van AWS Kinesis en AWS EMR. Volg die nuus!

Bron: will.com

Voeg 'n opmerking