Molo, Habr! Namhlanje siza kwakha inkqubo eya kuqhuba imiyalezo ye-Apache Kafka ngokusebenzisa i-Spark Streaming kwaye ibhale iziphumo zokucubungula kwi-database yefu ye-AWS RDS.
Makhe sicinge ukuba iziko elithile lekhredithi lisibekela umsebenzi wokucubungula iintengiselwano ezingenayo "ngokubhabha" kuwo onke amasebe alo. Oku kunokwenziwa ngenjongo yokubala ngokukhawuleza indawo yemali evulekile kwi-treasury, imida okanye iziphumo zemali yeentengiselwano, njl.
Indlela yokuphumeza le meko ngaphandle kokusetyenziswa komlingo kunye nomlingo womlingo - funda phantsi kokusikwa! Hamba!
Intshayelelo
Ngokuqinisekileyo, ukusetyenzwa kwedatha eninzi ngexesha lokwenyani kunika amathuba amaninzi okusetyenziswa kwiinkqubo zanamhlanje. Enye yezona ntlanganisela zidumileyo kule tandem ye-Apache Kafka kunye ne-Spark Streaming, apho i-Kafka idala umjelo weepakethi zemiyalezo engenayo, kunye ne-Spark Streaming iinkqubo ezipakethe ngexesha elithile.
Ukwandisa ukunyamezela kwesiphoso kwisicelo, siya kusebenzisa iindawo zokukhangela. Ngale ndlela, xa injini ye-Spark Streaming ifuna ukufumana kwakhona idatha elahlekileyo, kufuneka ibuyele kwindawo yokugqibela yokutshekisha kwaye iphinde iqalise izibalo ukusuka apho.
Uyilo lwenkqubo ephuhlisiweyo
Amacandelo asetyenzisiweyo:
Apache Kafka yinkqubo yokuthumela imiyalezo esasaziweyo epapashiweyo. Ilungele ukusetyenziswa komyalezo ongaxhunyiwe kwi-intanethi kunye ne-intanethi. Ukuthintela ukulahleka kwedatha, imiyalezo ye-Kafka igcinwa kwidiski kwaye iphindaphindwe ngaphakathi kweqela. Inkqubo yeKafka yakhiwe phezu kwenkonzo yongqamaniso yeZooKeeper;Ukusasazwa kweApache Spark -Inxalenye yentlantsi yokusetyenzwa kwedatha yostrimisho. Imodyuli ye-Spark Streaming yakhiwe ngokusebenzisa i-architecture ye-micro-batch, apho ukuhanjiswa kwedatha kuchazwa njengokulandelelana okuqhubekayo kweepakethi ezincinci zedatha. I-Spark Streaming ithatha idatha kwimithombo eyahlukeneyo kwaye idibanise kwiipakethe ezincinci. Iiphakheji ezintsha zenziwe ngamaxesha aqhelekileyo. Ekuqaleni kwexesha ngalinye lekhefu, ipakethi entsha yenziwe, kwaye nayiphi na idatha efunyenweyo ngelo xesha ifakwe kwipakethi. Ekupheleni kwekhefu, ukukhula kwepakethi kuyeka. Ubungakanani besithuba sokuphumla bumiselwa yiparamitha ebizwa ngokuba yi-batch interval;Apache Spark SQL - idibanisa inkqubo yobudlelwane kunye nenkqubo esebenzayo yeSpark. Idatha eyakhiweyo ithetha idatha ene-schema, oko kukuthi, iseti enye yemimandla yazo zonke iirekhodi. I-Spark SQL isekela igalelo elivela kwimithombo eyahlukeneyo yedatha eyakhiwe kwaye, ngenxa yokufumaneka kolwazi lwe-schema, inokubuyisela ngokufanelekileyo kuphela iindawo ezifunekayo zeerekhodi, kwaye inikezela ngeDathaFrame APIs;AWS RDS isiseko sedatha esisekwe kwilifu elingabizi kakhulu, inkonzo yewebhu eyenza kube lula ukuseta, ukusebenza kunye nokukala, kwaye ilawulwa ngokuthe ngqo yiAmazon.
Ukufakela nokusebenzisa iseva yeKafka
Ngaphambi kokusebenzisa iKafka ngokuthe ngqo, kufuneka uqiniseke ukuba unayo iJava, kuba... I-JVM isetyenziselwa umsebenzi:
sudo apt-get update
sudo apt-get install default-jre
java -version
Masenze umsebenzisi omtsha oza kusebenza noKafka:
sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo
Okulandelayo, khuphela ukuhanjiswa kwiwebhusayithi esemthethweni ye-Apache Kafka:
wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"
Khupha indawo yokugcina ekhutshelweyo:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
Inyathelo elilandelayo uyazikhethela. Inyani yeyokuba useto olungagqibekanga alukuvumeli ukuba usebenzise ngokupheleleyo bonke ubunakho beApache Kafka. Umzekelo, cima isihloko, udidi, iqela apho imiyalezo inokupapashwa. Ukutshintsha oku, masihlele ifayile yoqwalaselo:
vim ~/kafka/config/server.properties
Yongeza oku kulandelayo ekupheleni kwefayile:
delete.topic.enable = true
Ngaphambi kokuqala iseva yeKafka, kufuneka uqale iseva yeZooKeeper; siya kusebenzisa iskripthi esincedisayo esiza nokuhanjiswa kweKafka:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
Emva kokuba iZooKeeper iqalile ngempumelelo, yazisa iseva yeKafka kwindawo eyahlukileyo:
bin/kafka-server-start.sh config/server.properties
Masidale isihloko esitsha esibizwa ngokuba yiNtengiselwano:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction
Masiqinisekise ukuba isihloko esinenani elifunekayo lezahlulo kunye nokuphindaphinda senziwe:
bin/kafka-topics.sh --describe --zookeeper localhost:2181
Masiphoswe lixesha lokuvavanya umvelisi kunye nomthengi ngesihloko esitsha esenziwe. Iinkcukacha ezithe kratya malunga nendlela onokuvavanya ngayo ukuthumela nokufumana imiyalezo ibhaliwe kuxwebhu olusemthethweni -
Ukubhala komvelisi
Umvelisi uya kuvelisa idatha engaqhelekanga - imiyalezo eyi-100 nganye yesibini. Ngedatha engacwangciswanga sithetha isichazi-magama esinemimandla emithathu:
- Branch - igama lendawo yokuthengisa yeziko letyala;
- imali yeloo lizwe - imali yentengiselwano;
- isixa - imali yentengiselwano. Isixa-mali siya kuba inombolo positive ukuba kuthengwe lwemali yiBhanki, kunye nenombolo negative ukuba intengiso.
Ikhowudi yomvelisi ibonakala ngolu hlobo:
from numpy.random import choice, randint
def get_random_value():
new_dict = {}
branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
currency_list = ['RUB', 'USD', 'EUR', 'GBP']
new_dict['branch'] = choice(branch_list)
new_dict['currency'] = choice(currency_list)
new_dict['amount'] = randint(-100, 100)
return new_dict
Okulandelayo, sisebenzisa indlela yokuthumela, sithumela umyalezo kumncedisi, kwisihloko esisidingayo, ngefomathi ye-JSON:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Xa usebenzisa iscript, sifumana le miyalezo ilandelayo kwi-terminal:
Oku kuthetha ukuba yonke into isebenza njengoko besifuna - umvelisi uvelisa kwaye athumele imiyalezo kwisihloko esisidingayo.
Inyathelo elilandelayo kukufaka i-Spark kwaye uqhubekise lo mjelo womyalezo.
Ukufakela i-Apache Spark
Apache Spark liqonga lekhompuyutha yeqela elisebenza jikelele nelikumgangatho ophezulu.
I-Spark yenza ngcono kunokuphunyezwa okuthandwayo kwemodeli ye-MapNciphisa ngelixa ixhasa uluhlu olubanzi lweentlobo zokubala, kubandakanywa imibuzo edibeneyo kunye nokuqhutyelwa komjelo. Isantya sidlala indima ebalulekileyo xa kusetyenzwa inani elikhulu ledatha, kuba sisantya esikuvumela ukuba usebenze ngokudibeneyo ngaphandle kokuchitha imizuzu okanye iiyure ulindile. Awona mandla makhulu eSpark awenza ukuba akhawuleze kakhulu kukukwazi ukwenza izibalo ezikwinkumbulo.
Esi sikhokelo sibhalwe kwi-Scala, ke kufuneka usifake kuqala:
sudo apt-get install scala
Khuphela ukuhanjiswa kweSpark kwiwebhusayithi esemthethweni:
wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"
Khupha indawo yokugcina:
sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark
Yongeza indlela eya kwi-Spark kwifayile ye-bash:
vim ~/.bashrc
Yongeza le migca ilandelayo ngomhleli:
SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH
Sebenzisa lo myalelo ungezantsi emva kokwenza utshintsho kwi-bashrc:
source ~/.bashrc
Ukusasaza i-AWS PostgreSQL
Ekuphela kwento eseleyo kukusasaza iziko ledatha apho siya kufaka khona ulwazi olucwangcisiweyo oluvela kwimisinga. Kule nto siza kusebenzisa inkonzo ye-AWS RDS.
Yiya kwi-console ye-AWS -> AWS RDS -> Iidatabase -> Yenza isiseko sedatha:
Khetha iPostgreSQL kwaye ucofe Okulandelayo:
Ngokuba Lo mzekelo ngoweenjongo zemfundo kuphela; siya kusebenzisa iseva yasimahla “ubuncinane” (Inqanaba Lamahla):
Okulandelayo, sibeka uphawu kwibhloko yeTier yasimahla, kwaye emva koko siya kunikwa ngokuzenzekelayo umzekelo weklasi ye-t2.micro - nangona ibuthathaka, isimahla kwaye iwulungele umsebenzi wethu:
Okulandelayo kuza izinto ezibaluleke kakhulu: igama lomzekelo wesiseko sedatha, igama lomsebenzisi oyintloko kunye negama eliyimfihlo. Makhe sichaze umzekelo: myHabrTest, umsebenzisi oyintloko: habr, inombolo yokuvula: habr12345 kwaye ucofe iqhosha elilandelayo:
Kwiphepha elilandelayo kukho iiparameters ezinoxanduva lokufikeleleka kweseva yethu yedatha ukusuka ngaphandle (ukufikeleleka koluntu) kunye nokufumaneka kwe-port:
Masenze isethingi entsha yeqela lokhuseleko leVPC, eliya kuvumela ukufikelela kwangaphandle kwiseva yethu yedatha nge-port 5432 (PostgreSQL).
Makhe siye kwi-console ye-AWS kwifestile yesikhangeli esahlukileyo kwi-VPC Dashboard -> Amaqela oKhuseleko-> Yenza icandelo leqela lokhuseleko:
Siseta igama leqela loKhuseleko-PostgreSQL, inkcazo, bonisa ukuba yeyiphi iVPC eli qela ekufuneka inxulunyaniswe nayo kwaye ucofe iqhosha elithi Yenza:
Gcwalisa i-Inbound imithetho ye-port 5432 yeqela elitsha elenziwe, njengoko kubonisiwe kumfanekiso ongezantsi. Awunako ukukhankanya izibuko ngesandla, kodwa khetha i-PostgreSQL kuluhlu oluhlayo.
Ukuthetha ngokuthe ngqo, ixabiso ::/0 lithetha ukufumaneka kwetrafikhi engenayo kwiseva evela kwihlabathi liphela, engeyonyani ngokupheleleyo, kodwa ukuhlalutya umzekelo, masizivumele ukuba sisebenzise le ndlela:
Sibuyela kwiphepha lesikhangeli, apho sino “Lungiselela useto oluphambili” oluvulekileyo kwaye ukhethe kwicandelo lamaqela okhuseleko eVPC -> Khetha amaqela okhuseleko akhoyo eVPC -> PostgreSQL:
Okulandelayo, kwiinketho zeDatabase -> Igama leDatha yedatha -> seta igama - habrDB.
Sinokushiya iiparameters eziseleyo, ngaphandle kokukhubaza i-backup (ixesha lokugcinwa kwe-backup - iintsuku ezi-0), ukubeka iliso kunye neNtsebenzo yeNtsebenzo, ngokungagqibekanga. Cofa kwiqhosha Yenza uvimba weenkcukacha:
Umphathi womsonto
Inqanaba lokugqibela liya kuba kuphuhliso lomsebenzi we-Spark, oza kuqhuba idatha entsha evela eKafka yonke imizuzwana emibini kwaye ifake umphumo kwisiseko sedatha.
Njengoko kuphawuliwe ngasentla, iindawo zokukhangela yeyona ndlela ingundoqo kwi-SparkStreaming ekufuneka iqwalaselwe ukuqinisekisa ukunyamezela iziphene. Siza kusebenzisa iindawo zokutshekisha kwaye, ukuba inkqubo iyasilela, imodyuli ye-Spark Streaming iya kufuna kuphela ukubuyela kwindawo yokugqibela yokukhangela kwaye iphinde iqalise izibalo kuyo ukuze iphinde ifumane idatha elahlekileyo.
Ukukhangela kunokwenziwa ngokuseta uvimba weefayili kwi-fault-tolerant, ethembekileyo inkqubo yefayili (efana ne-HDFS, i-S3, njl.) apho ulwazi lwendawo yokukhangela luya kugcinwa khona. Oku kwenziwa kusetyenziswa, umzekelo:
streamingContext.checkpoint(checkpointDirectory)
Kumzekelo wethu, siya kusebenzisa le ndlela ilandelayo, eyile, ukuba i-checkpointDirectory ikhona, ke umxholo uya kwenziwa ngokutsha ukusuka kwidatha ye-checkpoint. Ukuba uvimba weefayili awukho (oko kukuthi wenziwe okokuqala), ke functionToCreateContext ibizelwa ukwenza umxholo omtsha kwaye uqwalasele iDStreams:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Senza into ye-DirectStream ukuxhuma kwisihloko "sentengiselwano" usebenzisa indlela yokudalaDirectStream yelayibrari yeKafkaUtils:
from pyspark.streaming.kafka import KafkaUtils
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)
broker_list = 'localhost:9092'
topic = 'transaction'
directKafkaStream = KafkaUtils.createDirectStream(ssc,
[topic],
{"metadata.broker.list": broker_list})
Ukwahlulahlula idatha engenayo ngefomathi ye-JSON:
rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
currency=w['currency'],
amount=w['amount']))
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")
Sisebenzisa iSpark SQL, senza iqela elilula kwaye sibonise iziphumo kwikhonsoli:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Ukufumana umbhalo wombuzo kunye nokuwuqhuba ngeSpark SQL:
sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)
Kwaye emva koko sigcina iziphumo ezidityanisiweyo zedatha kwitafile kwi-AWS RDS. Ukugcina iziphumo ezidityanisiweyo kwitheyibhile yedatha, siya kusebenzisa indlela yokubhala yento yeFrame yeDatha:
testResultDataFrame.write
.format("jdbc")
.mode("append")
.option("driver", 'org.postgresql.Driver')
.option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB")
.option("dbtable", "transaction_flow")
.option("user", "habr")
.option("password", "habr12345")
.save()
Amagama ambalwa malunga nokuseta umdibaniso kwi-AWS RDS. Senze umsebenzisi kunye negama lokugqitha kuyo kwinqanaba elithi "Ukuhambisa i-AWS PostgreSQL". Kufuneka usebenzise i-Endpoint njenge-url yedathabheyisi yedatha, eboniswa kuQhagamshelwano kunye necandelo lokhuseleko:
Ukuze udibanise ngokuchanekileyo i-Spark kunye ne-Kafka, kufuneka uqhube umsebenzi ngokungenisa i-smark usebenzisa i-artifact. intlantsi-umsinga-kafka-0-8_2.11. Ukongeza, siya kusebenzisa i-artifact yokunxibelelana nesiseko sedatha yePostgreSQL; siya kuzidlulisela ngee--packages.
Ukuguquguquka kweskripthi, siya kubandakanya njengeeparamitha zokufaka igama lomncedisi womyalezo kunye nesihloko apho sifuna ukufumana idatha.
Ke, lixesha lokuqalisa kwaye ujonge ukusebenza kwenkqubo:
spark-submit
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207
spark_job.py localhost:9092 transaction
Yahamba yonke into! Njengoko ubona kumfanekiso ongezantsi, ngelixa usetyenziso lusebenza, iziphumo ezitsha zohlanganiso ziphuma rhoqo kwimizuzwana emi-2, kuba sibeka isithuba sokudibanisa ukuya kwimizuzwana emi-2 xa sisenza into ye-StreamingContext:
Emva koko, senza umbuzo olula kwisiseko sedatha ukujonga ubukho beerekhodi kwitafile transaction_flow:
isiphelo
Eli nqaku lijonge umzekelo wokuhanjiswa kolwazi usebenzisa i-Spark Streaming ngokubambisana ne-Apache Kafka kunye ne-PostgreSQL. Ngokukhula kwedatha evela kwimithombo eyahlukeneyo, kunzima ukugqithisa ixabiso elisebenzayo le-Spark Streaming ekudaleni ukusasazwa kunye nezicelo zexesha langempela.
Ungafumana ikhowudi yemvelaphi epheleleyo kwindawo yam yokugcina
Ndiyavuya ukuxoxa ngeli nqaku, ndijonge phambili kwizimvo zenu, kwaye ndinethemba lokugxekwa okwakhayo kubo bonke abafundi abanenkathalo.
Ndikunqwenelela impumelelo!
INdu. Ekuqaleni kwakucetywe ukusebenzisa i-database ye-PostgreSQL yendawo, kodwa ngenxa yothando lwam lwe-AWS, ndagqiba ekubeni ndihambise i-database efini. Kwinqaku elilandelayo kwesi sihloko, ndiya kubonisa indlela yokuphumeza yonke inkqubo echazwe ngasentla kwi-AWS usebenzisa i-AWS Kinesis kunye ne-AWS EMR. Landela iindaba!
umthombo: www.habr.com