Sawubona, Habr! Namuhla sizokwakha uhlelo oluzocubungula ukusakazwa komlayezo we-Apache Kafka sisebenzisa i-Spark Streaming futhi sibhale imiphumela yokucubungula kusizindalwazi samafu se-AWS RDS.
Ake sicabange ukuthi isikhungo esithile sezikweletu sisibekela umsebenzi wokucubungula imisebenzi engenayo “ngokuphazima kweso” kuwo wonke amagatsha aso. Lokhu kungenziwa ngenjongo yokubala ngokushesha indawo yohlobo lwemali evulekile kumgcinimafa, imikhawulo noma imiphumela yezezimali yemisebenzi, njll.
Indlela yokuqalisa leli cala ngaphandle kokusebenzisa imilingo neziphonso zemilingo - funda ngaphansi kokusika! Hamba!
Isingeniso
Kunjalo, ukucubungula inani elikhulu ledatha ngesikhathi sangempela kunikeza amathuba amaningi okusetshenziswa ezinhlelweni zesimanje. Enye yezinhlanganisela ezidume kakhulu zalokhu i-tandem ye-Apache Kafka kanye ne-Spark Streaming, lapho i-Kafka idala ukusakazwa kwamaphakethe emilayezo engenayo, futhi i-Spark Streaming icubungula lawa maphakethe ngesikhathi esithile esinikeziwe.
Ukwandisa ukubekezelelwa kwephutha kohlelo lokusebenza, sizosebenzisa izindawo zokuhlola. Ngalo mshini, lapho injini yokusakaza kwe-Spark idinga ukubuyisela idatha elahlekile, idinga kuphela ukubuyela endaweni yokuhlola yokugcina bese iqalisa kabusha izibalo ukusuka lapho.
I-Architecture yesistimu ethuthukisiwe
Izingxenye ezisetshenzisiwe:
Apache Kafka iwuhlelo lokuthumela imiyalezo olusabalalisiwe. Ifanele ukusetshenziswa komlayezo okungaxhunyiwe ku-inthanethi nokungaxhunyiwe ku-inthanethi. Ukuze uvimbele ukulahleka kwedatha, imilayezo ye-Kafka igcinwa kudiski futhi iphindaphindwe ngaphakathi kweqoqo. Isistimu ye-Kafka yakhelwe phezu kwesevisi yokuvumelanisa ye-ZooKeeper;Ukusakaza kwe-Apache Spark - Ingxenye ye-Spark yokucubungula idatha yokusakaza. Imojuli ye-Spark Streaming yakhiwe kusetshenziswa i-micro-batch architecture, lapho ukusakazwa kwedatha kuhunyushwa njengokulandelana okuqhubekayo kwamaphakethe wedatha amancane. I-Spark Streaming ithatha idatha emithonjeni ehlukene futhi iyihlanganise ibe amaphakheji amancane. Amaphakheji amasha adalwa ngezikhathi ezithile. Ekuqaleni kwesikhawu ngasinye sesikhathi, kwakhiwa iphakethe elisha, futhi noma iyiphi idatha etholwe ngaleso sikhathi ifakwa ephaketheni. Ekupheleni kwesikhawu, ukukhula kwephakethe kuyama. Usayizi wesikhawu unqunywa ipharamitha ebizwa ngokuthi i-batch interval;I-Apache Spark SQL - ihlanganisa ukucutshungulwa kobudlelwano nohlelo olusebenzayo lwe-Spark. Idatha ehleliwe isho idatha ene-schema, okungukuthi, isethi yezinkambu zawo wonke amarekhodi. I-Spark SQL isekela okokufaka okuvela emithonjeni ehlukahlukene yedatha ehlelekile futhi, ngenxa yokutholakala kolwazi lwe-schema, ingakwazi ukubuyisa kuphela izinkambu ezidingekayo zamarekhodi, futhi ihlinzeke ngama-DataFrame API;AWS RDS iyisizindalwazi sobudlelwano esisekelwe emafini esingabizi kakhulu, isevisi yewebhu eyenza kube lula ukusetha, ukusebenza nokukala, futhi iqondiswa ngokuqondile yi-Amazon.
Ukufaka nokusebenzisa iseva ye-Kafka
Ngaphambi kokusebenzisa i-Kafka ngqo, udinga ukwenza isiqiniseko sokuthi une-Java, ngoba... I-JVM isetshenziselwa umsebenzi:
sudo apt-get update
sudo apt-get install default-jre
java -version
Masidale umsebenzisi omusha ozosebenza no-Kafka:
sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo
Okulandelayo, landa ukusatshalaliswa kuwebhusayithi esemthethweni ye-Apache Kafka:
wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"
Khipha ingobo yomlando elandiwe:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
Isinyathelo esilandelayo siyazikhethela. Iqiniso liwukuthi izilungiselelo ezizenzakalelayo azikuvumeli ukuthi usebenzise ngokugcwele zonke izici ze-Apache Kafka. Isibonelo, susa isihloko, isigaba, iqembu okungashicilelwa kulo imilayezo. Ukuze ushintshe lokhu, masihlele ifayela lokumisa:
vim ~/kafka/config/server.properties
Engeza okulandelayo ekupheleni kwefayela:
delete.topic.enable = true
Ngaphambi kokuqala iseva ye-Kafka, udinga ukuqala iseva ye-ZooKeeper; sizosebenzisa umbhalo osizayo oza nokusabalalisa kwe-Kafka:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
Ngemuva kokuthi i-ZooKeeper iqale ngempumelelo, vula iseva ye-Kafka endaweni ehlukile:
bin/kafka-server-start.sh config/server.properties
Masidale isihloko esisha esibizwa ngokuthi Okwenziwayo:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction
Masiqinisekise ukuthi isihloko esinenombolo edingekayo yokuhlukanisa nokuphindaphinda sekudaliwe:
bin/kafka-topics.sh --describe --zookeeper localhost:2181
Masiphuthelwe izikhathi zokuhlola umdidiyeli nomthengi ngesihloko esisanda kwakhiwa. Imininingwane eyengeziwe mayelana nokuthi ungakuhlola kanjani ukuthumela nokwamukela imilayezo ibhalwe emibhalweni esemthethweni -
Ukubhala komdidiyeli
Umkhiqizi uzokhiqiza idatha engahleliwe - imilayezo eyi-100 njalo ngomzuzwana. Ngedatha engahleliwe sisho isichazamazwi esinezinkambu ezintathu:
- Yegatsha - igama lendawo yokuthengisa yesikhungo sezikweletu;
- Currency - Imali yokuthengiselana;
- Inani le - inani lokwenziwe. Inani lizoba inombolo ephozithivu uma kuwukuthengwa kwemali yiBhange, kanye nenombolo enegethivu uma kuyindali.
Ikhodi yomkhiqizi ibukeka kanjena:
from numpy.random import choice, randint
def get_random_value():
new_dict = {}
branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
currency_list = ['RUB', 'USD', 'EUR', 'GBP']
new_dict['branch'] = choice(branch_list)
new_dict['currency'] = choice(currency_list)
new_dict['amount'] = randint(-100, 100)
return new_dict
Okulandelayo, sisebenzisa indlela yokuthumela, sithumela umlayezo kuseva, esihlokweni esisidingayo, ngefomethi ye-JSON:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Lapho usebenzisa umbhalo, sithola imilayezo elandelayo kutheminali:
Lokhu kusho ukuthi yonke into isebenza njengoba besifuna - umdidiyeli ukhiqiza futhi athumele imilayezo esihlokweni esisidingayo.
Isinyathelo esilandelayo ukufaka i-Spark nokucubungula lokhu kusakazwa komlayezo.
Ifaka i-Apache Spark
I-Apache Spark iyiplathifomu yekhompuyutha yeqoqo elisebenza kakhulu emhlabeni wonke.
I-Spark isebenza kangcono kunokusetshenziswa okudumile kwemodeli ye-MapReduce kuyilapho isekela ububanzi obubanzi bezinhlobo zekhompuyutha, okuhlanganisa imibuzo esebenzisanayo nokucutshungulwa kokusakaza. Isivinini sidlala indima ebalulekile lapho kucutshungulwa inani elikhulu ledatha, ngoba isivinini esikuvumela ukuthi usebenze ngokuhlanganyela ngaphandle kokuchitha imizuzu noma amahora ulindile. Enye yamandla amakhulu e-Spark ayenza isheshe kangaka ikhono layo lokwenza izibalo ezisenkumbulweni.
Lolu hlaka lubhalwe nge-Scala, ngakho-ke udinga ukulufaka kuqala:
sudo apt-get install scala
Landa ukusatshalaliswa kwe-Spark kuwebhusayithi esemthethweni:
wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"
Khipha ingobo yomlando:
sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark
Engeza indlela eya ku-Spark kufayela le-bash:
vim ~/.bashrc
Engeza imigqa elandelayo ngomhleli:
SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH
Qalisa umyalo ongezansi ngemuva kokwenza izinguquko ku-bashrc:
source ~/.bashrc
Isebenzisa i-AWS PostgreSQL
Okusele nje ukusebenzisa isizindalwazi lapho sizolayisha khona ulwazi olucutshunguliwe oluvela emifudlaneni. Kulokhu sizosebenzisa isevisi ye-AWS RDS.
Iya kukhonsoli ye-AWS -> AWS RDS -> Izizindalwazi -> Dala isizindalwazi:
Khetha i-PostgreSQL bese uchofoza Okulandelayo:
Ngoba Lesi sibonelo esezinjongo zemfundo kuphela; sizosebenzisa iseva yamahhala “okungenani” (Isigaba Samahhala):
Okulandelayo, sifaka umaka ku-Free Tier block, futhi ngemva kwalokho sizonikezwa ngokuzenzakalelayo isibonelo sekilasi le-t2.micro - nakuba libuthakathaka, limahhala futhi liwulungele umsebenzi wethu:
Okulandelayo kuza izinto ezibaluleke kakhulu: igama lesizindalwazi, igama lomsebenzisi oyinhloko kanye nephasiwedi yakhe. Ake siqambe isibonelo: myHabrTest, umsebenzisi oyinhloko: habr, iphasiwedi: habr12345 bese uchofoza inkinobho ethi Okulandelayo:
Ekhasini elilandelayo kunamapharamitha anesibopho sokufinyeleleka kweseva yethu egciniwe kusukela ngaphandle (Ukufinyeleleka Komphakathi) kanye nokutholakala kwembobo:
Ake sakhe isilungiselelo esisha seqembu lezokuphepha le-VPC, elizovumela ukufinyelela kwangaphandle kuseva yethu egciniwe nge-port 5432 (PostgreSQL).
Ake siye kukhonsoli ye-AWS efasiteleni elihlukile lesiphequluli ku-VPC Dashboard -> Amaqembu Okuphepha -> Dala isigaba seqembu lokuvikela:
Setha igama leqembu lezokuphepha - i-PostgreSQL, incazelo, ekhombisa ukuthi iyiphi i-VPC leli qembu okufanele lihlotshaniswe nayo bese uchofoza inkinobho ethi Dala:
Gcwalisa Imithetho Engenayo Yembobo 5432 yeqembu elisanda kwakhiwa, njengoba kukhonjisiwe esithombeni esingezansi. Awukwazi ukucacisa imbobo ngesandla, kodwa khetha i-PostgreSQL ohlwini lokudonsela phansi Uhlobo.
Uma sikhuluma ngokuqinile, inani ::/0 lisho ukutholakala kwethrafikhi engenayo kuseva evela emhlabeni wonke, okungelona iqiniso ngokuphelele ngokwezwi nezwi, kodwa ukuhlaziya isibonelo, masizivumele ukusebenzisa le ndlela:
Sibuyela ekhasini lesiphequluli, lapho "Lungisa izilungiselelo ezithuthukile" sivule bese sikhetha esigabeni samaqembu okuphepha e-VPC -> Khetha amaqembu akhona ezokuphepha e-VPC -> I-PostgreSQL:
Okulandelayo, ku-Database ongakhetha -> Igama lesizindalwazi -> setha igama - habrDB.
Singashiya amapharamitha asele, ngaphandle kokukhubaza ikhophi yasenqolobaneni (isikhathi sokugcinwa kwekhophi yasenqolobaneni - izinsuku ezingu-0), ukuqapha kanye Nemininingwane Yokusebenza, ngokuzenzakalelayo. Chofoza inkinobho Dala isizindalwazi:
Isibambi sochungechunge
Isigaba sokugcina kuzoba ukuthuthukiswa komsebenzi we-Spark, ozocubungula idatha entsha evela e-Kafka njalo ngemizuzwana emibili bese ufaka umphumela ku-database.
Njengoba kuphawuliwe ngenhla, izindawo zokuhlola ziyindlela eyinhloko ku-SparkStreaming okufanele ilungiselelwe ukuze kuqinisekiswe ukubekezelela amaphutha. Sizosebenzisa izindawo zokuhlola futhi, uma inqubo yehluleka, imojula ye-Spark Streaming izodinga kuphela ukubuyela endaweni yokuhlola yokugcina futhi iqalise kabusha izibalo kuyo ukuze sibuyisele idatha elahlekile.
Ukuhlola kunganikwa amandla ngokusetha uhla lwemibhalo kusistimu yefayela ekwazi ukumelana namaphutha (njenge-HDFS, S3, njll.) lapho kuzogcinwa khona ulwazi lwephoyinti lokuhlola. Lokhu kwenziwa ngokusebenzisa, isibonelo:
streamingContext.checkpoint(checkpointDirectory)
Esibonelweni sethu, sizosebenzisa indlela elandelayo, okungukuthi, uma i-checkpointDirectory ikhona, khona-ke umongo uzophinda udalwe kusukela kudatha yephoyinti lokuhlola. Uma uhla lwemibhalo lungekho (okungukuthi lwenziwe okokuqala), umsebenziToCreateContext ubizwa ukuze udale umongo omusha futhi ulungiselele i-DStreams:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Sakha into ye-DirectStream ukuze sixhume esihlokweni "somsebenzi" sisebenzisa indlela yokudalaDirectStream yelabhulali ye-KafkaUtils:
from pyspark.streaming.kafka import KafkaUtils
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)
broker_list = 'localhost:9092'
topic = 'transaction'
directKafkaStream = KafkaUtils.createDirectStream(ssc,
[topic],
{"metadata.broker.list": broker_list})
Ukuhlaziya idatha engenayo ngefomethi ye-JSON:
rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
currency=w['currency'],
amount=w['amount']))
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")
Sisebenzisa i-Spark SQL, senza iqembu elilula futhi sibonise umphumela kukhonsoli:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Ukuthola umbhalo wombuzo nokuwusebenzisa nge-Spark SQL:
sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)
Bese silondoloza idatha ehlanganisiwe ewumphumela wetafula ku-AWS RDS. Ukugcina imiphumela yokuhlanganisa kuthebula lesizindalwazi, sizosebenzisa indlela yokubhala yento ye-DataFrame:
testResultDataFrame.write
.format("jdbc")
.mode("append")
.option("driver", 'org.postgresql.Driver')
.option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB")
.option("dbtable", "transaction_flow")
.option("user", "habr")
.option("password", "habr12345")
.save()
Amagama ambalwa mayelana nokusetha uxhumano ku-AWS RDS. Sidale umsebenzisi nephasiwedi yayo esinyathelweni esithi “Deploying AWS PostgreSQL”. Kufanele usebenzise i-Endpoint njenge-url yesizindalwazi sesizindalwazi, eboniswa ku-Ukuxhumana nokuvikeleka isigaba:
Ukuze uxhume kahle i-Spark ne-Kafka, kufanele uqalise umsebenzi ngokuthumela i-smark usebenzisa i-artifact. i-spark-streaming-kafka-0-8_2.11. Ukwengeza, sizophinda sisebenzise i-artifact ukuze sixhumane nesizindalwazi se-PostgreSQL; sizowadlulisela ngama- --packages.
Ngokuvumelana nezimo kweskripthi, sizophinda sifake njengamapharamitha wokufaka igama leseva yomlayezo nesihloko lapho sifuna ukuthola khona idatha.
Ngakho-ke, yisikhathi sokuqalisa futhi uhlole ukusebenza kwesistimu:
spark-submit
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207
spark_job.py localhost:9092 transaction
Konke kwahamba kahle! Njengoba ubona esithombeni esingezansi, ngenkathi uhlelo lusebenza, imiphumela emisha yokuhlanganisa iphuma njalo ngemizuzwana emi-2, ngoba sibeka isikhawu sokuhlanganisa sibe imizuzwana emi-2 lapho sidala into ye-StreamingContext:
Okulandelayo, senza umbuzo olula kusizindalwazi ukuze sihlole ukuba khona kwamarekhodi etafuleni transaction_flow:
isiphetho
Lesi sihloko sibheke isibonelo sokucutshungulwa kokusakaza kolwazi kusetshenziswa ukusakazwa kwe-Spark ngokuhambisana ne-Apache Kafka kanye ne-PostgreSQL. Ngokukhula kwedatha evela emithonjeni ehlukahlukene, kunzima ukweqisa inani elisebenzayo le-Spark Streaming lokudala ukusakaza kanye nezinhlelo zokusebenza zesikhathi sangempela.
Ungathola ikhodi yomthombo egcwele endaweni yami yokugcina ku
Ngiyajabula ukuxoxa ngalesi sihloko, ngibheke ngabomvu ukuphawula kwenu, futhi ngithemba nokugxeka okwakhayo okuvela kubo bonke abafundi abakhathalelayo.
Ngikufisela impumelelo!
IHu. Ekuqaleni kwakuhlelwe ukusebenzisa i-database yendawo ye-PostgreSQL, kodwa ngenxa yothando lwami lwe-AWS, nginqume ukuhambisa i-database efwini. Esihlokweni esilandelayo ngalesi sihloko, ngizobonisa indlela yokusebenzisa lonke uhlelo oluchazwe ngenhla ku-AWS usebenzisa i-AWS Kinesis ne-AWS EMR. Landela izindaba!
Source: www.habr.com