I-Apache Kafka kunye noKusetyenzwa kweDatha yokuHamba ngeSpark Streaming

Molo, Habr! Namhlanje siza kwakha inkqubo eya kuqhuba imiyalezo ye-Apache Kafka ngokusebenzisa i-Spark Streaming kwaye ibhale iziphumo zokucubungula kwi-database yefu ye-AWS RDS.

Makhe sicinge ukuba iziko elithile lekhredithi lisibekela umsebenzi wokucubungula iintengiselwano ezingenayo "ngokubhabha" kuwo onke amasebe alo. Oku kunokwenziwa ngenjongo yokubala ngokukhawuleza indawo yemali evulekile kwi-treasury, imida okanye iziphumo zemali yeentengiselwano, njl.

Indlela yokuphumeza le meko ngaphandle kokusetyenziswa komlingo kunye nomlingo womlingo - funda phantsi kokusikwa! Hamba!

I-Apache Kafka kunye noKusetyenzwa kweDatha yokuHamba ngeSpark Streaming
(Umthombo womfanekiso)

Intshayelelo

Ngokuqinisekileyo, ukusetyenzwa kwedatha eninzi ngexesha lokwenyani kunika amathuba amaninzi okusetyenziswa kwiinkqubo zanamhlanje. Enye yezona ntlanganisela zidumileyo kule tandem ye-Apache Kafka kunye ne-Spark Streaming, apho i-Kafka idala umjelo weepakethi zemiyalezo engenayo, kunye ne-Spark Streaming iinkqubo ezipakethe ngexesha elithile.

Ukwandisa ukunyamezela kwesiphoso kwisicelo, siya kusebenzisa iindawo zokukhangela. Ngale ndlela, xa injini ye-Spark Streaming ifuna ukufumana kwakhona idatha elahlekileyo, kufuneka ibuyele kwindawo yokugqibela yokutshekisha kwaye iphinde iqalise izibalo ukusuka apho.

Uyilo lwenkqubo ephuhlisiweyo

I-Apache Kafka kunye noKusetyenzwa kweDatha yokuHamba ngeSpark Streaming

Amacandelo asetyenzisiweyo:

  • Apache Kafka yinkqubo yokuthumela imiyalezo esasaziweyo epapashiweyo. Ilungele ukusetyenziswa komyalezo ongaxhunyiwe kwi-intanethi kunye ne-intanethi. Ukuthintela ukulahleka kwedatha, imiyalezo ye-Kafka igcinwa kwidiski kwaye iphindaphindwe ngaphakathi kweqela. Inkqubo yeKafka yakhiwe phezu kwenkonzo yongqamaniso yeZooKeeper;
  • Ukusasazwa kweApache Spark -Inxalenye yentlantsi yokusetyenzwa kwedatha yostrimisho. Imodyuli ye-Spark Streaming yakhiwe ngokusebenzisa i-architecture ye-micro-batch, apho ukuhanjiswa kwedatha kuchazwa njengokulandelelana okuqhubekayo kweepakethi ezincinci zedatha. I-Spark Streaming ithatha idatha kwimithombo eyahlukeneyo kwaye idibanise kwiipakethe ezincinci. Iiphakheji ezintsha zenziwe ngamaxesha aqhelekileyo. Ekuqaleni kwexesha ngalinye lekhefu, ipakethi entsha yenziwe, kwaye nayiphi na idatha efunyenweyo ngelo xesha ifakwe kwipakethi. Ekupheleni kwekhefu, ukukhula kwepakethi kuyeka. Ubungakanani besithuba sokuphumla bumiselwa yiparamitha ebizwa ngokuba yi-batch interval;
  • Apache Spark SQL - idibanisa inkqubo yobudlelwane kunye nenkqubo esebenzayo yeSpark. Idatha eyakhiweyo ithetha idatha ene-schema, oko kukuthi, iseti enye yemimandla yazo zonke iirekhodi. I-Spark SQL isekela igalelo elivela kwimithombo eyahlukeneyo yedatha eyakhiwe kwaye, ngenxa yokufumaneka kolwazi lwe-schema, inokubuyisela ngokufanelekileyo kuphela iindawo ezifunekayo zeerekhodi, kwaye inikezela ngeDathaFrame APIs;
  • AWS RDS isiseko sedatha esisekwe kwilifu elingabizi kakhulu, inkonzo yewebhu eyenza kube lula ukuseta, ukusebenza kunye nokukala, kwaye ilawulwa ngokuthe ngqo yiAmazon.

Ukufakela nokusebenzisa iseva yeKafka

Ngaphambi kokusebenzisa iKafka ngokuthe ngqo, kufuneka uqiniseke ukuba unayo iJava, kuba... I-JVM isetyenziselwa umsebenzi:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Masenze umsebenzisi omtsha oza kusebenza noKafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Okulandelayo, khuphela ukuhanjiswa kwiwebhusayithi esemthethweni ye-Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Khupha indawo yokugcina ekhutshelweyo:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Inyathelo elilandelayo uyazikhethela. Inyani yeyokuba useto olungagqibekanga alukuvumeli ukuba usebenzise ngokupheleleyo bonke ubunakho beApache Kafka. Umzekelo, cima isihloko, udidi, iqela apho imiyalezo inokupapashwa. Ukutshintsha oku, masihlele ifayile yoqwalaselo:

vim ~/kafka/config/server.properties

Yongeza oku kulandelayo ekupheleni kwefayile:

delete.topic.enable = true

Ngaphambi kokuqala iseva yeKafka, kufuneka uqale iseva yeZooKeeper; siya kusebenzisa iskripthi esincedisayo esiza nokuhanjiswa kweKafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Emva kokuba iZooKeeper iqalile ngempumelelo, yazisa iseva yeKafka kwindawo eyahlukileyo:

bin/kafka-server-start.sh config/server.properties

Masidale isihloko esitsha esibizwa ngokuba yiNtengiselwano:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Masiqinisekise ukuba isihloko esinenani elifunekayo lezahlulo kunye nokuphindaphinda senziwe:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

I-Apache Kafka kunye noKusetyenzwa kweDatha yokuHamba ngeSpark Streaming

Masiphoswe lixesha lokuvavanya umvelisi kunye nomthengi ngesihloko esitsha esenziwe. Iinkcukacha ezithe kratya malunga nendlela onokuvavanya ngayo ukuthumela nokufumana imiyalezo ibhaliwe kuxwebhu olusemthethweni - Thumela eminye imiyalezo. Ewe, siqhubela phambili ekubhaleni umvelisi kwiPython usebenzisa iKafkaProducer API.

Ukubhala komvelisi

Umvelisi uya kuvelisa idatha engaqhelekanga - imiyalezo eyi-100 nganye yesibini. Ngedatha engacwangciswanga sithetha isichazi-magama esinemimandla emithathu:

  • Branch - igama lendawo yokuthengisa yeziko letyala;
  • imali yeloo lizwe - imali yentengiselwano;
  • isixa - imali yentengiselwano. Isixa-mali siya kuba inombolo positive ukuba kuthengwe lwemali yiBhanki, kunye nenombolo negative ukuba intengiso.

Ikhowudi yomvelisi ibonakala ngolu hlobo:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Okulandelayo, sisebenzisa indlela yokuthumela, sithumela umyalezo kumncedisi, kwisihloko esisidingayo, ngefomathi ye-JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Xa usebenzisa iscript, sifumana le miyalezo ilandelayo kwi-terminal:

I-Apache Kafka kunye noKusetyenzwa kweDatha yokuHamba ngeSpark Streaming

Oku kuthetha ukuba yonke into isebenza njengoko besifuna - umvelisi uvelisa kwaye athumele imiyalezo kwisihloko esisidingayo.
Inyathelo elilandelayo kukufaka i-Spark kwaye uqhubekise lo mjelo womyalezo.

Ukufakela i-Apache Spark

Apache Spark liqonga lekhompuyutha yeqela elisebenza jikelele nelikumgangatho ophezulu.

I-Spark yenza ngcono kunokuphunyezwa okuthandwayo kwemodeli ye-MapNciphisa ngelixa ixhasa uluhlu olubanzi lweentlobo zokubala, kubandakanywa imibuzo edibeneyo kunye nokuqhutyelwa komjelo. Isantya sidlala indima ebalulekileyo xa kusetyenzwa inani elikhulu ledatha, kuba sisantya esikuvumela ukuba usebenze ngokudibeneyo ngaphandle kokuchitha imizuzu okanye iiyure ulindile. Awona mandla makhulu eSpark awenza ukuba akhawuleze kakhulu kukukwazi ukwenza izibalo ezikwinkumbulo.

Esi sikhokelo sibhalwe kwi-Scala, ke kufuneka usifake kuqala:

sudo apt-get install scala

Khuphela ukuhanjiswa kweSpark kwiwebhusayithi esemthethweni:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Khupha indawo yokugcina:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Yongeza indlela eya kwi-Spark kwifayile ye-bash:

vim ~/.bashrc

Yongeza le migca ilandelayo ngomhleli:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Sebenzisa lo myalelo ungezantsi emva kokwenza utshintsho kwi-bashrc:

source ~/.bashrc

Ukusasaza i-AWS PostgreSQL

Ekuphela kwento eseleyo kukusasaza iziko ledatha apho siya kufaka khona ulwazi olucwangcisiweyo oluvela kwimisinga. Kule nto siza kusebenzisa inkonzo ye-AWS RDS.

Yiya kwi-console ye-AWS -> AWS RDS -> Iidatabase -> Yenza isiseko sedatha:
I-Apache Kafka kunye noKusetyenzwa kweDatha yokuHamba ngeSpark Streaming

Khetha iPostgreSQL kwaye ucofe Okulandelayo:
I-Apache Kafka kunye noKusetyenzwa kweDatha yokuHamba ngeSpark Streaming

Ngokuba Lo mzekelo ngoweenjongo zemfundo kuphela; siya kusebenzisa iseva yasimahla “ubuncinane” (Inqanaba Lamahla):
I-Apache Kafka kunye noKusetyenzwa kweDatha yokuHamba ngeSpark Streaming

Okulandelayo, sibeka uphawu kwibhloko yeTier yasimahla, kwaye emva koko siya kunikwa ngokuzenzekelayo umzekelo weklasi ye-t2.micro - nangona ibuthathaka, isimahla kwaye iwulungele umsebenzi wethu:
I-Apache Kafka kunye noKusetyenzwa kweDatha yokuHamba ngeSpark Streaming

Okulandelayo kuza izinto ezibaluleke kakhulu: igama lomzekelo wesiseko sedatha, igama lomsebenzisi oyintloko kunye negama eliyimfihlo. Makhe sichaze umzekelo: myHabrTest, umsebenzisi oyintloko: habr, inombolo yokuvula: habr12345 kwaye ucofe iqhosha elilandelayo:
I-Apache Kafka kunye noKusetyenzwa kweDatha yokuHamba ngeSpark Streaming

Kwiphepha elilandelayo kukho iiparameters ezinoxanduva lokufikeleleka kweseva yethu yedatha ukusuka ngaphandle (ukufikeleleka koluntu) kunye nokufumaneka kwe-port:

I-Apache Kafka kunye noKusetyenzwa kweDatha yokuHamba ngeSpark Streaming

Masenze isethingi entsha yeqela lokhuseleko leVPC, eliya kuvumela ukufikelela kwangaphandle kwiseva yethu yedatha nge-port 5432 (PostgreSQL).
Makhe siye kwi-console ye-AWS kwifestile yesikhangeli esahlukileyo kwi-VPC Dashboard -> Amaqela oKhuseleko-> Yenza icandelo leqela lokhuseleko:
I-Apache Kafka kunye noKusetyenzwa kweDatha yokuHamba ngeSpark Streaming

Siseta igama leqela loKhuseleko-PostgreSQL, inkcazo, bonisa ukuba yeyiphi iVPC eli qela ekufuneka inxulunyaniswe nayo kwaye ucofe iqhosha elithi Yenza:
I-Apache Kafka kunye noKusetyenzwa kweDatha yokuHamba ngeSpark Streaming

Gcwalisa i-Inbound imithetho ye-port 5432 yeqela elitsha elenziwe, njengoko kubonisiwe kumfanekiso ongezantsi. Awunako ukukhankanya izibuko ngesandla, kodwa khetha i-PostgreSQL kuluhlu oluhlayo.

Ukuthetha ngokuthe ngqo, ixabiso ::/0 lithetha ukufumaneka kwetrafikhi engenayo kwiseva evela kwihlabathi liphela, engeyonyani ngokupheleleyo, kodwa ukuhlalutya umzekelo, masizivumele ukuba sisebenzise le ndlela:
I-Apache Kafka kunye noKusetyenzwa kweDatha yokuHamba ngeSpark Streaming

Sibuyela kwiphepha lesikhangeli, apho sino “Lungiselela useto oluphambili” oluvulekileyo kwaye ukhethe kwicandelo lamaqela okhuseleko eVPC -> Khetha amaqela okhuseleko akhoyo eVPC -> PostgreSQL:
I-Apache Kafka kunye noKusetyenzwa kweDatha yokuHamba ngeSpark Streaming

Okulandelayo, kwiinketho zeDatabase -> Igama leDatha yedatha -> seta igama - habrDB.

Sinokushiya iiparameters eziseleyo, ngaphandle kokukhubaza i-backup (ixesha lokugcinwa kwe-backup - iintsuku ezi-0), ukubeka iliso kunye neNtsebenzo yeNtsebenzo, ngokungagqibekanga. Cofa kwiqhosha Yenza uvimba weenkcukacha:
I-Apache Kafka kunye noKusetyenzwa kweDatha yokuHamba ngeSpark Streaming

Umphathi womsonto

Inqanaba lokugqibela liya kuba kuphuhliso lomsebenzi we-Spark, oza kuqhuba idatha entsha evela eKafka yonke imizuzwana emibini kwaye ifake umphumo kwisiseko sedatha.

Njengoko kuphawuliwe ngasentla, iindawo zokukhangela yeyona ndlela ingundoqo kwi-SparkStreaming ekufuneka iqwalaselwe ukuqinisekisa ukunyamezela iziphene. Siza kusebenzisa iindawo zokutshekisha kwaye, ukuba inkqubo iyasilela, imodyuli ye-Spark Streaming iya kufuna kuphela ukubuyela kwindawo yokugqibela yokukhangela kwaye iphinde iqalise izibalo kuyo ukuze iphinde ifumane idatha elahlekileyo.

Ukukhangela kunokwenziwa ngokuseta uvimba weefayili kwi-fault-tolerant, ethembekileyo inkqubo yefayili (efana ne-HDFS, i-S3, njl.) apho ulwazi lwendawo yokukhangela luya kugcinwa khona. Oku kwenziwa kusetyenziswa, umzekelo:

streamingContext.checkpoint(checkpointDirectory)

Kumzekelo wethu, siya kusebenzisa le ndlela ilandelayo, eyile, ukuba i-checkpointDirectory ikhona, ke umxholo uya kwenziwa ngokutsha ukusuka kwidatha ye-checkpoint. Ukuba uvimba weefayili awukho (oko kukuthi wenziwe okokuqala), ke functionToCreateContext ibizelwa ukwenza umxholo omtsha kwaye uqwalasele iDStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Senza into ye-DirectStream ukuxhuma kwisihloko "sentengiselwano" usebenzisa indlela yokudalaDirectStream yelayibrari yeKafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Ukwahlulahlula idatha engenayo ngefomathi ye-JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Sisebenzisa iSpark SQL, senza iqela elilula kwaye sibonise iziphumo kwikhonsoli:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Ukufumana umbhalo wombuzo kunye nokuwuqhuba ngeSpark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Kwaye emva koko sigcina iziphumo ezidityanisiweyo zedatha kwitafile kwi-AWS RDS. Ukugcina iziphumo ezidityanisiweyo kwitheyibhile yedatha, siya kusebenzisa indlela yokubhala yento yeFrame yeDatha:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Amagama ambalwa malunga nokuseta umdibaniso kwi-AWS RDS. Senze umsebenzisi kunye negama lokugqitha kuyo kwinqanaba elithi "Ukuhambisa i-AWS PostgreSQL". Kufuneka usebenzise i-Endpoint njenge-url yedathabheyisi yedatha, eboniswa kuQhagamshelwano kunye necandelo lokhuseleko:

I-Apache Kafka kunye noKusetyenzwa kweDatha yokuHamba ngeSpark Streaming

Ukuze udibanise ngokuchanekileyo i-Spark kunye ne-Kafka, kufuneka uqhube umsebenzi ngokungenisa i-smark usebenzisa i-artifact. intlantsi-umsinga-kafka-0-8_2.11. Ukongeza, siya kusebenzisa i-artifact yokunxibelelana nesiseko sedatha yePostgreSQL; siya kuzidlulisela ngee--packages.

Ukuguquguquka kweskripthi, siya kubandakanya njengeeparamitha zokufaka igama lomncedisi womyalezo kunye nesihloko apho sifuna ukufumana idatha.

Ke, lixesha lokuqalisa kwaye ujonge ukusebenza kwenkqubo:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Yahamba yonke into! Njengoko ubona kumfanekiso ongezantsi, ngelixa usetyenziso lusebenza, iziphumo ezitsha zohlanganiso ziphuma rhoqo kwimizuzwana emi-2, kuba sibeka isithuba sokudibanisa ukuya kwimizuzwana emi-2 xa sisenza into ye-StreamingContext:

I-Apache Kafka kunye noKusetyenzwa kweDatha yokuHamba ngeSpark Streaming

Emva koko, senza umbuzo olula kwisiseko sedatha ukujonga ubukho beerekhodi kwitafile transaction_flow:

I-Apache Kafka kunye noKusetyenzwa kweDatha yokuHamba ngeSpark Streaming

isiphelo

Eli nqaku lijonge umzekelo wokuhanjiswa kolwazi usebenzisa i-Spark Streaming ngokubambisana ne-Apache Kafka kunye ne-PostgreSQL. Ngokukhula kwedatha evela kwimithombo eyahlukeneyo, kunzima ukugqithisa ixabiso elisebenzayo le-Spark Streaming ekudaleni ukusasazwa kunye nezicelo zexesha langempela.

Ungafumana ikhowudi yemvelaphi epheleleyo kwindawo yam yokugcina GitHub.

Ndiyavuya ukuxoxa ngeli nqaku, ndijonge phambili kwizimvo zenu, kwaye ndinethemba lokugxekwa okwakhayo kubo bonke abafundi abanenkathalo.

Ndikunqwenelela impumelelo!

INdu. Ekuqaleni kwakucetywe ukusebenzisa i-database ye-PostgreSQL yendawo, kodwa ngenxa yothando lwam lwe-AWS, ndagqiba ekubeni ndihambise i-database efini. Kwinqaku elilandelayo kwesi sihloko, ndiya kubonisa indlela yokuphumeza yonke inkqubo echazwe ngasentla kwi-AWS usebenzisa i-AWS Kinesis kunye ne-AWS EMR. Landela iindaba!

umthombo: www.habr.com

Yongeza izimvo