Hei Habr! Heddiw, byddwn yn adeiladu system a fydd yn prosesu ffrydiau neges Apache Kafka gan ddefnyddio Spark Streaming ac yn ysgrifennu'r canlyniad prosesu i gronfa ddata cwmwl AWS RDS.
Dychmygwch fod sefydliad credyd penodol yn gosod y dasg i ni o brosesu trafodion sy'n dod i mewn "ar y hedfan" ar gyfer ei holl ganghennau. Gellir gwneud hyn er mwyn cyfrifo'n gyflym y sefyllfa arian agored ar gyfer y trysorlys, terfynau neu ganlyniad ariannol trafodion, ac ati.
Sut i weithredu'r achos hwn heb ddefnyddio swynion hud a hud - darllenwch o dan y toriad! Ewch!
Wrth gwrs, mae prosesu llawer iawn o ddata mewn amser real yn rhoi digon o gyfleoedd i'w defnyddio mewn systemau modern. Un o'r cyfuniadau mwyaf poblogaidd ar gyfer hyn yw tandem Apache Kafka a Spark Streaming, lle mae Kafka yn creu llif o becynnau negeseuon sy'n dod i mewn, ac mae Spark Streaming yn prosesu'r pecynnau hyn ar gyfnod amser penodol.
Er mwyn gwella goddefgarwch bai'r cais, byddwn yn defnyddio pwyntiau gwirio - pwyntiau gwirio. Gyda'r mecanwaith hwn, pan fydd angen i'r modiwl Spark Streaming adennill data coll, dim ond dychwelyd i'r pwynt gwirio diwethaf ac ailddechrau cyfrifiadau sydd ei angen arno.
Pensaernïaeth y system ddatblygedig
Cydrannau a ddefnyddir:
Apache Kafka yn system negeseuon cyhoeddi-a-tanysgrifio ddosbarthedig. Yn addas ar gyfer defnydd negeseuon all-lein ac ar-lein. Er mwyn atal colli data, mae negeseuon Kafka yn cael eu storio ar ddisg a'u hailadrodd o fewn y clwstwr. Mae system Kafka wedi'i hadeiladu ar ben gwasanaeth cydamseru ZooKeeper;
Ffrydio Gwreichionen Apache - elfen Spark ar gyfer prosesu data ffrydio. Mae'r modiwl Spark Streaming yn cael ei adeiladu gan ddefnyddio pensaernïaeth micro-swp, pan ddehonglir llif data fel dilyniant parhaus o becynnau data bach. Mae Spark Streaming yn cymryd data o wahanol ffynonellau ac yn ei gyfuno'n sypiau bach. Mae pecynnau newydd yn cael eu creu yn rheolaidd. Ar ddechrau pob egwyl amser, mae pecyn newydd yn cael ei greu, ac mae unrhyw ddata a dderbynnir yn ystod y cyfnod hwnnw yn cael ei gynnwys yn y pecyn. Ar ddiwedd yr egwyl, mae twf pecynnau yn stopio. Mae maint y cyfwng yn cael ei bennu gan baramedr a elwir yn gyfwng swp;
Apache Spark SQL - Yn cyfuno prosesu perthynol â rhaglennu swyddogaethol Spark. Mae data strwythuredig yn cyfeirio at ddata sydd â sgema, hynny yw, un set o feysydd ar gyfer pob cofnod. Mae Spark SQL yn cefnogi mewnbwn o amrywiaeth o ffynonellau data strwythuredig ac, oherwydd presenoldeb gwybodaeth sgema, gall adfer y meysydd gofynnol o gofnodion yn unig yn effeithlon, a hefyd mae'n darparu APIs DataFrame;
AWS RDS yn gronfa ddata gymharol rad sy'n seiliedig ar gwmwl, gwasanaeth gwe sy'n symleiddio gosod, gweithredu a graddio, a weinyddir yn uniongyrchol gan Amazon.
Gosod a rhedeg gweinydd Kafka
Cyn defnyddio Kafka yn uniongyrchol, mae angen i chi sicrhau bod gennych Java, oherwydd Defnyddir JVM ar gyfer gwaith:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
Mae'r cam nesaf yn ddewisol. Y ffaith yw nad yw'r gosodiadau diofyn yn caniatáu ichi ddefnyddio holl nodweddion Apache Kafka yn llawn. Er enghraifft, dileu pwnc, categori, grŵp, y gellir cyhoeddi negeseuon arno. I newid hyn, gadewch i ni olygu'r ffeil ffurfweddu:
vim ~/kafka/config/server.properties
Ychwanegwch y canlynol at ddiwedd y ffeil:
delete.topic.enable = true
Cyn dechrau'r gweinydd Kafka, mae angen i chi gychwyn y gweinydd ZooKeeper, byddwn yn defnyddio'r sgript helpwr sy'n dod gyda dosbarthiad Kafka:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
Ar ôl i ZooKeeper ddechrau'n llwyddiannus, rydyn ni'n cychwyn y gweinydd Kafka mewn terfynell ar wahân:
Gadewch i ni golli'r eiliadau o brofi'r cynhyrchydd a'r defnyddiwr am y pwnc sydd newydd ei greu. Mae mwy o fanylion ar sut y gallwch chi brofi anfon a derbyn negeseuon wedi'u hysgrifennu yn y ddogfennaeth swyddogol - Anfon rhai negeseuon. Wel, rydyn ni'n symud ymlaen i ysgrifennu cynhyrchydd yn Python gan ddefnyddio'r API KafkaProducer.
Ysgrifennu cynhyrchydd
Bydd y cynhyrchydd yn cynhyrchu data ar hap - 100 neges bob eiliad. Wrth ddata ar hap, rydym yn golygu geiriadur sy'n cynnwys tri maes:
Cangen — enw man gwerthu'r sefydliad credyd;
Arian cyfred — arian cyfred trafodion;
swm - swm y trafodiad. Bydd y swm yn gadarnhaol os yw'n prynu arian cyfred gan y Banc, ac yn negyddol os yw'n werthiant.
Mae'r cod ar gyfer y cynhyrchydd yn edrych fel hyn:
Nesaf, gan ddefnyddio'r dull anfon, rydym yn anfon neges at y gweinydd, i'r pwnc sydd ei angen arnom, mewn fformat JSON:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Wrth redeg y sgript, rydym yn cael y negeseuon canlynol yn y derfynell:
Mae hyn yn golygu bod popeth yn gweithio fel y dymunwn - mae'r cynhyrchydd yn cynhyrchu ac yn anfon negeseuon i'r pwnc sydd ei angen arnom.
Y cam nesaf yw gosod Spark a phrosesu llif y neges hon.
Gosod Apache Spark
Apache Spark yn blatfform cyfrifiadura clwstwr amlbwrpas a pherfformiad uchel.
Mae Spark yn perfformio'n well na gweithrediadau poblogaidd y model MapReduce o ran perfformiad, tra'n darparu cefnogaeth ar gyfer ystod ehangach o fathau o gyfrifiannu, gan gynnwys ymholiadau rhyngweithiol a ffrydio. Mae cyflymder yn chwarae rhan bwysig wrth brosesu llawer iawn o ddata, gan mai cyflymder sy'n caniatáu ichi weithio'n rhyngweithiol heb dreulio munudau neu oriau yn aros. Un o gryfderau mwyaf Spark ar gyfer darparu'r cyflymder hwn yw ei allu i wneud cyfrifiadau er cof.
Mae'r fframwaith hwn wedi'i ysgrifennu yn Scala, felly mae angen i chi ei osod yn gyntaf:
sudo apt-get install scala
Dadlwythwch y dosbarthiad Spark o'r wefan swyddogol:
Rhedeg y gorchymyn isod ar ôl gwneud newidiadau i bashrc:
source ~/.bashrc
Defnyddio AWS PostgreSQL
Mae angen ehangu'r gronfa ddata o hyd, lle byddwn yn llenwi'r wybodaeth wedi'i phrosesu o'r ffrydiau. I wneud hyn, byddwn yn defnyddio gwasanaeth AWS RDS.
Ewch i'r consol AWS -> AWS RDS -> Cronfeydd data -> Creu cronfa ddata:
Dewiswch PostgreSQL a chliciwch ar y botwm Nesaf:
Achos dadansoddir yr enghraifft hon at ddibenion addysgol yn unig, byddwn yn defnyddio gweinydd rhad ac am ddim “o leiaf” (Haen Rydd):
Nesaf, gwiriwch y bloc Haen Rhad ac Am Ddim, ac ar ôl hynny byddwn yn cael cynnig enghraifft o'r dosbarth t2.micro yn awtomatig - er yn wan, ond yn rhad ac am ddim ac yn eithaf addas ar gyfer ein tasg:
Mae pethau pwysig iawn yn dilyn: enw'r enghraifft DB, enw'r prif ddefnyddiwr a'i gyfrinair. Gadewch i ni alw'r enghraifft: myHabrTest, prif ddefnyddiwr: habr, cyfrinair: habr12345 a chliciwch ar y botwm Nesaf:
Mae'r dudalen nesaf yn cynnwys y paramedrau sy'n gyfrifol am hygyrchedd ein gweinydd cronfa ddata o'r tu allan (Hygyrchedd cyhoeddus) ac argaeledd porthladdoedd:
Gadewch i ni greu gosodiad newydd ar gyfer y grŵp diogelwch VPC, a fydd yn caniatáu mynediad allanol i'n gweinydd cronfa ddata trwy borthladd 5432 (PostgreSQL).
Gadewch i ni fynd mewn ffenestr porwr ar wahân i'r consol AWS yn y Dangosfwrdd VPC -> Grwpiau Diogelwch -> Creu adran grŵp diogelwch:
Rydym yn gosod yr enw ar gyfer y grŵp Diogelwch - PostgreSQL, disgrifiad, nodwch pa VPC y dylai'r grŵp hwn fod yn gysylltiedig ag ef a chliciwch ar y botwm Creu:
Rydyn ni'n llenwi'r rheolau Inbound grŵp sydd newydd eu creu ar gyfer porthladd 5432, fel y dangosir yn y llun isod. Ni allwch nodi'r porthladd â llaw, ond dewiswch PostgreSQL o'r gwymplen Math.
A siarad yn fanwl gywir, mae'r gwerth ::/0 yn golygu argaeledd traffig sy'n dod i mewn ar gyfer y gweinydd o bob cwr o'r byd, nad yw'n hollol wir yn ganonaidd, ond i ddadansoddi'r enghraifft, gadewch i ni ddefnyddio'r dull hwn:
Rydym yn dychwelyd i dudalen y porwr, lle mae gennym “Ffurfweddu gosodiadau uwch” yn agor ac yn dewis grwpiau diogelwch VPC -> Dewiswch grwpiau diogelwch VPC presennol -> PostgreSQL yn yr adran:
Nesaf, yn yr adran Opsiynau cronfa ddata -> Enw cronfa ddata -> gosodwch yr enw - habrDB.
Gallwn adael gweddill y paramedrau, ac eithrio analluogi copi wrth gefn (cyfnod cadw wrth gefn - 0 diwrnod), monitro a Insights Perfformiad, yn ddiofyn. Cliciwch ar y botwm Creu cronfa ddata:
Triniwr Ffrwd
Y cam olaf fydd datblygu swydd Spark, a fydd yn prosesu data newydd o Kafka bob dwy eiliad ac yn mewnbynnu'r canlyniad i'r gronfa ddata.
Fel y nodwyd uchod, pwyntiau gwirio yw'r prif fecanwaith yn SparkStreaming y mae'n rhaid eu ffurfweddu i ddarparu goddefgarwch bai. Byddwn yn defnyddio pwyntiau gwirio ac, os bydd y weithdrefn yn methu, dim ond dychwelyd i'r pwynt gwirio diwethaf ac ailddechrau cyfrifiadau ohono i adennill y data coll y bydd angen i'r modiwl Spark Streaming ei wneud.
Gellir galluogi pwynt gwirio trwy osod cyfeiriadur ar system ffeiliau ddibynadwy sy'n gallu goddef diffygion (e.e. HDFS, S3, ac ati) lle bydd gwybodaeth y pwynt gwirio yn cael ei storio. Gwneir hyn gyda, er enghraifft:
streamingContext.checkpoint(checkpointDirectory)
Yn ein hesiampl, byddwn yn defnyddio'r dull canlynol, sef, os yw'r Cyfeiriadur pwynt gwirio yn bodoli, yna bydd y cyd-destun yn cael ei ail-greu o'r data pwynt gwirio. Os nad yw'r cyfeiriadur yn bodoli (h.y. mae'n cael ei weithredu am y tro cyntaf), yna gelwir y ffwythiant functionToCreateContext i greu cyd-destun newydd a gosod DStreams:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Rydym yn creu gwrthrych DirectStream i gysylltu â'r pwnc "trafodiad" gan ddefnyddio dull createDirectStream llyfrgell KafkaUtils:
Gan ddefnyddio Spark SQL, rydyn ni'n grwpio'n syml ac yn allbwn y canlyniad i'r consol:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Ac yna rydyn ni'n cadw'r data cyfanredol a dderbyniwyd mewn tabl yn AWS RDS. I arbed canlyniadau'r agregiad i dabl cronfa ddata, byddwn yn defnyddio dull ysgrifennu'r gwrthrych DataFrame:
Ychydig eiriau am sefydlu cysylltiad ag AWS RDS. Fe wnaethon ni greu'r defnyddiwr a'r cyfrinair ar ei gyfer yn y cam “Deploying AWS PostgreSQL”. Fel url gweinydd y gronfa ddata, dylech ddefnyddio'r Endpoint, a ddangosir yn yr adran Cysylltedd a diogelwch:
Er mwyn cysylltu Spark a Kafka yn gywir, dylech redeg y swydd trwy smark-submit gan ddefnyddio'r arteffact gwreichionen-ffrydio-kafka-0-8_2.11. Yn ogystal, byddwn hefyd yn defnyddio arteffact ar gyfer rhyngweithio â chronfa ddata PostgreSQL, byddwn yn eu pasio trwy --pecynnau.
Er mwyn hyblygrwydd y sgript, byddwn hefyd yn tynnu enw'r gweinydd neges a'r pwnc yr ydym am dderbyn data ohono fel paramedrau mewnbwn.
Gweithiodd popeth allan! Fel y gwelwch yn y llun isod, tra bod y rhaglen yn rhedeg, mae canlyniadau agregu newydd yn cael eu harddangos bob 2 eiliad, oherwydd fe wnaethom osod yr egwyl bwndelu i 2 eiliad pan wnaethom greu'r gwrthrych StreamingContext:
Nesaf, rydym yn gwneud ymholiad syml i'r gronfa ddata i wirio am gofnodion yn y tabl llif_trafodiad:
Casgliad
Yn yr erthygl hon, ystyriwyd enghraifft o brosesu gwybodaeth ffrydio gan ddefnyddio Spark Streaming ar y cyd ag Apache Kafka a PostgreSQL. Gyda'r twf mewn symiau o ddata o wahanol ffynonellau, mae'n anodd goramcangyfrif gwerth ymarferol Spark Streaming ar gyfer creu cymwysiadau amser real a ffrydio.
Gallwch ddod o hyd i'r cod ffynhonnell llawn yn fy ystorfa yn GitHub.
Rwy'n hapus i drafod yr erthygl hon, edrychaf ymlaen at eich sylwadau, a hefyd, rwy'n gobeithio am feirniadaeth adeiladol gan bob darllenydd pryderus.
Rwy'n dymuno llwyddiant i chi!
Ps. Yn wreiddiol, y bwriad oedd defnyddio cronfa ddata PostgreSQL leol, ond o ystyried fy nghariad at AWS, penderfynais symud y gronfa ddata i'r cwmwl. Yn yr erthygl nesaf ar y pwnc hwn, byddaf yn dangos i chi sut i weithredu'r system gyfan a ddisgrifir uchod yn AWS gan ddefnyddio AWS Kinesis ac AWS EMR. Dilynwch y newyddion!