I-Apache Kafka nokusakazwa kwedatha yokusakaza nge-Spark Streaming

Sawubona, Habr! Namuhla sizokwakha uhlelo oluzocubungula ukusakazwa komlayezo we-Apache Kafka sisebenzisa i-Spark Streaming futhi sibhale imiphumela yokucubungula kusizindalwazi samafu se-AWS RDS.

Ake sicabange ukuthi isikhungo esithile sezikweletu sisibekela umsebenzi wokucubungula imisebenzi engenayo “ngokuphazima kweso” kuwo wonke amagatsha aso. Lokhu kungenziwa ngenjongo yokubala ngokushesha indawo yohlobo lwemali evulekile kumgcinimafa, imikhawulo noma imiphumela yezezimali yemisebenzi, njll.

Indlela yokuqalisa leli cala ngaphandle kokusebenzisa imilingo neziphonso zemilingo - funda ngaphansi kokusika! Hamba!

I-Apache Kafka nokusakazwa kwedatha yokusakaza nge-Spark Streaming
(Umthombo wesithombe)

Isingeniso

Kunjalo, ukucubungula inani elikhulu ledatha ngesikhathi sangempela kunikeza amathuba amaningi okusetshenziswa ezinhlelweni zesimanje. Enye yezinhlanganisela ezidume kakhulu zalokhu i-tandem ye-Apache Kafka kanye ne-Spark Streaming, lapho i-Kafka idala ukusakazwa kwamaphakethe emilayezo engenayo, futhi i-Spark Streaming icubungula lawa maphakethe ngesikhathi esithile esinikeziwe.

Ukwandisa ukubekezelelwa kwephutha kohlelo lokusebenza, sizosebenzisa izindawo zokuhlola. Ngalo mshini, lapho injini yokusakaza kwe-Spark idinga ukubuyisela idatha elahlekile, idinga kuphela ukubuyela endaweni yokuhlola yokugcina bese iqalisa kabusha izibalo ukusuka lapho.

I-Architecture yesistimu ethuthukisiwe

I-Apache Kafka nokusakazwa kwedatha yokusakaza nge-Spark Streaming

Izingxenye ezisetshenzisiwe:

  • Apache Kafka iwuhlelo lokuthumela imiyalezo olusabalalisiwe. Ifanele ukusetshenziswa komlayezo okungaxhunyiwe ku-inthanethi nokungaxhunyiwe ku-inthanethi. Ukuze uvimbele ukulahleka kwedatha, imilayezo ye-Kafka igcinwa kudiski futhi iphindaphindwe ngaphakathi kweqoqo. Isistimu ye-Kafka yakhelwe phezu kwesevisi yokuvumelanisa ye-ZooKeeper;
  • Ukusakaza kwe-Apache Spark - Ingxenye ye-Spark yokucubungula idatha yokusakaza. Imojuli ye-Spark Streaming yakhiwe kusetshenziswa i-micro-batch architecture, lapho ukusakazwa kwedatha kuhunyushwa njengokulandelana okuqhubekayo kwamaphakethe wedatha amancane. I-Spark Streaming ithatha idatha emithonjeni ehlukene futhi iyihlanganise ibe amaphakheji amancane. Amaphakheji amasha adalwa ngezikhathi ezithile. Ekuqaleni kwesikhawu ngasinye sesikhathi, kwakhiwa iphakethe elisha, futhi noma iyiphi idatha etholwe ngaleso sikhathi ifakwa ephaketheni. Ekupheleni kwesikhawu, ukukhula kwephakethe kuyama. Usayizi wesikhawu unqunywa ipharamitha ebizwa ngokuthi i-batch interval;
  • I-Apache Spark SQL - ihlanganisa ukucutshungulwa kobudlelwano nohlelo olusebenzayo lwe-Spark. Idatha ehleliwe isho idatha ene-schema, okungukuthi, isethi yezinkambu zawo wonke amarekhodi. I-Spark SQL isekela okokufaka okuvela emithonjeni ehlukahlukene yedatha ehlelekile futhi, ngenxa yokutholakala kolwazi lwe-schema, ingakwazi ukubuyisa kuphela izinkambu ezidingekayo zamarekhodi, futhi ihlinzeke ngama-DataFrame API;
  • AWS RDS iyisizindalwazi sobudlelwano esisekelwe emafini esingabizi kakhulu, isevisi yewebhu eyenza kube lula ukusetha, ukusebenza nokukala, futhi iqondiswa ngokuqondile yi-Amazon.

Ukufaka nokusebenzisa iseva ye-Kafka

Ngaphambi kokusebenzisa i-Kafka ngqo, udinga ukwenza isiqiniseko sokuthi une-Java, ngoba... I-JVM isetshenziselwa umsebenzi:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Masidale umsebenzisi omusha ozosebenza no-Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Okulandelayo, landa ukusatshalaliswa kuwebhusayithi esemthethweni ye-Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Khipha ingobo yomlando elandiwe:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Isinyathelo esilandelayo siyazikhethela. Iqiniso liwukuthi izilungiselelo ezizenzakalelayo azikuvumeli ukuthi usebenzise ngokugcwele zonke izici ze-Apache Kafka. Isibonelo, susa isihloko, isigaba, iqembu okungashicilelwa kulo imilayezo. Ukuze ushintshe lokhu, masihlele ifayela lokumisa:

vim ~/kafka/config/server.properties

Engeza okulandelayo ekupheleni kwefayela:

delete.topic.enable = true

Ngaphambi kokuqala iseva ye-Kafka, udinga ukuqala iseva ye-ZooKeeper; sizosebenzisa umbhalo osizayo oza nokusabalalisa kwe-Kafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Ngemuva kokuthi i-ZooKeeper iqale ngempumelelo, vula iseva ye-Kafka endaweni ehlukile:

bin/kafka-server-start.sh config/server.properties

Masidale isihloko esisha esibizwa ngokuthi Okwenziwayo:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Masiqinisekise ukuthi isihloko esinenombolo edingekayo yokuhlukanisa nokuphindaphinda sekudaliwe:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

I-Apache Kafka nokusakazwa kwedatha yokusakaza nge-Spark Streaming

Masiphuthelwe izikhathi zokuhlola umdidiyeli nomthengi ngesihloko esisanda kwakhiwa. Imininingwane eyengeziwe mayelana nokuthi ungakuhlola kanjani ukuthumela nokwamukela imilayezo ibhalwe emibhalweni esemthethweni - Thumela imilayezo ethile. Yebo, siqhubekela phambili ekubhaleni umkhiqizi ePython sisebenzisa i-KafkaProducer API.

Ukubhala komdidiyeli

Umkhiqizi uzokhiqiza idatha engahleliwe - imilayezo eyi-100 njalo ngomzuzwana. Ngedatha engahleliwe sisho isichazamazwi esinezinkambu ezintathu:

  • Yegatsha - igama lendawo yokuthengisa yesikhungo sezikweletu;
  • Currency - Imali yokuthengiselana;
  • Inani le - inani lokwenziwe. Inani lizoba inombolo ephozithivu uma kuwukuthengwa kwemali yiBhange, kanye nenombolo enegethivu uma kuyindali.

Ikhodi yomkhiqizi ibukeka kanjena:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Okulandelayo, sisebenzisa indlela yokuthumela, sithumela umlayezo kuseva, esihlokweni esisidingayo, ngefomethi ye-JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Lapho usebenzisa umbhalo, sithola imilayezo elandelayo kutheminali:

I-Apache Kafka nokusakazwa kwedatha yokusakaza nge-Spark Streaming

Lokhu kusho ukuthi yonke into isebenza njengoba besifuna - umdidiyeli ukhiqiza futhi athumele imilayezo esihlokweni esisidingayo.
Isinyathelo esilandelayo ukufaka i-Spark nokucubungula lokhu kusakazwa komlayezo.

Ifaka i-Apache Spark

I-Apache Spark iyiplathifomu yekhompuyutha yeqoqo elisebenza kakhulu emhlabeni wonke.

I-Spark isebenza kangcono kunokusetshenziswa okudumile kwemodeli ye-MapReduce kuyilapho isekela ububanzi obubanzi bezinhlobo zekhompuyutha, okuhlanganisa imibuzo esebenzisanayo nokucutshungulwa kokusakaza. Isivinini sidlala indima ebalulekile lapho kucutshungulwa inani elikhulu ledatha, ngoba isivinini esikuvumela ukuthi usebenze ngokuhlanganyela ngaphandle kokuchitha imizuzu noma amahora ulindile. Enye yamandla amakhulu e-Spark ayenza isheshe kangaka ikhono layo lokwenza izibalo ezisenkumbulweni.

Lolu hlaka lubhalwe nge-Scala, ngakho-ke udinga ukulufaka kuqala:

sudo apt-get install scala

Landa ukusatshalaliswa kwe-Spark kuwebhusayithi esemthethweni:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Khipha ingobo yomlando:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Engeza indlela eya ku-Spark kufayela le-bash:

vim ~/.bashrc

Engeza imigqa elandelayo ngomhleli:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Qalisa umyalo ongezansi ngemuva kokwenza izinguquko ku-bashrc:

source ~/.bashrc

Isebenzisa i-AWS PostgreSQL

Okusele nje ukusebenzisa isizindalwazi lapho sizolayisha khona ulwazi olucutshunguliwe oluvela emifudlaneni. Kulokhu sizosebenzisa isevisi ye-AWS RDS.

Iya kukhonsoli ye-AWS -> AWS RDS -> Izizindalwazi -> Dala isizindalwazi:
I-Apache Kafka nokusakazwa kwedatha yokusakaza nge-Spark Streaming

Khetha i-PostgreSQL bese uchofoza Okulandelayo:
I-Apache Kafka nokusakazwa kwedatha yokusakaza nge-Spark Streaming

Ngoba Lesi sibonelo esezinjongo zemfundo kuphela; sizosebenzisa iseva yamahhala “okungenani” (Isigaba Samahhala):
I-Apache Kafka nokusakazwa kwedatha yokusakaza nge-Spark Streaming

Okulandelayo, sifaka umaka ku-Free Tier block, futhi ngemva kwalokho sizonikezwa ngokuzenzakalelayo isibonelo sekilasi le-t2.micro - nakuba libuthakathaka, limahhala futhi liwulungele umsebenzi wethu:
I-Apache Kafka nokusakazwa kwedatha yokusakaza nge-Spark Streaming

Okulandelayo kuza izinto ezibaluleke kakhulu: igama lesizindalwazi, igama lomsebenzisi oyinhloko kanye nephasiwedi yakhe. Ake siqambe isibonelo: myHabrTest, umsebenzisi oyinhloko: habr, iphasiwedi: habr12345 bese uchofoza inkinobho ethi Okulandelayo:
I-Apache Kafka nokusakazwa kwedatha yokusakaza nge-Spark Streaming

Ekhasini elilandelayo kunamapharamitha anesibopho sokufinyeleleka kweseva yethu egciniwe kusukela ngaphandle (Ukufinyeleleka Komphakathi) kanye nokutholakala kwembobo:

I-Apache Kafka nokusakazwa kwedatha yokusakaza nge-Spark Streaming

Ake sakhe isilungiselelo esisha seqembu lezokuphepha le-VPC, elizovumela ukufinyelela kwangaphandle kuseva yethu egciniwe nge-port 5432 (PostgreSQL).
Ake siye kukhonsoli ye-AWS efasiteleni elihlukile lesiphequluli ku-VPC Dashboard -> Amaqembu Okuphepha -> Dala isigaba seqembu lokuvikela:
I-Apache Kafka nokusakazwa kwedatha yokusakaza nge-Spark Streaming

Setha igama leqembu lezokuphepha - i-PostgreSQL, incazelo, ekhombisa ukuthi iyiphi i-VPC leli qembu okufanele lihlotshaniswe nayo bese uchofoza inkinobho ethi Dala:
I-Apache Kafka nokusakazwa kwedatha yokusakaza nge-Spark Streaming

Gcwalisa Imithetho Engenayo Yembobo 5432 yeqembu elisanda kwakhiwa, njengoba kukhonjisiwe esithombeni esingezansi. Awukwazi ukucacisa imbobo ngesandla, kodwa khetha i-PostgreSQL ohlwini lokudonsela phansi Uhlobo.

Uma sikhuluma ngokuqinile, inani ::/0 lisho ukutholakala kwethrafikhi engenayo kuseva evela emhlabeni wonke, okungelona iqiniso ngokuphelele ngokwezwi nezwi, kodwa ukuhlaziya isibonelo, masizivumele ukusebenzisa le ndlela:
I-Apache Kafka nokusakazwa kwedatha yokusakaza nge-Spark Streaming

Sibuyela ekhasini lesiphequluli, lapho "Lungisa izilungiselelo ezithuthukile" sivule bese sikhetha esigabeni samaqembu okuphepha e-VPC -> Khetha amaqembu akhona ezokuphepha e-VPC -> I-PostgreSQL:
I-Apache Kafka nokusakazwa kwedatha yokusakaza nge-Spark Streaming

Okulandelayo, ku-Database ongakhetha -> Igama lesizindalwazi -> setha igama - habrDB.

Singashiya amapharamitha asele, ngaphandle kokukhubaza ikhophi yasenqolobaneni (isikhathi sokugcinwa kwekhophi yasenqolobaneni - izinsuku ezingu-0), ukuqapha kanye Nemininingwane Yokusebenza, ngokuzenzakalelayo. Chofoza inkinobho Dala isizindalwazi:
I-Apache Kafka nokusakazwa kwedatha yokusakaza nge-Spark Streaming

Isibambi sochungechunge

Isigaba sokugcina kuzoba ukuthuthukiswa komsebenzi we-Spark, ozocubungula idatha entsha evela e-Kafka njalo ngemizuzwana emibili bese ufaka umphumela ku-database.

Njengoba kuphawuliwe ngenhla, izindawo zokuhlola ziyindlela eyinhloko ku-SparkStreaming okufanele ilungiselelwe ukuze kuqinisekiswe ukubekezelela amaphutha. Sizosebenzisa izindawo zokuhlola futhi, uma inqubo yehluleka, imojula ye-Spark Streaming izodinga kuphela ukubuyela endaweni yokuhlola yokugcina futhi iqalise kabusha izibalo kuyo ukuze sibuyisele idatha elahlekile.

Ukuhlola kunganikwa amandla ngokusetha uhla lwemibhalo kusistimu yefayela ekwazi ukumelana namaphutha (njenge-HDFS, S3, njll.) lapho kuzogcinwa khona ulwazi lwephoyinti lokuhlola. Lokhu kwenziwa ngokusebenzisa, isibonelo:

streamingContext.checkpoint(checkpointDirectory)

Esibonelweni sethu, sizosebenzisa indlela elandelayo, okungukuthi, uma i-checkpointDirectory ikhona, khona-ke umongo uzophinda udalwe kusukela kudatha yephoyinti lokuhlola. Uma uhla lwemibhalo lungekho (okungukuthi lwenziwe okokuqala), umsebenziToCreateContext ubizwa ukuze udale umongo omusha futhi ulungiselele i-DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Sakha into ye-DirectStream ukuze sixhume esihlokweni "somsebenzi" sisebenzisa indlela yokudalaDirectStream yelabhulali ye-KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Ukuhlaziya idatha engenayo ngefomethi ye-JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Sisebenzisa i-Spark SQL, senza iqembu elilula futhi sibonise umphumela kukhonsoli:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Ukuthola umbhalo wombuzo nokuwusebenzisa nge-Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Bese silondoloza idatha ehlanganisiwe ewumphumela wetafula ku-AWS RDS. Ukugcina imiphumela yokuhlanganisa kuthebula lesizindalwazi, sizosebenzisa indlela yokubhala yento ye-DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Amagama ambalwa mayelana nokusetha uxhumano ku-AWS RDS. Sidale umsebenzisi nephasiwedi yayo esinyathelweni esithi “Deploying AWS PostgreSQL”. Kufanele usebenzise i-Endpoint njenge-url yesizindalwazi sesizindalwazi, eboniswa ku-Ukuxhumana nokuvikeleka isigaba:

I-Apache Kafka nokusakazwa kwedatha yokusakaza nge-Spark Streaming

Ukuze uxhume kahle i-Spark ne-Kafka, kufanele uqalise umsebenzi ngokuthumela i-smark usebenzisa i-artifact. i-spark-streaming-kafka-0-8_2.11. Ukwengeza, sizophinda sisebenzise i-artifact ukuze sixhumane nesizindalwazi se-PostgreSQL; sizowadlulisela ngama- --packages.

Ngokuvumelana nezimo kweskripthi, sizophinda sifake njengamapharamitha wokufaka igama leseva yomlayezo nesihloko lapho sifuna ukuthola khona idatha.

Ngakho-ke, yisikhathi sokuqalisa futhi uhlole ukusebenza kwesistimu:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Konke kwahamba kahle! Njengoba ubona esithombeni esingezansi, ngenkathi uhlelo lusebenza, imiphumela emisha yokuhlanganisa iphuma njalo ngemizuzwana emi-2, ngoba sibeka isikhawu sokuhlanganisa sibe imizuzwana emi-2 lapho sidala into ye-StreamingContext:

I-Apache Kafka nokusakazwa kwedatha yokusakaza nge-Spark Streaming

Okulandelayo, senza umbuzo olula kusizindalwazi ukuze sihlole ukuba khona kwamarekhodi etafuleni transaction_flow:

I-Apache Kafka nokusakazwa kwedatha yokusakaza nge-Spark Streaming

isiphetho

Lesi sihloko sibheke isibonelo sokucutshungulwa kokusakaza kolwazi kusetshenziswa ukusakazwa kwe-Spark ngokuhambisana ne-Apache Kafka kanye ne-PostgreSQL. Ngokukhula kwedatha evela emithonjeni ehlukahlukene, kunzima ukweqisa inani elisebenzayo le-Spark Streaming lokudala ukusakaza kanye nezinhlelo zokusebenza zesikhathi sangempela.

Ungathola ikhodi yomthombo egcwele endaweni yami yokugcina ku GitHub.

Ngiyajabula ukuxoxa ngalesi sihloko, ngibheke ngabomvu ukuphawula kwenu, futhi ngithemba nokugxeka okwakhayo okuvela kubo bonke abafundi abakhathalelayo.

Ngikufisela impumelelo!

IHu. Ekuqaleni kwakuhlelwe ukusebenzisa i-database yendawo ye-PostgreSQL, kodwa ngenxa yothando lwami lwe-AWS, nginqume ukuhambisa i-database efwini. Esihlokweni esilandelayo ngalesi sihloko, ngizobonisa indlela yokusebenzisa lonke uhlelo oluchazwe ngenhla ku-AWS usebenzisa i-AWS Kinesis ne-AWS EMR. Landela izindaba!

Source: www.habr.com

Engeza amazwana