Apache Kafka a Ffrydio Data gyda Spark Streaming

Hei Habr! Heddiw, byddwn yn adeiladu system a fydd yn prosesu ffrydiau neges Apache Kafka gan ddefnyddio Spark Streaming ac yn ysgrifennu'r canlyniad prosesu i gronfa ddata cwmwl AWS RDS.

Dychmygwch fod sefydliad credyd penodol yn gosod y dasg i ni o brosesu trafodion sy'n dod i mewn "ar y hedfan" ar gyfer ei holl ganghennau. Gellir gwneud hyn er mwyn cyfrifo'n gyflym y sefyllfa arian agored ar gyfer y trysorlys, terfynau neu ganlyniad ariannol trafodion, ac ati.

Sut i weithredu'r achos hwn heb ddefnyddio swynion hud a hud - darllenwch o dan y toriad! Ewch!

Apache Kafka a Ffrydio Data gyda Spark Streaming
(Ffynhonnell llun)

Cyflwyniad

Wrth gwrs, mae prosesu llawer iawn o ddata mewn amser real yn rhoi digon o gyfleoedd i'w defnyddio mewn systemau modern. Un o'r cyfuniadau mwyaf poblogaidd ar gyfer hyn yw tandem Apache Kafka a Spark Streaming, lle mae Kafka yn creu llif o becynnau negeseuon sy'n dod i mewn, ac mae Spark Streaming yn prosesu'r pecynnau hyn ar gyfnod amser penodol.

Er mwyn gwella goddefgarwch bai'r cais, byddwn yn defnyddio pwyntiau gwirio - pwyntiau gwirio. Gyda'r mecanwaith hwn, pan fydd angen i'r modiwl Spark Streaming adennill data coll, dim ond dychwelyd i'r pwynt gwirio diwethaf ac ailddechrau cyfrifiadau sydd ei angen arno.

Pensaernïaeth y system ddatblygedig

Apache Kafka a Ffrydio Data gyda Spark Streaming

Cydrannau a ddefnyddir:

  • Apache Kafka yn system negeseuon cyhoeddi-a-tanysgrifio ddosbarthedig. Yn addas ar gyfer defnydd negeseuon all-lein ac ar-lein. Er mwyn atal colli data, mae negeseuon Kafka yn cael eu storio ar ddisg a'u hailadrodd o fewn y clwstwr. Mae system Kafka wedi'i hadeiladu ar ben gwasanaeth cydamseru ZooKeeper;
  • Ffrydio Gwreichionen Apache - elfen Spark ar gyfer prosesu data ffrydio. Mae'r modiwl Spark Streaming yn cael ei adeiladu gan ddefnyddio pensaernïaeth micro-swp, pan ddehonglir llif data fel dilyniant parhaus o becynnau data bach. Mae Spark Streaming yn cymryd data o wahanol ffynonellau ac yn ei gyfuno'n sypiau bach. Mae pecynnau newydd yn cael eu creu yn rheolaidd. Ar ddechrau pob egwyl amser, mae pecyn newydd yn cael ei greu, ac mae unrhyw ddata a dderbynnir yn ystod y cyfnod hwnnw yn cael ei gynnwys yn y pecyn. Ar ddiwedd yr egwyl, mae twf pecynnau yn stopio. Mae maint y cyfwng yn cael ei bennu gan baramedr a elwir yn gyfwng swp;
  • Apache Spark SQL - Yn cyfuno prosesu perthynol â rhaglennu swyddogaethol Spark. Mae data strwythuredig yn cyfeirio at ddata sydd â sgema, hynny yw, un set o feysydd ar gyfer pob cofnod. Mae Spark SQL yn cefnogi mewnbwn o amrywiaeth o ffynonellau data strwythuredig ac, oherwydd presenoldeb gwybodaeth sgema, gall adfer y meysydd gofynnol o gofnodion yn unig yn effeithlon, a hefyd mae'n darparu APIs DataFrame;
  • AWS RDS yn gronfa ddata gymharol rad sy'n seiliedig ar gwmwl, gwasanaeth gwe sy'n symleiddio gosod, gweithredu a graddio, a weinyddir yn uniongyrchol gan Amazon.

Gosod a rhedeg gweinydd Kafka

Cyn defnyddio Kafka yn uniongyrchol, mae angen i chi sicrhau bod gennych Java, oherwydd Defnyddir JVM ar gyfer gwaith:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Gadewch i ni greu defnyddiwr newydd i weithio gyda Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Nesaf, lawrlwythwch y pecyn dosbarthu o wefan swyddogol Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Dadbacio'r archif wedi'i lawrlwytho:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Mae'r cam nesaf yn ddewisol. Y ffaith yw nad yw'r gosodiadau diofyn yn caniatáu ichi ddefnyddio holl nodweddion Apache Kafka yn llawn. Er enghraifft, dileu pwnc, categori, grŵp, y gellir cyhoeddi negeseuon arno. I newid hyn, gadewch i ni olygu'r ffeil ffurfweddu:

vim ~/kafka/config/server.properties

Ychwanegwch y canlynol at ddiwedd y ffeil:

delete.topic.enable = true

Cyn dechrau'r gweinydd Kafka, mae angen i chi gychwyn y gweinydd ZooKeeper, byddwn yn defnyddio'r sgript helpwr sy'n dod gyda dosbarthiad Kafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Ar ôl i ZooKeeper ddechrau'n llwyddiannus, rydyn ni'n cychwyn y gweinydd Kafka mewn terfynell ar wahân:

bin/kafka-server-start.sh config/server.properties

Gadewch i ni greu pwnc newydd o'r enw Trafodion:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Gadewch i ni sicrhau bod pwnc gyda'r nifer gofynnol o raniadau ac atgynhyrchu wedi'i greu:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka a Ffrydio Data gyda Spark Streaming

Gadewch i ni golli'r eiliadau o brofi'r cynhyrchydd a'r defnyddiwr am y pwnc sydd newydd ei greu. Mae mwy o fanylion ar sut y gallwch chi brofi anfon a derbyn negeseuon wedi'u hysgrifennu yn y ddogfennaeth swyddogol - Anfon rhai negeseuon. Wel, rydyn ni'n symud ymlaen i ysgrifennu cynhyrchydd yn Python gan ddefnyddio'r API KafkaProducer.

Ysgrifennu cynhyrchydd

Bydd y cynhyrchydd yn cynhyrchu data ar hap - 100 neges bob eiliad. Wrth ddata ar hap, rydym yn golygu geiriadur sy'n cynnwys tri maes:

  • Cangen — enw man gwerthu'r sefydliad credyd;
  • Arian cyfred — arian cyfred trafodion;
  • swm - swm y trafodiad. Bydd y swm yn gadarnhaol os yw'n prynu arian cyfred gan y Banc, ac yn negyddol os yw'n werthiant.

Mae'r cod ar gyfer y cynhyrchydd yn edrych fel hyn:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Nesaf, gan ddefnyddio'r dull anfon, rydym yn anfon neges at y gweinydd, i'r pwnc sydd ei angen arnom, mewn fformat JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Wrth redeg y sgript, rydym yn cael y negeseuon canlynol yn y derfynell:

Apache Kafka a Ffrydio Data gyda Spark Streaming

Mae hyn yn golygu bod popeth yn gweithio fel y dymunwn - mae'r cynhyrchydd yn cynhyrchu ac yn anfon negeseuon i'r pwnc sydd ei angen arnom.
Y cam nesaf yw gosod Spark a phrosesu llif y neges hon.

Gosod Apache Spark

Apache Spark yn blatfform cyfrifiadura clwstwr amlbwrpas a pherfformiad uchel.

Mae Spark yn perfformio'n well na gweithrediadau poblogaidd y model MapReduce o ran perfformiad, tra'n darparu cefnogaeth ar gyfer ystod ehangach o fathau o gyfrifiannu, gan gynnwys ymholiadau rhyngweithiol a ffrydio. Mae cyflymder yn chwarae rhan bwysig wrth brosesu llawer iawn o ddata, gan mai cyflymder sy'n caniatáu ichi weithio'n rhyngweithiol heb dreulio munudau neu oriau yn aros. Un o gryfderau mwyaf Spark ar gyfer darparu'r cyflymder hwn yw ei allu i wneud cyfrifiadau er cof.

Mae'r fframwaith hwn wedi'i ysgrifennu yn Scala, felly mae angen i chi ei osod yn gyntaf:

sudo apt-get install scala

Dadlwythwch y dosbarthiad Spark o'r wefan swyddogol:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Dadbacio'r archif:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Ychwanegwch y llwybr i Spark at y ffeil bash:

vim ~/.bashrc

Ychwanegwch y llinellau canlynol trwy'r golygydd:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Rhedeg y gorchymyn isod ar ôl gwneud newidiadau i bashrc:

source ~/.bashrc

Defnyddio AWS PostgreSQL

Mae angen ehangu'r gronfa ddata o hyd, lle byddwn yn llenwi'r wybodaeth wedi'i phrosesu o'r ffrydiau. I wneud hyn, byddwn yn defnyddio gwasanaeth AWS RDS.

Ewch i'r consol AWS -> AWS RDS -> Cronfeydd data -> Creu cronfa ddata:
Apache Kafka a Ffrydio Data gyda Spark Streaming

Dewiswch PostgreSQL a chliciwch ar y botwm Nesaf:
Apache Kafka a Ffrydio Data gyda Spark Streaming

Achos dadansoddir yr enghraifft hon at ddibenion addysgol yn unig, byddwn yn defnyddio gweinydd rhad ac am ddim “o leiaf” (Haen Rydd):
Apache Kafka a Ffrydio Data gyda Spark Streaming

Nesaf, gwiriwch y bloc Haen Rhad ac Am Ddim, ac ar ôl hynny byddwn yn cael cynnig enghraifft o'r dosbarth t2.micro yn awtomatig - er yn wan, ond yn rhad ac am ddim ac yn eithaf addas ar gyfer ein tasg:
Apache Kafka a Ffrydio Data gyda Spark Streaming

Mae pethau pwysig iawn yn dilyn: enw'r enghraifft DB, enw'r prif ddefnyddiwr a'i gyfrinair. Gadewch i ni alw'r enghraifft: myHabrTest, prif ddefnyddiwr: habr, cyfrinair: habr12345 a chliciwch ar y botwm Nesaf:
Apache Kafka a Ffrydio Data gyda Spark Streaming

Mae'r dudalen nesaf yn cynnwys y paramedrau sy'n gyfrifol am hygyrchedd ein gweinydd cronfa ddata o'r tu allan (Hygyrchedd cyhoeddus) ac argaeledd porthladdoedd:

Apache Kafka a Ffrydio Data gyda Spark Streaming

Gadewch i ni greu gosodiad newydd ar gyfer y grŵp diogelwch VPC, a fydd yn caniatáu mynediad allanol i'n gweinydd cronfa ddata trwy borthladd 5432 (PostgreSQL).
Gadewch i ni fynd mewn ffenestr porwr ar wahân i'r consol AWS yn y Dangosfwrdd VPC -> Grwpiau Diogelwch -> Creu adran grŵp diogelwch:
Apache Kafka a Ffrydio Data gyda Spark Streaming

Rydym yn gosod yr enw ar gyfer y grŵp Diogelwch - PostgreSQL, disgrifiad, nodwch pa VPC y dylai'r grŵp hwn fod yn gysylltiedig ag ef a chliciwch ar y botwm Creu:
Apache Kafka a Ffrydio Data gyda Spark Streaming

Rydyn ni'n llenwi'r rheolau Inbound grŵp sydd newydd eu creu ar gyfer porthladd 5432, fel y dangosir yn y llun isod. Ni allwch nodi'r porthladd â llaw, ond dewiswch PostgreSQL o'r gwymplen Math.

A siarad yn fanwl gywir, mae'r gwerth ::/0 yn golygu argaeledd traffig sy'n dod i mewn ar gyfer y gweinydd o bob cwr o'r byd, nad yw'n hollol wir yn ganonaidd, ond i ddadansoddi'r enghraifft, gadewch i ni ddefnyddio'r dull hwn:
Apache Kafka a Ffrydio Data gyda Spark Streaming

Rydym yn dychwelyd i dudalen y porwr, lle mae gennym “Ffurfweddu gosodiadau uwch” yn agor ac yn dewis grwpiau diogelwch VPC -> Dewiswch grwpiau diogelwch VPC presennol -> PostgreSQL yn yr adran:
Apache Kafka a Ffrydio Data gyda Spark Streaming

Nesaf, yn yr adran Opsiynau cronfa ddata -> Enw cronfa ddata -> gosodwch yr enw - habrDB.

Gallwn adael gweddill y paramedrau, ac eithrio analluogi copi wrth gefn (cyfnod cadw wrth gefn - 0 diwrnod), monitro a Insights Perfformiad, yn ddiofyn. Cliciwch ar y botwm Creu cronfa ddata:
Apache Kafka a Ffrydio Data gyda Spark Streaming

Triniwr Ffrwd

Y cam olaf fydd datblygu swydd Spark, a fydd yn prosesu data newydd o Kafka bob dwy eiliad ac yn mewnbynnu'r canlyniad i'r gronfa ddata.

Fel y nodwyd uchod, pwyntiau gwirio yw'r prif fecanwaith yn SparkStreaming y mae'n rhaid eu ffurfweddu i ddarparu goddefgarwch bai. Byddwn yn defnyddio pwyntiau gwirio ac, os bydd y weithdrefn yn methu, dim ond dychwelyd i'r pwynt gwirio diwethaf ac ailddechrau cyfrifiadau ohono i adennill y data coll y bydd angen i'r modiwl Spark Streaming ei wneud.

Gellir galluogi pwynt gwirio trwy osod cyfeiriadur ar system ffeiliau ddibynadwy sy'n gallu goddef diffygion (e.e. HDFS, S3, ac ati) lle bydd gwybodaeth y pwynt gwirio yn cael ei storio. Gwneir hyn gyda, er enghraifft:

streamingContext.checkpoint(checkpointDirectory)

Yn ein hesiampl, byddwn yn defnyddio'r dull canlynol, sef, os yw'r Cyfeiriadur pwynt gwirio yn bodoli, yna bydd y cyd-destun yn cael ei ail-greu o'r data pwynt gwirio. Os nad yw'r cyfeiriadur yn bodoli (h.y. mae'n cael ei weithredu am y tro cyntaf), yna gelwir y ffwythiant functionToCreateContext i greu cyd-destun newydd a gosod DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Rydym yn creu gwrthrych DirectStream i gysylltu â'r pwnc "trafodiad" gan ddefnyddio dull createDirectStream llyfrgell KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Wrthi'n dosrannu data sy'n dod i mewn ar ffurf JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Gan ddefnyddio Spark SQL, rydyn ni'n grwpio'n syml ac yn allbwn y canlyniad i'r consol:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Cael corff yr ymholiad a'i redeg trwy Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Ac yna rydyn ni'n cadw'r data cyfanredol a dderbyniwyd mewn tabl yn AWS RDS. I arbed canlyniadau'r agregiad i dabl cronfa ddata, byddwn yn defnyddio dull ysgrifennu'r gwrthrych DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Ychydig eiriau am sefydlu cysylltiad ag AWS RDS. Fe wnaethon ni greu'r defnyddiwr a'r cyfrinair ar ei gyfer yn y cam “Deploying AWS PostgreSQL”. Fel url gweinydd y gronfa ddata, dylech ddefnyddio'r Endpoint, a ddangosir yn yr adran Cysylltedd a diogelwch:

Apache Kafka a Ffrydio Data gyda Spark Streaming

Er mwyn cysylltu Spark a Kafka yn gywir, dylech redeg y swydd trwy smark-submit gan ddefnyddio'r arteffact gwreichionen-ffrydio-kafka-0-8_2.11. Yn ogystal, byddwn hefyd yn defnyddio arteffact ar gyfer rhyngweithio â chronfa ddata PostgreSQL, byddwn yn eu pasio trwy --pecynnau.

Er mwyn hyblygrwydd y sgript, byddwn hefyd yn tynnu enw'r gweinydd neges a'r pwnc yr ydym am dderbyn data ohono fel paramedrau mewnbwn.

Felly, mae'n bryd rhedeg a phrofi'r system:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Gweithiodd popeth allan! Fel y gwelwch yn y llun isod, tra bod y rhaglen yn rhedeg, mae canlyniadau agregu newydd yn cael eu harddangos bob 2 eiliad, oherwydd fe wnaethom osod yr egwyl bwndelu i 2 eiliad pan wnaethom greu'r gwrthrych StreamingContext:

Apache Kafka a Ffrydio Data gyda Spark Streaming

Nesaf, rydym yn gwneud ymholiad syml i'r gronfa ddata i wirio am gofnodion yn y tabl llif_trafodiad:

Apache Kafka a Ffrydio Data gyda Spark Streaming

Casgliad

Yn yr erthygl hon, ystyriwyd enghraifft o brosesu gwybodaeth ffrydio gan ddefnyddio Spark Streaming ar y cyd ag Apache Kafka a PostgreSQL. Gyda'r twf mewn symiau o ddata o wahanol ffynonellau, mae'n anodd goramcangyfrif gwerth ymarferol Spark Streaming ar gyfer creu cymwysiadau amser real a ffrydio.

Gallwch ddod o hyd i'r cod ffynhonnell llawn yn fy ystorfa yn GitHub.

Rwy'n hapus i drafod yr erthygl hon, edrychaf ymlaen at eich sylwadau, a hefyd, rwy'n gobeithio am feirniadaeth adeiladol gan bob darllenydd pryderus.

Rwy'n dymuno llwyddiant i chi!

Ps. Yn wreiddiol, y bwriad oedd defnyddio cronfa ddata PostgreSQL leol, ond o ystyried fy nghariad at AWS, penderfynais symud y gronfa ddata i'r cwmwl. Yn yr erthygl nesaf ar y pwnc hwn, byddaf yn dangos i chi sut i weithredu'r system gyfan a ddisgrifir uchod yn AWS gan ddefnyddio AWS Kinesis ac AWS EMR. Dilynwch y newyddion!

Ffynhonnell: hab.com

Ychwanegu sylw