Apache Kafka en Streaming Data Processing mei Spark Streaming

Hallo, Habr! Hjoed sille wy in systeem bouwe dat Apache Kafka-berjochtstreamen sil ferwurkje mei Spark Streaming en de ferwurkingsresultaten skriuwe nei de AWS RDS-wolkdatabase.

Litte wy ús foarstelle dat in bepaalde kredytynstelling ús de taak stelt om ynkommende transaksjes "op 'e flecht" te ferwurkjen oer al har tûken. Dit kin dien wurde foar it doel fan prompt berekkenjen fan in iepen faluta posysje foar de skatkiste, grinzen of finansjele resultaten foar transaksjes, etc.

Hoe kinne jo dit gefal útfiere sûnder it gebrûk fan magy en magyske spreuken - lês ûnder de besuniging! Go!

Apache Kafka en Streaming Data Processing mei Spark Streaming
(Ofbylding boarne)

Ynlieding

Fansels biedt it ferwurkjen fan in grutte hoemannichte gegevens yn echte tiid genôch mooglikheden foar gebrûk yn moderne systemen. Ien fan 'e populêrste kombinaasjes hjirfoar is de tandem fan Apache Kafka en Spark Streaming, wêrby't Kafka in stream fan ynkommende berjochtpakketten makket, en Spark Streaming ferwurket dizze pakketten op in bepaald tiidynterval.

Om de fouttolerânsje fan 'e applikaasje te fergrutsjen, sille wy kontrôlepunten brûke. Mei dit meganisme, as de Spark Streaming-motor ferlerne gegevens moat herstellen, hoecht it allinich werom te gean nei it lêste kontrolepunt en berekkeningen fan dêrút te hervatten.

Arsjitektuer fan it ûntwikkele systeem

Apache Kafka en Streaming Data Processing mei Spark Streaming

Gebrûkte komponinten:

  • Apache Kafka is in ferspraat publisearje-abonnearje berjochtensysteem. Geskikt foar sawol offline as online berjochtkonsumpsje. Om gegevensferlies te foarkommen, wurde Kafka-berjochten opslein op skiif en replikearre binnen it kluster. De Kafka systeem is boud boppe op de ZooKeeper syngronisaasje tsjinst;
  • Apache Spark Streaming - Spark-komponint foar it ferwurkjen fan streaminggegevens. De Spark Streaming-module is boud mei in mikro-batch-arsjitektuer, wêrby't de gegevensstream wurdt ynterpretearre as in trochgeande folchoarder fan lytse gegevenspakketten. Spark Streaming nimt gegevens út ferskate boarnen en kombinearret it yn lytse pakketten. Nije pakketten wurde makke mei regelmjittige yntervallen. Oan it begjin fan elk tiidynterval wurdt in nij pakket oanmakke, en alle gegevens dy't yn dat ynterval ûntfongen binne binne opnommen yn it pakket. Oan 'e ein fan it ynterval stopet pakketgroei. De grutte fan it ynterval wurdt bepaald troch in parameter neamd de batch ynterval;
  • Apache Spark SQL - kombinearret relasjonele ferwurking mei Spark funksjonele programmearring. Strukturearre gegevens betsjuttet gegevens dy't in skema hawwe, dat is in inkele set fan fjilden foar alle records. Spark SQL stipet ynfier fan in ferskaat oan strukturearre gegevens boarnen en, tank oan de beskikberens fan skema ynformaasje, it kin effisjint ophelje allinne de fereaske fjilden fan records, en ek jout DataFrame APIs;
  • AWS RDS is in relatyf goedkeape wolk-basearre relaasje databank, web tsjinst dy't simplifies opset, operaasje en skaalfergrutting, en wurdt bestjoerd direkt troch Amazon.

Ynstallearje en útfiere fan de Kafka-tsjinner

Foardat jo Kafka direkt brûke, moatte jo derfoar soargje dat jo Java hawwe, om't ... JVM wurdt brûkt foar wurk:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Litte wy in nije brûker meitsje om mei Kafka te wurkjen:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Download dan de ferdieling fan 'e offisjele Apache Kafka-webside:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Utpakke it ynladen argyf:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

De folgjende stap is opsjoneel. It feit is dat de standertynstellingen jo net tastean om alle mooglikheden fan Apache Kafka folslein te brûken. Wiskje bygelyks in ûnderwerp, kategory, groep wêryn berjochten kinne wurde publisearre. Om dit te feroarjen, litte wy it konfiguraasjetriem bewurkje:

vim ~/kafka/config/server.properties

Foegje it folgjende ta oan it ein fan it bestân:

delete.topic.enable = true

Foardat jo de Kafka-tsjinner begjinne, moatte jo de ZooKeeper-tsjinner begjinne; wy sille it helpskript brûke dat komt mei de Kafka-distribúsje:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Neidat ZooKeeper mei súkses begon is, starte de Kafka-tsjinner yn in aparte terminal:

bin/kafka-server-start.sh config/server.properties

Litte wy in nij ûnderwerp oanmeitsje mei de namme Transaksje:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Litte wy derfoar soargje dat in ûnderwerp mei it fereaske oantal partysjes en replikaasje is makke:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka en Streaming Data Processing mei Spark Streaming

Litte wy de mominten misse fan it testen fan de produsint en konsumint foar it nij oanmakke ûnderwerp. Mear details oer hoe't jo it ferstjoeren en ûntfangen fan berjochten kinne testen binne skreaun yn 'e offisjele dokumintaasje - Stjoer wat berjochten. No, wy geane troch nei it skriuwen fan in produsint yn Python mei de KafkaProducer API.

Produsint skriuwen

De produsint sil willekeurige gegevens generearje - 100 berjochten elke sekonde. Mei willekeurige gegevens bedoele wy in wurdboek dat bestiet út trije fjilden:

  • Tûke - namme fan it ferkeappunt fan 'e kredytynstelling;
  • Muntsoarte - transaksje faluta;
  • Tal - transaksje bedrach. It bedrach sil in posityf nûmer wêze as it in oankeap fan munt is troch de Bank, en in negatyf nûmer as it in ferkeap is.

De koade foar de produsint sjocht der sa út:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Folgjende, mei de ferstjoermetoade, stjoere wy in berjocht nei de tsjinner, nei it ûnderwerp dat wy nedich binne, yn JSON-formaat:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

By it útfieren fan it skript krije wy de folgjende berjochten yn 'e terminal:

Apache Kafka en Streaming Data Processing mei Spark Streaming

Dit betsjut dat alles wurket lykas wy woene - de produsint genereart en stjoert berjochten nei it ûnderwerp dat wy nedich binne.
De folgjende stap is om Spark te ynstallearjen en dizze berjochtstream te ferwurkjen.

Ynstallaasje fan Apache Spark

Apache Spark is in universele en hege-optreden kluster Computing platfoarm.

Spark prestearret better dan populêre ymplemintaasjes fan it MapReduce-model, wylst se in breder skala oan berekkeningstypen stypje, ynklusyf ynteraktive fragen en streamferwurking. Snelheid spilet in wichtige rol by it ferwurkjen fan grutte hoemannichten gegevens, om't it snelheid is wêrmei jo ynteraktyf kinne wurkje sûnder minuten of oeren te wachtsjen. Ien fan Spark's grutste sterke punten dy't it sa rap makket, is syn fermogen om berekkeningen yn it ûnthâld út te fieren.

Dit ramt is skreaun yn Scala, dus jo moatte it earst ynstallearje:

sudo apt-get install scala

Download de Spark-distribúsje fan 'e offisjele webside:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

It argyf útpakke:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Foegje it paad nei Spark ta oan it bash-bestân:

vim ~/.bashrc

Foegje de folgjende rigels ta troch de bewurker:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Rin it kommando hjirûnder út nei it meitsjen fan wizigingen oan bashrc:

source ~/.bashrc

AWS PostgreSQL ynsette

Alles wat oerbliuwt is de database yn te setten wêryn wy de ferwurke ynformaasje fan 'e streamen sille uploade. Hjirfoar sille wy de AWS RDS-tsjinst brûke.

Gean nei de AWS-konsole -> AWS RDS -> Databases -> Databank oanmeitsje:
Apache Kafka en Streaming Data Processing mei Spark Streaming

Selektearje PostgreSQL en klikje Folgjende:
Apache Kafka en Streaming Data Processing mei Spark Streaming

Omdat Dit foarbyld is allinich foar edukative doelen; wy sille in fergese tsjinner "minimum" brûke (Free Tier):
Apache Kafka en Streaming Data Processing mei Spark Streaming

Dêrnei sette wy in tikje yn it Free Tier-blok, en dêrnei sille wy automatysk in eksimplaar fan 'e t2.micro-klasse oanbean wurde - hoewol swak, it is fergees en frij geskikt foar ús taak:
Apache Kafka en Streaming Data Processing mei Spark Streaming

Folgjende komme hiel wichtige dingen: de namme fan de database eksimplaar, de namme fan de master brûker en syn wachtwurd. Litte wy de eksimplaar neame: myHabrTest, master brûker: habr, wachtwurd: habr12345 en klikje op de knop Folgjende:
Apache Kafka en Streaming Data Processing mei Spark Streaming

Op de folgjende side binne d'r parameters dy't ferantwurdlik binne foar de tagonklikens fan ús databanktsjinner fan bûten (Iepenbiere tagonklikens) en poartebeskikberens:

Apache Kafka en Streaming Data Processing mei Spark Streaming

Litte wy in nije ynstelling meitsje foar de VPC-befeiligingsgroep, dy't eksterne tagong ta ús databanktsjinner fia poarte 5432 (PostgreSQL) sil tastean.
Litte wy nei de AWS-konsole gean yn in apart browserfinster nei it VPC Dashboard -> Feiligensgroepen -> Feiligensgroepseksje oanmeitsje:
Apache Kafka en Streaming Data Processing mei Spark Streaming

Wy sette de namme foar de Feiligensgroep yn - PostgreSQL, in beskriuwing, jouwe oan mei hokker VPC dizze groep moat wurde assosjearre en klikje op de knop Meitsje:
Apache Kafka en Streaming Data Processing mei Spark Streaming

Folje de ynkommende regels yn foar haven 5432 foar de nij oanmakke groep, lykas werjûn yn 'e foto hjirûnder. Jo kinne de poarte net manuell oantsjutte, mar selektearje PostgreSQL út 'e útklaplist Type.

Strikt sjoen betsjut de wearde ::/0 de beskikberens fan ynkommende ferkear nei de tsjinner fan oer de hiele wrâld, wat kanonysk net hielendal wier is, mar om it foarbyld te analysearjen, litte wy ússels dizze oanpak brûke:
Apache Kafka en Streaming Data Processing mei Spark Streaming

Wy geane werom nei de browserside, wêr't wy "Avansearre ynstellings konfigurearje" iepen hawwe en selektearje yn 'e seksje VPC-befeiligingsgroepen -> Kies besteande VPC-befeiligingsgroepen -> PostgreSQL:
Apache Kafka en Streaming Data Processing mei Spark Streaming

Folgjende, yn 'e Database-opsjes -> Databanknamme -> set de namme yn - habrDB.

Wy kinne de oerbleaune parameters ferlitte, mei útsûndering fan it útskeakeljen fan reservekopy (reservekopybehâldperioade - 0 dagen), tafersjoch en Performance Insights, standert. Klikje op de knop Databank oanmeitsje:
Apache Kafka en Streaming Data Processing mei Spark Streaming

Thread handler

De lêste etappe sil de ûntwikkeling wêze fan in Spark-taak, dy't elke twa sekonden nije gegevens fan Kafka ferwurkje en it resultaat yn 'e database ynfiere.

Lykas hjirboppe oanjûn, binne kontrôlepunten in kearnmeganisme yn SparkStreaming dy't moatte wurde konfigureare om fouttolerânsje te garandearjen. Wy sille kontrôlepunten brûke en, as de proseduere mislearret, sil de Spark Streaming-module allinich werom moatte nei it lêste kontrôlepunt en berekkeningen derfan ferfetsje om de ferlerne gegevens te herstellen.

Checkpointing kin ynskeakele wurde troch it ynstellen fan in map op in flater-tolerant, betrouber triemsysteem (lykas HDFS, S3, ensfh) wêryn de checkpoint ynformaasje wurdt opslein. Dit wurdt dien mei bygelyks:

streamingContext.checkpoint(checkpointDirectory)

Yn ús foarbyld sille wy de folgjende oanpak brûke, nammentlik as checkpointDirectory bestiet, dan sil de kontekst opnij makke wurde fan 'e checkpointgegevens. As de map net bestiet (dus foar de earste kear útfierd), dan wurdt functionToCreateContext oproppen om in nije kontekst te meitsjen en DStreams te konfigurearjen:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Wy meitsje in DirectStream-objekt om te ferbinen mei it "transaksje"-ûnderwerp mei de createDirectStream-metoade fan 'e KafkaUtils-bibleteek:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

It parsearjen fan ynkommende gegevens yn JSON-formaat:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Mei Spark SQL dogge wy in ienfâldige groepearring en werjaan it resultaat yn 'e konsole:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

De querytekst krije en it útfiere fia Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

En dan bewarje wy de resultearjende aggregearre gegevens yn in tabel yn AWS RDS. Om de aggregaasjeresultaten te bewarjen yn in databanktabel, sille wy de skriuwmetoade fan it DataFrame-objekt brûke:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

In pear wurden oer it opsetten fan in ferbining mei AWS RDS. Wy hawwe de brûker en wachtwurd dêrfoar makke by de stap "AWS PostgreSQL ynsette". Jo moatte Endpoint brûke as de url fan de databaseserver, dy't wurdt werjûn yn 'e seksje Konnektivität en feiligens:

Apache Kafka en Streaming Data Processing mei Spark Streaming

Om Spark en Kafka korrekt te ferbinen, moatte jo de taak útfiere fia smark-submit mei it artefakt spark-streaming-kafka-0-8_2.11. Derneist sille wy ek in artefakt brûke foar ynteraksje mei de PostgreSQL-database; wy sille se oerdrage fia --pakketten.

Foar de fleksibiliteit fan it skript sille wy ek as ynfierparameters de namme fan 'e berjochttsjinner en it ûnderwerp opnimme wêrfan wy gegevens ûntfange wolle.

Dat, it is tiid om de funksjonaliteit fan it systeem te starten en te kontrolearjen:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Alles slagge! Lykas jo kinne sjen yn 'e ôfbylding hjirûnder, wylst de applikaasje rint, wurde nije aggregaasjeresultaten elke 2 sekonden útfierd, om't wy it batching-ynterval op 2 sekonden ynstelle doe't wy it StreamingContext-objekt makken:

Apache Kafka en Streaming Data Processing mei Spark Streaming

Dêrnei meitsje wy in ienfâldige fraach nei de databank om de oanwêzigens fan records yn 'e tabel te kontrolearjen transaksje_flow:

Apache Kafka en Streaming Data Processing mei Spark Streaming

konklúzje

Dit artikel seach nei in foarbyld fan streamferwurking fan ynformaasje mei Spark Streaming yn kombinaasje mei Apache Kafka en PostgreSQL. Mei de groei fan gegevens út ferskate boarnen is it lestich om de praktyske wearde fan Spark Streaming te oerskatten foar it meitsjen fan streaming en real-time applikaasjes.

Jo kinne de folsleine boarnekoade fine yn myn repository op GitHub.

Ik bin bliid om dit artikel te besprekken, ik sjoch út nei jo opmerkings, en ik hoopje ek op konstruktive krityk fan alle soarchsume lêzers.

Ik winskje jo súkses!

Ps. Yn it earstoan wie it pland om in lokale PostgreSQL-database te brûken, mar sjoen myn leafde foar AWS, besleat ik de database nei de wolk te ferpleatsen. Yn it folgjende artikel oer dit ûnderwerp sil ik sjen litte hoe't jo it heule systeem hjirboppe beskreaun yn AWS kinne ymplementearje mei AWS Kinesis en AWS EMR. Folgje it nijs!

Boarne: www.habr.com

Add a comment